2020年1月16日 星期四

Coroutines-and-Tasks翻譯

Coroutines and Tasks(翻譯)

這章節概略說明用於協程、任務的高階asyncio的APIs。

Coroutines

使用async/await語法宣告的協程是寫asyncio應用程式最好的方法。舉例來說,下面片段程式碼(需要Python 3.7+)列印"hello",等待1秒,然後列印"world":

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意到,單純的呼叫協程並不會調度它被執行:

>>> main()
<coroutine object main at 0x1053bb7c8>

要實際的執行協程,asyncio提供三種機制:

  • 使用函數asyncio.run()來執行頂層的入口"main()"函數(見上範例)
  • 協程中的等待。下面程式碼片段會在等待1秒之後列印"hello",,然後在等待另外2秒之後列印"world":
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

預期輸出如下:

started at 17:13:52
hello
world
finished at 17:13:55

讓我們調整上面範例,並且同時執行兩個協程say_after

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

注意到,預期輸出現在顯示出這程式碼片段比之前還要快一秒:

started at 17:14:32
hello
world
finished at 17:14:34

Awaitables

我們說,如果一個物件可以用於await表達示中,那它就是一個可等待物件。許多asyncio APIs被設計為接受等待。

有三種類型的可等待物件:coroutines, Tasks, and Futures

Coroutines

Python協程是可等待的,因此可以從其它協程中等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要:這文件中的術語"協程"可以用於兩個緊密相關的概念:

  • 協程函數:一個async def函數
  • 協程物件:透過呼叫協程函數回傳的物件

asyncio亦支援傳統基於生程的協程。

Tasks

Tasks用於並行調度協程。

當協程被包裝到具有像是asyncio.create_task()函數的任務內時,這個協程會很快的自動調度來執行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future是一個特別的低階可等待物件,代表一個非同步(異步)操作的事件結果。

當Future物件為awaited的時候,意味著協程會等待,一直到Future在其它位置被解析。

asyncio中的Future物件需要允許基於回呼程式碼與async/await一起使用。

一般來說,不需要在應用程式級別程式碼中建立Future物件。

Future objects, sometimes exposed by libraries and some asyncio APIs, can be awaited:

Future物件(有些時候會由套件或asyncio APIs公開)可以是awaited:

    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一個回傳Future物件的低階函數範例為loop.run_in_executor()

Running an asyncio Program

  • asyncio.run(coro, *, debug=False)
    執行協程並回傳結果。

    這個函數執行一個傳遞過來的協程,並負責管理asyncio事件迴圈以及完成非同步生成器。

    當另一個asyncio事件迴圈在相同thread(執行緒)執行的時候,這函數無法被呼叫。

    如果debug=True,那事件迴圈會在除錯模式中執行。

    這函數總是建立一個新的事件迴圈,並在最後關閉它。它應該用做為asyncio程式的主要入口點,而且理想情況下應該只調用一次。
    範例:

    async def main():
        await asyncio.sleep(1)
        print('hello')
    
    asyncio.run(main())
    

    New in version 3.7.

    原始碼可以在Lib/asyncio/runners.py找到

Creating Tasks

  • asyncio.create_task(coro, *, name=None)
    將協程包裝到Task中,並調度它的執行。回傳Task物件。

    如果它的name不為None,則使用Task.set_name()來設置task名稱。

    task在get_running_loop()回傳的迴圈中執行,如果當前的執行緒(thread)沒有正在執行中的迴圈,那就拋出RuntimeError

    這個函數在Python 3.7中被加入。Python 3.7之前的版本,以低階函數asyncio.ensure_future()替代:

    async def coro():
    ...
    
    # In Python 3.7+
    task = asyncio.create_task(coro())
    ...
    
    # This works in all Python versions but is less readable
    task = asyncio.ensure_future(coro())
    ...
    

    New in version 3.7.

    Changed in version 3.8:增加參數name

Sleeping

  • coroutine asyncio.sleep(delay, result=None, *, loop=None)
    延遲秒數的阻塞。

    如果提供結果,那在協程完成的時候會回傳給調用者。

    sleep()總是暫停當前task,允許其它tasks執行。

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

    協程範例,每秒顯示當前日期5秒:

    import asyncio
    import datetime
    
    async def display_date():
        loop = asyncio.get_running_loop()
        end_time = loop.time() + 5.0
        while True:
            print(datetime.datetime.now())
            if (loop.time() + 1.0) >= end_time:
                break
            await asyncio.sleep(1)
    
    asyncio.run(display_date())
    

Running Tasks Concurrently

  • awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
    在asw序列中同時執行awaitable物件

    如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。

    如果所有的awaitables都成功的完成,彙總清單(list)。其順序相對應於aws中的awaitables的順序。

    如果return_exceptions為False(預設),那第一個拋出的異常會立即被傳送到在gather()上等待的task。

    如果return_exceptions為True,那異常會跟成功的結果一樣的處理方法,並彙總於結果清單。

    如果gather()被取消,所有提交的awaitables(未完成的)也會一併被取消。

    如果任一來自aws序列的Task或Future被取消,那就將它視為引發CancelledError,這種情況下不會取消gather()的呼叫。

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

    範例:

    import asyncio
    
    async def factorial(name, number):
        f = 1
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
        
    async def main():
        # Schedule three calls *concurrently*:
        await asyncio.gather(
            factorial("A", 2),
            factorial("B", 3),
            factorial("C", 4),
        )
        
    asyncio.run(main())
    
    # Expected output:
    #
    #     Task A: Compute factorial(2)...
    #     Task B: Compute factorial(2)...
    #     Task C: Compute factorial(2)...
    #     Task A: factorial(2) = 2
    #     Task B: Compute factorial(3)...
    #     Task C: Compute factorial(3)...
    #     Task B: factorial(3) = 6
    #     Task C: Compute factorial(4)...
    #     Task C: factorial(4) = 24
    

    Changed in version 3.7:如果gather本身被取消,那取消傳播與return_exceptions無關。

Shielding From Cancellation

  • awaitable asyncio.shield(aw, *, loop=None)
    保護一個awaitable物件不被取消

    如果aw是一個協程,那就視為Task自動調度。

    語法:

    res = await shield(something())
    

    等價於:

    res = await something()
    

    除非包含它的協程被取消,那在something()中執行的Task就不會被取消。從something()的觀點來看,取消沒有發生。儘管其調用者仍然被取消,但"await"表達式依然會拋出CancelledError

    如果something()被其它方式取消(由自身內部),那會同時取消shield()

    如果希望完全忽略取消(不建議),那函數shield()應該結合try/except,如下:

    try:
        res = await shield(something())
    except CancelledError:
        res = None
    

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

Timeouts

  • coroutine asyncio.wait_for(aw, timeout, *, loop=None)
    等待aw awaitable以超時完成。

    如果aw是一個協程,那就視為Task自動調度。

    timeout可以是None或是float或是int做為等待的秒數。如果timeoutNone,那阻塞會一直到future完成。

    避免task取消,可以將task包裝在shield()

    函數會等待,一直到future確實的取消,因此總等待時間也許會超過timeout

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

    範例:

    async def eternity():
        # Sleep for one hour
        await asyncio.sleep(3600)
        print('yay!')
    async def main():
        # Wait for at most 1 second
        try:
            await asyncio.wait_for(eternity(), timeout=1.0)
        except asyncio.TimeoutError:
            print('timeout!') 
            
    asyncio.run(main())
    
    # Expected output:
    #
    #     timeout!    
    

    Changed in version 3.7:當aw因為timeout而取消,則wait_for等待aw被取消。之前的版本會立即拋出asyncio.TimeoutError

Waiting Primitives

  • coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    同時在aws上執行awaitable物件,並阻塞直到return_when指定的條件為止。

    回傳兩組Tasks/Futures:(done, pending)

    用法:

    done, pending = await asyncio.wait(aws)
    

    timeout(float or ing),如果指定,可以用來控制回傳前等待的最大秒數。

    注意,這個函數並不會拋出asyncio.TimeoutError。發生timeout未完成的Futures或Tasks僅在第二組中回傳。

    return_when指示這函數何時應該回傳。它必須為下面項目之一:

    Constant Description
    FIRST_COMPLETED The function will return when any future finishes or is cancelled.
    FIRST_EXCEPTION The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.
    ALL_COMPLETED The function will return when all futures finish or are cancelled.

    不像wait_for(),當發生timeout的時候,wait並不會取消futures。

    Python 3.8之後不建議使用:如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。不建議將協程物件直接傳遞給wait(),因為它會導致混亂的行為。

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

    Note:wait()自動的將協程視為Task調度,稍後會回傳隱式建立的Task物件(done, pending)。因此,下面程式碼將無法如預期般執行:

    async def foo():
        return 42
    
    coro = foo()
    done, pending = await asyncio.wait({coro})
    
    if coro in done:
        # This branch will never be run!
    

    修復上面片段程式碼如下:

    async def foo():
        return 42
    
    task = asyncio.create_task(foo())
    done, pending = await asyncio.wait({task})
    
    if task in done:
        # Everything will work as expected now.
    

    Python 3.8之後不建議使用:不建議直接將協程物件直接傳遞給wait()

  • asyncio.as_completed(aws, *, loop=None, timeout=None)
    同時在aws集中執行awaitable物件。回傳一個Future物件的迭代器。回傳的每一個Future物件表示剩餘的awaitables集的最早結果。

    如果在所有的Futures完成之前發生timeout,則拋出asyncio.TimeoutError

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

    範例:

    for f in as_completed(aws):
        earliest_result = await f
        # ... 
    

Scheduling From Other Threads

  • asyncio.run_coroutine_threadsafe(coro, loop)
    提交協程到給定的事件迴圈。線程安全。

    回傳concurrent.futures.Future以等待其它OS線程的結果。

    這個函數意思是,從不同於執行事件循環的OS線程中調用。範例:

    # Create a coroutine
    coro = asyncio.sleep(1, result=3)
    
    # Submit the coroutine to a given loop
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    
    # Wait for the result with an optional timeout argument
    assert future.result(timeout) == 3   
    

    如果協程中拋出異常,那將通知回傳的Future。這也可以用來取消事件迴圈中的task:

    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        print('The coroutine took too long, cancelling the task...')
        future.cancel()
    except Exception as exc:
        print(f'The coroutine raised an exception: {exc!r}')
    else:
        print(f'The coroutine returned: {result!r}')
    

    參考concurrency and multithreading

    不像其它asyncio函數,這個函數要求顯式的傳遞參數loop

    New in version 3.5.1.

Introspection

  • asyncio.current_task(loop=None)
    回傳目前執行中的Task實例,如果沒有Task在執行,則回傳None

    如果loop is None,那就用get_running_loop()來取得當前迴圈。

    New in version 3.7.

  • asyncio.all_tasks(loop=None)
    回傳由迴圈執行的一組未完成的Task物件。

    如果loop is None,那就用get_running_loop()來取得當前迴圈。

    New in version 3.7.

Task Object

  • class asyncio.Task(coro, *, loop=None, name=None)
    執行Python協程的Future-like物件。非線程安全。

    Tasks用於在事件迴圈中執行協程。如果協程等待Future,那Task會暫停協程的執行並等待Future完成。當Future完成,將繼續執行包裝的協程。

    事件迴圈使用協同調度:事件迴圈一次執行一個Task。當Task等待協程完成的時候,事件迴圈會執行其它Task、回呼或執行IO操作。

    使用高階函數asyncio.create_task(),或低階函數loop.create_task()ensure_future()建立Tasks。不建議手動實例化Tasks。

    要取消執行中的Task,請使用cancal()方法。呼叫它將導致Task向包裝的協程中拋出CancelledError異常。如果在取消過程中某個Future物件正在等待協程,那這個Future物件將被取消。

    calcelled()可以用來確認Task是否已經被取消。如果包裝的協程沒有抑制CancelledError異常並確實的取消,那就回傳True。

    asyncio.Task繼承Future所有的APIs,除了Future.set_result()Future.set_exception()

    Tasks支援contextvars模組。建立Task的時候,它會複製當前的上下文,然後在複製的上下文中執行協程。

    Changed in version 3.7: 增加支援contextvars模組。

    Changed in version 3.8: 增加參數name

    Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數loop

cancel()

請求取消Task。

這安排在事件迴圈的下一個週期將CancelledError異常拋出到包裝的協程中。

然後,協程有機會清楚,或甚至拒絕請求,透過利用try except CancelledError finally區塊來抑制異常。

然而,不同於Future.cancel()Task.cancel()並不保證Task一定被取消,儘管完全抑制取消並不常見,並且主動阻止。

下面範例說明協程如何攔截取消請求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now

cancelled()

如果Task已經取消,則回傳True。

當使用calcel()請求取消,Task將被取消,並且包裝的協程將CancelledError異常傳播到Task中。

done()

如果Task完成,則回傳True。

當包裝的協程回傳值,拋出異常、或Task被取消,Task就算完成了。

result()

回傳Task的結果。

如果Task完成,那就回傳包裝的協程的結果(或者如何協程拋出異常,則重新拋出該異常)

如果Task已經被取消,這個方法將拋出CancelledError異常。

如果Task的結果尚不可用,這個方法會拋出InvalidStateError異常。

exception()

回傳Task的異常。

如果包裝的協程拋出異常,則回傳該異常。如果包裝的協程正常回傳,這個方法將回傳None

如果Task已經被取消,這個方法將拋出CancelledError異常。

如果Task尚未完成,這個方法會拋出InvalidStateError異常。

add_done_callback(callback, *, context=None)

增加一個callback function給Task完成的時候執行。

這個方法只能在基於低階回呼的程式碼中使用。

更多細節請參考Future.add_done_callback()

remove_done_callback(callback)

從callback清單中取消callback。

這個方法只能在基於低階回呼的程式碼中使用。

更多細節請參考Future.remove_done_callback()

get_stack(*, limit=None)

回傳該Task的堆疊框的清單(list of stack frames)。

如果包裝的協程還沒有完成,那就回傳掛起它的堆疊。如果協程已經成功地完成或被取消,那就回傳空清單(empty list)。如果協程被異常終止,那就回傳回溯框清單(list of traceback frames)。

框(frames)總是依著舊到新的排序。

對懸置的協程,只會回傳一個堆疊框(stack frame)。

選項參數limit設置回傳的框(frame)的最大數量;預設情況下回傳所有可用的框(frame)。回傳的清單順序依回傳是堆疊或回溯而有所不同:回傳最新的堆疊,但回傳最舊的回溯。(這與回溯模組的行為匹配。)

列印該Task的堆棧或回溯。

對於get_stack()檢索到的frames,將生成與回溯模組類似的輸出。

get_coro()

回傳Task包裝的協程。

New in version 3.8.

get_name()

回傳Task的名稱。

如果沒有為Task明確的分配名稱,則預設asyncio Task實現在實例化過程中生成一個預設名稱。

New in version 3.8.

set_name(value)

設置Task的名稱。

參數value可以是任意物件,然後將它轉為字串。

在預設的Task實現中,名稱將在Task物件的repr()的輸出看的見。

New in version 3.8.

classmethod all_tasks(loop=None)

回傳事件迴圈的所有Tasks集合。

預設情況下,回傳當前事件迴圈的所有Tasks。如果loop is None,則使用函數get_event_loop()取得當前迴圈。

版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.all_tasks()

classmethod current_task(loop=None)

回傳當前的task或None。

如果loop is None,則使用函數get_event_loop()取得當前迴圈。

版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用asyncio.current_task()

Generator-based Coroutines