clojure 中使用 actor

Mybaby0 2019-06-20

背景

Actor 模型 是让 Earlang 声名卓著的关键特性。它是 Erlang 平台实现分布式编程的关键内容,在 Clojure 语言设计时, Rich Hickey 考虑过在 Clojure 语言中是否实现 Actor,他最终认为:这仅仅是适合于分布式编程的一种特征,如果成为语言的本质,将限制 Clojure 成为一种服务器领域语言,因此他决定使用其他更简单直接的异步通信模型。但他也没有排除未来在 Clojure 中引入 Actor 的可能。

简单实现

随着 core.async 库的推出和成熟,我们实际上已经可以用 core.async 来实现一个最简单的 Actor 模型:

(require '[clojure.core.async :as a])

(defn actor [f]
  (let [mail-box (a/chan (a/dropping-buffer 32))]
    (a/go-loop [f f]
      (when-let [v (a/<! mail-box)]
        (recur (f v))))
    mail-box))

(def ! a/put!) ;erlang 的操作符

可以看到,上面的 actor 函数可以将一个 core.async 通道封装成一个主动单元,其中拥有自己的事件循环,不断地在 mail-box 上等待新的消息。

下面我们定义一个简单的调试 actor:

(def debug-actor (actor (fn debug[x] (prn x) debug)))

(! debug-actor "Hello, world!")
;;输出 Hello, world

这最简单的 actor 当然还没有支持分布式编程,但使用这个模型,我们将程序的组件变成了主动的单元,从而在通道的基础上提供了另一种抽象。

与 Transducers 的关系

Clojure 1.7 开始引入的 transducer 被广泛使用,它可以使用在任何连续的数据结构上,例如序列,以及 core.async 的通道,如果我们已经定义好了一个 transducer 或者用它定义的操作 (xf),是否可以用于 Actor 呢?

(defn actor-xf [xf out-actor]
  (let [f-out (fn
                ([b] b)
                ([b itm] (a/put! b itm) b))]
    (fn f-actor
      ([v] (f-actor out-actor v))
      ([acc v]
       (let [acc ((xf f-out) acc v)]
         (partial f-actor acc))))))

这个函数可以用定义好的 xf 生成一个 actor,它将对消息处理后将转化后的消息送往 out-actor。例如:

(def inc-actor (actor (actor-xf (map inc) debug-actor)))
(! inc-actor 6)
;;输出7
(def complex-actor (actor (actor-xf (mapcat #(repeatedly % vector)) debug-actor)))
(! complex-actor 2)
;;[]
;;[]

其实 core.async 库中已经实现了类似的功能,就是 chan 函数本身!我们不过需要用 pipe 函数将内外两个通道连起来就可以了:

(defn actor-xf
  [xf out-ch]
  (let [ch (a/chan (a/dropping-buffer 32) xf)]
    (a/pipe ch out-ch)
    ch))

相关推荐