szzxsh 2019-06-20
源文档是 core.async 仓库的一个代码文件, 包含大量的教程性质的注释
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中间不确定的两句留了原文, 有读懂的同学请回复帮我纠正
这份攻略介绍 core.async 核心的一些概念
clojure.core.async namespace 包含了公开的 API.
(require '[clojure.core.async :as async :refer :all])
数据通过类似队列的 Channel 来传输, Channel 默认不进行 buffer(长度为 0)
需要生产者和消费者进行约定从而在 Channel 当中传送数据
用 chan 可以创建一个不进行 buffer 的 Channel:
(chan)
传一个数字以创建限定了 buffer 大小的 Channel:
(chan 10)
close! 用来关闭 Channel 终结接受消息传入, 已存在的数据依然可以取出
取尽的 Channel 在取值时返回 nil, nil 是不能直接通过 Channel 发送的!
(let [c (chan)] (close! c))
对在一般的 Thread 中, 使用 >!!(阻塞的 put) 和 <!!(阻塞的 take)
与 Channel 进行通信
(let [c (chan 10)] (>!! c "hello") (assert (= "hello" (<!! c))) (close! c))
由于是这些调用是阻塞的, 如果尝试把数据放进没有 buffer 的 Channel, 那么整个 Thread 都会被卡住.
所以需要 thread(好比 future) 在线程池当中执行代码主体, 并且通过 Channel 传回数据
例子中启动了一个后台任务把 "hello" 放进 Channel, 然后在主线程读取数据
(let [c (chan)] (thread (>!! c "hello")) (assert (= "hello" (<!! c))) (close! c))
go 代码块和反转控制(IoC) threadgo 是一个宏, 能把它的 body 在特殊的线程池里异步执行
不同的是本来会阻塞的 Channel 操作会暂停, 不会有线程被阻塞
这套机制封装了事件/回调系统当中需要外部代码的反转控制
在 go block 内部, 我们使用 >!(put) 和 <!(take)
这里把前面 Channel 的例子转化成 go block:
(let [c (chan)] (go (>! c "hello")) (assert (= "hello" (<!! (go (<! c))))) (close! c))
这里使用了 go block 来模拟生产者, 而不是直接用 Thread 和阻塞调用
消费者用 go block 进行获取, 返回 Channel 作为结果, 对这个 Channel 做阻塞的读取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)
alts)Channel 对比队列一个啥手机应用是能够同时等待多个 Channel(像是 socket select)
通过 alts!!(一般 thread)或者 alts!(用于 go block)
可以通过 alts 创建后台线程讲两个任意的 Channel 结合到一起alts!! 获取集合中某个操作的来执行
或者是可以 take 的 Channel, 或者是可以 put [channel value] 的 Channel
并返回包含具体的值(对于 put 返回 nil)以及获取成功的 Channel:
(原文: alts!! takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)
(let [c1 (chan)
c2 (chan)]
(thread (while true
(let [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch))))
(>!! c1 "hi")
(>!! c2 "there"))打印内容(在 stdout, 可能你的 REPL 当中看不到):
从 #<ManyToManyChannel ...> 读取 hi
从 #<ManyToManyChannel ...> 读取 there
使用 alts! 来做和 go block 一样的事情:
(let [c1 (chan)
c2 (chan)]
(go (while true
(let [[v ch] (alts! [c1 c2])]
(println "Read" v "from" ch))))
(go (>! c1 "hi"))
(go (>! c2 "there")))因为 go block 是轻量级的进程而而不是限于 thread, 可以同时有大量的实例
这里创建 1000 个 go block 在 1000 个 Channel 里同时发送 hi
它们妥当时用 alts!! 来读取
(let [n 1000
cs (repeatedly n chan)
begin (System/currentTimeMillis)]
(doseq [c cs] (go (>! c "hi")))
(dotimes [i n]
(let [[v c] (alts!! cs)]
(assert (= "hi" v))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))timeout 创建 Channel 并等待设定的毫秒时间, 然后关闭:
(let [t (timeout 100)
begin (System/currentTimeMillis)]
(<!! t)
(println "Waited" (- (System/currentTimeMillis) begin)))可以结合 timeout 和 alts 来做有时限的 Channel 等待
这里是花 100ms 等待数据到达 Channel, 没有的话放弃:
(let [c (chan)
begin (System/currentTimeMillis)]
(alts!! [c (timeout 100)])
(println "Gave up after" (- (System/currentTimeMillis) begin)))Channel 可以定制不同的策略来处理 Buffer 填满的情况
这套 API 中提供了两个实用的例子.
使用 dropping-buffer 控制当 buffer 填满时丢弃最新鲜的值:
(chan (dropping-buffer 10))
使用 sliding-buffer 控制当 buffer 填满时丢弃最久远的值:
(chan (sliding-buffer 10))