Coroutines and Tasks(翻譯)
這章節概略說明用於協程、任務的高階asyncio的APIs。
Coroutines
使用async/await語法宣告的協程是寫asyncio應用程式最好的方法。舉例來說,下面片段程式碼(需要Python 3.7+)列印"hello",等待1秒,然後列印"world":
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
注意到,單純的呼叫協程並不會調度它被執行:
>>> main()
<coroutine object main at 0x1053bb7c8>
要實際的執行協程,asyncio提供三種機制:
- 使用函數asyncio.run()來執行頂層的入口"main()"函數(見上範例)
- 協程中的等待。下面程式碼片段會在等待1秒之後列印"hello",,然後在等待另外2秒之後列印"world":
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
預期輸出如下:
started at 17:13:52
hello
world
finished at 17:13:55
讓我們調整上面範例,並且同時執行兩個協程say_after
:
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")
注意到,預期輸出現在顯示出這程式碼片段比之前還要快一秒:
started at 17:14:32
hello
world
finished at 17:14:34
Awaitables
我們說,如果一個物件可以用於await表達示中,那它就是一個可等待物件。許多asyncio APIs被設計為接受等待。
有三種類型的可等待物件:coroutines, Tasks, and Futures。
Coroutines
Python協程是可等待的,因此可以從其它協程中等待:
import asyncio
async def nested():
return 42
async def main():
nested()
print(await nested())
asyncio.run(main())
重要:這文件中的術語"協程"可以用於兩個緊密相關的概念:
asyncio亦支援傳統基於生程的協程。
Tasks
Tasks用於並行調度協程。
當協程被包裝到具有像是asyncio.create_task()
函數的任務內時,這個協程會很快的自動調度來執行:
import asyncio
async def nested():
return 42
async def main():
task = asyncio.create_task(nested())
await task
asyncio.run(main())
Futures
Future是一個特別的低階可等待物件,代表一個非同步(異步)操作的事件結果。
當Future物件為awaited的時候,意味著協程會等待,一直到Future在其它位置被解析。
asyncio中的Future物件需要允許基於回呼程式碼與async/await一起使用。
一般來說,不需要在應用程式級別程式碼中建立Future物件。
Future objects, sometimes exposed by libraries and some asyncio APIs, can be awaited:
Future物件(有些時候會由套件或asyncio APIs公開)可以是awaited:
await function_that_returns_a_future_object()
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一個回傳Future物件的低階函數範例為loop.run_in_executor()。
Running an asyncio Program
-
asyncio.run(coro, *, debug=False)
執行協程並回傳結果。
這個函數執行一個傳遞過來的協程,並負責管理asyncio事件迴圈以及完成非同步生成器。
當另一個asyncio事件迴圈在相同thread(執行緒)執行的時候,這函數無法被呼叫。
如果debug=True
,那事件迴圈會在除錯模式中執行。
這函數總是建立一個新的事件迴圈,並在最後關閉它。它應該用做為asyncio程式的主要入口點,而且理想情況下應該只調用一次。
範例:
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
New in version 3.7.
Creating Tasks
-
asyncio.create_task(coro, *, name=None)
將協程包裝到Task中,並調度它的執行。回傳Task物件。
如果它的name
不為None,則使用Task.set_name()來設置task名稱。
task在get_running_loop()回傳的迴圈中執行,如果當前的執行緒(thread)沒有正在執行中的迴圈,那就拋出RuntimeError。
這個函數在Python 3.7中被加入。Python 3.7之前的版本,以低階函數asyncio.ensure_future()替代:
async def coro():
...
task = asyncio.create_task(coro())
...
task = asyncio.ensure_future(coro())
...
New in version 3.7.
Changed in version 3.8:增加參數name
Sleeping
-
coroutine asyncio.sleep(delay, result=None, *, loop=None)
延遲秒數的阻塞。
如果提供結果,那在協程完成的時候會回傳給調用者。
sleep()
總是暫停當前task,允許其它tasks執行。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop
。
協程範例,每秒顯示當前日期5秒:
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())
Running Tasks Concurrently
-
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
在asw序列中同時執行awaitable物件
如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。
如果所有的awaitables都成功的完成,彙總清單(list)。其順序相對應於aws中的awaitables的順序。
如果return_exceptions
為False(預設),那第一個拋出的異常會立即被傳送到在gather()
上等待的task。
如果return_exceptions
為True,那異常會跟成功的結果一樣的處理方法,並彙總於結果清單。
如果gather()
被取消,所有提交的awaitables(未完成的)也會一併被取消。
如果任一來自aws序列的Task或Future被取消,那就將它視為引發CancelledError,這種情況下不會取消gather()
的呼叫。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop
。
範例:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
Changed in version 3.7:如果gather本身被取消,那取消傳播與return_exceptions無關。
Shielding From Cancellation
-
awaitable asyncio.shield(aw, *, loop=None)
保護一個awaitable物件不被取消。
如果aw是一個協程,那就視為Task自動調度。
語法:
res = await shield(something())
等價於:
res = await something()
除非包含它的協程被取消,那在something()中執行的Task就不會被取消。從something()的觀點來看,取消沒有發生。儘管其調用者仍然被取消,但"await"表達式依然會拋出CancelledError。
如果something()
被其它方式取消(由自身內部),那會同時取消shield()
。
如果希望完全忽略取消(不建議),那函數shield()
應該結合try/except,如下:
try:
res = await shield(something())
except CancelledError:
res = None
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop
。
Timeouts
-
coroutine asyncio.wait_for(aw, timeout, *, loop=None)
等待aw awaitable以超時完成。
如果aw是一個協程,那就視為Task自動調度。
timeout
可以是None
或是float或是int做為等待的秒數。如果timeout
是None
,那阻塞會一直到future完成。
避免task取消,可以將task包裝在shield()
函數會等待,一直到future確實的取消,因此總等待時間也許會超過timeout
。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop
。
範例:
async def eternity():
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
Changed in version 3.7:當aw因為timeout而取消,則wait_for
等待aw被取消。之前的版本會立即拋出asyncio.TimeoutError。
Waiting Primitives
-
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同時在aws上執行awaitable物件,並阻塞直到return_when
指定的條件為止。
回傳兩組Tasks/Futures:(done, pending)
用法:
done, pending = await asyncio.wait(aws)
timeout
(float or ing),如果指定,可以用來控制回傳前等待的最大秒數。
注意,這個函數並不會拋出asyncio.TimeoutError
。發生timeout未完成的Futures或Tasks僅在第二組中回傳。
return_when
指示這函數何時應該回傳。它必須為下面項目之一:
Constant |
Description |
FIRST_COMPLETED |
The function will return when any future finishes or is cancelled. |
FIRST_EXCEPTION |
The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED. |
ALL_COMPLETED |
The function will return when all futures finish or are cancelled. |
不像wait_for(),當發生timeout的時候,wait
並不會取消futures。
Python 3.8之後不建議使用:如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。不建議將協程物件直接傳遞給wait()
,因為它會導致混亂的行為。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop
。
Note:wait()
自動的將協程視為Task調度,稍後會回傳隱式建立的Task物件(done, pending)
。因此,下面程式碼將無法如預期般執行:
async def foo():
return 42
coro = foo()
done, pending = await asyncio.wait({coro})
if coro in done:
修復上面片段程式碼如下:
async def foo():
return 42
task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})
if task in done:
Python 3.8之後不建議使用:不建議直接將協程物件直接傳遞給wait()
-
asyncio.as_completed(aws, *, loop=None, timeout=None)
同時在aws集中執行awaitable物件。回傳一個Future物件的迭代器。回傳的每一個Future物件表示剩餘的awaitables集的最早結果。
如果在所有的Futures完成之前發生timeout,則拋出asyncio.TimeoutError。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop
。
範例:
for f in as_completed(aws):
earliest_result = await f
Scheduling From Other Threads
-
asyncio.run_coroutine_threadsafe(coro, loop)
提交協程到給定的事件迴圈。線程安全。
回傳concurrent.futures.Future以等待其它OS線程的結果。
這個函數意思是,從不同於執行事件循環的OS線程中調用。範例:
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, loop)
assert future.result(timeout) == 3
如果協程中拋出異常,那將通知回傳的Future。這也可以用來取消事件迴圈中的task:
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task...')
future.cancel()
except Exception as exc:
print(f'The coroutine raised an exception: {exc!r}')
else:
print(f'The coroutine returned: {result!r}')
參考concurrency and multithreading
不像其它asyncio函數,這個函數要求顯式的傳遞參數loop
New in version 3.5.1.
Introspection
-
asyncio.current_task(loop=None)
回傳目前執行中的Task實例,如果沒有Task在執行,則回傳None
如果loop is None
,那就用get_running_loop()來取得當前迴圈。
New in version 3.7.
-
asyncio.all_tasks(loop=None)
回傳由迴圈執行的一組未完成的Task物件。
如果loop is None
,那就用get_running_loop()來取得當前迴圈。
New in version 3.7.
Task Object
cancel()
請求取消Task。
這安排在事件迴圈的下一個週期將CancelledError異常拋出到包裝的協程中。
然後,協程有機會清楚,或甚至拒絕請求,透過利用try except CancelledError finally
區塊來抑制異常。
然而,不同於Future.cancel(),Task.cancel()並不保證Task一定被取消,儘管完全抑制取消並不常見,並且主動阻止。
下面範例說明協程如何攔截取消請求:
async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
cancelled()
如果Task已經取消,則回傳True。
當使用calcel()請求取消,Task將被取消,並且包裝的協程將CancelledError異常傳播到Task中。
done()
如果Task完成,則回傳True。
當包裝的協程回傳值,拋出異常、或Task被取消,Task就算完成了。
result()
回傳Task的結果。
如果Task完成,那就回傳包裝的協程的結果(或者如何協程拋出異常,則重新拋出該異常)
如果Task已經被取消,這個方法將拋出CancelledError異常。
如果Task的結果尚不可用,這個方法會拋出InvalidStateError異常。
exception()
回傳Task的異常。
如果包裝的協程拋出異常,則回傳該異常。如果包裝的協程正常回傳,這個方法將回傳None
。
如果Task已經被取消,這個方法將拋出CancelledError異常。
如果Task尚未完成,這個方法會拋出InvalidStateError異常。
add_done_callback(callback, *, context=None)
增加一個callback function給Task完成的時候執行。
這個方法只能在基於低階回呼的程式碼中使用。
更多細節請參考Future.add_done_callback()
remove_done_callback(callback)
從callback清單中取消callback。
這個方法只能在基於低階回呼的程式碼中使用。
更多細節請參考Future.remove_done_callback()
get_stack(*, limit=None)
回傳該Task的堆疊框的清單(list of stack frames)。
如果包裝的協程還沒有完成,那就回傳掛起它的堆疊。如果協程已經成功地完成或被取消,那就回傳空清單(empty list)。如果協程被異常終止,那就回傳回溯框清單(list of traceback frames)。
框(frames)總是依著舊到新的排序。
對懸置的協程,只會回傳一個堆疊框(stack frame)。
選項參數limit
設置回傳的框(frame)的最大數量;預設情況下回傳所有可用的框(frame)。回傳的清單順序依回傳是堆疊或回溯而有所不同:回傳最新的堆疊,但回傳最舊的回溯。(這與回溯模組的行為匹配。)
print_stack(*, limit=None, file=None)
列印該Task的堆棧或回溯。
對於get_stack()檢索到的frames,將生成與回溯模組類似的輸出。
get_coro()
回傳Task包裝的協程。
New in version 3.8.
get_name()
回傳Task的名稱。
如果沒有為Task明確的分配名稱,則預設asyncio Task實現在實例化過程中生成一個預設名稱。
New in version 3.8.
set_name(value)
設置Task的名稱。
參數value
可以是任意物件,然後將它轉為字串。
在預設的Task實現中,名稱將在Task物件的repr()
的輸出看的見。
New in version 3.8.
classmethod all_tasks(loop=None)
回傳事件迴圈的所有Tasks集合。
預設情況下,回傳當前事件迴圈的所有Tasks。如果loop is None
,則使用函數get_event_loop()取得當前迴圈。
版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.all_tasks()。
classmethod current_task(loop=None)
回傳當前的task或None。
如果loop is None
,則使用函數get_event_loop()取得當前迴圈。
版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.current_task() 。
Generator-based Coroutines
Coroutines and Tasks(翻譯)
這章節概略說明用於協程、任務的高階asyncio的APIs。
- Coroutines and Tasks(翻譯)
- Coroutines
- Awaitables
- Coroutines
- Tasks
- Futures
- Running an asyncio Program
- Creating Tasks
- Sleeping
- Running Tasks Concurrently
- Shielding From Cancellation
- Timeouts
- Waiting Primitives
- Scheduling From Other Threads
- Introspection
- Task Object
- cancel()
- cancelled()
- done()
- result()
- exception()
- add_done_callback(callback, *, context=None)
- remove_done_callback(callback)
- get_stack(*, limit=None)
- print_stack(*, limit=None, file=None)
- get_coro()
- get_name()
- set_name(value)
- classmethod all_tasks(loop=None)
- classmethod current_task(loop=None)
- Generator-based Coroutines
Coroutines
使用async/await語法宣告的協程是寫asyncio應用程式最好的方法。舉例來說,下面片段程式碼(需要Python 3.7+)列印"hello",等待1秒,然後列印"world":
>>> import asyncio >>> async def main(): ... print('hello') ... await asyncio.sleep(1) ... print('world') >>> asyncio.run(main()) hello world
注意到,單純的呼叫協程並不會調度它被執行:
>>> main() <coroutine object main at 0x1053bb7c8>
要實際的執行協程,asyncio提供三種機制:
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
預期輸出如下:
讓我們調整上面範例,並且同時執行兩個協程
say_after
:async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
注意到,預期輸出現在顯示出這程式碼片段比之前還要快一秒:
Awaitables
我們說,如果一個物件可以用於await表達示中,那它就是一個可等待物件。許多asyncio APIs被設計為接受等待。
有三種類型的可等待物件:coroutines, Tasks, and Futures。
Coroutines
Python協程是可等待的,因此可以從其它協程中等待:
import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main())
重要:這文件中的術語"協程"可以用於兩個緊密相關的概念:
asyncio亦支援傳統基於生程的協程。
Tasks
Tasks用於並行調度協程。
當協程被包裝到具有像是
asyncio.create_task()
函數的任務內時,這個協程會很快的自動調度來執行:import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main())
Futures
Future是一個特別的低階可等待物件,代表一個非同步(異步)操作的事件結果。
當Future物件為awaited的時候,意味著協程會等待,一直到Future在其它位置被解析。
asyncio中的Future物件需要允許基於回呼程式碼與async/await一起使用。
一般來說,不需要在應用程式級別程式碼中建立Future物件。
Future objects, sometimes exposed by libraries and some asyncio APIs, can be awaited:
Future物件(有些時候會由套件或asyncio APIs公開)可以是awaited:
await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
一個回傳Future物件的低階函數範例為loop.run_in_executor()。
Running an asyncio Program
asyncio.run(coro, *, debug=False)
執行協程並回傳結果。
這個函數執行一個傳遞過來的協程,並負責管理asyncio事件迴圈以及完成非同步生成器。
當另一個asyncio事件迴圈在相同thread(執行緒)執行的時候,這函數無法被呼叫。
如果
debug=True
,那事件迴圈會在除錯模式中執行。這函數總是建立一個新的事件迴圈,並在最後關閉它。它應該用做為asyncio程式的主要入口點,而且理想情況下應該只調用一次。
範例:
async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main())
New in version 3.7.
原始碼可以在Lib/asyncio/runners.py找到
Creating Tasks
asyncio.create_task(coro, *, name=None)
將協程包裝到Task中,並調度它的執行。回傳Task物件。
如果它的
name
不為None,則使用Task.set_name()來設置task名稱。task在get_running_loop()回傳的迴圈中執行,如果當前的執行緒(thread)沒有正在執行中的迴圈,那就拋出RuntimeError。
這個函數在Python 3.7中被加入。Python 3.7之前的版本,以低階函數asyncio.ensure_future()替代:
async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro()) ...
New in version 3.7.
Changed in version 3.8:增加參數
name
Sleeping
coroutine asyncio.sleep(delay, result=None, *, loop=None)
延遲秒數的阻塞。
如果提供結果,那在協程完成的時候會回傳給調用者。
sleep()
總是暫停當前task,允許其它tasks執行。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。協程範例,每秒顯示當前日期5秒:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
Running Tasks Concurrently
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
在asw序列中同時執行awaitable物件
如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。
如果所有的awaitables都成功的完成,彙總清單(list)。其順序相對應於aws中的awaitables的順序。
如果
return_exceptions
為False(預設),那第一個拋出的異常會立即被傳送到在gather()
上等待的task。如果
return_exceptions
為True,那異常會跟成功的結果一樣的處理方法,並彙總於結果清單。如果
gather()
被取消,所有提交的awaitables(未完成的)也會一併被取消。如果任一來自aws序列的Task或Future被取消,那就將它視為引發CancelledError,這種情況下不會取消
gather()
的呼叫。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。範例:
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24
Changed in version 3.7:如果gather本身被取消,那取消傳播與return_exceptions無關。
Shielding From Cancellation
awaitable asyncio.shield(aw, *, loop=None)
保護一個awaitable物件不被取消。
如果aw是一個協程,那就視為Task自動調度。
語法:
res = await shield(something())
等價於:
res = await something()
除非包含它的協程被取消,那在something()中執行的Task就不會被取消。從something()的觀點來看,取消沒有發生。儘管其調用者仍然被取消,但"await"表達式依然會拋出CancelledError。
如果
something()
被其它方式取消(由自身內部),那會同時取消shield()
。如果希望完全忽略取消(不建議),那函數
shield()
應該結合try/except,如下:try: res = await shield(something()) except CancelledError: res = None
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。Timeouts
coroutine asyncio.wait_for(aw, timeout, *, loop=None)
等待aw awaitable以超時完成。
如果aw是一個協程,那就視為Task自動調度。
timeout
可以是None
或是float或是int做為等待的秒數。如果timeout
是None
,那阻塞會一直到future完成。避免task取消,可以將task包裝在shield()
函數會等待,一直到future確實的取消,因此總等待時間也許會超過
timeout
。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。範例:
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
Changed in version 3.7:當aw因為timeout而取消,則
wait_for
等待aw被取消。之前的版本會立即拋出asyncio.TimeoutError。Waiting Primitives
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同時在aws上執行awaitable物件,並阻塞直到
return_when
指定的條件為止。回傳兩組Tasks/Futures:
(done, pending)
用法:
done, pending = await asyncio.wait(aws)
timeout
(float or ing),如果指定,可以用來控制回傳前等待的最大秒數。注意,這個函數並不會拋出
asyncio.TimeoutError
。發生timeout未完成的Futures或Tasks僅在第二組中回傳。return_when
指示這函數何時應該回傳。它必須為下面項目之一:不像wait_for(),當發生timeout的時候,
wait
並不會取消futures。Python 3.8之後不建議使用:如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。不建議將協程物件直接傳遞給
wait()
,因為它會導致混亂的行為。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。Note:
wait()
自動的將協程視為Task調度,稍後會回傳隱式建立的Task物件(done, pending)
。因此,下面程式碼將無法如預期般執行:async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run!
修復上面片段程式碼如下:
async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.
Python 3.8之後不建議使用:不建議直接將協程物件直接傳遞給
wait()
asyncio.as_completed(aws, *, loop=None, timeout=None)
同時在aws集中執行awaitable物件。回傳一個Future物件的迭代器。回傳的每一個Future物件表示剩餘的awaitables集的最早結果。
如果在所有的Futures完成之前發生timeout,則拋出asyncio.TimeoutError。
Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。範例:
for f in as_completed(aws): earliest_result = await f # ...
Scheduling From Other Threads
asyncio.run_coroutine_threadsafe(coro, loop)
提交協程到給定的事件迴圈。線程安全。
回傳concurrent.futures.Future以等待其它OS線程的結果。
這個函數意思是,從不同於執行事件循環的OS線程中調用。範例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果協程中拋出異常,那將通知回傳的Future。這也可以用來取消事件迴圈中的task:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
參考concurrency and multithreading
不像其它asyncio函數,這個函數要求顯式的傳遞參數
loop
New in version 3.5.1.
Introspection
asyncio.current_task(loop=None)
回傳目前執行中的Task實例,如果沒有Task在執行,則回傳
None
如果
loop is None
,那就用get_running_loop()來取得當前迴圈。New in version 3.7.
asyncio.all_tasks(loop=None)
回傳由迴圈執行的一組未完成的Task物件。
如果
loop is None
,那就用get_running_loop()來取得當前迴圈。New in version 3.7.
Task Object
class asyncio.Task(coro, *, loop=None, name=None)
執行Python協程的Future-like物件。非線程安全。
Tasks用於在事件迴圈中執行協程。如果協程等待Future,那Task會暫停協程的執行並等待Future完成。當Future完成,將繼續執行包裝的協程。
事件迴圈使用協同調度:事件迴圈一次執行一個Task。當Task等待協程完成的時候,事件迴圈會執行其它Task、回呼或執行IO操作。
使用高階函數asyncio.create_task(),或低階函數loop.create_task()或ensure_future()建立Tasks。不建議手動實例化Tasks。
要取消執行中的Task,請使用cancal()方法。呼叫它將導致Task向包裝的協程中拋出CancelledError異常。如果在取消過程中某個Future物件正在等待協程,那這個Future物件將被取消。
calcelled()可以用來確認Task是否已經被取消。如果包裝的協程沒有抑制CancelledError異常並確實的取消,那就回傳True。
asyncio.Task繼承Future所有的APIs,除了Future.set_result()與Future.set_exception()。
Tasks支援contextvars模組。建立Task的時候,它會複製當前的上下文,然後在複製的上下文中執行協程。
Changed in version 3.7: 增加支援contextvars模組。
Changed in version 3.8: 增加參數
name
。Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數
loop
。cancel()
請求取消Task。
這安排在事件迴圈的下一個週期將CancelledError異常拋出到包裝的協程中。
然後,協程有機會清楚,或甚至拒絕請求,透過利用
try except CancelledError finally
區塊來抑制異常。然而,不同於Future.cancel(),Task.cancel()並不保證Task一定被取消,儘管完全抑制取消並不常見,並且主動阻止。
下面範例說明協程如何攔截取消請求:
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
cancelled()
如果Task已經取消,則回傳True。
當使用calcel()請求取消,Task將被取消,並且包裝的協程將CancelledError異常傳播到Task中。
done()
如果Task完成,則回傳True。
當包裝的協程回傳值,拋出異常、或Task被取消,Task就算完成了。
result()
回傳Task的結果。
如果Task完成,那就回傳包裝的協程的結果(或者如何協程拋出異常,則重新拋出該異常)
如果Task已經被取消,這個方法將拋出CancelledError異常。
如果Task的結果尚不可用,這個方法會拋出InvalidStateError異常。
exception()
回傳Task的異常。
如果包裝的協程拋出異常,則回傳該異常。如果包裝的協程正常回傳,這個方法將回傳
None
。如果Task已經被取消,這個方法將拋出CancelledError異常。
如果Task尚未完成,這個方法會拋出InvalidStateError異常。
add_done_callback(callback, *, context=None)
增加一個callback function給Task完成的時候執行。
這個方法只能在基於低階回呼的程式碼中使用。
更多細節請參考Future.add_done_callback()
remove_done_callback(callback)
從callback清單中取消callback。
這個方法只能在基於低階回呼的程式碼中使用。
更多細節請參考Future.remove_done_callback()
get_stack(*, limit=None)
回傳該Task的堆疊框的清單(list of stack frames)。
如果包裝的協程還沒有完成,那就回傳掛起它的堆疊。如果協程已經成功地完成或被取消,那就回傳空清單(empty list)。如果協程被異常終止,那就回傳回溯框清單(list of traceback frames)。
框(frames)總是依著舊到新的排序。
對懸置的協程,只會回傳一個堆疊框(stack frame)。
選項參數
limit
設置回傳的框(frame)的最大數量;預設情況下回傳所有可用的框(frame)。回傳的清單順序依回傳是堆疊或回溯而有所不同:回傳最新的堆疊,但回傳最舊的回溯。(這與回溯模組的行為匹配。)print_stack(*, limit=None, file=None)
列印該Task的堆棧或回溯。
對於get_stack()檢索到的frames,將生成與回溯模組類似的輸出。
get_coro()
回傳Task包裝的協程。
New in version 3.8.
get_name()
回傳Task的名稱。
如果沒有為Task明確的分配名稱,則預設asyncio Task實現在實例化過程中生成一個預設名稱。
New in version 3.8.
set_name(value)
設置Task的名稱。
參數
value
可以是任意物件,然後將它轉為字串。在預設的Task實現中,名稱將在Task物件的
repr()
的輸出看的見。New in version 3.8.
classmethod all_tasks(loop=None)
回傳事件迴圈的所有Tasks集合。
預設情況下,回傳當前事件迴圈的所有Tasks。如果
loop is None
,則使用函數get_event_loop()取得當前迴圈。版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.all_tasks()。
classmethod current_task(loop=None)
回傳當前的task或None。
如果
loop is None
,則使用函數get_event_loop()取得當前迴圈。版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.current_task() 。
Generator-based Coroutines