zeroMQ初体验-10.优雅的使用多线程

石头君 2011-03-25

"或许,ZeroMQ是最好的多线程运行环境!"官网如是说。

其实它想要支持的是那种类似erlang信号模式。传统多线程总会伴随各种"锁"出现各种稀奇古怪的问题。而zeroMQ的多线程致力于"去锁化",简单来说,一条数据在同一时刻只允许被一个线程持有(而传统的是:只允许被一个线程操作)。而锁,是因为可能会出现的多线程同时操作一条数据才出现的副产品。从这里就可以很清晰的看出zeromq的切入点了,通过线程间的数据流动来保证同一时刻任何数据都只会被一个线程持有。

这里给出传统的应答模式的例子:

import time
import threading
import zmq

def worker_routine(worker_url, context):
    
    socket = context.socket(zmq.REP)
    
    socket.connect(worker_url)
    
    while True:
        
        string  = socket.recv()
        print("Received request: [%s]\n" % (string))
        time.sleep(1)
        
        socket.send("World")

def main():    
    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"
    
    context = zmq.Context(1)
    
    clients = context.socket(zmq.XREP)
    clients.bind(url_client)
    
    workers = context.socket(zmq.XREQ)
    workers.bind(url_worker)
    
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker, context, ))
        thread.start()
    
    zmq.device(zmq.QUEUE, clients, workers)
    
    clients.close()
    workers.close()
    context.term()
    
if __name__ == "__main__":
    main()

这样的切分还有一个隐性的好处,万一要从多线程转为多进程,可以非常容易的把代码切割过来再利用。

这里还给了一个用多线程不用多进程的理由:

进程开销太大了(话说,python是鼓励多进程替代线程的)。

上面代码给出的例子似乎没有子线程间的通信啊?既然支持用多线程,自然不会忘了这个:

import threading
import zmq

def step1(context):
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step2")   
    sender.send("")  

def step2(context):
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step2")
    
    thread = threading.Thread(target=step1, args=(context, ))
    thread.start()
    
    string = receiver.recv()

    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step3")
    sender.send("")
    
    return

def main():
    context = zmq.Context(1)
    
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step3")
    
    thread = threading.Thread(target=step2, args=(context, ))
    thread.start()
    
    string = receiver.recv()
    
    print("Test successful!\n")
    
    receiver.close()
    context.term()
    
    return

if __name__ == "__main__":
    main()

注意:

这里用到了一个新的端口类型:PAIR。专门为进程间通信准备的(文中还列了下为神马么用之前已经出现过的类型比如应答之类的)。这种类型及时,可靠,安全(进程间其实也是可以用的,与应答相似)。

(未完待续)

相关推荐