We're planting a tree for every job application! Click here to learn more

How would you organize your goroutine and channel? (P1)

Khoa Pham

30 Apr 2019

3 min read

How would you organize your goroutine and channel? (P1)
  • Go

P1 TLDR: keep the state in the message! and if you don’t know what that means yet, keep reading.

1_D2gWfrioqx5LFf-Z08pXSg.jpeg

Goroutine and channel are the embraced model to do concurrency in Go. Channel syntax abstracts the explicit use of locks and help developer avoid incorrect data flow.

But this post will focus on the tedious side of working with goroutine and channel, how to use them right, and how to organize them into composable and reusable code.

The post will be laddered up in complexity.

Simplest: Fire-and-forget (only apply for async ops with no result collecting). Very easy but don’t forget to set your timer (especially on i/o call)

client := &http.Client{
    Timeout: 3 * time.Second,
}
go func() {
    client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))
}()

Great… Wait, we don’t need that closure do we:

go client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))

Much brevity but DON’T do the above if you make that call in a loop with different arguments, you will need function scope.

Intermediate: Long running goroutines listen to a channel, they all do the same logic - process data and send back result (you may call them workers pool).

type (
    work struct {
        query string
    }
    result struct {
        answer string
    }
)
var (
    workc   = make(chan work, 4)
    resultc = make(chan result, 4)
    errc    = make(chan error, 4)
)
// Run result collector in background
go func() {
    for {
        select {
        case r := <-resultc:
            processResult(r)
        case err := <-errc:
            log.Println(err)
        default:
        }
    }
}()
// Run 4 workers in background
for i := 0; i < 4; i++ {
    go func() {
        for {
            select {
            case w := <- workc:
                r, err := processWork(w)
                if err != nil {
                    errc <- err
                    continue
                }
                resultc <- r
            default:
        }
    }()
}

Great… Wait do we really need 3 channels? error in Go is just value, we can combine error and result together and reduce it to 2 channels, in and out!

// Combine result and error
type result struct {
    answer string
    err    error
}
// Updated result collector
go func() {
    for {
        select {
        case r := <-resultc:
            if r.err != nil {
                log.Println(err)
                continue
            }
            processResult(r)
        default:
        }
    }
}()
// Updated workers
for i := 0; i < 4; i++ {
    go func() {
        for {
            select {
            case w := <- workc:
                resultc <- processWork(w)
            default:
        }
    }()
}

Another intermediate: Long running goroutines listen on multiple channels. Given scenario that a set of workers listen to 2 or more channels to do different kind of work. This is particular common in a lot of codebase, so I think it’s beneficial to point it out.

type (
    work1 struct {
        query string
    }
    work2 struct {
        command int
    }
    result1 struct {
        answer string
        err    error
    }
    result2 struct {
        output int
        err    error
    }
)
var (
    workc1   = make(chan work1, 4)
    workc2   = make(chan work2, 4)
    resultc1 = make(chan result1, 4)
    resultc2 = make(chan result2, 4)
)
// Result collector listens to 2 result channels
go func() {
    for {
        select {
        case r := <-resultc1:
            if r.err != nil {
                log.Println(err)
                continue
            }
            processResultType1(r)
        case r := <-resultc2
            if r.err != nil {
                log.Println(err)
                continue
            }
            processResultType2(r)
        default:
        }
    }
}()
// 4 workers listen to 2 working channels
for i := 0; i < 4; i++ {
    go func() {
        for {
            select {
            case w := <- workc1
                resultc1 <- processWorkType1(w)
            case w := <- workc2
                resultc2 <- processWorkType2(w)
            default:
        }
    }()
}

Great… Wait that’s 4 channels already, only for 2 types of work! and quite a pattern of repetitive code, imagine we have to do this with 3, 4 or even 10 types of work (we will need 2x number of channels!)

Channel is just a medium for communication between goroutines. A good medium is a stateless one, it shouldn’t care what it carries. To reduce the number of channels (down to a fixed 2 - in and out!) disregarding how many types of work, we can do this:

type work struct {
    typ     int
    query   string
    command int
    err     error
}
type result struct {
    typ    int
    answer string
    output int
    err    error
}

or even better, consider in and out data types are just data!

type (
    work1 struct {
        query string
    }
    work2 struct {
        command int
    }
    result1 struct {
        answer string
    }
    result2 struct {
        output int
    }
    message struct {
        work1
        work2
        result1
        result2
        typ int
        err error
    }
)

The only notice is that we should increase the buffer if the message carry more types, for above example, we can bump the buffer up to guarantee the same throughput (in ideal scenario):

var (
    workc   = make(chan message, 8)
    resultc = make(chan message, 8)
)
// Updated collector logic
go func() {
    for {
        select {
        case msg := <-resultc:
            if msg.err != nil {
                log.Println(err)
                continue
            }
            switch msg.typ {
            case 1:
                processResultType1(msg.result1)
            case 2:
                processResultType2(msg.result2)
            }
        default:
        }
    }
}()
// Updated workers logic
for i:= 0; i < 4; i++ {
    go func() {
        for {
            select {
            case msg := <- workc:
                switch msg.typ {
                case 1:
                    resultc <- processWorkType1(msg.work1)
                case 2:
                    resultc <- processWorkType2(msg.work2)
                }
            default:
        }
    }()
}

Level 9000: Complex routing, duplex channel, bouncing messages and how to make them into reusable code.

P1 of this post ends here as it’s running long already. I’m still writing up P2. P2 will cover Level 9000 above :)

Let me know if this could be your favorite goroutine and channel structure in comments below. Happy coding!

Did you like this article?

Khoa Pham

See other articles by Khoa

Related jobs

See all

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Related articles

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

WorksHub

CareersCompaniesSitemapFunctional WorksBlockchain WorksJavaScript WorksAI WorksGolang WorksJava WorksPython WorksRemote Works
hello@works-hub.com

Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ

108 E 16th Street, New York, NY 10003

Subscribe to our newsletter

Join over 111,000 others and get access to exclusive content, job opportunities and more!

© 2024 WorksHub

Privacy PolicyDeveloped by WorksHub