Coroutines and Tasks(翻譯)
這章節概略說明用於協程、任務的高階asyncio的APIs。
Coroutines
使用async/await語法宣告的協程是寫asyncio應用程式最好的方法。舉例來說,下面片段程式碼(需要Python 3.7+)列印"hello",等待1秒,然後列印"world":
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
注意到,單純的呼叫協程並不會調度它被執行:
>>> main()
<coroutine object main at 0x1053bb7c8>
要實際的執行協程,asyncio提供三種機制:
- 使用函數asyncio.run()來執行頂層的入口"main()"函數(見上範例)
- 協程中的等待。下面程式碼片段會在等待1秒之後列印"hello",,然後在等待另外2秒之後列印"world":
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
預期輸出如下:
started at 17:13:52
hello
world
finished at 17:13:55
- 函數asyncio.create_task()作為asyncio.Tasks同時執行協程
讓我們調整上面範例,並且同時執行兩個協程say_after
:
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
注意到,預期輸出現在顯示出這程式碼片段比之前還要快一秒:
started at 17:14:32
hello
world
finished at 17:14:34
Awaitables
我們說,如果一個物件可以用於await表達示中,那它就是一個可等待物件。許多asyncio APIs被設計為接受等待。
有三種類型的可等待物件:coroutines, Tasks, and Futures。
Coroutines
Python協程是可等待的,因此可以從其它協程中等待:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
重要:這文件中的術語"協程"可以用於兩個緊密相關的概念:
- 協程函數:一個async def函數
- 協程物件:透過呼叫協程函數回傳的物件
asyncio亦支援傳統基於生程的協程。
Tasks
Tasks用於並行調度協程。
當協程被包裝到具有像是asyncio.create_task()
函數的任務內時,這個協程會很快的自動調度來執行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Futures
Future是一個特別的低階可等待物件,代表一個非同步(異步)操作的事件結果。
當Future物件為awaited的時候,意味著協程會等待,一直到Future在其它位置被解析。
asyncio中的Future物件需要允許基於回呼程式碼與async/await一起使用。
一般來說,不需要在應用程式級別程式碼中建立Future物件。
Future objects, sometimes exposed by libraries and some asyncio APIs, can be awaited:
Future物件(有些時候會由套件或asyncio APIs公開)可以是awaited:
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一個回傳Future物件的低階函數範例為loop.run_in_executor()。
Running an asyncio Program
-
asyncio.run(coro, *, debug=False)
執行協程並回傳結果。這個函數執行一個傳遞過來的協程,並負責管理asyncio事件迴圈以及完成非同步生成器。
當另一個asyncio事件迴圈在相同thread(執行緒)執行的時候,這函數無法被呼叫。
如果
debug=True
,那事件迴圈會在除錯模式中執行。這函數總是建立一個新的事件迴圈,並在最後關閉它。它應該用做為asyncio程式的主要入口點,而且理想情況下應該只調用一次。
範例:async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main())
New in version 3.7.
原始碼可以在Lib/asyncio/runners.py找到
Creating Tasks
-
asyncio.create_task(coro, *, name=None)
將協程包裝到Task中,並調度它的執行。回傳Task物件。如果它的
name
不為None,則使用Task.set_name()來設置task名稱。task在get_running_loop()回傳的迴圈中執行,如果當前的執行緒(thread)沒有正在執行中的迴圈,那就拋出RuntimeError。
這個函數在Python 3.7中被加入。Python 3.7之前的版本,以低階函數asyncio.ensure_future()替代:
async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro()) ...
New in version 3.7.
Changed in version 3.8:增加參數
name
Sleeping
-
coroutine asyncio.sleep(delay, result=None, *, loop=None)
延遲秒數的阻塞。如果提供結果,那在協程完成的時候會回傳給調用者。
sleep()
總是暫停當前task,允許其它tasks執行。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。協程範例,每秒顯示當前日期5秒:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
Running Tasks Concurrently
-
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
在asw序列中同時執行awaitable物件如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。
如果所有的awaitables都成功的完成,彙總清單(list)。其順序相對應於aws中的awaitables的順序。
如果
return_exceptions
為False(預設),那第一個拋出的異常會立即被傳送到在gather()
上等待的task。如果
return_exceptions
為True,那異常會跟成功的結果一樣的處理方法,並彙總於結果清單。如果
gather()
被取消,所有提交的awaitables(未完成的)也會一併被取消。如果任一來自aws序列的Task或Future被取消,那就將它視為引發CancelledError,這種情況下不會取消
gather()
的呼叫。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。範例:
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24
Changed in version 3.7:如果gather本身被取消,那取消傳播與return_exceptions無關。
Shielding From Cancellation
-
awaitable asyncio.shield(aw, *, loop=None)
保護一個awaitable物件不被取消。如果aw是一個協程,那就視為Task自動調度。
語法:
res = await shield(something())
等價於:
res = await something()
除非包含它的協程被取消,那在something()中執行的Task就不會被取消。從something()的觀點來看,取消沒有發生。儘管其調用者仍然被取消,但"await"表達式依然會拋出CancelledError。
如果
something()
被其它方式取消(由自身內部),那會同時取消shield()
。如果希望完全忽略取消(不建議),那函數
shield()
應該結合try/except,如下:try: res = await shield(something()) except CancelledError: res = None
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。
Timeouts
-
coroutine asyncio.wait_for(aw, timeout, *, loop=None)
等待aw awaitable以超時完成。如果aw是一個協程,那就視為Task自動調度。
timeout
可以是None
或是float或是int做為等待的秒數。如果timeout
是None
,那阻塞會一直到future完成。函數會等待,一直到future確實的取消,因此總等待時間也許會超過
timeout
。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。範例:
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
Changed in version 3.7:當aw因為timeout而取消,則
wait_for
等待aw被取消。之前的版本會立即拋出asyncio.TimeoutError。
Waiting Primitives
-
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同時在aws上執行awaitable物件,並阻塞直到return_when
指定的條件為止。回傳兩組Tasks/Futures:
(done, pending)
用法:
done, pending = await asyncio.wait(aws)
timeout
(float or ing),如果指定,可以用來控制回傳前等待的最大秒數。注意,這個函數並不會拋出
asyncio.TimeoutError
。發生timeout未完成的Futures或Tasks僅在第二組中回傳。return_when
指示這函數何時應該回傳。它必須為下面項目之一:Constant Description FIRST_COMPLETED The function will return when any future finishes or is cancelled. FIRST_EXCEPTION The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED. ALL_COMPLETED The function will return when all futures finish or are cancelled. 不像wait_for(),當發生timeout的時候,
wait
並不會取消futures。Python 3.8之後不建議使用:如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。不建議將協程物件直接傳遞給
wait()
,因為它會導致混亂的行為。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。Note:
wait()
自動的將協程視為Task調度,稍後會回傳隱式建立的Task物件(done, pending)
。因此,下面程式碼將無法如預期般執行:async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run!
修復上面片段程式碼如下:
async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.
Python 3.8之後不建議使用:不建議直接將協程物件直接傳遞給
wait()
-
asyncio.as_completed(aws, *, loop=None, timeout=None)
同時在aws集中執行awaitable物件。回傳一個Future物件的迭代器。回傳的每一個Future物件表示剩餘的awaitables集的最早結果。如果在所有的Futures完成之前發生timeout,則拋出asyncio.TimeoutError。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。範例:
for f in as_completed(aws): earliest_result = await f # ...
Scheduling From Other Threads
-
asyncio.run_coroutine_threadsafe(coro, loop)
提交協程到給定的事件迴圈。線程安全。回傳concurrent.futures.Future以等待其它OS線程的結果。
這個函數意思是,從不同於執行事件循環的OS線程中調用。範例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果協程中拋出異常,那將通知回傳的Future。這也可以用來取消事件迴圈中的task:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
參考concurrency and multithreading
不像其它asyncio函數,這個函數要求顯式的傳遞參數
loop
New in version 3.5.1.
Introspection
-
asyncio.current_task(loop=None)
回傳目前執行中的Task實例,如果沒有Task在執行,則回傳None
如果
loop is None
,那就用get_running_loop()來取得當前迴圈。New in version 3.7.
-
asyncio.all_tasks(loop=None)
回傳由迴圈執行的一組未完成的Task物件。如果
loop is None
,那就用get_running_loop()來取得當前迴圈。New in version 3.7.
Task Object
-
class asyncio.Task(coro, *, loop=None, name=None)
執行Python協程的Future-like物件。非線程安全。Tasks用於在事件迴圈中執行協程。如果協程等待Future,那Task會暫停協程的執行並等待Future完成。當Future完成,將繼續執行包裝的協程。
事件迴圈使用協同調度:事件迴圈一次執行一個Task。當Task等待協程完成的時候,事件迴圈會執行其它Task、回呼或執行IO操作。
使用高階函數asyncio.create_task(),或低階函數loop.create_task()或ensure_future()建立Tasks。不建議手動實例化Tasks。
要取消執行中的Task,請使用cancal()方法。呼叫它將導致Task向包裝的協程中拋出CancelledError異常。如果在取消過程中某個Future物件正在等待協程,那這個Future物件將被取消。
calcelled()可以用來確認Task是否已經被取消。如果包裝的協程沒有抑制CancelledError異常並確實的取消,那就回傳True。
asyncio.Task繼承Future所有的APIs,除了Future.set_result()與Future.set_exception()。
Tasks支援contextvars模組。建立Task的時候,它會複製當前的上下文,然後在複製的上下文中執行協程。
Changed in version 3.7: 增加支援contextvars模組。
Changed in version 3.8: 增加參數
name
。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。
cancel()
請求取消Task。
這安排在事件迴圈的下一個週期將CancelledError異常拋出到包裝的協程中。
然後,協程有機會清楚,或甚至拒絕請求,透過利用try except CancelledError finally
區塊來抑制異常。
然而,不同於Future.cancel(),Task.cancel()並不保證Task一定被取消,儘管完全抑制取消並不常見,並且主動阻止。
下面範例說明協程如何攔截取消請求:
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now
cancelled()
如果Task已經取消,則回傳True。
當使用calcel()請求取消,Task將被取消,並且包裝的協程將CancelledError異常傳播到Task中。
done()
如果Task完成,則回傳True。
當包裝的協程回傳值,拋出異常、或Task被取消,Task就算完成了。
result()
回傳Task的結果。
如果Task完成,那就回傳包裝的協程的結果(或者如何協程拋出異常,則重新拋出該異常)
如果Task已經被取消,這個方法將拋出CancelledError異常。
如果Task的結果尚不可用,這個方法會拋出InvalidStateError異常。
exception()
回傳Task的異常。
如果包裝的協程拋出異常,則回傳該異常。如果包裝的協程正常回傳,這個方法將回傳None
。
如果Task已經被取消,這個方法將拋出CancelledError異常。
如果Task尚未完成,這個方法會拋出InvalidStateError異常。
add_done_callback(callback, *, context=None)
增加一個callback function給Task完成的時候執行。
這個方法只能在基於低階回呼的程式碼中使用。
更多細節請參考Future.add_done_callback()
remove_done_callback(callback)
從callback清單中取消callback。
這個方法只能在基於低階回呼的程式碼中使用。
更多細節請參考Future.remove_done_callback()
get_stack(*, limit=None)
回傳該Task的堆疊框的清單(list of stack frames)。
如果包裝的協程還沒有完成,那就回傳掛起它的堆疊。如果協程已經成功地完成或被取消,那就回傳空清單(empty list)。如果協程被異常終止,那就回傳回溯框清單(list of traceback frames)。
框(frames)總是依著舊到新的排序。
對懸置的協程,只會回傳一個堆疊框(stack frame)。
選項參數limit
設置回傳的框(frame)的最大數量;預設情況下回傳所有可用的框(frame)。回傳的清單順序依回傳是堆疊或回溯而有所不同:回傳最新的堆疊,但回傳最舊的回溯。(這與回溯模組的行為匹配。)
print_stack(*, limit=None, file=None)
列印該Task的堆棧或回溯。
對於get_stack()檢索到的frames,將生成與回溯模組類似的輸出。
get_coro()
回傳Task包裝的協程。
New in version 3.8.
get_name()
回傳Task的名稱。
如果沒有為Task明確的分配名稱,則預設asyncio Task實現在實例化過程中生成一個預設名稱。
New in version 3.8.
set_name(value)
設置Task的名稱。
參數value
可以是任意物件,然後將它轉為字串。
在預設的Task實現中,名稱將在Task物件的repr()
的輸出看的見。
New in version 3.8.
classmethod all_tasks(loop=None)
回傳事件迴圈的所有Tasks集合。
預設情況下,回傳當前事件迴圈的所有Tasks。如果loop is None
,則使用函數get_event_loop()取得當前迴圈。
版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.all_tasks()。
classmethod current_task(loop=None)
回傳當前的task或None。
如果loop is None
,則使用函數get_event_loop()取得當前迴圈。
版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.current_task() 。
沒有留言:
張貼留言