Clojure:ZeroMQ的入门DEMO

iceyung 2015-01-24

假设你已经知道什么是ZeroMQ(不知道的话可以看这个:http://zh.wikipedia.org/wiki/%C3%98MQ),以下就给出在Clojure中如何使用ZeroMQ(感谢此文作者:http://patternhatch.com/2013/06/12/messaging-using-clojure-and-zeromq/)。
1.    创建一个Clojure项目,这里我们用leiin。lein new app zmq-test
2.    在project.clj文件中添加
[com.rmoquin.bundle/jeromq "0.2.0"]
cheshire "5.3.1"]]
其中jeromq就是我们需要使用的ZeroMQ类库(纯Java实现),cheshire用于双向处理json。
3.    打开core.clj文件,输入如下代码:

(ns zmq-test.core
  (:import [org.jeromq ZMQ])
  (:require (cheshire [core :as c])))

(def ctx (ZMQ/context 1))

;; REQ/REP [Request-Reply] Pattern
;; In REPL, input
;;  (future-call echo-server)
;;  (echo "hi")
;; to run the demo function
(defn echo-server
  []
  (let [s (.socket ctx ZMQ/REP)]
    (.bind s "tcp:// 127.0.0.1:5555")
    (loop [msg (.recv s)]
      (.send s msg)
      (recur (.recv s)))))

(defn echo
  [msg]
  (let [s (.socket ctx ZMQ/REQ)]
    (.connect s "tcp:// 127.0.0.1:5555")
    (.send s msg)
    (println "Server replied:" (String. (.recv s)))
    (.close s)))

;; PUB/SUB [Publish-Subscribe] Pattern
;; In REPL, input
;;  (future-call market-data-publisher)
;;  (get-market-data 100)
;; to run the demo function
(defn market-data-publisher
  []
  (let [s (.socket ctx ZMQ/PUB)
        market-data-event (fn []
                            {:symbol (rand-nth ["CAT" "UTX"])
                            :size (rand-int 1000)
                            :price (format "%.2f" (rand 50.0))})]
    (.bind s "tcp:// 127.0.0.1:6666")
    (while :true
      (.send s (c/generate-string (market-data-event))))))

(defn get-market-data
  [num-events]
  (let [s (.socket ctx ZMQ/SUB)]
    (.subscribe s "")
    (.connect s "tcp://127.0.0.1:6666")
    (dotimes [_ num-events]
      (println (c/parse-string (String. (.recv s)))))
    (.close s)))

;; PUSH/PULL [Pipeline] Pattern
;; In REPL, input
;;  (future-call collector)
;;  (future-call worker)
;;  (future-call worker)
;;  (future-call worker)
;;  (dispatcher 100)
;; to run the demo function
(defn dispatcher
  [jobs]
  (let [s (.socket ctx ZMQ/PUSH)]
    (.bind s "tcp://127.0.0.1:7777")
    (Thread/sleep 1000)
    (dotimes [n jobs]
      (.send s (str n)))
    (.close s)))

(defn worker
  []
  (let [rcv (.socket ctx ZMQ/PULL)
        snd (.socket ctx ZMQ/PUSH)
        id (str (gensym "w"))]
    (.connect rcv "tcp://127.0.0.1:7777")
    (.connect snd "tcp://127.0.0.1:8888")
    (while :true
      (let [job-id (String. (.recv rcv))
            proc-time (rand-int 100)]
        (Thread/sleep proc-time)
        (.send snd (c/generate-string {:worker-id id
                                      :job-id job-id
                                      :processing-time proc-time}))))))

(defn collector
  []
  (let [s (.socket ctx ZMQ/PULL)]
    (.bind s "tcp://127.0.0.1:8888")
    (while :true
      (->> (.recv s)
          (String.)
          (c/parse-string)
          (println "Job completed:")))))

代码中包括了ZeroMQ的三种模式,可以直接在REPL中进行测试。但是这只是很简单的Hello World程序,如果要将ZeroMQ用于实际生产环境中的话,还有很多环节需要考虑和完善。

相关阅读

ZeroMQ 的详细介绍:请点这里
ZeroMQ 的下载地址:请点这里

相关推荐

pfjia / 0评论 2013-08-02