[[org.clojure/clojure "1.7.0"] [org.clojure/core.async "0.1.346.0-17112a-alpha"]]
Mastering Concurrent Processes with core.async
One day, while you are walking down the street, you will be surprised, intrigued, and a little disgusted to discover a hot dog vending machine. Your scalp tingling with guilty curiosity, you won’t be able to help yourself from pulling out three dollars and seeing if this contraption actually works. After accepting your money with a click and a whir, it pops out a fresh hot dog, bun and all.
The vending machine exhibits simple behavior: when it receives money, it releases a hot dog and then gets ready for the next purchase. When it’s out of hot dogs, it stops. All around us are hot dog vending machines in different guises—independent entities concurrently responding to events in the world. The espresso machine at your favorite coffee shop, the pet hamster you loved as a child—everything can be deconstructed into a set of behaviors that follow the general form “when x happens, do y.” Even the programs we write are just glorified hot dog vending machines, each one an independent process waiting for the next event, whether it’s a keystroke, a timeout, or the arrival of data on a socket.
Clojure’s core.async library allows you to create multiple independent processes within a single program. This chapter describes a useful model for thinking about this style of programming as well as the practical details you need to know to actually write code. You’ll learn how to use channels to communicate between independent processes created by go blocks and
thread; a bit about how Clojure manages threads efficiently with parking and blocking; how to use
alts!!; and a more straightforward way of creating queues. Finally, you’ll learn how to kick callbacks in the butt with process pipelines.
Getting Started with Processes
At the heart of core.async is the process, a concurrently running unit of logic that responds to events. The process corresponds to our mental model of the real world: entities interact with and respond to each other independently without some kind of central control mechanism pulling the strings. You put your money in the machine, and out comes a hot dog, all without the Illuminati or Big Brother orchestrating the whole thing. This differs from the view of concurrency you’ve been exploring so far, where you’ve defined tasks that are either mere extensions of the main thread of control (for example, achieving data parallelism with
pmap) or tasks that you have no interest in communicating with (like one-off tasks created with
It might be strange to think of a vending machine as a process: vending machines are noun-y and thing-y, and processes are verb-y and do-y. To get in the right mindset, try defining real-world objects as the sum of their event-driven behavior. When a seed gets watered, it sprouts; when a mother looks at her newborn child, she feels love; and when you watch Star Wars Episode I, you are filled with anger and despair. If you want to get super philosophical, consider whether it’s possible to define every thing’s essence as the set of the events it recognizes and how it responds. Is reality just the composition of hot dog vending machines?
Anyway, enough of my yakking! Let’s move from the theoretical to the concrete by creating some simple processes. First, create a new Leiningen project called playsync with
lein new app playsync. Then, open the file project.clj and add core.async to the
:dependencies vector so it reads as follows:
Note It’s possible that the core.async version has advanced since I wrote this. For the latest version, check the core.async GitHub project page. But for the purpose of these exercises, please use the version listed here.
Next, open src/playsync/core.clj and make it look like this:
(ns playsync.core (:require [clojure.core.async :as a :refer [>! <! >!! <!! go chan buffer close! thread alts! alts!! timeout]]))
Now when you open this in a REPL, you’ll have the most frequently used core.async functions at your disposal. Great! Before creating something as sophisticated and revolutionary as a hot dog vending machine, create a process that simply prints the message it receives:
(def echo-chan (chan)) (go (println (<! echo-chan))) (>!! echo-chan "ketchup") ; => true ; => ketchup
At the first line of code, you used the
chan function to create a channel named
echo-chan. Channels communicate messages. You can put messages on a channel and take messages off a channel. Processes wait for the completion of put and take—these are the events that processes respond to. You can think of processes as having two rules: 1) when trying to put a message on a channel or take a message off of it, wait and do nothing until the put or take succeeds, and 2) when the put or take succeeds, continue executing.
On the next line, you used
go to create a new process. Everything within the
go expression—called a go block—runs concurrently on a separate thread. Go blocks run your processes on a thread pool that contains a number of threads equal to two plus the number of cores on your machine, which means your program doesn’t have to create a new thread for each process. This often results in better performance because you avoid the overhead associated with creating threads.
In this case, the process
(println (<! echo-chan)) expresses “when I take a message from
echo-chan, print it.” The process is shunted to another thread, freeing up the current thread and allowing you to continue interacting with the REPL.
In the expression
<! is the take function. It listens to the channel you give it as an argument, and the process it belongs to waits until another process puts a message on the channel. When
<! retrieves a value, the value is returned and the
println expression is executed.
(>!! echo-chan "ketchup") puts the string
echo-chan and returns
true. When you put a message on a channel, the process blocks until another process takes the message. In this case, the REPL process didn’t have to wait at all, because there was already a process listening to the channel, waiting to take something off it. However, if you do the following, your REPL will block indefinitely:
(>!! (chan) "mustard")
You’ve created a new channel and put something on it, but there’s no process listening to that channel. Processes don’t just wait to receive messages; they also wait for the messages they put on a channel to be taken.
It’s worth noting that the previous exercise contained two processes: the one you created with
go and the REPL process. These processes don’t have explicit knowledge of each other, and they act independently.
Let’s imagine that these processes take place in a diner. The REPL is the ketchup chef, and when he’s done with a batch, he belts out, “Ketchup!” It’s entirely possible that the rest of the staff is outside admiring the latest batch of oregano in their organic garden, and the chef just sits and waits until someone shows up to take his ketchup. On the flip side, the
go process represents one of the staff, and he’s waiting patiently for something to respond to. It could be that nothing ever happens, and he just waits indefinitely until the restaurant closes.
This situation seems a little silly: what self-respecting ketchup chef would just sit and wait for someone to take his latest batch before making more ketchup? To avoid this tragedy, you can create buffered channels:
(def echo-buffer (chan 2)) (>!! echo-buffer "ketchup") ; => true (>!! echo-buffer "ketchup") ; => true (>!! echo-buffer "ketchup") ; This blocks because the channel buffer is full
(Be careful evaluating the last
(>!! echo-buffer "ketchup") because it will block your REPL. If you’re using a Leiningen REPL, ctrl-C will unblock it.)
In this case, you’ve created a channel with buffer size 2. That means you can put two values on the channel without waiting, but putting a third one on means the process will wait until another process takes a value from the channel. You can also create sliding buffers with
sliding-buffer, which drops values in a first-in, first-out fashion; and dropping buffers with
dropping-buffer, which discards values in a last-in, first-out fashion. Neither of these buffers will ever cause
>!! to block.
By using buffers, the master ketchup chef can keep whipping up batches of mouthwatering ketchup without having to wait for his staff to take them away. If he’s using a regular buffer, it’s like he has a shelf to put all his ketchup batches on; once the shelf is full, he’ll still have to wait for space to open up. If he’s using a sliding buffer, he’d throw away the oldest batch of ketchup when the shelf is full, slide all the ketchup down, and put the new batch in the vacant space. With a dropping buffer, he’d just knock the freshest batch off of the shelf and put his new batch in that space.
Buffers are just elaborations of the core model: processes are independent, concurrently executing units of logic that respond to events. You can create processes with go blocks and communicate events over channels.
Blocking and Parking
You may have noticed that the take function
<! used only one exclamation point, whereas the put function
>!! used two. In fact, both put and take have one-exclamation-point and two-exclamation-point varieties. When do you use which? The simple answer is that you can use one exclamation point inside go blocks, but you have to use two exclamation points outside of them:
|Inside go block||Outside go block|
It all comes down to efficiency. Because go blocks use a thread pool with a fixed size, you can create 1,000 go processes but use only a handful of threads:
(def hi-chan (chan)) (doseq [n (range 1000)] (go (>! hi-chan (str "hi " n))))
To understand how Clojure accomplishes this, we need to explore how processes wait. Waiting is a key aspect of working with core.async processes: we’ve already established that put waits until another process does a take on the same channel, and vice versa. In this example, 1,000 processes are waiting for another process to take from
There are two varieties of waiting: parking and blocking. Blocking is the kind of waiting you’re familiar with: a thread stops execution until a task is complete. Usually this happens when you’re doing some kind of I/O operation. The thread remains alive but doesn’t do any work, so you have to create a new thread if you want your program to continue working. In Chapter 9, you learned how to do this with
Parking frees up the thread so it can keep doing work. Let’s say you have one thread and two processes, Process A and Process B. Process A is running on the thread and then waits for a put or take. Clojure moves Process A off the thread and moves Process B onto the thread. If Process B starts waiting and Process A’s put or take has finished, then Clojure will move Process B off the thread and put Process A back on it. Parking allows the instructions from multiple processes to interleave on a single thread, similar to the way that using multiple threads allows interleaving on a single core. The implementation of parking isn’t important; suffice it to say that it’s only possible within go blocks, and it’s only possible when you use
<!, or parking put and parking take.
<!! are blocking put and blocking take.
There are definitely times when you’ll want to use blocking instead of parking, like when your process will take a long time before putting or taking, and for those occasions you should use
(thread (println (<!! echo-chan))) (>!! echo-chan "mustard") ; => true ; => mustard
thread acts almost exactly like
future: it creates a new thread and executes a process on that thread. Unlike
future, instead of returning an object that you can dereference,
thread returns a channel. When
thread’s process stops, the process’s return value is put on the channel that
(let [t (thread "chili")] (<!! t)) ; => "chili"
In this case, the process doesn’t wait for any events; instead, it stops immediately. Its return value is
"chili", which gets put on the channel that’s bound to
t. We take from
The reason you should use
thread instead of a go block when you’re performing a long-running task is so you don’t clog your thread pool. Imagine you’re running four processes that download humongous files, save them, and then put the file paths on a channel. While the processes are downloading files and saving these files, Clojure can’t park their threads. It can park the thread only at the last step, when the process puts the files’ paths on a channel. Therefore, if your thread pool has only four threads, all four threads will be used for downloading, and no other process will be allowed to run until one of the downloads finishes.
>!! are the core tools you’ll use for creating and communicating with processes. Both put and take will cause a process to wait until its complement is performed on the given channel.
go allows you to use the parking variants of put and take, which could improve performance. You should use the blocking variants, along with
thread, if you’re performing long-running tasks before the put or take.
And that should give you everything you need to fulfill your heart’s desire and create a machine that turns money into hot dogs.
The Hot Dog Machine Process You’ve Been Longing For
Behold, your dreams made real!
(defn hot-dog-machine  (let [in (chan) out (chan)] (go (<! in) (>! out "hot dog")) [in out]))
This function creates an
in channel for receiving money and an
out channel for dispensing a hot dog. It then creates an asynchronous process with
go, which waits for money and then dispenses a hot dog. Finally, it returns the
out channels as a vector.
Time for a hot dog!
(let [[in out] (hot-dog-machine)] (>!! in "pocket lint") (<!! out)) ; => "hot dog"
In this snippet, you use destructuring (covered in Chapter 3) with
let to bind the
out channels to the
out symbols. You then put
"pocket lint" on the
in channel. The hot dog machine process waits for something, anything, to arrive on the
in channel; once
"pocket lint" arrives, the hot dog machine process resumes execution, putting
"hot dog" on the
Wait a minute . . . that’s not right. I mean, yay, free hot dogs, but someone’s bound to get upset that the machine’s accepting pocket lint as payment. Not only that, but this machine will only dispense one hot dog before shutting down. Let’s alter the hot dog machine function so that you can specify how many hot dogs it has and so it only dispenses a hot dog when you give it the number 3:
(defn hot-dog-machine-v2 [hot-dog-count] (let [in (chan) out (chan)] (go (loop [hc hot-dog-count] (if (> hc 0) (let [input (<! in)] ➊(if (= 3 input) (do (>! out "hot dog") (recur (dec hc))) (do (>! out "wilted lettuce") (recur hc)))) ➋(do (close! in) (close! out))))) [in out]))
There’s a lot more code here, but the strategy is straightforward. The new function
hot-dog-machine-v2 allows you to specify the
hot-dog-count. Within the go block at ➊, it dispenses a hot dog only if the number 3 (meaning three dollars) is placed on the
in channel; otherwise, it dispenses wilted lettuce, which is definitely not a hot dog. Once a process has taken the output, the hot dog machine process loops back with an updated hot dog count and is ready to receive money again.
When the machine process runs out of hot dogs, the process closes the channels at ➋. When you close a channel, you can no longer perform puts on it, and once you’ve taken all values off a closed channel, any subsequent takes will return
Let’s give the upgraded hot dog machine a go in Listing 11-1 by putting in money and pocket lint:
(let [[in out] (hot-dog-machine-v2 2)] (>!! in "pocket lint") (println (<!! out)) (>!! in 3) (println (<!! out)) (>!! in 3) (println (<!! out)) (>!! in 3) (<!! out)) ; => wilted lettuce ; => hotdog ; => hotdog ; => nil
- Listing 11-1. Interacting with a robust hot dog vending machine process
First, we try the ol’ pocket lint trick and get wilted lettuce. Next, we put in 3 dollars twice and get a hot dog both times. Then, we try to put in another 3 dollars, but that’s ignored because the channel is closed; the number 3 is not put on the channel. When we try to take from the
out channel, we get
nil, again because the channel is closed. You might notice a couple of interesting details about
hot-dog-machine-v2. First, it does a put and a take within the same go block. This isn’t that unusual, and it’s one way you can create a pipeline of processes: just make the in channel of one process the out channel of another. The following example does just that, passing a string through a series of processes that perform transformations until the string finally gets printed by the last process:
(let [c1 (chan) c2 (chan) c3 (chan)] (go (>! c2 (clojure.string/upper-case (<! c1)))) (go (>! c3 (clojure.string/reverse (<! c2)))) (go (println (<! c3))) (>!! c1 "redrum")) ; => MURDER
I’ll have more to say about process pipelines and how you can use them instead of callbacks toward the end of the chapter.
Back to Listing 11-1! Another thing to note is that the hot dog machine doesn’t accept more money until you’ve dealt with whatever it’s dispensed. This allows you to model state-machine-like behavior, where the completion of channel operations triggers state transitions. For example, you can think of the vending machine as having two states: ready to receive money and dispensed item. Inserting money and taking the item trigger transitions between the two.
The core.async function
alts!! lets you use the result of the first successful channel operation among a collection of operations. We did something similar to this with delays and futures in “Delays” on page 198. In that example, we uploaded a set of headshots to a headshot-sharing site and notified the headshot owner when the first photo was uploaded. Here’s how you’d do the same with
(defn upload [headshot c] (go (Thread/sleep (rand 100)) (>! c headshot))) ➊ (let [c1 (chan) c2 (chan) c3 (chan)] (upload "serious.jpg" c1) (upload "fun.jpg" c2) (upload "sassy.jpg" c3) ➋ (let [[headshot channel] (alts!! [c1 c2 c3])] (println "Sending headshot notification for" headshot))) ; => Sending headshot notification for sassy.jpg
upload function takes a headshot and a channel, and creates a new process that sleeps for a random amount of time (to simulate the upload) and then puts the headshot on the channel. The
let bindings and
upload function calls beginning at ➊ should make sense: we create three channels and then use them to perform the uploads.
Things get interesting at ➋. The
alts!! function takes a vector of channels as its argument. This is like saying, “Try to do a blocking take on each of these channels simultaneously. As soon as a take succeeds, return a vector whose first element is the value taken and whose second element is the winning channel.” In this case, the channel associated with sassy.jpg received a value first. The other channels are still available if you want to take their values and do something with them. All
alts!! does is take a value from the first channel to have a value; it doesn’t touch the other channels.
One cool aspect of
alts!! is that you can give it a timeout channel, which waits the specified number of milliseconds and then closes. It’s an elegant mechanism for putting a time limit on concurrent operations. Here’s how you could use it with the upload service:
(let [c1 (chan)] (upload "serious.jpg" c1) (let [[headshot channel] (alts!! [c1 (timeout 20)])] (if headshot (println "Sending headshot notification for" headshot) (println "Timed out!")))) ; => Timed out!
In this case, we set the timeout to 20 milliseconds. Because the upload didn’t finish in that time frame, we got a timeout message.
You can also use
alts!! to specify put operations. To do that, place a vector inside the vector you pass to
alts!!, like at ➊ in this example:
(let [c1 (chan) c2 (chan)] (go (<! c2)) ➊ (let [[value channel] (alts!! [c1 [c2 "put!"]])] (println value) (= channel c2))) ; => true ; => true
Here you’re creating two channels and then creating a process that’s waiting to perform a take on
c2. The vector that you supply to
alts!! tells it, “Try to do a take on
c1 and try to put
c2. If the take on
c1 finishes first, return its value and channel. If the put on
c2 finishes first, return
true if the put was successful and
false otherwise.” Finally, the result of
value (which is
true, because the
c2 channel was open) prints and shows that the channel returned was indeed
alts!! has a parking alternative,
alts!, which you can use inside go blocks.
alts! is a nice way to exercise some choice over which of a group of channels you put or take from. It still performs puts and takes, so the same reasons to use the parking or blocking variation apply.
And that covers the core.async basics! The rest of the chapter explains two common patterns for coordinating processes.
In “Rolling Your Own Queue” on page 202, you wrote a macro that let you queue futures. Processes let you use a similar technique in a more straightforward manner. Let’s say you want to get a bunch of random quotes from a website and write them to a single file. You want to make sure that only one quote is written to a file at a time so the text doesn’t get interleaved, so you put your quotes on a queue. Here’s the full code:
(defn append-to-file "Write a string to the end of a file" [filename s] (spit filename s :append true)) (defn format-quote "Delineate the beginning and end of a quote because it's convenient" [quote] (str "=== BEGIN QUOTE ===\n" quote "=== END QUOTE ===\n\n")) (defn random-quote "Retrieve a random quote and format it"  (format-quote (slurp "http://www.braveclojure.com/random-quote"))) (defn snag-quotes [filename num-quotes] (let [c (chan)] (go (while true (append-to-file filename (<! c)))) (dotimes [n num-quotes] (go (>! c (random-quote))))))
random-quote have docstrings that explain what they do.
snag-quotes is where the interesting work happens. First, it creates a channel that’s shared between the quote-producing processes and the quote-consuming process. Then it creates a process that uses
while true to create an infinite loop. On every iteration of the loop, it waits for a quote to arrive on
c and then appends it to a file. Finally,
snag-quotes creates a
num-quotes number of processes that fetch a quote and then put it on
c. If you evaluate
(snag-quotes "quotes" 2) and check the quotes file in the directory where you started your REPL, it should have two quotes:
=== BEGIN QUOTE === Nobody's gonna believe that computers are intelligent until they start coming in late and lying about it. === END QUOTE === === BEGIN QUOTE === Give your child mental blocks for Christmas. === END QUOTE ===
This kind of queuing differs from the example in Chapter 9. In that example, each task was handled in the order it was created. Here, each quote-retrieving task is handled in the order that it finishes. In both cases, you ensure that only one quote at a time is written to a file.
Escape Callback Hell with Process Pipelines
In languages without channels, you need to express the idea “when x happens, do y” with
The reason it’s called callback hell is that it’s very easy to create dependencies among layers of callbacks that aren’t immediately obvious. They end up sharing state, making it difficult to reason about the state of the overall system as the callbacks get triggered. You can avoid this depressing outcome by creating a process pipeline. That way, each unit of logic lives in its own isolated process, and all communication between units of logic occurs through explicitly defined input and output channels.
In the following example, we create three infinitely looping processes connected through channels, passing the out channel of one process as the in channel of the next process in the pipeline:
(defn upper-caser [in] (let [out (chan)] (go (while true (>! out (clojure.string/upper-case (<! in))))) out)) (defn reverser [in] (let [out (chan)] (go (while true (>! out (clojure.string/reverse (<! in))))) out)) (defn printer [in] (go (while true (println (<! in))))) (def in-chan (chan)) (def upper-caser-out (upper-caser in-chan)) (def reverser-out (reverser upper-caser-out)) (printer reverser-out) (>!! in-chan "redrum") ; => MURDER (>!! in-chan "repaid") ; => DIAPER
By handling events using processes like this, it’s easier to reason about the individual steps of the overall data transformation system. You can look at each step and understand what it does without having to refer to what might have happened before it or what might happen after it; each process is as easy to reason about as a pure function.
Clojure’s core.async library was largely inspired by Go’s concurrency model, which is based on the work by Tony Hoare in Communicating Sequential Processes and is available at http://www.usingcsp.com/.
Rob Pike, co-creator of Go, has a good talk on concurrency, which is available at https://www.youtube.com/watch?v=f6kdp27TYZs.
ClojureScript, also known as the best thing to happen to the browser, uses core.async. No more callback hell! You can learn about ClojureScript at https://github.com/clojure/clojurescript.
Finally, check out the API docs at http://clojure.github.io/core.async/.
In this chapter, you learned about how core.async allows you to create concurrent processes that respond to the put and take communication events on channels. You learned about how to use
thread to create concurrent processes that wait for communication events by parking and blocking. You also learned how to create process pipelines by making the out channel of one process the in channel of another, and how this allows you to write code that’s way more intelligible than nested callbacks. Finally, you meditated on whether or not you’re just a fancy hot dog vending machine.