Python中的asyncio代码详解

zuanfengxiao 2019-09-07

asyncioä»ç»

çæc#çåå­¦å¯è½ç¥éï¼å¨c#中å¯ä»¥å¾æ¹ä¾¿ç使稠async å await æ¥å®ç°å¼æ­¥ç¼ç¨ï¼é£ä¹å¨python中åºè¯¥æä¹åå¢ï¼å¶å®pythonä¹æ¯æå¼æ­¥ç¼ç¨ï¼ä¸è¬ä½¿ç¨ asyncio è¿ä¸ªåºï¼ä¸é¢ä»ç»ä¸ä»ä¹æ¯ asyncio :

asyncio æ¯ç¨æ¥ç¼å 并å 代ç çåºï¼ä½¿ç¨ async/await 语æ³ã asyncio 被ç¨ä½å¤ä¸ªæä¾é«æ§è½ Python å¼æ­¥æ¡æ¶çåºç¡ï¼åæ¬ç½ç»åç½ç«æå¡ï¼æ°æ®åºè¿æ¥åºï¼åå¸å¼ä»»å¡éåç­ç­ã asyncio å¾å¾æ¯æ建 IO å¯éååé«å±çº§ ç»æå ç½ç»ä»£ç çæä½³éæ©ã

asyncio中çåºæ¬æ¦å¿µ

å¯ä»¥çè§ï¼ä½¿ç¨asyncioåºæ们ä¹å¯ä»¥å¨python代ç ä¸­ä½¿ç¨ async å await ã娠asyncio 中ï¼æå个åºæ¬æ¦å¿µï¼åå«æ¯ï¼

Eventloop

Eventloop å¯ä»¥è¯´æ¯ asyncio åºç¨çæ ¸å¿ï¼ä¸­å¤®æ»æ§ï¼ Eventloop å®ä¾æä¾äºæ³¨åãåæ¶ãæ§è¡ä»»å¡ååè° çæ¹æ³ã ç®åæ¥è¯´ï¼å°±æ¯æ们å¯ä»¥æä¸äºå¼æ­¥å½æ°æ³¨åå°è¿ä¸ªäºä»¶å¾ªç¯ä¸ï¼äºä»¶å¾ªç¯å循ç¯æ§è¡è¿äºå½æ°ï¼æ¯æ¬¡åªè½æ§è¡ä¸ä¸ªï¼ï¼å¦æå½åæ­£å¨æ§è¡çå½æ°å¨ç­å¾I/Oè¿åï¼é£ä¹äºä»¶å¾ªç¯å°±ä¼æåå®çæ§è¡å»æ§è¡å¶ä»å½æ°ãå½æ个å½æ°å®æI/Oåä¼æ¢å¤ï¼ç­å°ä¸æ¬¡å¾ªç¯å°å®çæ¶åå°±ä¼ç»§ç»­æ§è¡ã

Coroutine

åç¨æ¬è´¨å°±æ¯ä¸ä¸ªå½æ°ï¼

import asyncio
import time
async def a():
 print('Suspending a')
 await asyncio.sleep(3)
 print('Resuming a')
async def b():
 print('Suspending b')
 await asyncio.sleep(1)
 print('Resuming b')
async def main():
 start = time.perf_counter()
 await asyncio.gather(a(), b())
 print(f'{main.__name__} Cost: {time.perf_counter() - start}')
if __name__ == '__main__':
 asyncio.run(main())

æ§è¡ä¸è¿°ä»£ç ï¼å¯ä»¥çå°ç±»ä¼¼è¿æ ·çè¾åº:

Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997

å³äºåç¨çå·ä½ä»ç»ï¼å¯ä»¥åèæ以åçæç« python中çåç¨ ä¸è¿ä»¥åçé£ç§åæ³ï¼éè¦ä½¿ç¨è£é¥°å¨ï¼å·²ç»è¿æ¶äºã

Future

Future æ¯è¡¨ç¤ºä¸ä¸ªâæªæ¥â对象ï¼ç±»ä¼¼äº javascript 中ç promise ï¼å½å¼æ­¥æä½ç»æåä¼ææç»ç»æ设置å°è¿ä¸ª Future 对象ä¸ï¼ Future æ¯å¯¹åç¨çå°è£ã

>>> import asyncio
>>> def fun():
...  print("inner fun")
...  return 111
... 
>>> loop = asyncio.get_event_loop()
>>> future = loop.run_in_executor(None, fun) #è¿é没æ使ç¨await
inner fun
>>> future #å¯ä»¥çå°ï¼funæ¹æ³ç¶ææ¯pending
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]>
>>> future.done() # è¿æ²¡æå®æ
False
>>> [m for m in dir(future) if not m.startswith('_')]
['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
>>> future.result() #è¿ä¸ªæ¶åå¦æç´æ¥è°ç¨result()æ¹æ³ä¼æ¥é
Traceback (most recent call last):
 File "<input>", line 1, in <module>
asyncio.base_futures.InvalidStateError: Result is not set.
>>> async def runfun():
...  result=await future
...  print(result)
...  
>>>loop.run_until_complete(runfun()) #ä¹å¯ä»¥éè¿ loop.run_until_complete(future) æ¥æ§è¡ï¼è¿éåªæ¯ä¸ºäºæ¼ç¤ºawait
111
>>> future
<Future finished result=111>
>>> future.done()
True
>>> future.result()
111
Task

Eventloop é¤äºæ¯æåç¨ï¼è¿æ¯æ注å Future å Task 2ç§ç±»åç对象ï¼è Future æ¯åç¨çå°è£ï¼ Future 对象æä¾äºå¾å¤ä»»å¡æ¹æ³ï¼å¦å®æåçåè°ï¼åæ¶ï¼è®¾ç½®ä»»å¡ç»æç­ç­ï¼ï¼ä½æ¯ä¸è¬æåµä¸å¼åèä¸éè¦æä½ Future è¿ç§åºå±å¯¹è±¡ï¼èæ¯ç´æ¥ç¨ Future çå­ç±» Task ååçè°åº¦åç¨æ¥å®ç°å¹¶åãé£ä¹ä»ä¹æ¯ Task å¢ï¼ä¸é¢ä»ç»ä¸ï¼

ä¸ä¸ªä¸ Future 类似ç对象ï¼å¯è¿è¡ Python åç¨ãé线ç¨å®å¨ã Task 对象被ç¨æ¥å¨äºä»¶å¾ªç¯ä¸­è¿è¡åç¨ãå¦æä¸ä¸ªåç¨å¨ç­å¾ä¸ä¸ª Future 对象@Task 对象ä¼æ起该åç¨çæ§è¡å¹¶ç­å¾è¯¥ Future 对象å®æãå½è¯¥ Future 对象å®æ被æåçåç¨å°æ¢å¤æ§è¡ã äºä»¶å¾ªç¯ä½¿ç¨ååæ¥ç¨è°åº¦: ä¸ä¸ªäºä»¶å¾ªç¯æ¯æ¬¡è¿è¡ä¸ä¸ª Task 对象ãèä¸ä¸ª Task 对象ä¼ç­å¾ä¸ä¸ª Future 对象å®æï¼è¯¥äºä»¶å¾ªç¯ä¼è¿è¡å¶ä» Task ãåè°ææ§è¡IOæä½ã

ä¸é¢ççç¨æ³ï¼

>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  
>>> task = asyncio.ensure_future(a())
>>> loop.run_until_complete(task)
Suspending a
Resuming a

asyncio中ä¸äºå¸¸è§ç¨æ³çåºå«

Asyncio.gatheråasyncio.wait

æ们å¨ä¸é¢ç代ç ä¸­ç¨å°è¿ asyncio.gather ï¼å¶å®è¿æå¦å¤ä¸ç§ç¨æ³æ¯ asyncio.wait ï¼ä»ä»¬é½å¯ä»¥è®©å¤ä¸ªåç¨å¹¶åæ§è¡ï¼é£ä¹ä»ä»¬æä»ä¹åºå«å¢ï¼ä¸é¢ä»ç»ä¸ã

>>> import asyncio
>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  return 'A'
... 
... 
... async def b():
...  print('Suspending b')
...  await asyncio.sleep(1)
...  print('Resuming b')
...  return 'B'
... 
>>> async def fun1():
...  return_value_a, return_value_b = await asyncio.gather(a(), b())
...  print(return_value_a,return_value_b)
...  
>>> asyncio.run(fun1())
Suspending a
Suspending b
Resuming b
Resuming a
A B
>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()])
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending b
Suspending a
Resuming b
Resuming a
{<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>}
set()
<Task finished coro=<a() done, defined at <input>:1> result='A'>
A

æ ¹æ®ä¸è¿°ä»£ç ï¼æ们å¯ä»¥çåºä¸¤èçåºå«ï¼

asyncio.gather è½æ¶éåç¨çç»æï¼èä¸ä¼æç§è¾å¥åç¨ç顺åºä¿å­å¯¹åºåç¨çæ§è¡ç»æï¼è asyncio.wait çè¿åå¼æ两项ï¼ç¬¬ä¸é¡¹æ¯å®æçä»»å¡å表ï¼ç¬¬äºé¡¹è¡¨ç¤ºç­å¾å®æçä»»å¡å表ã

asyncio.wait æ¯ææ¥åä¸ä¸ªåæ° return_when ï¼å¨é»è®¤æåµä¸ï¼ asyncio.wait ä¼ç­å¾å¨é¨ä»»å¡å®æ (return_when='ALL_COMPLETED') ï¼å®è¿æ¯æ FIRST_COMPLETED ï¼ç¬¬ä¸ä¸ªåç¨å®æå°±è¿åï¼å FIRST_EXCEPTION ï¼åºç°ç¬¬ä¸ä¸ªå¼å¸¸å°±è¿åï¼ï¼

>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED)
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending a
Suspending b
Resuming b
{<Task finished coro=<b() done, defined at <input>:8> result='B'>}
{<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>}
<Task finished coro=<b() done, defined at <input>:8> result='B'>
B

ä¸è¬æåµä¸ï¼ç¨ asyncio.gather 就足å¤äºã

asyncio.create_taskåloop.create_task以åasyncio.ensure_future

è¿ä¸ç§æ¹æ³é½å¯ä»¥å建 Task ,ä»Python3.7å¼å§å¯ä»¥ç»ä¸ç使ç¨æ´é«é¶ç asyncio.create_task .å¶å® asyncio.create_task å°±æ¯ç¨ç loop.create_task . loop.create_task æ¥åçåæ°éè¦æ¯ä¸ä¸ªåç¨ï¼ä½æ¯ asyncio.ensure_future é¤äºæ¥ååç¨ï¼è¿å¯ä»¥æ¯ Future 对象æè awaitable 对象ï¼

  1. å¦æåæ°æ¯åç¨ï¼å¶åºå±ä½¿ç¨ loop.create_task ï¼è¿å Task 对象
  2. å¦æ毠Future 对象ä¼ç´æ¥è¿å
  3. å¦ææ¯ä¸ä¸ª awaitable 对象ï¼ä¼ await è¿ä¸ªå¯¹è±¡ç __await__ æ¹æ³ï¼åæ§è¡ä¸æ¬¡ ensure_future ï¼æåè¿å Task æè Future ã

æ以 ensure_future æ¹æ³ä¸»è¦å°±æ¯ç¡®ä¿è¿æ¯ä¸ä¸ª Future 对象ï¼ä¸è¬æåµä¸ç´æ¥ç¨ asyncio.create_task å°±å¯ä»¥äºã

注ååè°åæ§è¡å步代ç 

å¯ä»¥ä½¿ç¨ add_done_callback æ¥æ·»å æååè°ï¼

def callback(future):
 print(f'Result: {future.result()}')
def callback2(future, n):
 print(f'Result: {future.result()}, N: {n}')
async def funa():
 await asyncio.sleep(1)
 return "funa"
async def main():
 task = asyncio.create_task(funa())
 task.add_done_callback(callback)
 await task
 #è¿æ ·å¯ä»¥ä¸ºcallbackä¼ éåæ°
 task = asyncio.create_task(funa())
 task.add_done_callback(functools.partial(callback2, n=1))
 await task
if __name__ == '__main__':
 asyncio.run(main())

æ§è¡å步代ç 

å¦ææåæ­¥é»è¾ï¼æ³è¦ç¨ asyncio æ¥å®ç°å¹¶åï¼é£ä¹éè¦æä¹åå¢ï¼ä¸é¢ççï¼

def a1():
 time.sleep(1)
 return "A"
async def b1():
 await asyncio.sleep(1)
 return "B"
async def main():
 loop = asyncio.get_running_loop()
 await asyncio.gather(loop.run_in_executor(None, a1), b1())
if __name__ == '__main__':
 start = time.perf_counter()
 asyncio.run(main())
 print(f'main method Cost: {time.perf_counter() - start}')
# è¾åºï¼ main method Cost: 1.0050589740000002

å¯ä»¥ä½¿ç¨ run_into_executor æ¥å°åæ­¥å½æ°é»è¾è½¬åæä¸ä¸ªåç¨ï¼ç¬¬ä¸ä¸ªåæ°æ¯è¦ä¼ é concurrent.futures.Executor å®ä¾çï¼ä¼ é None ä¼éæ©é»è®¤ç executor ã

æ»ç»

以ä¸æè¿°æ¯å°ç¼ç»å¤§å®¶ä»ç»çPython中çasyncio代ç è¯¦è§£,å¸æ对大家ææ帮å©ï¼å¦æ大家æä»»ä½çé®è¯·ç»æçè¨ï¼å°ç¼ä¼åæ¶åå¤å¤§å®¶çãå¨æ­¤ä¹é常æ谢大家对èæ¬ä¹å®¶ç½ç«çæ¯æï¼
å¦æä½ è§å¾æ¬æ对你æ帮å©ï¼æ¬¢è¿è½¬è½½ï¼ç¦è¯·æ³¨æåºå¤ï¼è°¢è°¢ï¼

相关推荐