Worker Pool in Golang

Worker Pool in Golang

Create multiple worker Goroutines to execute tasks concurrently

Tags: #advance #topic #golang #goroutine #channels #workerpool #threadpool

Often we end up with some work which is so time-consuming that if we're able to assign, multiple person/worker, to do that job the execution time will reduce the time which will save a lot of time for those particular tasks.

Today we're going to solve this problem by creating a worker pool also known as thread pool so that tasks are done by multiple workers concurrently. We're particularly using Golang's lightweight thread also known as Goroutine & Channels.

Prerequisites: Goroutine, Channels

Goroutine

A goroutine is a lightweight thread managed by the Go runtime unlike other languages like Python who's threads are managed by OS and also expensive to run. So goroutines are functions or methods that run concurrently with other functions or methods.

Channels

Channels are ways in which different goroutines communicate with each other. We can understand them as pipes through which you can connect with different concurrent goroutines. The communication is bidirectional by default, meaning that you can send and receive values from the same channel.

Let's define some workers so that we can solve the time issue using goroutines and channels.

Task

func task() {
    time.Sleep(time.Second) // some task to be executed
}

Job

Note: Each job takes 1 second to complete

func job(workerID, jobID int) {
    fmt.Println("worker", workerID, "started  job", jobID)
    task()
    fmt.Println("worker", workerID, "finished job", jobID)
}

Worker

func worker(workerID int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        job(workerID, j)
        results<- j * 2
    }
}

In Golang, we define a channel with the keyword chan. Anyone get confused when seeing those arrow signs with channels, let's simplified those first...

chan   // read-write
<-chan // read-only
chan<- // write-only

So we can say without any arrow in the chan keyword would mean the channel can read-write which is the default behavior, but if we want a read-only channel we put an arrow sign before the chan keyword like <-chan and if we want a write-only channel we put an arrow sign after the chan keyword like chan<- this.

So for our example above the jobs channel only reads and the results channel only writes data.

So let's continue on our worker pool example...

Our worker function will receive the jobs and send the results of the job in the results channel.

We make the job function to execute the task function to simulate an actual task running by the worker.

In the task function we put a sleep function which will wait for a second so that it behaves like expensive/time-consuming work.

Create a int channel with buffer

func makeIntChannels(buffer int) chan int {
    channel := make(chan int, buffer)
    return channel
}

Worker pool

func execUsingWorkerPool(numOfJobs, numOfWorkers int) {
    defer duration(track("time using worker pool"))

    jobs := makeIntChannels(numOfJobs)
    results := makeIntChannels(numOfJobs)

    for w := 1; w <= numOfWorkers; w++ {
        go worker(w, jobs, results)
    }

    for job := 1; job <= numOfJobs; job++ {
        jobs<- job
    }

    close(jobs) // closing the job channel to indicate that's all the work we have.

    for i := 1; i <= numOfJobs; i++ {
        <-results
    }
}

Without worker pool

func execWithoutUsingWorkerPool(numOfJobs, worker int) {
    defer duration(track("time without using worker pool"))

    for j := 1; j <= numOfJobs; j++ {
        job(worker, j)
    }
}

Calculate execution time

func track(msg string) (string, time.Time) {
    return msg, time.Now()
}

func duration(msg string, start time.Time) {
    log.Printf("%v: %v\n", msg, time.Since(start))
}

whoo!!! lots of code right... Let's go through the main function to understand what's happening

Main function

func main() {
    const numOfJobs = 5
    const numOfWorkers = 3

    execUsingWorkerPool(numOfJobs, numOfWorkers)

    execWithoutUsingWorkerPool(numOfJobs, 1)
}

In the main function, we're defining the number of jobs and workers as a const a value so that we can reuse them in the worker pool function and single worker pool function. Let's check out the execUsingWorkerPool function to understand what's happening.

defer duration(track("time using worker pool"))

In the first line, we use the defer keyword, which means that when execUsingWorkerPool function executes all other statements in the function block & the last command will be executed would be defined in the defer statement, cool right...

duration & track function here is a util function which allows us to track the execution time. track function passed as a parameter in the duration function as in the Golang, this is called higher-order function or first-class citizen which means is a function can be assigned to a variable, pass as a parameter to other function and return a function from another function.

jobs := makeIntChannels(numOfJobs)
results := makeIntChannels(numOfJobs)

Next line we define two int buffer channels as jobs & results. In order to use our pool of workers, we need to send them jobs and collect their results.

for worker := 1; worker <= numOfWorkers; worker++ {
    go worker(worker, jobs, results)
}

Next line This starts up workers, for our example, we use 3 workers, initially blocked because there are no jobs yet.

for job := 1; job <= numOfJobs; job++ {
    jobs<- job
}

close(jobs) // closing the job channel to indicate that's all the work we have.

Next, we send a total of 5 jobs and then close the jobs channel to indicate, that’s all the work we have right now.

for i := 1; i <= numOfJobs; i++ {
    <-results
}

Finally, we collect all the results of the jobs we define. This also ensures that the worker goroutines have finished all the workers.

If you like, you can read the same article on our official blog

You can read my other blog-posts Here

Output

worker 3 started  job 1
worker 1 started  job 2
worker 2 started  job 3
worker 2 finished job 3
worker 2 started  job 4
worker 3 finished job 1
worker 3 started  job 5
worker 1 finished job 2
worker 3 finished job 5
worker 2 finished job 4
2021/03/18 09:25:25 time using worker pool: 2.000943787s
worker 1 started  job 1
worker 1 finished job 1
worker 1 started  job 2
worker 1 finished job 2
worker 1 started  job 3
worker 1 finished job 3
worker 1 started  job 4
worker 1 finished job 4
worker 1 started  job 5
worker 1 finished job 5
2021/03/18 09:25:30 time without using worker pool: 5.001234313s

In Conclusion, we can say using the worker pool, execution time reduces to 2+ seconds where without worker pool, it's taking 5+ seconds. Hopefully, After this, we understand what is a worker pool and how to create one in Golang, and the benefit of using a worker pool.