talkincode 2019-06-30
gevent 一种异步的方式,基于事件循环.. 跟 asyncio 里的东西运作的差不多
官方手册说的太不清楚 .
自己写了个入门教程.
一个最简单的例子:
spawn 将把你的函数封装成一个个协程对象
# 注意. gevent.sleep 不是 time.sleep . 下一个例子说明 def fuck1(arg): print('我在这: ',fuck1.__code__.co_firstlineno) gevent.sleep(1) return arg g1 = gevent.spawn(fuck1, 123) #产生一个GreenLet 协程 . print(g1 , type(g1)) #看看是杀 g1.join() #等待咯 # 与上面一种完全一样的方式 . gevent.spawn 相当于创建一个GreenLet ,然后start() g2 = gevent.Greenlet(fuck1,456) g2.start() #启动协成 print(g2, type(g2)) g2.join()
下面的例子中. 我不再使用gevent.Greenlet 来自己创建了,比较麻烦.直接spawn了 。
下面的例子里,我也不再使用继承的Greenlet啦.
如果对于GreenLet需要,也可以自己继承Greenlet . 重写 _run (有个下划线) 函数 即可:
class fuckme( gevent.Greenlet): def __init__(self ,*args): gevent.Greenlet.__init__(self) #自己可以弄点属性啥的。我就不弄了 def _run(self): #主要是这个 run前面有个线 i = 0 while i <3 : print(' 我是弱智') i+=1 gevent.sleep(0.3) g = fuckme() g.start() g.join()
用gevent.joinall 等待多个协成对象:
def fuck1(arg): print('我在这: ',fuck1.__code__.co_firstlineno) gevent.sleep(1) #你会发现2个函数几乎同时睡眠. 不再像time.sleep return arg # 将返回一个 list . 里面存放一个个GreenLet , 使用 value 获取返回值 res = gevent.joinall([ gevent.spawn(fuck1 , 123) , #产生一个GreenLet 协成 gevent.spawn(fuck1 , 456) , ]) print(res , type(res)) for v in res: print(v.value) #修改一下,更明显 def fuck1(arg): print('参数 < %s > 我在这: '%arg,fuck1.__code__.co_firstlineno) gevent.sleep(1) print('参数 < %s > 我醒来了 我在这: '%arg, fuck1.__code__.co_firstlineno) return arg cor_list = [gevent.spawn(fuck1 , arg ) for arg in range(5)] res = gevent.joinall(cor_list)
再来一些例子:
交互的运行着.
def fuck1(arg): print('参数 < %s > 我在这: '%arg,fuck1.__code__.co_firstlineno) gevent.sleep(1) print('参数 < %s > 我醒来了 我在这: '%arg, fuck1.__code__.co_firstlineno) return arg def fuck2( arg ): print(fuck2.__code__.co_name , fuck2.__code__.co_firstlineno) gevent.sleep(1) print(fuck2.__code__.co_name + " done") return arg cor_list = [gevent.spawn(fuck1 , arg ) for arg in range(3)] cor_list1 = [gevent.spawn(fuck2 , arg) for arg in range(3)] cor_list.extend(cor_list1) gevent.joinall(cor_list)
看一下同步 和异步的比较:
#用与测试的函数 def job(arg): import random print(' job start :' ,arg) gevent.sleep(random.randint(0,5) * 0.5) #这个时间可以自己修改看看 print(' im done') def sync(): for i in range(3): job(i) def async(): alist = [ gevent.spawn(job , arg) for arg in range(3)] gevent.joinall(alist) print("先来同步:") sync() print('再来异步:') async()
还有一些类死于线程的同步对象 . event啦, semaphore啦 ,queue啦. 这些都用于协程之间交互的, 毕竟单线程
event:
# 我看了下, 在windows中是个 CreateEvent 的手动事件 ,即一旦 set , 所有wait的将全部继续运行. # 附注: windows中有2个事件,一个自动一个手动. 自动的在 WaitForSingleObject后将原子的ResetEvent, 手动的不会. # 相当于 py中的 Event.wait, Event.clear # 那个啥, 这行别看了.py中没那么麻烦 from gevent.event import Event ev = Event() def request(): print(' fetching pages' * 10) gevent.sleep(2) print(' fetching done' * 10) ev.set() #所有wait的将被全部激活 def response(): print('response 已启动') ev.wait() #等待 ev.set 后将运行 print('response 完成') res_list = [gevent.spawn(response) for i in range(5)] #先创建了5个,他们运行到 ev.wait的时候将全部等待 res_list.append(gevent.spawn_later(2, request)) #这里用了 spawn_later .可以预定几秒后 开始运行 gevent.joinall(res_list)
queue: 多生产多消费
我一开始使用queue 的时候常常会碰到一个异常. LockUp.Exit (forever 之类的) 好像是这个.
主要原因是要么在生产者要么在消费者中一定有一个地方,没让协程退出. 所以在joinall 的时候会产生异常
queue.put / get 都是阻塞操作
from gevent.queue import Queue q = Queue(3) # 最多存放3个 def producer(): for i in range(20): print('->>>>>>>> producer put %d'%(i)) q.put(i) print('->' * 20 + ' producer done') def consumer(arg): while True: try: item = q.get(timeout=0.5) #设置了timeout ,用于过了0.5秒一旦queue为空则抛异常.结束此循环 print('consumer %d get %d , queue:%d ' %(arg,item,q.qsize()) ) except Exception as e: break print('consumer %d done' % arg) pro_list = [gevent.spawn(producer) for i in range(5)] #多个生产者 con_list = [gevent.spawn(consumer,i) for i in range(3)] #多个消费者 con_list.extend(pro_list) gevent.joinall(con_list) print(q.empty())
一个失败的例子: 用协程读取文件 . 测试下来速度很慢:
import os from functools import partial EACH_SIZE = 1024 #每次读1024 #eachpart : 每块大小, pos : 从哪里开始读取 def pro_readfile(filepath,eachPart,pos): with open(filepath,'rt') as fd: fd.seek(pos) iterbale = iter(partial(fd.read,EACH_SIZE),'') for text in iterbale: print(text) path = 'D:/360极速浏览器下载/msdn.txt' co_size = 5 #协程数量 filesize = os.path.getsize(path) #文件大小 eachPart = int(filesize/5) +1 #每个协程读多少 be = time.clock() #开始时间 pro_list = [gevent.spawn(pro_readfile,path,eachPart, i*eachPart) for i in range(co_size)] gevent.joinall(pro_list) end = time.clock() #结束 print(end-be)
JoinableQueue:
q = JoinableQueue(50) def doing(arg): print('im doing %d' %arg) gevent.sleep(1) print('im done %d'%arg) q.task_done() def to_do(): while True: func , args= q.get() gevent.spawn(func,args) for i in range(5): gevent.spawn(to_do) for i in range(10): q.put((doing,i)) q.join()
最后介绍一下Pool , 会用Pool ,也就会用Group了 . 附:class Pool(Group)
import gevent.monkey; gevent.monkey.patch_all() #注意导入这个. 如果你的程序涉及了socket pool = Pool(10) #限制在10个协程 def read_from(url): r = requests.get(url) r.encoding = r.apparent_encoding print(r.url, r.headers) return r.status_code urls = ["https://www.qq.com","https://www.baidu.com","http://www.sina.com.cn"] pool.spawn(read_from,urls[0]) #产生一个协程 pool.spawn(read_from,urls[1]) #也可以这样 res = pool.imap(read_from, urls) #跟map 函数类似, 返回一个可迭代 for r in res: print(r) #或者 print('- ' * 50) for r in pool.imap_unordered(read_from,urls): #更好的选择.哪个先完成就返回 print(r) pool.join()