本次来聊一聊 Python/ target=_blank class=infotextkey>Python 的队列,首先队列是一种特殊的线性表,具有先进先出(FIFO)的特性,这意味着元素的入队顺序和出队顺序是一致的。
队列通常用于存储需要按顺序处理的数据,例如任务调度。当然队列最常见的一个应用场景就是解耦,一个线程不停地生产数据,放到队列里,另一个线程从队列中取数据进行消费。
而 Python 也提供了队列,分别是协程队列和线程队列。
import asyncio
import queue
# 协程队列
coroutine_queue = asyncio.Queue()
# 线程队列
threading_queue = queue.Queue()
如果你的程序基于 asyncio,那么应该使用协程队列,如果你的程序采用了多线程,那么应该使用线程队列。
下面我们来看一看这两种队列的 API,以及底层实现原理。
协程队列
协程队列的具体实现由 asyncio 提供,以下是它的一些用法。
import asyncio
async def mAIn():
# 创建队列时可以指定能够存储的最大元素个数
# 不指定则没有容量限制
queue = asyncio.Queue(maxsize=20)
# 返回容量
print(queue.maxsize)
"""
20
"""
# 添加元素,如果队列满了会阻塞,直到有剩余空间
await queue.put(111)
# 添加元素,如果队列满了会抛异常
# 因为不需要阻塞等待,所以 put_nowait 不是协程函数
queue.put_nowait(222)
# 队列是否已满
print(queue.full())
"""
False
"""
# 返回队列内部的元素个数
print(queue.qsize())
"""
2
"""
# 从队列中获取元素,如果队列为空,会阻塞,直到队列中有可用元素
print(await queue.get())
"""
111
"""
# 从队列中获取元素,如果队列为空,会抛异常
# 因为不需要阻塞等待,所以 put_nowait 不是协程函数
print(queue.get_nowait())
"""
222
"""
# 队列是否为空
print(queue.empty())
"""
True
"""
asyncio.run(main())
所以协程队列的 API 很简单,我们再罗列一下:
图片
然后,协程队列还有两个 API,需要单独说明,分别是 task_done() 和 join()。
首先在协程队列内部有一个 _unfinished_tasks 属性,初始值为 0,每当往队列添加一个元素时,该属性的值就会自增 1。但是从队列取出元素时,该属性不会自动减 1,需要手动调用 task_done() 方法。
所以 _unfinished_tasks 记录了队列中有多少个任务数据需要处理,每来一个自动加 1,但取走一个不会自动减 1,而是需要 task_done 来实现。
然后 join() 的作用是,当 _unfinished_tasks 不为 0 的时候,await queue.join() 会阻塞,直到为 0。
import asyncio
async def consumer(queue, n):
print(f"consumer{n} 开始消费")
await asyncio.sleep(3)
await queue.get()
# 获取数据后,调用 task_done
queue.task_done()
print(f"consumer{n} 消费完毕")
async def main():
queue = asyncio.Queue()
await queue.put(123)
await queue.put(456)
await queue.put(789)
# 队列里面有三个数据,开启三个消费者去消费
await asyncio.gather(
consumer(queue, 1),
consumer(queue, 2),
consumer(queue, 3),
)
# 这里会陷入阻塞,直到 _unfinished_tasks 变为 0
await queue.join()
print("main 解除阻塞")
asyncio.run(main())
"""
consumer1 开始消费
consumer2 开始消费
consumer3 开始消费
consumer1 消费完毕
consumer2 消费完毕
consumer3 消费完毕
main 解除阻塞
"""
还是比较简单的,然后我们来看一下协程队列的具体实现细节。
图片
首先协程队列内部有一个 _queue 属性,它是一个双端队列,负责保存具体的元素。因为要保证两端的操作都是高效的,所以采用双端队列实现。
然后是 _getters 和 _putters 两个属性,它们是做什么的呢?在队列满了的时候,协程往队列添加元素时会陷入阻塞,等到队列有剩余空间时会解除阻塞。同理,在队列为空时,协程从队列获取元素时会陷入阻塞,等到队列有可用元素时会解除阻塞。
那么这个阻塞等待,以及自动唤醒并解除阻塞是怎么实现的呢?在介绍锁和信号量的时候,我们分析过整个实现过程,协程队列与之类似。
假设协程从队列获取元素,但是队列为空,于是会创建一个 Future 对象,并保存起来,当前保存的地方就是 _getters,它也是双端队列。然后 await future,此时就会陷入阻塞,当其它协程往队列中添加元素时,会将 _getters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用元素了。
同理,协程往队列添加元素也是如此,如果队列满了,同样创建一个 Future 对象,并保存起来,当前保存的地方就是 _putters。然后 await future,陷入阻塞,当其它协程从队列中取出元素,会将 _putters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用空间了。
图片
三个内部调用的方法,_get 方法负责从队列的头部弹出元素,_put 方法负责从队列的尾部追加元素,比较简单。然后是 _wakeup_next 方法,它负责唤醒阻塞的协程。参数 waiters 要么是 _getters,要么是 _putters,从里面弹出一个 future,设置结果集,让对应的协程解除阻塞。
图片
图片
然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往队列添加元素。
如果添加时发现队列已满,那么抛出异常。如果未满,则调用 _put 方法往 _queue 里面添加元素,因为元素的实际存储是由 self._queue 这个双端队列负责的。
添加完毕后,将 _unfinished_task 加 1。最后从 _getters 里面弹出 future,设置结果集,让因获取不到元素而陷入阻塞的协程解除阻塞(同时会将添加的元素取走)。
get_nowait 的逻辑也很简单,如果队列为空,直接抛异常。如果不为空,则调用 _get 方法从队列中弹出元素。最后从 _putters 里面弹出 future,设置结果集,让因队列已满、无法添加元素而陷入阻塞的协程解除阻塞(同时会将元素添加进队列)。
再来看看 put 方法的实现细节:
图片
结果和我们之前分析的一样,只是源码内部多做了一些异常检测。再来看看 get 方法,它的实现细节和 put 是类似的。
图片
比较简单,还是没什么难度的,最后再来看看 task_done 和 join 两个方法。
图片
协程队列里面使用了 asyncio.Event,它表示事件,如果事件对象没有调用 set 方法设置标志位,那么调用 wait 方法时会陷入阻塞。当事件对象调用 set 方法时,wait 会解除阻塞。
所以协程队列的 join 方法的逻辑就是,当 _unfinished_tasks 大于 0 时,调用事件对象的 wait 方法陷入阻塞。
而 task_done 方法的作用就是将 _unfinished_tasks 减 1,当它的值属性为 0 时,调用事件对象的 set 方法,让 join 解除阻塞。
以上就是整个协程队列的实现细节,具体的元素存储是由 collections.deque 来承载的。并在队列已满或者为空时,通过 Future 对象来实现阻塞等待和自动唤醒。
另外除了先进先出队列之外,还有先进后出队列,一般称为 LIFO 队列,它的效果类似于栈。
图片
这个没什么好说的,因为是先进后出,所以添加和弹出都在同一端,直接使用列表实现即可。并且由于 LifoQueue 继承 Queue,所以它的 API 和普通的协程队列是一样的。
除了先进先出队列,还有一个优先队列。
图片
它的 API 和普通的协程队列也是一致的,只不过优先队列在添加元素时,需要指定一个优先级:(优先级, 元素),优先级的值越低,表示优先级越高。然后在内部,会按照优先级的高低,维护一个小根堆,堆顶元素便是优先级最高的元素。
这几个队列具体使用哪一种,则取决于具体的业务场景。
说完了协程队列,再来看看线程队列,它们的 API 是类似的,但实现细节则不同。因为操作系统感知不到协程,所以协程队列的阻塞等待是基于 Future 实现的,而线程队列的阻塞等待是基于条件变量(和互斥锁)实现的。
还是先来看看线程队列的一些 API,和协程队列是类似的。
from queue import Queue
# 可以指定一个 maxsize 参数,表示队列的容量
# 默认为 0,表示队列的容量无限
queue = Queue(maxsize=20)
# 查看容量
print(queue.maxsize)
"""
20
"""
# 查看队列的元素个数
print(queue.qsize())
"""
0
"""
# 判断队列是否已满
print(queue.full())
"""
False
"""
# 判断队列是否为空
print(queue.empty())
"""
True
"""
# 往队列中添加元素
# block 参数表示是否阻塞,默认为 True,当队列已满时,线程会阻塞
# timeout 表示超时时间,默认为 None,表示会无限等待
# 当然也可以给 timeout 传一个具体的值
# 如果在规定时间内,没有将元素放入队列,那么抛异常
queue.put(123, block=True, timeout=None)
# 也是往队列中添加元素,但是当队列已满时,会直接抛异常
# put_nowait(item) 本质上就是 put(item, block=False)
queue.put_nowait(456)
# 从队列中取出元素
# 同样可以传递 block 和 timeout 参数
# block 默认为 True,当队列为空时会陷入阻塞
# timeout 默认为 None,表示会无限等待
print(queue.get(block=True, timeout=None))
"""
123
"""
# 也是从队列中取出元素,但是当队列为空时,会直接抛异常
# get_nowait() 本质上就是 get(block=False)
print(queue.get_nowait())
"""
456
"""
# task_done(),将 unfinished_tasks 属性的值减 1
print(queue.unfinished_tasks)
"""
2
"""
queue.task_done()
queue.task_done()
print(queue.unfinished_tasks)
"""
0
"""
# join(),当 unfinished_tasks 不为 0 时,陷入阻塞
queue.join()
API 和协程队列是相似的,我们罗列一下:
图片
线程队列的具体使用我们已经知道了,下面来看看它的具体实现。
图片
线程队列的内部依旧使用双端队列进行元素存储,并且还使用了一个互斥锁和三个条件变量。
为了保证数据的一致性和线程安全,当队列在多线程环境中被修改(比如添加或删除元素)时,需要使用互斥锁。任何需要修改队列的操作都必须在获取到互斥锁之后进行,以防止多个线程同时对队列进行修改,否则会导致数据不一致或其它错误。同时,一旦对队列的修改完成,必须立即释放互斥锁,以便其它线程可以访问队列。
然后是 not_empty 条件变量,当一个新元素被添加到队列时,应该向 not_empty发送一个信号。这个动作会通知那些想从队列中获取元素,但因队列为空而陷入阻塞的线程,现在队列中已经有了新的元素,它们可以继续执行获取元素的操作。
接下来是 not_full 条件变量,当从队列中取走一个元素时,应该向 not_full 发送一个信号。这个动作通知那些想往队列添加元素,但因队列已满而陷入阻塞的线程,现在队列中已经有了可用空间,它们可以继续执行添加元素的操作。
最后是 all_tasks_done 条件变量,当处理的任务全部完成,即计数器 unfinished_task 为 0 时,应该向 all_tasks_done 发送一个信号。这个动作会通知那些执行了 join() 方法而陷入阻塞的线程,它们可以继续往下执行了。
图片
因为线程队列采用了双端队列存储元素,所以双端队列的长度就是线程队列的元素个数。如果元素个数为 0,那么队列就是空;如果容量大于 0,并且小于等于元素个数,那么队列就满了。
图片
前面说了,put_nowait 和 get_nowait 本质上就是调用了 put 和 get,所以我们的重点是 put 和 get 两个方法。
图片
以上就是 put 方法的底层实现,不难理解。说完了 put,再来看看 get。
图片
最后是 task_done 和 join 方法,看看它们的内部逻辑。
图片
调用 join 方法,当 unfinished_task 大于 0 时,会陷入阻塞。调用 task_done 方法,会将未完成任务数减 1,如果为 0,那么唤醒阻塞等待的线程。
需要注意的是,唤醒调用的方法不是 notify,而是 notify_all。对于添加元素和获取元素,每次显然只能唤醒一个线程,此时调用 notify。而 unfinished_task 为 0 时,应该要唤醒所有等待的线程,因此要调用 notify_all。
最后线程队列也有相应的 PriorityQueue 和 LifoQueue,它们的用法、实现和协程里面的这两个队列是一样的。
以上便是协程队列和线程队列的具体用法和实现原理,它们本质上都是基于双端队列实现具体的元素存储,并且在队列已满和队列为空时,可以阻塞等待。
只不过协程队列是通过 Future 对象实现的,而线程队列是通过条件变量实现的。
当然,除了协程队列和线程队列,还有进程队列,但进程队列要复杂的多。因此关于进程队列的实现细节,我们以后专门花篇幅去介绍。