您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

源码解密协程队列和线程队列的实现原理

时间:2023-12-05 15:39:24  来源:  作者:古明地觉的编程教室

本次来聊一聊 Python/ target=_blank class=infotextkey>Python 的队列,首先队列是一种特殊的线性表,具有先进先出(FIFO)的特性,这意味着元素的入队顺序和出队顺序是一致的。

源码解密协程队列和线程队列的实现原理

队列通常用于存储需要按顺序处理的数据,例如任务调度。当然队列最常见的一个应用场景就是解耦,一个线程不停地生产数据,放到队列里,另一个线程从队列中取数据进行消费。

而 Python 也提供了队列,分别是协程队列和线程队列。

import asyncio
import queue

# 协程队列
coroutine_queue = asyncio.Queue()
# 线程队列
threading_queue = queue.Queue()

如果你的程序基于 asyncio,那么应该使用协程队列,如果你的程序采用了多线程,那么应该使用线程队列。

下面我们来看一看这两种队列的 API,以及底层实现原理。

协程队列

协程队列的具体实现由 asyncio 提供,以下是它的一些用法。

import asyncio

async def mAIn():
    # 创建队列时可以指定能够存储的最大元素个数
    # 不指定则没有容量限制
    queue = asyncio.Queue(maxsize=20)
    # 返回容量
    print(queue.maxsize)
    """
    20
    """
    # 添加元素,如果队列满了会阻塞,直到有剩余空间
    await queue.put(111)
    # 添加元素,如果队列满了会抛异常
    # 因为不需要阻塞等待,所以 put_nowait 不是协程函数
    queue.put_nowait(222)
    # 队列是否已满
    print(queue.full())
    """
    False
    """
    # 返回队列内部的元素个数
    print(queue.qsize())
    """
    2
    """
    # 从队列中获取元素,如果队列为空,会阻塞,直到队列中有可用元素
    print(await queue.get())
    """
    111
    """
    # 从队列中获取元素,如果队列为空,会抛异常
    # 因为不需要阻塞等待,所以 put_nowait 不是协程函数
    print(queue.get_nowait())
    """
    222
    """
    # 队列是否为空
    print(queue.empty())
    """
    True
    """

asyncio.run(main())

所以协程队列的 API 很简单,我们再罗列一下:

源码解密协程队列和线程队列的实现原理图片

然后,协程队列还有两个 API,需要单独说明,分别是 task_done() 和 join()。

首先在协程队列内部有一个 _unfinished_tasks 属性,初始值为 0,每当往队列添加一个元素时,该属性的值就会自增 1。但是从队列取出元素时,该属性不会自动减 1,需要手动调用 task_done() 方法。

所以 _unfinished_tasks 记录了队列中有多少个任务数据需要处理,每来一个自动加 1,但取走一个不会自动减 1,而是需要 task_done 来实现。

然后 join() 的作用是,当 _unfinished_tasks 不为 0 的时候,await queue.join() 会阻塞,直到为 0。

import asyncio

async def consumer(queue, n):
    print(f"consumer{n} 开始消费")
    await asyncio.sleep(3)
    await queue.get()
    # 获取数据后,调用 task_done
    queue.task_done()
    print(f"consumer{n} 消费完毕")

async def main():
    queue = asyncio.Queue()
    await queue.put(123)
    await queue.put(456)
    await queue.put(789)
    # 队列里面有三个数据,开启三个消费者去消费
    await asyncio.gather(
        consumer(queue, 1),
        consumer(queue, 2),
        consumer(queue, 3),
    )
    # 这里会陷入阻塞,直到 _unfinished_tasks 变为 0
    await queue.join()
    print("main 解除阻塞")


asyncio.run(main())
"""
consumer1 开始消费
consumer2 开始消费
consumer3 开始消费
consumer1 消费完毕
consumer2 消费完毕
consumer3 消费完毕
main 解除阻塞
"""

还是比较简单的,然后我们来看一下协程队列的具体实现细节。

源码解密协程队列和线程队列的实现原理图片

首先协程队列内部有一个 _queue 属性,它是一个双端队列,负责保存具体的元素。因为要保证两端的操作都是高效的,所以采用双端队列实现。

然后是 _getters 和 _putters 两个属性,它们是做什么的呢?在队列满了的时候,协程往队列添加元素时会陷入阻塞,等到队列有剩余空间时会解除阻塞。同理,在队列为空时,协程从队列获取元素时会陷入阻塞,等到队列有可用元素时会解除阻塞。

那么这个阻塞等待,以及自动唤醒并解除阻塞是怎么实现的呢?在介绍锁和信号量的时候,我们分析过整个实现过程,协程队列与之类似。

假设协程从队列获取元素,但是队列为空,于是会创建一个 Future 对象,并保存起来,当前保存的地方就是 _getters,它也是双端队列。然后 await future,此时就会陷入阻塞,当其它协程往队列中添加元素时,会将 _getters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用元素了。

同理,协程往队列添加元素也是如此,如果队列满了,同样创建一个 Future 对象,并保存起来,当前保存的地方就是 _putters。然后 await future,陷入阻塞,当其它协程从队列中取出元素,会将 _putters 里面的 future 弹出,设置结果集。因此 await future 的协程就会解除阻塞,因为队列有可用空间了。

源码解密协程队列和线程队列的实现原理图片

三个内部调用的方法,_get 方法负责从队列的头部弹出元素,_put 方法负责从队列的尾部追加元素,比较简单。然后是 _wakeup_next 方法,它负责唤醒阻塞的协程。参数 waiters 要么是 _getters,要么是 _putters,从里面弹出一个 future,设置结果集,让对应的协程解除阻塞。

源码解密协程队列和线程队列的实现原理图片

  • qsize() 负责返回队列的元素个数;
  • maxsize 负责返回队列的容量;
  • empty() 负责判断队列是否为空;
  • full() 负责判断队列是否已满,如果容量小于等于 0,那么表示容量无限,队列永远不会满。否则判断元素个数是否大于等于容量;

源码解密协程队列和线程队列的实现原理图片

然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往队列添加元素。

如果添加时发现队列已满,那么抛出异常。如果未满,则调用 _put 方法往 _queue 里面添加元素,因为元素的实际存储是由 self._queue 这个双端队列负责的。

添加完毕后,将 _unfinished_task 加 1。最后从 _getters 里面弹出 future,设置结果集,让因获取不到元素而陷入阻塞的协程解除阻塞(同时会将添加的元素取走)。

get_nowait 的逻辑也很简单,如果队列为空,直接抛异常。如果不为空,则调用 _get 方法从队列中弹出元素。最后从 _putters 里面弹出 future,设置结果集,让因队列已满、无法添加元素而陷入阻塞的协程解除阻塞(同时会将元素添加进队列)。

再来看看 put 方法的实现细节:

源码解密协程队列和线程队列的实现原理图片

结果和我们之前分析的一样,只是源码内部多做了一些异常检测。再来看看 get 方法,它的实现细节和 put 是类似的。

源码解密协程队列和线程队列的实现原理图片

比较简单,还是没什么难度的,最后再来看看 task_done 和 join 两个方法。

源码解密协程队列和线程队列的实现原理图片

协程队列里面使用了 asyncio.Event,它表示事件,如果事件对象没有调用 set 方法设置标志位,那么调用 wait 方法时会陷入阻塞。当事件对象调用 set 方法时,wait 会解除阻塞。

所以协程队列的 join 方法的逻辑就是,当 _unfinished_tasks 大于 0 时,调用事件对象的 wait 方法陷入阻塞。

而 task_done 方法的作用就是将 _unfinished_tasks 减 1,当它的值属性为 0 时,调用事件对象的 set 方法,让 join 解除阻塞。

以上就是整个协程队列的实现细节,具体的元素存储是由 collections.deque 来承载的。并在队列已满或者为空时,通过 Future 对象来实现阻塞等待和自动唤醒。

另外除了先进先出队列之外,还有先进后出队列,一般称为 LIFO 队列,它的效果类似于栈。

源码解密协程队列和线程队列的实现原理图片

这个没什么好说的,因为是先进后出,所以添加和弹出都在同一端,直接使用列表实现即可。并且由于 LifoQueue 继承 Queue,所以它的 API 和普通的协程队列是一样的。

除了先进先出队列,还有一个优先队列。

源码解密协程队列和线程队列的实现原理图片

它的 API 和普通的协程队列也是一致的,只不过优先队列在添加元素时,需要指定一个优先级:(优先级, 元素),优先级的值越低,表示优先级越高。然后在内部,会按照优先级的高低,维护一个小根堆,堆顶元素便是优先级最高的元素。

这几个队列具体使用哪一种,则取决于具体的业务场景。

线程队列

说完了协程队列,再来看看线程队列,它们的 API 是类似的,但实现细节则不同。因为操作系统感知不到协程,所以协程队列的阻塞等待是基于 Future 实现的,而线程队列的阻塞等待是基于条件变量(和互斥锁)实现的。

还是先来看看线程队列的一些 API,和协程队列是类似的。

from queue import Queue

# 可以指定一个 maxsize 参数,表示队列的容量
# 默认为 0,表示队列的容量无限
queue = Queue(maxsize=20)

# 查看容量
print(queue.maxsize)
"""
20
"""
# 查看队列的元素个数
print(queue.qsize())
"""
0
"""
# 判断队列是否已满
print(queue.full())
"""
False
"""
# 判断队列是否为空
print(queue.empty())
"""
True
"""
# 往队列中添加元素
# block 参数表示是否阻塞,默认为 True,当队列已满时,线程会阻塞
# timeout 表示超时时间,默认为 None,表示会无限等待
# 当然也可以给 timeout 传一个具体的值
# 如果在规定时间内,没有将元素放入队列,那么抛异常
queue.put(123, block=True, timeout=None)
# 也是往队列中添加元素,但是当队列已满时,会直接抛异常
# put_nowait(item) 本质上就是 put(item, block=False)
queue.put_nowait(456)

# 从队列中取出元素
# 同样可以传递 block 和 timeout 参数
# block 默认为 True,当队列为空时会陷入阻塞
# timeout 默认为 None,表示会无限等待
print(queue.get(block=True, timeout=None))
"""
123
"""
# 也是从队列中取出元素,但是当队列为空时,会直接抛异常
# get_nowait() 本质上就是 get(block=False)
print(queue.get_nowait())
"""
456
"""
# task_done(),将 unfinished_tasks 属性的值减 1
print(queue.unfinished_tasks)  
"""
2
"""
queue.task_done()
queue.task_done()
print(queue.unfinished_tasks)
"""
0
"""
# join(),当 unfinished_tasks 不为 0 时,陷入阻塞
queue.join()

API 和协程队列是相似的,我们罗列一下:

源码解密协程队列和线程队列的实现原理图片

线程队列的具体使用我们已经知道了,下面来看看它的具体实现。

源码解密协程队列和线程队列的实现原理图片

线程队列的内部依旧使用双端队列进行元素存储,并且还使用了一个互斥锁和三个条件变量。

为了保证数据的一致性和线程安全,当队列在多线程环境中被修改(比如添加或删除元素)时,需要使用互斥锁。任何需要修改队列的操作都必须在获取到互斥锁之后进行,以防止多个线程同时对队列进行修改,否则会导致数据不一致或其它错误。同时,一旦对队列的修改完成,必须立即释放互斥锁,以便其它线程可以访问队列。

然后是 not_empty 条件变量,当一个新元素被添加到队列时,应该向 not_empty发送一个信号。这个动作会通知那些想从队列中获取元素,但因队列为空而陷入阻塞的线程,现在队列中已经有了新的元素,它们可以继续执行获取元素的操作。

接下来是 not_full 条件变量,当从队列中取走一个元素时,应该向 not_full 发送一个信号。这个动作通知那些想往队列添加元素,但因队列已满而陷入阻塞的线程,现在队列中已经有了可用空间,它们可以继续执行添加元素的操作。

最后是 all_tasks_done 条件变量,当处理的任务全部完成,即计数器 unfinished_task 为 0 时,应该向 all_tasks_done 发送一个信号。这个动作会通知那些执行了 join() 方法而陷入阻塞的线程,它们可以继续往下执行了。

源码解密协程队列和线程队列的实现原理图片

因为线程队列采用了双端队列存储元素,所以双端队列的长度就是线程队列的元素个数。如果元素个数为 0,那么队列就是空;如果容量大于 0,并且小于等于元素个数,那么队列就满了。

源码解密协程队列和线程队列的实现原理图片

前面说了,put_nowait 和 get_nowait 本质上就是调用了 put 和 get,所以我们的重点是 put 和 get 两个方法。

源码解密协程队列和线程队列的实现原理图片

以上就是 put 方法的底层实现,不难理解。说完了 put,再来看看 get。

源码解密协程队列和线程队列的实现原理图片

最后是 task_done 和 join 方法,看看它们的内部逻辑。

源码解密协程队列和线程队列的实现原理图片

调用 join 方法,当 unfinished_task 大于 0 时,会陷入阻塞。调用 task_done 方法,会将未完成任务数减 1,如果为 0,那么唤醒阻塞等待的线程。

需要注意的是,唤醒调用的方法不是 notify,而是 notify_all。对于添加元素和获取元素,每次显然只能唤醒一个线程,此时调用 notify。而 unfinished_task 为 0 时,应该要唤醒所有等待的线程,因此要调用 notify_all。

最后线程队列也有相应的 PriorityQueue 和 LifoQueue,它们的用法、实现和协程里面的这两个队列是一样的。

小结

以上便是协程队列和线程队列的具体用法和实现原理,它们本质上都是基于双端队列实现具体的元素存储,并且在队列已满和队列为空时,可以阻塞等待。

只不过协程队列是通过 Future 对象实现的,而线程队列是通过条件变量实现的。

当然,除了协程队列和线程队列,还有进程队列,但进程队列要复杂的多。因此关于进程队列的实现细节,我们以后专门花篇幅去介绍。



Tags:队列   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Linux 6.9-rc1 内核发布:AMD P-State 首选核心、BH 工作队列
IT之家 3 月 25 日消息,Linus Torvalds 宣布,Linux 6.9 内核的首个 RC(候选发布)版 Linux 6.9-rc1 发布。▲ Linux 6.9-rc1Linus 表示,Linux 内核 6.9 看起来是一个“相当正常”...【详细内容】
2024-03-25  Search: 队列  点击:(9)  评论:(0)  加入收藏
如何使用 Redis 实现消息队列
Redis不仅是一个强大的内存数据存储系统,它还可以用作一个高效的消息队列。消息队列是应用程序间或应用程序内部进行异步通信的一种方式,它允许数据生产者将消息放入队列中,然...【详细内容】
2024-03-22  Search: 队列  点击:(17)  评论:(0)  加入收藏
RabbitMQ如何实现延迟队列?
延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种: 未按时支付的订单,30 分钟过期之后取消订单。 给活...【详细内容】
2024-01-26  Search: 队列  点击:(46)  评论:(0)  加入收藏
一文带你彻底掌握阻塞队列!
一、摘要在之前的文章中,我们介绍了生产者和消费者模型的最基本实现思路,相信大家对它已经有一个初步的认识。在 Java 的并发包里面还有一个非常重要的接口:BlockingQueue。Blo...【详细内容】
2023-12-15  Search: 队列  点击:(130)  评论:(0)  加入收藏
用 C 语言实现一个循环队列
本文将介绍如何使用C语言实现一个循环队列,包括队列的定义、入队、出队、判空和判满等操作。代码实现将遵循专业编程规范,并使用注释进行详细解释。一、引言队列是一种常见的...【详细内容】
2023-12-07  Search: 队列  点击:(205)  评论:(0)  加入收藏
源码解密协程队列和线程队列的实现原理
本次来聊一聊 Python 的队列,首先队列是一种特殊的线性表,具有先进先出(FIFO)的特性,这意味着元素的入队顺序和出队顺序是一致的。队列通常用于存储需要按顺序处理的数据,例如任务...【详细内容】
2023-12-05  Search: 队列  点击:(144)  评论:(0)  加入收藏
消息队列备选架构选择,你选择哪个?
中间件团队的研发人员认为这个方案比较简单,实现成本低,但测试代表认为这个方案测试人力投入较大。运维团队认为这个方案的硬件成本比较高,一个数据分组就需要4台机器(2台服务器...【详细内容】
2023-11-30  Search: 队列  点击:(179)  评论:(0)  加入收藏
四种消息队列,如何选型?
最近发现很多号主发消息队列的文章,质量参差不齐,相关文章我之前也写过,建议直接看这篇。这篇文章,主要讲述 Kafka、RabbitMQ、RocketMQ 和 ActiveMQ 这 4 种消息队列的异同,无论...【详细内容】
2023-11-27  Search: 队列  点击:(193)  评论:(0)  加入收藏
几款主流消息队列之间的差异,我们应该如何选择
为什么需要消息队列消息队列是历史最悠久的中间件之一,它可以和不同的进程进行通信,从而实现上下游之间的消息传递。基于此特性,我们可以在以下三个场景中使用消息队列。 解耦; ...【详细内容】
2023-11-17  Search: 队列  点击:(124)  评论:(0)  加入收藏
常用消息队列框架与技术选型
又是一年双11季,土豪们买买买,程序员看看热闹,聊聊技术。海量的订单、支付请求以及库存更新等任务,离不开分布式架构(SOFAStack)、分布式数据库(OceanBase)、分布式缓存(Tair)、数据处...【详细内容】
2023-11-13  Search: 队列  点击:(204)  评论:(0)  加入收藏
▌简易百科推荐
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(5)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(12)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(8)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(10)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(8)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
Kubernetes 究竟有没有 LTS?
从一个有趣的问题引出很多人都在关注的 Kubernetes LTS 的问题。有趣的问题2019 年,一个名为 apiserver LoopbackClient Server cert expired after 1 year[1] 的 issue 中提...【详细内容】
2024-03-15  云原生散修  微信公众号  Tags:Kubernetes   点击:(5)  评论:(0)  加入收藏
站内最新
站内热门
站内头条