Concurrency is becoming ever more important. Mark Summerfield looks at the approach of the new language Go.
The Go programming language is in some respects a radical departure from existing compiled languages. Its syntax, although C-ish, is much cleaner and simpler than C or C++’s, and it supports object-orientation through embedding (delegation) and aggregation, rather than by using inheritance. Go has a built-in garbage collector so we never have to worry about deleting/freeing memory – something that can be fiendishly complicated in a multithreaded context. In this article we will focus on another area where Go breaks new ground (at least, compared with other mainstream programming languages): concurrency.
Go has the usual concurrency primitives, such as mutexes, read–write mutexes, and wait conditions, as well as low-level primitives such as atomic adds, loads, and compare and swaps. But Go programmers are encouraged to avoid using any of these and instead to use Go’s high-level goroutines and channels .
A goroutine is a very lightweight thread of execution that shares the same address space as the rest of the program. The gc compiler multiplexes one or more goroutines per operating system thread and can realistically support hundreds, thousands, or more goroutines.
A channel is a two-way (or one-way, at our option) communications pipeline. Channels are type safe, and when they are used to pass immutable values (
bool
s,
int
s,
float64
s,
string
s, and
struct
s composed of immutable values), they can be used in multiple goroutines without formality. When it comes to passing pointers or references, we must, of course, ensure that our accesses are synchronized.
Incidentally, goroutines and channels are an implentation of a form of CSP (Communicating Sequential Processes), based on the ideas of computer scientist C. A. R. Hoare.
Go’s mantra for concurrency is:
Do not communicate by sharing memory;instead, share memory by communicating.
In this article we will review a simple concurrent program called
headcheck
, that, given a list of URLs, performs an HTTP HEAD request on each one and reports its results. We will look at a few different ways the program can implement concurrency using Go’s goroutines and channels, to give a flavour of the possibilities.
Listing 1 shows the
struct
s the program will operate on. We made
Job
a
struct
because this is syntactically more convenient when giving it methods.
type Result struct { url string status int lastModified string } type Job struct { url string } |
Listing 1 |
Listing 2 shows the
main()
function. The built-in
make()
command is used to create channels (as well as values of the built in map and slice collection types).
func main() { jobs := make(chan Job, nWorkers * 2) results := make(chan Result, bufferSize) done := make(chan bool, nWorkers) go addJobs(jobs) for i := 0; i < nWorkers; i++ { go doJobs(jobs, results, done) } go wait(results, done) process(results) } |
Listing 2 |
In Listing 2, both
nWorkers
and
bufferSize
are constants (6 and 24; not shown).
The
main()
function begins by creating three channels, one for passing jobs to worker goroutines, one for receiving all the results, and another to keep track of when each worker goroutine has finished.
By default channels are unbuffered (their size is 0) which means that a receive will block until there is a send and a send will block if there’s a sent item that hasn’t been received. By buffering we allow a channel to accept as many sends as the size of the buffer, before sends are blocked. Similarly, we can do as many receives as there are items in the buffer, only blocking when the buffer is empty. The purpose of buffering is to improve throughput by minimizing the time goroutines spend being blocked.
In this example we have buffered all the channels by giving
make()
a second buffer-size argument. We have made the
jobs
channel large enough to accept (an average of) two jobs per worker goroutine and made the results channel big enough to accept plenty of results without blocking the workers. The
done
channel’s buffer’s size is the same as the number of workers since, as we will see, each worker sends to that channel exactly once.
To execute code in a separate goroutine we use the
go
keyword. This keyword must be followed by a function call (which could be a call on a function literal – which is also a closure). The go statement completes ‘immediately’ and the called function is executed in a newly created goroutine. When the function finishes the Go runtime system automatically gets rid of its goroutine and reclaims the memory it used.
Here, the
main()
function executes the
addJobs()
function in its own separate goroutine, so execution continues immediately to the
for
loop. In the
for
loop six separate goroutines are created, each one executing an instance of the
doJobs()
function. All the newly created goroutines share the
same
jobs
channel and the
same
results
channel. The
for
loop completes as soon as the goroutines have been created and started and then another function is called,
wait()
, again in its own separate goroutine. And finally, we call the
process()
function in the current (
main
) goroutine.
Figure 1 schematically illustrates the relationships between the program’s goroutines and channels.
Figure 1 |
Once the
main
goroutine has finished, the program will terminate – even if there are other goroutines still executing. So, we must ensure that all the other goroutines finish their work before we leave
main()
.
The
addJobs()
function is used to populate the
jobs
channel and is shown in Listing 3, but with the code for reading in the URLs elided.
func addJobs(jobs chan Job) { reader := bufio.NewReader(os.Stdin) for { ... // Read in a URL url = strings.TrimSpace(url) if len(url) > 0 { jobs <- Job{url} } } close(jobs) } |
Listing 3 |
Each job simply consists of a URL to check. URLs are read from
os.Stdin
(e.g., by using redirection on the command line). At each iteration we read both a line and an
error
value; if the error is
io.EOF
we have finished and break out of the
for
loop. (All of this has been elided.)
Once all the jobs have been added the
jobs
channel is closed to signify that there are no more jobs to be added. Sending to a channel is done using the syntax
channel <- item
. Items can be received from a channel that is non-empty, even if it is closed, so no jobs will be lost. When the
addJobs()
function has finished the Go runtime system will take care of removing the goroutine in which it ran and reclaiming its memory.
The
doJobs()
function is shown in Listing 4. It is simple because it passes all its work on to a method of the
Job
type (not shown). The
Job.Do()
method sends one result of type
Result
to the
results
channel using the statement
results <- result
.
func doJobs(jobs chan Job, results chan Result, done chan bool) { for job := range jobs { job.Do(results) } done <- true } |
Listing 4 |
Go’s
for ... range
statement can iterate over maps (data dictionaries like C++11’s
unordered_map
), slices (in effect, variable length arrays), and channels. If the channel has an item it is received and assigned to the
for
loop’s variable (here,
job
); if the channel has no item but isn’t closed the loop
blocks
. Of course, this does not hold up the rest of the program, only the goroutine in which the loop is executing is blocked. The loop terminates when the channel is empty and closed.
Once all the jobs are done the function sends a
bool
to the
done
channel. Whether
true
or
false
is sent doesn’t matter, since the
done
channel is used purely to keep the program alive until all the jobs are done.
The
headcheck
program has one goroutine adding jobs to the
jobs
channel and six goroutines reading and processing jobs from the same channel, all of them working concurrently. Yet, we don’t have to worry about locking – Go handles all the synchronization for us.
Listing 5 shows the
wait()
function which was executed by
main()
in its own goroutine. This function has a regular
for
loop that iterates for as many worker goroutines as were created, and at each iteration it does a
blocking
receive using the syntax
item <- channel
. Notice that it doesn’t matter whether
true
or
false
was sent – we only care that
something
was sent – since we discard the channel’s items. Once all the workers have sent to the
done
channel we know that there can be no more results added to the
results
channel, so we close that channel.
func wait(results chan Result, done chan bool) { for i := 0; i < nWorkers; i++ { <-done } close(results) } |
Listing 5 |
Listing 6 shows the
process()
function which is executed in the
main
goroutine. This function iterates over the
results
channel and blocks if no result is available. The
for
loop terminates when the
results
channels is empty and closed, which will only happen when the
wait()
function finishes. This ensures that this function blocks the
main
goroutine until every result has been received and output.
func process(results chan result) { for result := range results { result.Report(os.Stdout) } } |
Listing 6 |
We could replace the
wait()
and
process()
functions with a single
waitAndProcess()
function executed in the main goroutine, as Listing 7 illustrates.
func waitAndProcess(results <-chan Result, done <-chan struct{}) { for working := nWorkers; working > 0; { select { // Blocking case result := <-results: result.Report(os.Stdout) case <-done: working-- } } for { select { // Non-blocking case result := <-results: result.Report(os.Stdout) default: return } } } |
Listing 7 |
This function begins with a
while
loop that iterates so long as there is at least one worker still working. The
select
statement is structurally like a
switch
statement, only it works in terms of channel communications. A
select
with no
default
case is
blocking
. So, here, the first
select
blocks until it receives either a result or an empty
struct
.
Since we don’t care what’s sent to the
done
channel, only whether something’s sent, we have defined the channel to be of type
chan struct{}
. This channel’s value type specifies a
struct
with no fields; there is only one possible value of such a type and this is specified using
struct{}{}
which means create a (zero) value of type
struct{}
. Since such values have no data they are more expressive of our semantics than sending
bool
s whose value we would then ignore.
After each receive the
select
is broken out of and the loop condition is checked. This causes the
main
goroutine to block until all the workers have finished sending their results (because they only send to the
done
channel after they have finished all their jobs).
It is quite possible that after all the worker goroutines have finished there are still unprocessed results in the
results
channel (after all, we buffered the channel when we created it). So we execute a second
for
loop (an infinite loop) that uses a non-blocking
select
. So long as there are results in the
results
channel the
select
will receive each one and finish, and the
for
loop will iterate again. But once the
results
channel is empty and closed the
default
case will be executed, and the function returned from. At this point all the results have been output and the program will terminate.
A pipelining approach
Goroutines and channels are very flexible and versatile, to the extent that we can take some quite different approaches to concurrency. Listing 8 illustrates an alternative
headcheck
implementation’s
main()
function.
func main() { results := make(chan Result, bufferSize) go sink(processImages(processHTML( source(results)))) for result := range results { result.Report(os.Stdout) } } |
Listing 8 |
Unlike the previous versions, this
headcheck
program only reports on URLs for HTML files and images, and ignores anything else. We could always add another pipeline component, say,
processRemainder()
, if we didn’t want to ignore any URLs.
Figure 2 schematically illustrates the relationships between the program’s goroutines and channels.
Figure 2 |
The function begins by creating a buffered
results
channel and then it executes a pipeline in a separate goroutine. (And as we will see, each component of the pipeline itself executes in a separate goroutine.) Control immediately passes to the
for ... range
loop which blocks waiting for results and finishes when the
results
channel is empty and closed. (The
Result
type was shown in Listing 1.) The
source()
function shown in Listing 9 is where the pipeline begins.
func source(results chan Result) ( <-chan string, chan Result) { out := make(chan string, bufferSize) go func() { reader := bufio.NewReader(os.Stdin) for { ... // Read in a URL out <- url } close(out) }() return out, results } |
Listing 9 |
A Go function’s return value is specified after the closing parenthesis that follows the function’s arguments. Multiple values can be returned simply by enclosing them in parentheses. Here we return a receive-only channel and a send–receive channel.
The
source()
function is passed the
results
channel, which it simply returns to its caller. All the pipeline functions share the same
results
channel. The function starts by creating a buffered output channel which is initially bi-directional. It then creates a goroutine to populate the
out
channel with jobs (in this case URL strings), after which the goroutine closes the channel. By closing the channel, future receivers (e.g., using a
for ... range
loop) will know when to finish. The
go func() { ... }()
creates a function literal (which is a closure) and executes it in a separate goroutine. So processing immediately continues to the
source()
function’s last statement which simply returns the
out
channel as a receive-only channel thanks to the way the function’s return value is declared, as well as the
results
channel.
The
processHTML()
function shown in Listing 10 has the same structure as all the pipeline component functions except for the
source()
function. It is passed two channels, a receive-only jobs channel (of type
chan string
) which it calls
in
(and which was the previous pipeline component’s
out
channel), and the shared
results
channel. The function creates a new buffered bi-directional
out
channel with the same buffer size as the capacity of the
in
channel the function has been passed. It then creates a goroutine which executes a new function literal. The goroutine reads jobs from the
in
channel. This channel is closed by the previous pipeline component when it has been fully populated, so this goroutine’s
for ... range
loop is guaranteed to terminate. For each job (URL) received, for those that this function can process it performs its processing (in this case a call to a
Check()
function), and sends the result to the
results
channel. And those jobs the function isn’t concerned with are simply sent to the (new)
out
channel. At the end the function returns its
out
channel and the shared
results
channel.
func processHTML(in <-chan string, results chan Result) (<-chan string, chan Result) { out := make(chan string, cap(in)) go func() { for url := range in { suffix := strings.ToLower( filepath.Ext(url)) if suffix == ".htm" || suffix == ".html" { results <- Check(url) } else { out <- url } } close(out) }() return out, results } |
Listing 10 |
The
processImages()
function has the same signature and uses the same logic.
The
sink()
function takes a receive-only
jobs
channel and a receive-only
results
channel. It is shown in Listing 11.
func sink(in <-chan string, results chan Result) { for _ = range in { // Drain unprocessed URLs } close(results) } |
Listing 11 |
The
sink()
function is the last one in the pipeline. It iterates over the penultimate pipeline component’s jobs channel, draining it until it is empty. (It might be empty in the first place if every job has been done by one or other pipeline component.)
At the end the function closes the
results
channel. Closing the
results
channel is essential to ensure that the
for ... range
loop in
main()
terminates. But we must be careful not to close the channel when one or more goroutines might still want to send to it. Looking back at the implementations of the
source()
and
processHTML()
functions we can see that each creates its own
out
channel which it ultimately closes when it has finished processing. The last of these
out
channels is passed to the
sink()
function as its
in
channel – and this channel isn’t closed until all the previous pipeline components have finished reading their
in
channels and closed their
out
channels. In view of this we know that once the
for ... range
loop in the
sink()
function has finished, all of the pipeline’s preceding components have finished processing and no more results could be sent to the
results
channel, and hence the channel is safe to close.
The pipeline-based
headcheck
program ends up with five goroutines: one executing
sink()
, one started by
source()
, one started by
processHTML()
, one started by
processImages()
, and the main goroutine that
main()
executes in. All of these goroutines operate concurrently, passing type-safe values via channels, only terminating when their work is done, and with no explicit locks.
Thinking in terms of goroutines and channels is very different from thinking in terms of threads and locks, and may take some getting used to! Also, keep in mind that in this article we have seen just some of the possible ways of using goroutines and channels – many other approaches are possible. Go is a fascinating language, not just for its innovative approach to concurrency, but also for its clean syntax and very different approach to object orientation, and is well worth getting to know.
Further information
The Go Programming Language: http://golang.org/