【译】Brave Clojure 第十一章:用core.async掌握并发过程

本文是我对Clojure书籍 CLOJURE FOR THE BRAVE AND TRUE第十一章Mastering Concurrent Processes with core.async 做的翻译。翻译形式,中英对照,英文引用跟着中文翻译。如有错误,在所难免,欢迎指正。

其他章的翻译在这里

译文开始。


One day, while you are walking down the street, you will be surprised, intrigued, and a little disgusted to discover a hot dog vending machine. Your scalp tingling with guilty curiosity, you won’t be able to help yourself from pulling out three dollars and seeing if this contraption actually works. After accepting your money with a click and a whir, it pops out a fresh hot dog, bun and all.

有一天你在街上看见一个热狗贩卖机。

hotdog-vending-machine

The vending machine exhibits simple behavior: when it receives money, it releases a hot dog and then gets ready for the next purchase. When it’s out of hot dogs, it stops. All around us are hot dog vending machines in different guises—independent entities concurrently responding to events in the world. The espresso machine at your favorite coffee shop, the pet hamster you loved as a child—everything can be deconstructed into a set of behaviors that follow the general form “when x happens, do y.” Even the programs we write are just glorified hot dog vending machines, each one an independent process waiting for the next event, whether it’s a keystroke, a timeout, or the arrival of data on a socket.

贩卖机的行为很简单:收钱,放出一个热狗,等待下次购买。热狗卖完了,就停止工作。我们周围到处都是伪装的热狗贩卖机-对事件并发响应的实体。咖啡店的咖啡机,宠物仓鼠-这一切都能分解为一组遵循一个通用形式的行为,这个形式就是“当x发生时,做y”。即使我们写的程序只是热狗贩卖机,但每个都是一个正在等待下次事件的独立过程,无论事件是按键,超时,或socket上的数据到达。

Clojure’s core.async library allows you to create multiple independent processes within a single program. This chapter describes a useful model for thinking about this style of programming as well as the practical details you need to know to actually write code. You’ll learn how to use channels to communicate between independent processes created by go blocks and thread; a bit about how Clojure manages threads efficiently with parking and blocking; how to use alts!!; and a more straight­forward way of creating queues. Finally, you’ll learn how to kick callbacks in the butt with process pipelines.

Clojure的core.async库让你能在单一程序里创建多个不相关的过程。这章描述了一个对于思考这种风格的编程有用的模型,也描述了写代码需要的细节。你将学习:如何使用通道(channel)在go代码块(go blocks)或thread创建的独立过程间通信;Clojure如何用停泊(parking)和阻塞(blocking)有效管理线程;如何使用alts!!;用更直接方法创建队列;如何用过程管道干掉回调。

Getting Started with Processes

从过程开始

At the heart of core.async is the process, a concurrently running unit of logic that responds to events. The process corresponds to our mental model of the real world: entities interact with and respond to each other independently without some kind of central control mechanism pulling the strings. You put your money in the machine, and out comes a hot dog, all without the Illuminati or Big Brother orchestrating the whole thing. This differs from the view of concurrency you’ve been exploring so far, where you’ve defined tasks that are either mere extensions of the main thread of control (for example, achieving data parallelism with pmap) or tasks that you have no interest in communicating with (like one-off tasks created with future).

core.async的核心是过程,并发运行的,对事件做出响应的逻辑单元。过程符合我们对真实世界的思考模型: 实体独立地相互交互与响应,没有中央机构施加控制。你把钱放进机器,出现一个热狗,所有事情并没有任何人从中协调。这与你之前学习的并发不同,那些或是主控制线程的扩展(比如pmap),或是不再需要通信的任务(future创建的一次性任务)。

It might be strange to think of a vending machine as a process: vending machines are noun-y and thing-y, and processes are verb-y and do-y. To get in the right mindset, try defining real-world objects as the sum of their event-driven behavior. When a seed gets watered, it sprouts; when a mother looks at her newborn child, she feels love; and when you watch Star Wars Episode I, you are filled with anger and despair. If you want to get super philosophical, consider whether it’s possible to define every thing’s essence as the set of the events it recognizes and how it responds. Is reality just the composition of hot dog vending machines?

把贩卖机当作一个过程可能很奇怪:贩卖机是名词性的,过程是动词性的。为得到正确的思维,试试把真实世界的物体定义成它们的事件驱动的行为的总和。给一粒种子浇水,它会发芽;一位母亲看到她的新生孩子,她会感到爱;你看了星战前传1,你充满了愤怒和失望。如果你想从哲学上思考,考虑一下,是否可以把所有东西的本质定义为它们识别的事件和如何对其进行响应的集合。现实是否只是热狗贩卖机的组合呢?

Anyway, enough of my yakking! Let’s move from the theoretical to the concrete by creating some simple processes. First, create a new Leiningen project called playsync with lein new app playsync. Then, open the file project.clj and add core.async to the :dependencies vector so it reads as follows:

总之,讲的够多了!让我们从理论转移到一个具体例子上。用lein new app playsync创建一个叫做playsync的项目。然后打开文件projcect.clj并往:dependencies里添加core.async,如下所示:

1
2
[[org.clojure/clojure "1.7.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]]

Note It’s possible that the core.async version has advanced since I wrote this. For the latest version, check the core.async GitHub project page. But for the purpose of these exercises, please use the version listed here.

注意,core.async的版本可能已经更新了。你可以去它的GitHub项目仓库查看最新版。但出于练习的目的,请使用这个版本。

Next, open src/playsync/core.clj and make it look like this:

接下来,打开src/playsync/core.clj ,使它变成这样:

1
2
3
4
5
(ns playsync.core
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout]]))

Now when you open this in a REPL, you’ll have the most frequently used core.async functions at your disposal. Great! Before creating something as sophisticated and revolutionary as a hot dog vending machine, create a process that simply prints the message it receives:

现在,打开REPL,你已经可以使用core.async里最常用的函数了。很好!先来创建一个简单的过程,打印它收到的消息:

1
2
3
4
5
(def echo-chan (chan))
(go (println (<! echo-chan)))
(>!! echo-chan "ketchup")
; => true
; => ketchup

At the first line of code, you used the chan function to create a channel named echo-chan. Channels communicate messages. You can put messages on a channel and take messages off a channel. Processes wait for the completion of put and take—these are the events that processes respond to. You can think of processes as having two rules: 1) when trying to put a message on a channel or take a message off of it, wait and do nothing until the put or take succeeds, and 2) when the put or take succeeds, continue executing.

第一行用chan函数创建了一个叫做echo-chan通道。通道传递消息。你可以把消息放进通道,也可以从通道取得消息。过程会等着放进或取得,放进或取得成功时,会出现对应的事件,过程会对这个事件进行响应。你可以认为过程有两个规则: 1) 当尝试在一个通道放进或取得消息时,等着,什么都不干,直到放进或取得成功。2) 当放进或取得成功时,继续执行。

On the next line, you used go to create a new process. Everything within the go expression—called a go block—runs concurrently on a separate thread. Go blocks run your processes on a thread pool that contains a number of threads equal to two plus the number of cores on your machine, which means your program doesn’t have to create a new thread for each process. This often results in better performance because you avoid the overhead associated with creating threads.

下一行,用go创建了一个新过程。go表达式里的所有东西(叫go block)都并发运行在一个单独线程上。go block把你的代码放在一个线程池里运行,这个线程池的线程数等于处理器内核数加上2,这意味着你的程序不会为每个过程创建一个新线程。这避免了创建线程的损耗,通常会使性能更好。

In this case, the process (println (<! echo-chan)) expresses “when I take a message from echo-chan, print it.” The process is shunted to another thread, freeing up the current thread and allowing you to continue interacting with the REPL.

这个例子里,过程(println (<! echo-chan))表述的意思是:”当我从echo-chan取得一个消息时,打印这个消息。”这个过程被转移到另一个线程上,释放出当前线程,允许你继续与REPL交互。

In the expression (<! echo-chan), <! is the take function. It listens to the channel you give it as an argument, and the process it belongs to waits until another process puts a message on the channel. When <! retrieves a value, the value is returned and the println expression is executed.

表达式(<! echo-chan)里的<!take函数。它监听通道参数,包含这个函数的过程会等待,直到另一个过程在这个通道上放入一个消息。当<!取得一个值,这个值被返回,println表达式被执行。

The expression (>!! echo-chan "ketchup") puts the string "ketchup" on echo-chan and returns true. When you put a message on a channel, the process blocks until another process takes the message. In this case, the REPL process didn’t have to wait at all, because there was already a process listening to the channel, waiting to take something off it. However, if you do the following, your REPL will block indefinitely:

表达式(>!! echo-chan "ketchup")把字符串"ketchup"放在echo-chan上并返回true。当你在一个频道放进一个消息时,这个过程会阻塞,直到另一个过程取得这个消息。这个例子里,REPL进程不会阻塞,因为已经有一个过程正在监听这个通道,等待取得消息。但如果这么干,REPL将会无限阻塞:

1
(>!! (chan) "mustard")

You’ve created a new channel and put something on it, but there’s no process listening to that channel. Processes don’t just wait to receive messages; they also wait for the messages they put on a channel to be taken.

你创建了一个新通道,并在它上面放进些东西,但没有过程正在监听这个通道。过程不仅仅等待接收消息,也等待放进通道的消息被取走。

Buffering

缓冲

It’s worth noting that the previous exercise contained two processes: the one you created with go and the REPL process. These processes don’t have explicit knowledge of each other, and they act independently.

很重要的一点是,前面的练习包含两个过程: 用go创建的和REPL。这些过程互相不知道,并且独立行动。

Let’s imagine that these processes take place in a diner. The REPL is the ketchup chef, and when he’s done with a batch, he belts out, “Ketchup!” It’s entirely possible that the rest of the staff is outside admiring the latest batch of oregano in their organic garden, and the chef just sits and waits until someone shows up to take his ketchup. On the flip side, the go process represents one of the staff, and he’s waiting patiently for something to respond to. It could be that nothing ever happens, and he just waits indefinitely until the restaurant closes.

想象一下,所有这些过程发生在一个餐厅里。REPL是番茄酱大厨,当他做好一批时,大声喊到”“Ketchup!”。很有可能其他人正在外面聊天,大厨只是坐着并等着某人出现拿走他的番茄酱。另一方面,go过程代表其中一个人,他正在耐心等待某种东西,准备做出响应。有可能什么也不会发生,他无限等着直到饭馆关门。

This situation seems a little silly: what self-respecting ketchup chef would just sit and wait for someone to take his latest batch before making more ketchup? To avoid this tragedy, you can create buffered channels:

这种情况看着有点傻,大厨为什么要等着而不去做更多的番茄酱呢?为避免这个悲剧,可以创建有缓冲的通道:

1
2
3
4
5
6
7
(def echo-buffer (chan 2))
(>!! echo-buffer "ketchup")
; => true
(>!! echo-buffer "ketchup")
; => true
(>!! echo-buffer "ketchup")
; This blocks because the channel buffer is full

(Be careful evaluating the last (>!! echo-buffer "ketchup") because it will block your REPL. If you’re using a Leiningen REPL, ctrl-C will unblock it.)

(小心,求值最后一个(>!! echo-buffer "ketchup")时,REPL会阻塞。ctrl-C可以解锁)

In this case, you’ve created a channel with buffer size 2. That means you can put two values on the channel without waiting, but putting a third one on means the process will wait until another process takes a value from the channel. You can also create sliding buffers with sliding-buffer, which drops values in a first-in, first-out fashion; and dropping buffers with dropping-buffer, which discards values in a last-in, first-out fashion. Neither of these buffers will ever cause >!! to block.

这个例子里,创建了一个缓冲大小是2的通道。意味着可以无需等待地放入两个值,但放进第三个值时候,就要等待其他过程把这个值取走。你也可以用sliding-buffer创建滑动缓冲,它会按先进先出的方式丢弃值。也可以用dropping-buffer创建丢弃缓冲,它会按后进先出的方式丢弃值。这两种缓冲都不会使>!!阻塞。

By using buffers, the master ketchup chef can keep whipping up batches of mouthwatering ketchup without having to wait for his staff to take them away. If he’s using a regular buffer, it’s like he has a shelf to put all his ketchup batches on; once the shelf is full, he’ll still have to wait for space to open up. If he’s using a sliding buffer, he’d throw away the oldest batch of ketchup when the shelf is full, slide all the ketchup down, and put the new batch in the vacant space. With a dropping buffer, he’d just knock the freshest batch off of the shelf and put his new batch in that space.

通过使用缓冲,番茄酱大厨可以持续制作番茄酱,而无需等待做好的被取走。如果使用普通缓冲,就像他有个放置做好的番茄酱的架子,一旦架子放满了,他还是要等待架子上出现空余位置。如果他用的是滑动缓冲,架子满了,他就扔掉最旧的番茄酱,滑动所有的番茄酱,把新番茄酱放在空出的位置。如果用的是丢弃缓冲,他会扔掉架子上最新的番茄酱,把新做的放在那个位置上。

Buffers are just elaborations of the core model: processes are independent, concurrently executing units of logic that respond to events. You can create processes with go blocks and communicate events over channels.

缓冲只是核心模型的详细阐述: 过程是独立的,并发执行的,对事件做出响应的逻辑单元。可以用go blocks创建过程,并通过通道传达事件。

Blocking and Parking

阻塞与停泊

You may have noticed that the take function <! used only one exclamation point, whereas the put function >!! used two. In fact, both put and take have one-exclamation-point and two-exclamation-point varieties. When do you use which? The simple answer is that you can use one exclamation point inside go blocks, but you have to use two exclamation points outside of them:

你可能注意到take函数<!用的是一个叹号,put函数>!!用的是两个。实际上,它们都有一个和两个的形式。什么时候用哪个?简单答案是在go block里用一个叹号,在其外用两个叹号:

Inside go block Outside go block
put >! or >!! >!!
take <! or <!! <!!

It all comes down to efficiency. Because go blocks use a thread pool with a fixed size, you can create 1,000 go processes but use only a handful of threads:

所有原因都归结为效率。因为go blocks使用一个固定大小的线程池,所以你可以创建1000个go过程,但只使用几个线程:

1
2
3
(def hi-chan (chan))
(doseq [n (range 1000)]
(go (>! hi-chan (str "hi " n))))

To understand how Clojure accomplishes this, we need to explore how processes wait. Waiting is a key aspect of working with core.async processes: we’ve already established that put waits until another process does a take on the same channel, and vice versa. In this example, 1,000 processes are waiting for another process to take from hi-chan.

为了理解Clojure如何实现这件事,我们需要了解进程是如何等待的。等待是一个core.async工作的关键因素:我们已经知道,放入 会等待直到另一个过程在同一通道做一个取得操作,反之亦然。这个例子里,1000个过程正在等待另一个过程从hi-chan做取得操作。

There are two varieties of waiting: parking and blocking. Blocking is the kind of waiting you’re familiar with: a thread stops execution until a task is complete. Usually this happens when you’re doing some kind of I/O operation. The thread remains alive but doesn’t do any work, so you have to create a new thread if you want your program to continue working. In Chapter 9, you learned how to do this with future.

等待有两种变体:停泊阻塞。阻塞是你熟悉的那种等待: 线程停止执行直到任务完成。这通常发生在进行某些I/O操作时。这个线程保持存活,但什么工作都不干,为了程序继续工作,你不得不创建一个新线程。在第九章,你学过如何用future干这件事,点此查看

Parking frees up the thread so it can keep doing work. Let’s say you have one thread and two processes, Process A and Process B. Process A is running on the thread and then waits for a put or take. Clojure moves Process A off the thread and moves Process B onto the thread. If Process B starts waiting and Process A’s put or take has finished, then Clojure will move Process B off the thread and put Process A back on it. Parking allows the instructions from multiple processes to interleave on a single thread, similar to the way that using multiple threads allows interleaving on a single core. The implementation of parking isn’t important; suffice it to say that it’s only possible within go blocks, and it’s only possible when you use >! and <!, or parking put and parking take. >!! and <!! are blocking put and blocking take.

停泊释放当前线程,让它能继续工作。比如说有一个线程和两个过程,过程A和过程B。过程A正在这个线程上运行,然后等待一个放入或取得。Clojure把过程A从这个线程挪走,并把过程B挪到这个线程上。如果过程B开始等待,并且过程A的放入或取得已经完成,那么Clojure把过程B从这个线程挪走,并把过程A挪回来。停泊让多个过程的指令在一个线程上交替执行,类似于单核上的多线程交替执行。停泊的实现方式并不重要,只需要知道只能在go block里进行>!<!>!叫做停泊放进(parking put),<!叫做停泊取得(parking take),>!!叫做阻塞放进(blocking put),<!!叫做阻塞取得(blocking take)。

thread

thread

There are definitely times when you’ll want to use blocking instead of parking, like when your process will take a long time before putting or taking, and for those occasions you should use thread:

有时候你想使用阻塞而不想用停泊,比如你的过程放进或取得前,将花费很长时间,这种情况下,你应该使用thread:

1
2
3
4
(thread (println (<!! echo-chan)))
(>!! echo-chan "mustard")
; => true
; => mustard

thread acts almost exactly like future: it creates a new thread and executes a process on that thread. Unlike future, instead of returning an object that you can dereference, thread returns a channel. When thread’s process stops, the process’s return value is put on the channel that thread returns:

thread的行为几乎与future一样: 创建一个新线程,并且在那个线程上执行一个过程。不同的是,不像future那样返回一个可以取值的对象,thread返回一个通道。当thread的过程停止,这个过程的返回值被放进这个通道:

1
2
3
(let [t (thread "chili")]
(<!! t))
; => "chili"

In this case, the process doesn’t wait for any events; instead, it stops immediately. Its return value is "chili", which gets put on the channel that’s bound to t. We take from t, returning "chili".

这个例子里,thread创建的过程立即返回值"chili",这个值被放进通道t。我们从通道t取得"chili"

The reason you should use thread instead of a go block when you’re performing a long-running task is so you don’t clog your thread pool. Imagine you’re running four processes that download humongous files, save them, and then put the file paths on a channel. While the processes are downloading files and saving these files, Clojure can’t park their threads. It can park the thread only at the last step, when the process puts the files’ paths on a channel. Therefore, if your thread pool has only four threads, all four threads will be used for downloading, and no other process will be allowed to run until one of the downloads finishes.

运行长时间任务时,应该使用thread而不是go blockd的理由是:这么做不会阻塞线程池。假设你正在运行4个过程,每个都下载一个大文件,保存文件,把文件路径放在一个通道上。这些过程正在下载和保存文件时,Clojure不能停泊它们的线程。只有到最后一步,过程把文件路径放进一个通道时,Clojure才能停泊它们。因此,如果你的线程池只有4个线程,所有线程都会被下载占用,其他过程都无法运行,直到其中一个下载结束。

go, thread, chan, <!, <!!, >!, and >!! are the core tools you’ll use for creating and communicating with processes. Both put and take will cause a process to wait until its complement is performed on the given channel. go allows you to use the parking variants of put and take, which could improve performance. You should use the blocking variants, along with thread, if you’re performing long-running tasks before the put or take.

go, thread, chan, <!, <!!, >!, 和 >!!是过程创建和通信的核心工具。放进和取得都会使过程等待,直到通道上有互补操作。go里面允许你使用放进和取得的停泊变体,以提高性能。如果放进或取得前,要执行长时间任务,应该使用阻塞变体加上thread

And that should give you everything you need to fulfill your heart’s desire and create a machine that turns money into hot dogs.

就这些。你可以用这些创建热狗贩卖机了。

The Hot Dog Machine Process You’ve Been Longing For

你渴望的热狗机

Behold, your dreams made real!

看,你梦想成真了!

1
2
3
4
5
6
7
(defn hot-dog-machine
[]
(let [in (chan)
out (chan)]
(go (<! in)
(>! out "hot dog"))
[in out]))

This function creates an in channel for receiving money and an out channel for dispensing a hot dog. It then creates an asynchronous process with go, which waits for money and then dispenses a hot dog. Finally, it returns the in and out channels as a vector.

这个函数创建了一个in通道,用于收钱,一个out通道用于给出热狗。然后用go创建了一个异步过程,等着收钱,然后放出热狗。最后,把inout放在一个vector里返回。

Time for a hot dog!

热狗交易:

1
2
3
4
(let [[in out] (hot-dog-machine)]
(>!! in "pocket lint")
(<!! out))
; => "hot dog"

In this snippet, you use destructuring (covered in Chapter 3) with let to bind the in and out channels to the in and out symbols. You then put "pocket lint" on the in channel. The hot dog machine process waits for something, anything, to arrive on the in channel; once "pocket lint" arrives, the hot dog machine process resumes execution, putting "hot dog" on the out channel.

这个例子,用解构(第3章讲解过)把inout通道绑定到inout符号。然后把"pocket lint"放进in通道。热狗机过程正在等待任何东西进入in通道,一旦"pocket lint"到达,热狗机过程继续执行,把"hot dog"放进out通道。

Wait a minute . . . that’s not right. I mean, yay, free hot dogs, but someone’s bound to get upset that the machine’s accepting pocket lint as payment. Not only that, but this machine will only dispense one hot dog before shutting down. Let’s alter the hot dog machine function so that you can specify how many hot dogs it has and so it only dispenses a hot dog when you give it the number 3:

等下,不对。免费热狗,热狗机接受了口袋里的绒毛作为付款。还有,这个机器只能发出一个热狗。修改一下,可以指定它有几个热狗,并让它只有接受数字3时候才发出热狗:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(defn hot-dog-machine-v2
[hot-dog-count]
(let [in (chan)
out (chan)]
(go (loop [hc hot-dog-count]
(if (> hc 0)
(let [input (<! in)]
➊(if (= 3 input)
(do (>! out "hot dog")
(recur (dec hc)))
(do (>! out "wilted lettuce")
(recur hc))))
➋(do (close! in)
(close! out)))))
[in out]))

There’s a lot more code here, but the strategy is straightforward. The new function hot-dog-machine-v2 allows you to specify the hot-dog-count. Within the go block at ➊, it dispenses a hot dog only if the number 3 (meaning three dollars) is placed on the in channel; otherwise, it dispenses wilted lettuce, which is definitely not a hot dog. Once a process has taken the output, the hot dog machine process loops back with an updated hot dog count and is ready to receive money again.

上面的代码虽然多,但很直接。函数hot-dog-machine-v2允许你指定hot-dog-count。在➊处的go block内,只有数字3(意思是3美元)放入通道in,才给出一个热狗,否则给出枯萎的莴苣。一旦一个过程取走了给出的东西,热狗机过程更新热狗计数,重新开始循环,准备好收钱。

When the machine process runs out of hot dogs, the process closes the channels at ➋. When you close a channel, you can no longer perform puts on it, and once you’ve taken all values off a closed channel, any subsequent takes will return nil.

当热狗机过程用完热狗,过程在➋处关闭通道,通道关闭后,不能在执行放入操作,一旦通道上的值都被取走,后续的取得操作将返回nil

Let’s give the upgraded hot dog machine a go in Listing 11-1 by putting in money and pocket lint:

使用一下升级的热狗机,放进钱和绒毛:

list 11-1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(let [[in out] (hot-dog-machine-v2 2)]
(>!! in "pocket lint")
(println (<!! out))

(>!! in 3)
(println (<!! out))

(>!! in 3)
(println (<!! out))

(>!! in 3)
(<!! out))
; => wilted lettuce
; => hotdog
; => hotdog
; => nil

First, we try the ol’ pocket lint trick and get wilted lettuce. Next, we put in 3 dollars twice and get a hot dog both times. Then, we try to put in another 3 dollars, but that’s ignored because the channel is closed; the number 3 is not put on the channel. When we try to take from the out channel, we get nil, again because the channel is closed. You might notice a couple of interesting details about hot-dog-machine-v2. First, it does a put and a take within the same go block. This isn’t that unusual, and it’s one way you can create a pipeline of processes: just make the in channel of one process the out channel of another. The following example does just that, passing a string through a series of processes that perform transformations until the string finally gets printed by the last process:

首先,我们尝试了绒毛把戏,得到了枯萎莴苣。接下来放入两次3美元,每次都得到一个热狗。然后,再次放入3美元,但被忽略了,因为通道关了,3美元没有放入通道。当我们尝试在out通道进行取得操作,得到了nil,因为通道已经关闭。你可能注意到hot-dog-machine-v2里一个有趣的细节,在同一个go block里进行了放进和取得。这很常见,这是创建过程管道的一种方法:只需让in通道利用一个过程,out 通道利用另一个过程。下面例子干的就是这个,让一个字符串通过一系列的转换处理,直到被最后一个过程打印:

1
2
3
4
5
6
7
8
(let [c1 (chan)
c2 (chan)
c3 (chan)]
(go (>! c2 (clojure.string/upper-case (<! c1))))
(go (>! c3 (clojure.string/reverse (<! c2))))
(go (println (<! c3)))
(>!! c1 "redrum"))
; => MURDER

I’ll have more to say about process pipelines and how you can use them instead of callbacks toward the end of the chapter.

过程管道先说到这,后面再接着说如何用它替代回调。

Back to Listing 11-1! Another thing to note is that the hot dog machine doesn’t accept more money until you’ve dealt with whatever it’s dispensed. This allows you to model state-machine-like behavior, where the completion of channel operations triggers state transitions. For example, you can think of the vending machine as having two states: ready to receive money and dispensed item. Inserting money and taking the item trigger transitions between the two.

回到list 11-1,另一个注意注意的是: 热狗机不会再收钱,直到你处理了它给出的东西。这让你能对类似状态机的行为建立模型,互补的通道操作触发状态转换。例如,你可以认为贩卖机有两种状态: 准备收钱给出了物品。放进钱和拿走物品触发状态间的转换。

alts!!

alts!!

The core.async function alts!! lets you use the result of the first successful channel operation among a collection of operations. We did something similar to this with delays and futures in “Delays” on page 198. In that example, we uploaded a set of headshots to a headshot-sharing site and notified the headshot owner when the first photo was uploaded. Here’s how you’d do the same with alts!!:

core.async的alts!!函数允许你使用操作集合里第一个成功的通道操作结果。我们用延期和未来在198页做了类似的事情。那个例子里,我们上传一组照片到照片共享网站,并且当第一张上传完成时,通知上传人(点击查看)。现在我们用alts!!做同样的事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(defn upload
[headshot c]
(go (Thread/sleep (rand 100))
(>! c headshot)))

➊ (let [c1 (chan)
c2 (chan)
c3 (chan)]
(upload "serious.jpg" c1)
(upload "fun.jpg" c2)
(upload "sassy.jpg" c3)
➋ (let [[headshot channel] (alts!! [c1 c2 c3])]
(println "Sending headshot notification for" headshot)))
; => Sending headshot notification for sassy.jpg

Here, the upload function takes a headshot and a channel, and creates a new process that sleeps for a random amount of time (to simulate the upload) and then puts the headshot on the channel. The let bindings and upload function calls beginning at ➊ should make sense: we create three channels and then use them to perform the uploads.

upload函数接受一个照片和一个通道,并创建了一个新过程,睡眠随机时间(模拟上传),然后把照片放进这个通道。在➊处,创建了3个通道,并用它们进行上传。

Things get interesting at ➋. The alts!! function takes a vector of channels as its argument. This is like saying, “Try to do a blocking take on each of these channels simultaneously. As soon as a take succeeds, return a vector whose first element is the value taken and whose second element is the winning channel.” In this case, the channel associated with sassy.jpg received a value first. The other channels are still available if you want to take their values and do something with them. All alts!! does is take a value from the first channel to have a value; it doesn’t touch the other channels.

在➋处,事情变的有意思了。alts!!函数接受一个成员是通道的vector作为参数。就像是在说:”同时在每个通道上尝试阻塞取得,一旦其中一个成功,返回一个vector,第一个成员是取得的值,第二个是对应的通道”。这个例子里,与sassy.jpg相关的通道第一个接受到了值。其他通道的值仍然可用。alts!!做的就是从第一个有值的通道取得值,并没有碰其他通道。

One cool aspect of alts!! is that you can give it a timeout channel, which waits the specified number of milliseconds and then closes. It’s an elegant mechanism for putting a time limit on concurrent operations. Here’s how you could use it with the upload service:

alts!!很cool的一个功能是:可以给它一个超时通道,这个通道等待指定毫秒的时间,然后关闭。这是个优雅的并发操作限时机制。代码如下:

1
2
3
4
5
6
7
(let [c1 (chan)]
(upload "serious.jpg" c1)
(let [[headshot channel] (alts!! [c1 (timeout 20)])]
(if headshot
(println "Sending headshot notification for" headshot)
(println "Timed out!"))))
; => Timed out!

In this case, we set the timeout to 20 milliseconds. Because the upload didn’t finish in that time frame, we got a timeout message.

这个例子里,设置了20毫秒的超时。因为上传在这个时限内没完成,我们得到了超时信息。

You can also use alts!! to specify put operations. To do that, place a vector inside the vector you pass to alts!!, like at ➊ in this example:

alts里也可以做放进操作。alts的vector参数里,用一个vector表示放进操作:

1
2
3
4
5
6
7
8
(let [c1 (chan)
c2 (chan)]
(go (<! c2))
(let [[value channel] (alts!! [c1 [c2 "put!"]])]
(println value)
(= channel c2)))
; => true
; => true

Here you’re creating two channels and then creating a process that’s waiting to perform a take on c2. The vector that you supply to alts!! tells it, “Try to do a take on c1 and try to put “put!” on c2. If the take on c1 finishes first, return its value and channel. If the put on c2 finishes first, return true if the put was successful and false otherwise.” Finally, the result of value (which is true, because the c2 channel was open) prints and shows that the channel returned was indeed c2.

代码里创建了两个通道,然后创建了一个等着在c2上进行取得操作的过程。传给alts!!的vector参数的意思是:“尝试在c1上进行取得,并且尝试在c2上放进”put!”。如果c1上的取得第一个完成,返回那个值和通道。如果c2上的放进先完成,放进成功则返回true,否则返回false”。最后,valuechannel被打印,证实返回的通道确实是c2

Like <!! and >!!, alts!! has a parking alternative, alts!, which you can use inside go blocks. alts! is a nice way to exercise some choice over which of a group of channels you put or take from. It still performs puts and takes, so the same reasons to use the parking or blocking variation apply.

<!!>!!一样,alts!也有个停泊版变体,alts!,只能在go blocks里使用。alts!是在一组通道里进行选择的好方法。它也执行放进和取得,所以前面讲过的何时使用停泊或阻塞的理由同样适用。

And that covers the core.async basics! The rest of the chapter explains two common patterns for coordinating processes.

core.async的基础知识讲完了!后面讲解两个协调过程的通用模式。

Queues

队列

In “Rolling Your Own Queue” on page 202, you wrote a macro that let you queue futures. Processes let you use a similar technique in a more straightforward manner. Let’s say you want to get a bunch of random quotes from a website and write them to a single file. You want to make sure that only one quote is written to a file at a time so the text doesn’t get interleaved, so you put your quotes on a queue. Here’s the full code:

在202页的”自建队列”,你写了宏把未来队列化(点击查看)。有了过程,你能用更直接的方式使用类似的技巧。假设你想从一个网站获得一些随机的名言,并写进一个文件里。你要确保一次只有一个名言写进文件以避免文字交叠在一起。你把名言放进一个队列。这是全部代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
(defn append-to-file
"Write a string to the end of a file"
[filename s]
(spit filename s :append true))

(defn format-quote
"Delineate the beginning and end of a quote because it's convenient"
[quote]
(str "=== BEGIN QUOTE ===\n" quote "=== END QUOTE ===\n\n"))

(defn random-quote
"Retrieve a random quote and format it"
[]
(format-quote (slurp "http://www.braveclojure.com/random-quote")))

(defn snag-quotes
[filename num-quotes]
(let [c (chan)]
(go (while true (append-to-file filename (<! c))))
(dotimes [n num-quotes] (go (>! c (random-quote))))))

The functions append-to-file, format-quote, and random-quote have docstrings that explain what they do. snag-quotes is where the interesting work happens. First, it creates a channel that’s shared between the quote-producing processes and the quote-consuming process. Then it creates a process that uses while true to create an infinite loop. On every iteration of the loop, it waits for a quote to arrive on c and then appends it to a file. Finally, snag-quotes creates a num-quotes number of processes that fetch a quote and then put it on c. If you evaluate (snag-quotes "quotes" 2) and check the quotes file in the directory where you started your REPL, it should have two quotes:

函数append-to-file, format-quote, 和 random-quote都有文档说明各自的功能。重点是snag-quotes。首先它创建了一个名言生产过程和名言消费过程共享的通道。然后用while true创建了一个无限循环的过程。每次循环,这个过程都等待名言进入通道c,然后把它写入一个文件。最后snag-quotes创建了num-quotes个过程,每个获取一个名言,然后放进通道c。如果你求值(snag-quotes "quotes" 2),并查看REPL启动目录下的文件quotes,应该看到两个名言:

1
2
3
4
5
6
7
8
=== BEGIN QUOTE ===
Nobody's gonna believe that computers are intelligent until they start
coming in late and lying about it.
=== END QUOTE ===

=== BEGIN QUOTE ===
Give your child mental blocks for Christmas.
=== END QUOTE ===

This kind of queuing differs from the example in Chapter 9. In that example, each task was handled in the order it was created. Here, each quote-retrieving task is handled in the order that it finishes. In both cases, you ensure that only one quote at a time is written to a file.

这种队列与第9章的不同。第9章的例子里,每个任务按照创建的顺序处理。这里,每个获取名言任务按照它们完成的顺序处理。两者都保证了一次只有一个名言写入文件。

Escape Callback Hell with Process Pipelines

用过程管道避免回调地狱

In languages without channels, you need to express the idea “when x happens, do y” with callbacks. In a language like JavaScript, callbacks are a way to define code that executes asynchronously once other code finishes. If you’ve worked with JavaScript, you’ve probably spent some time wallowing in callback hell.

在没有通道的语言里,你需要用回调表示这个想法”当x发生时候,做y”。在JavaScript这样的语言里,回调是定义当其他代码完成时异步执行的代码的方法。如果你用过JavaScript,你可能在回调地狱上花费过一些时间。

The reason it’s called callback hell is that it’s very easy to create dependencies among layers of callbacks that aren’t immediately obvious. They end up sharing state, making it difficult to reason about the state of the overall system as the callbacks get triggered. You can avoid this depressing outcome by creating a process pipeline. That way, each unit of logic lives in its own isolated process, and all communication between units of logic occurs through explicitly defined input and output channels.

叫做回调地狱的原因是非常容易建立回调层之间的不明显的依赖。最后它们共享状态,使回调触发时,整个系统的行为很难判断。你可以通过创建过程管道避免这种令人沮丧的结果。那样,每个逻辑单元都存在于自己的隔离过程里,它们之间的所有通信都通过明确定义的输入和输出通道完成。

In the following example, we create three infinitely looping processes connected through channels, passing the out channel of one process as the in channel of the next process in the pipeline:

下面的例子里,创建了三个无限循环的,通过通道连接起来的过程,把一个过程输出的通道,传递给下个过程,作为下个过程的输入通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(defn upper-caser
[in]
(let [out (chan)]
(go (while true (>! out (clojure.string/upper-case (<! in)))))
out))

(defn reverser
[in]
(let [out (chan)]
(go (while true (>! out (clojure.string/reverse (<! in)))))
out))

(defn printer
[in]
(go (while true (println (<! in)))))

(def in-chan (chan))
(def upper-caser-out (upper-caser in-chan))
(def reverser-out (reverser upper-caser-out))
(printer reverser-out)

(>!! in-chan "redrum")
; => MURDER

(>!! in-chan "repaid")
; => DIAPER

By handling events using processes like this, it’s easier to reason about the individual steps of the overall data transformation system. You can look at each step and understand what it does without having to refer to what might have happened before it or what might happen after it; each process is as easy to reason about as a pure function.

这样使用过程处理事件,整个数据转换中的独立步骤更容易理解。你可以查看每步并理解它做了什么,而不需要参考这步之前或之后发生了什么;每个过程都跟一个纯函数一样容易理解。

Additional Resources

更多资源

Clojure’s core.async library was largely inspired by Go’s concurrency model, which is based on the work by Tony Hoare in Communicating Sequential Processes and is available at http://www.usingcsp.com/.

Clojure的core.async库的灵感主要来源与Go的并发模型,此模型基于Tony Hoare的著作Communicating Sequential Processes,可以在这里查看 http://www.usingcsp.com/

Rob Pike, co-creator of Go, has a good talk on concurrency, which is available at https://www.youtube.com/watch?v=f6kdp27TYZs.

Rob Pike,Go的共同创造者,有一个很好的并发演讲,在这里 https://www.youtube.com/watch?v=f6kdp27TYZs

ClojureScript, also known as the best thing to happen to the browser, uses core.async. No more callback hell! You can learn about ClojureScript at https://github.com/clojure/clojurescript.

ClojureScript, 被称为:对于浏览器来说,最好的事发生了。ClojureScript使用了core.async。不在有回调地狱了!你可以在这里学习ClojureScript: https://github.com/clojure/clojurescript

Finally, check out the API docs at http://clojure.github.io/core.async/.

最后,这里有core.async的API文档 http://clojure.github.io/core.async/

Summary

总结

In this chapter, you learned about how core.async allows you to create concurrent processes that respond to the put and take communication events on channels. You learned about how to use go and thread to create concurrent processes that wait for communication events by parking and blocking. You also learned how to create process pipelines by making the out channel of one process the in channel of another, and how this allows you to write code that’s way more intelligible than nested callbacks. Finally, you meditated on whether or not you’re just a fancy hot dog vending machine.

这章知道了core.async允许创建并发过程,这些过程会对通道上的放进和取得这两种通信事件做出响应。学习了如何使用gothread创建并发的等待并发事件的过程,go里可以使用停泊方式,thread使用的是阻塞方式。还学习了创建过程管道,方法是使一个过程的输出通道作为另一个过程的输入通道,这样的代码比回调嵌套的代码更容易理解。最后,你还就你是否只是个其妙的热狗贩卖机进行了深入思考。


译文结束。