szzxsh 2019-06-20
源文档是 core.async
仓库的一个代码文件, 包含大量的教程性质的注释
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中间不确定的两句留了原文, 有读懂的同学请回复帮我纠正
这份攻略介绍 core.async
核心的一些概念
clojure.core.async
namespace 包含了公开的 API.
(require '[clojure.core.async :as async :refer :all])
数据通过类似队列的 Channel 来传输, Channel 默认不进行 buffer(长度为 0)
需要生产者和消费者进行约定从而在 Channel 当中传送数据
用 chan
可以创建一个不进行 buffer 的 Channel:
(chan)
传一个数字以创建限定了 buffer 大小的 Channel:
(chan 10)
close!
用来关闭 Channel 终结接受消息传入, 已存在的数据依然可以取出
取尽的 Channel 在取值时返回 nil
, nil
是不能直接通过 Channel 发送的!
(let [c (chan)] (close! c))
对在一般的 Thread 中, 使用 >!!
(阻塞的 put) 和 <!!
(阻塞的 take)
与 Channel 进行通信
(let [c (chan 10)] (>!! c "hello") (assert (= "hello" (<!! c))) (close! c))
由于是这些调用是阻塞的, 如果尝试把数据放进没有 buffer 的 Channel, 那么整个 Thread 都会被卡住.
所以需要 thread
(好比 future
) 在线程池当中执行代码主体, 并且通过 Channel 传回数据
例子中启动了一个后台任务把 "hello"
放进 Channel, 然后在主线程读取数据
(let [c (chan)] (thread (>!! c "hello")) (assert (= "hello" (<!! c))) (close! c))
go
代码块和反转控制(IoC) threadgo
是一个宏, 能把它的 body 在特殊的线程池里异步执行
不同的是本来会阻塞的 Channel 操作会暂停, 不会有线程被阻塞
这套机制封装了事件/回调系统当中需要外部代码的反转控制
在 go
block 内部, 我们使用 >!
(put
) 和 <!
(take
)
这里把前面 Channel 的例子转化成 go
block:
(let [c (chan)] (go (>! c "hello")) (assert (= "hello" (<!! (go (<! c))))) (close! c))
这里使用了 go
block 来模拟生产者, 而不是直接用 Thread 和阻塞调用
消费者用 go
block 进行获取, 返回 Channel 作为结果, 对这个 Channel 做阻塞的读取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)
alts
)Channel 对比队列一个啥手机应用是能够同时等待多个 Channel(像是 socket select)
通过 alts!!
(一般 thread)或者 alts!
(用于 go
block)
可以通过 alts
创建后台线程讲两个任意的 Channel 结合到一起alts!!
获取集合中某个操作的来执行
或者是可以 take
的 Channel, 或者是可以 put
[channel value]
的 Channel
并返回包含具体的值(对于 put
返回 nil
)以及获取成功的 Channel:
(原文: alts!!
takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)
(let [c1 (chan) c2 (chan)] (thread (while true (let [[v ch] (alts!! [c1 c2])] (println "Read" v "from" ch)))) (>!! c1 "hi") (>!! c2 "there"))
打印内容(在 stdout, 可能你的 REPL 当中看不到):
从 #<ManyToManyChannel ...>
读取 hi
从 #<ManyToManyChannel ...>
读取 there
使用 alts!
来做和 go
block 一样的事情:
(let [c1 (chan) c2 (chan)] (go (while true (let [[v ch] (alts! [c1 c2])] (println "Read" v "from" ch)))) (go (>! c1 "hi")) (go (>! c2 "there")))
因为 go
block 是轻量级的进程而而不是限于 thread, 可以同时有大量的实例
这里创建 1000 个 go
block 在 1000 个 Channel 里同时发送 hi
它们妥当时用 alts!!
来读取
(let [n 1000 cs (repeatedly n chan) begin (System/currentTimeMillis)] (doseq [c cs] (go (>! c "hi"))) (dotimes [i n] (let [[v c] (alts!! cs)] (assert (= "hi" v)))) (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
timeout
创建 Channel 并等待设定的毫秒时间, 然后关闭:
(let [t (timeout 100) begin (System/currentTimeMillis)] (<!! t) (println "Waited" (- (System/currentTimeMillis) begin)))
可以结合 timeout
和 alts
来做有时限的 Channel 等待
这里是花 100ms 等待数据到达 Channel, 没有的话放弃:
(let [c (chan) begin (System/currentTimeMillis)] (alts!! [c (timeout 100)]) (println "Gave up after" (- (System/currentTimeMillis) begin)))
Channel 可以定制不同的策略来处理 Buffer 填满的情况
这套 API 中提供了两个实用的例子.
使用 dropping-buffer
控制当 buffer 填满时丢弃最新鲜的值:
(chan (dropping-buffer 10))
使用 sliding-buffer
控制当 buffer 填满时丢弃最久远的值:
(chan (sliding-buffer 10))