隊(duì)列集?

源代碼: Lib/asyncio/queues.py


asyncio 隊(duì)列被設(shè)計(jì)成與 queue 模塊類(lèi)似。盡管 asyncio隊(duì)列不是線程安全的,但是他們是被設(shè)計(jì)專(zhuān)用于 async/await 代碼。

注意asyncio 的隊(duì)列沒(méi)有 timeout 形參;請(qǐng)使用 asyncio.wait_for() 函數(shù)為隊(duì)列添加超時(shí)操作。

參見(jiàn)下面的 Examples 部分。

Queue?

class asyncio.Queue(maxsize=0)?

先進(jìn),先出(FIFO)隊(duì)列

如果 maxsize 小于等于零,則隊(duì)列尺寸是無(wú)限的。如果是大于 0 的整數(shù),則當(dāng)隊(duì)列達(dá)到 maxsize 時(shí), await put() 將阻塞至某個(gè)元素被 get() 取出。

不像標(biāo)準(zhǔn)庫(kù)中的并發(fā)型 queue ,隊(duì)列的尺寸一直是已知的,可以通過(guò)調(diào)用 qsize() 方法返回。

在 3.10 版更改: Removed the loop parameter.

這個(gè)類(lèi)不是線程安全的(not thread safe)。

maxsize?

隊(duì)列中可存放的元素?cái)?shù)量。

empty()?

如果隊(duì)列為空返回 True ,否則返回 False 。

full()?

如果有 maxsize 個(gè)條目在隊(duì)列中,則返回 True 。

如果隊(duì)列用 maxsize=0 (默認(rèn))初始化,則 full() 永遠(yuǎn)不會(huì)返回 True 。

coroutine get()?

從隊(duì)列中刪除并返回一個(gè)元素。如果隊(duì)列為空,則等待,直到隊(duì)列中有元素。

get_nowait()?

立即返回一個(gè)隊(duì)列中的元素,如果隊(duì)列內(nèi)有值,否則引發(fā)異常 QueueEmpty 。

coroutine join()?

阻塞至隊(duì)列中所有的元素都被接收和處理完畢。

當(dāng)條目添加到隊(duì)列的時(shí)候,未完成任務(wù)的計(jì)數(shù)就會(huì)增加。每當(dāng)消費(fèi)協(xié)程調(diào)用 task_done() 表示這個(gè)條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計(jì)數(shù)就會(huì)減少。當(dāng)未完成計(jì)數(shù)降到零的時(shí)候, join() 阻塞被解除。

coroutine put(item)?

添加一個(gè)元素進(jìn)隊(duì)列。如果隊(duì)列滿(mǎn)了,在添加元素之前,會(huì)一直等待空閑插槽可用。

put_nowait(item)?

不阻塞的放一個(gè)元素入隊(duì)列。

如果沒(méi)有立即可用的空閑槽,引發(fā) QueueFull 異常。

qsize()?

返回隊(duì)列用的元素?cái)?shù)量。

task_done()?

表明前面排隊(duì)的任務(wù)已經(jīng)完成,即get出來(lái)的元素相關(guān)操作已經(jīng)完成。

由隊(duì)列使用者控制。每個(gè) get() 用于獲取一個(gè)任務(wù),任務(wù)最后調(diào)用 task_done() 告訴隊(duì)列,這個(gè)任務(wù)已經(jīng)完成。

如果 join() 當(dāng)前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個(gè) put() 進(jìn)隊(duì)列的條目的 task_done() 都被收到)。

如果被調(diào)用的次數(shù)多于放入隊(duì)列中的項(xiàng)目數(shù)量,將引發(fā) ValueError 。

優(yōu)先級(jí)隊(duì)列?

class asyncio.PriorityQueue?

Queue 的變體;按優(yōu)先級(jí)順序取出條目 (最小的先取出)。

條目通常是 (priority_number, data) 形式的元組。

后進(jìn)先出隊(duì)列?

class asyncio.LifoQueue?

Queue 的變體,先取出最近添加的條目(后進(jìn),先出)。

異常?

exception asyncio.QueueEmpty?

當(dāng)隊(duì)列為空的時(shí)候,調(diào)用 get_nowait() 方法而引發(fā)這個(gè)異常。

exception asyncio.QueueFull?

當(dāng)隊(duì)列中條目數(shù)量已經(jīng)達(dá)到它的 maxsize 的時(shí)候,調(diào)用 put_nowait() 方法而引發(fā)的異常。

例子?

隊(duì)列能被用于多個(gè)的并發(fā)任務(wù)的工作量分配:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())