laijunfeng 2011-03-25
关掉一个进程有很多种方式,而在ZeroMQ中则推崇通过使用信号通知,可控的卸载、关闭进程。在这里,要援引之前的"分而治之"例子(具体可以见这里)。
例图:
显然,信号发送是由能够掌握整个进度的"水槽"(下游)来控制,在原有基础上做少许变更即可。
Worker(数据处理):
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") controller = context.socket(zmq.SUB) controller.connect("tcp://localhost:5559") controller.setsockopt(zmq.SUBSCRIBE, "") poller = zmq.Poller() poller.register(receiver, zmq.POLLIN) poller.register(controller, zmq.POLLIN) while True: socks = dict(poller.poll()) if socks.get(receiver) == zmq.POLLIN: message = receiver.recv() workload = int(message) # Workload in msecs time.sleep(workload / 1000.0) sender.send(message) sys.stdout.write(".") sys.stdout.flush() if socks.get(controller) == zmq.POLLIN: break
水槽(下游):
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") controller = context.socket(zmq.PUB) controller.bind("tcp://*:5559") receiver.recv() tstart = time.time() for task_nbr in xrange(100): receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(":") else: sys.stdout.write(".") sys.stdout.flush() tend = time.time() tdiff = tend - tstart total_msec = tdiff * 1000 print "Total elapsed time: %d msec" % total_msec controller.send("KILL") time.sleep(1)
注意:
正常情况下,即使进程被关闭,可能端口并没有被清除(那是有ZeroMQ维护的),原文中调用了这么两句
zmq_close(server)
zmq_term(context)
python中对应为zmq.close(),zmq.term(),不过python的垃圾回收会替俺们解决后顾之忧的~
(未完待续)