We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Golang Works average a 15% increase in salary 🚀

Blog hero image

How would you organize your goroutine and channel? (P1)

Khoa Pham 30 April, 2019 (3 min read)

P1 TLDR: keep the state in the message! and if you don’t know what that means yet, keep reading.

1_D2gWfrioqx5LFf-Z08pXSg.jpeg

Goroutine and channel are the embraced model to do concurrency in Go. Channel syntax abstracts the explicit use of locks and help developer avoid incorrect data flow.

But this post will focus on the tedious side of working with goroutine and channel, how to use them right, and how to organize them into composable and reusable code.

The post will be laddered up in complexity.

Simplest: Fire-and-forget (only apply for async ops with no result collecting). Very easy but don’t forget to set your timer (especially on i/o call)

client := &http.Client{
    Timeout: 3 * time.Second,
}
go func() {
    client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))
}()

Great… Wait, we don’t need that closure do we:

go client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))

Much brevity but DON’T do the above if you make that call in a loop with different arguments, you will need function scope.

Intermediate: Long running goroutines listen to a channel, they all do the same logic - process data and send back result (you may call them workers pool).

type (
    work struct {
        query string
    }
    result struct {
        answer string
    }
)
var (
    workc   = make(chan work, 4)
    resultc = make(chan result, 4)
    errc    = make(chan error, 4)
)
// Run result collector in background
go func() {
    for {
        select {
        case r := <-resultc:
            processResult(r)
        case err := <-errc:
            log.Println(err)
        default:
        }
    }
}()
// Run 4 workers in background
for i := 0; i < 4; i++ {
    go func() {
        for {
            select {
            case w := <- workc:
                r, err := processWork(w)
                if err != nil {
                    errc <- err
                    continue
                }
                resultc <- r
            default:
        }
    }()
}

Great… Wait do we really need 3 channels? error in Go is just value, we can combine error and result together and reduce it to 2 channels, in and out!

// Combine result and error
type result struct {
    answer string
    err    error
}
// Updated result collector
go func() {
    for {
        select {
        case r := <-resultc:
            if r.err != nil {
                log.Println(err)
                continue
            }
            processResult(r)
        default:
        }
    }
}()
// Updated workers
for i := 0; i < 4; i++ {
    go func() {
        for {
            select {
            case w := <- workc:
                resultc <- processWork(w)
            default:
        }
    }()
}

Another intermediate: Long running goroutines listen on multiple channels. Given scenario that a set of workers listen to 2 or more channels to do different kind of work. This is particular common in a lot of codebase, so I think it’s beneficial to point it out.

type (
    work1 struct {
        query string
    }
    work2 struct {
        command int
    }
    result1 struct {
        answer string
        err    error
    }
    result2 struct {
        output int
        err    error
    }
)
var (
    workc1   = make(chan work1, 4)
    workc2   = make(chan work2, 4)
    resultc1 = make(chan result1, 4)
    resultc2 = make(chan result2, 4)
)
// Result collector listens to 2 result channels
go func() {
    for {
        select {
        case r := <-resultc1:
            if r.err != nil {
                log.Println(err)
                continue
            }
            processResultType1(r)
        case r := <-resultc2
            if r.err != nil {
                log.Println(err)
                continue
            }
            processResultType2(r)
        default:
        }
    }
}()
// 4 workers listen to 2 working channels
for i := 0; i < 4; i++ {
    go func() {
        for {
            select {
            case w := <- workc1
                resultc1 <- processWorkType1(w)
            case w := <- workc2
                resultc2 <- processWorkType2(w)
            default:
        }
    }()
}

Great… Wait that’s 4 channels already, only for 2 types of work! and quite a pattern of repetitive code, imagine we have to do this with 3, 4 or even 10 types of work (we will need 2x number of channels!)

Channel is just a medium for communication between goroutines. A good medium is a stateless one, it shouldn’t care what it carries. To reduce the number of channels (down to a fixed 2 - in and out!) disregarding how many types of work, we can do this:

type work struct {
    typ     int
    query   string
    command int
    err     error
}
type result struct {
    typ    int
    answer string
    output int
    err    error
}

or even better, consider in and out data types are just data!

type (
    work1 struct {
        query string
    }
    work2 struct {
        command int
    }
    result1 struct {
        answer string
    }
    result2 struct {
        output int
    }
    message struct {
        work1
        work2
        result1
        result2
        typ int
        err error
    }
)

The only notice is that we should increase the buffer if the message carry more types, for above example, we can bump the buffer up to guarantee the same throughput (in ideal scenario):

var (
    workc   = make(chan message, 8)
    resultc = make(chan message, 8)
)
// Updated collector logic
go func() {
    for {
        select {
        case msg := <-resultc:
            if msg.err != nil {
                log.Println(err)
                continue
            }
            switch msg.typ {
            case 1:
                processResultType1(msg.result1)
            case 2:
                processResultType2(msg.result2)
            }
        default:
        }
    }
}()
// Updated workers logic
for i:= 0; i < 4; i++ {
    go func() {
        for {
            select {
            case msg := <- workc:
                switch msg.typ {
                case 1:
                    resultc <- processWorkType1(msg.work1)
                case 2:
                    resultc <- processWorkType2(msg.work2)
                }
            default:
        }
    }()
}

Level 9000: Complex routing, duplex channel, bouncing messages and how to make them into reusable code.

P1 of this post ends here as it’s running long already. I’m still writing up P2. P2 will cover Level 9000 above :)

Let me know if this could be your favorite goroutine and channel structure in comments below. Happy coding!

Originally published on medium.com