您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Python

Python语法-多进程、多线程、协程(异步IO)

时间:2021-11-30 15:14:05  来源:  作者:Python林路

相关概念

Python语法-多进程、多线程、协程(异步IO)

 

并发和并行

  • 并发:指一个时间段内,在一个CPU(CPU核心)能运行的程序的数量。
  • 并行:指在同一时刻,在多个CPU上运行多个程序,跟CPU(CPU核心)数量有关。

因为

计算机CPU(CPU核心)在同一时刻只能运行一个程序。

同步和异步

  • 同步是指代码调用的时候必须等待执行完成才能执行剩余的逻辑。
  • 异步是指代码在调用的时候,不用等待操作完成,直接执行剩余逻辑。

阻塞和非阻塞

  • 阻塞是指调用函数的时候当前线程被挂起。
  • 非阻塞是指调用函数时当前线程不会被挂起,而是立即返回。

CPU密集型和I/O密集型

CPU密集型(CPU-bound):

CPU密集型又叫做计算密集型,指I/O在很短时间就能完成,CPU需要大量的计算和处理,特点是CPU占用高。

例如:压缩解压缩、加密解密、正则表达式搜索。

IO密集型(I/O-bound):

IO密集型是指系统运行时大部分时间时CPU在等待IO操作(硬盘/内存)的读写操作,特点是CPU占用较低。

例如:文件读写、网络爬虫、数据库读写。

多进程、多线程、多协程的对比

类型

优点

缺点

适用

多进程
Process(multiprocessing)

可以利用CPU多核并行运算

占用资源最多
可启动数目比线程少

CPU密集型计算

多线程
Thread(threading)

相比进程更轻量占用资源少

相比进程,多线程只能并发执行,不能利用多CPU(GIL)
相比协程启动数目有限制,占用内存资源有线程切换开销

IO密集型计算、同时运行的任务要求不多

多协程
Coroutine(asyncio)

内存开销最少,启动协程数量最多

支持库的限制
代码实现复杂

IO密集型计算、同时运行的较多任务

GIL全称Global Interpreter Lock

下图为GIL的运行

Python语法-多进程、多线程、协程(异步IO)

 

Python/ target=_blank class=infotextkey>Python的多线程是伪多线程,同时只能有一个线程运行。

一个进程能够启动N个线程,数量受系统限制。

一个线程能够启动N个协程,数量不受限制。

怎么选择

对于其他语言来说,多线程是能同时利用多CPU(核)的,所以是适用CPU密集型计算的,但是Python由于GIL的限制,只能使用IO密集型计算。

所以对于Python来说:

对于IO密集型来说能用多协程就用多协程,没有库支持才用多线程。

对于CPU密集型就只能用多进程了。

Python语法-多进程、多线程、协程(异步IO)

 

协程(异步IO)

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio


async def test():
    await asyncio.sleep(3)
    return "123"


async def main():
    result = await test()
    print(result)


if __name__ == '__main__':
    asyncio.run(main())

 

单次请求查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)
    return index


def getfuture(future):
    print(f"结果为:{future.result()}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(myfun(1))
    future.add_done_callback(getfuture)
    loop.run_until_complete(future)
    loop.close()

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)
    return index

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(myfun(1))
    loop.run_until_complete(future)
    print(f"结果为:{future.result()}")
    loop.close()

 

多次请求查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import asyncio


async def myfun(index):
    print(f'线程({threading.currentThread().name}) 传入参数({index})')
    await asyncio.sleep(1)
    return index

loop = asyncio.get_event_loop()
future_list = []
for item in range(3):
    future = asyncio.ensure_future(myfun(item))
    future_list.Append(future)
loop.run_until_complete(asyncio.wait(future_list))
for future in future_list:
    print(f"结果为:{future.result()}")
loop.close()

 

asyncio.wait和asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import asyncio


async def myfun(index):
    print(f'[{index}]({threading.currentThread().name})')
    await asyncio.sleep(1)


loop = asyncio.get_event_loop()
tasks = [myfun(1), myfun(2)]
loop.run_until_complete(asyncio.wait(tasks))
#loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

asyncio.gather 和asyncio.wait区别:

在内部wait()使用一个set保存它创建的Task实例。因为set是无序的所以这也就是我们的任务不是顺序执行的原因。wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务。wait第二个参数为一个超时值
达到这个超时时间后,未完成的任务状态变为pending,当程序退出时还有任务没有完成此时就会看到如下的错误提示。

gather的使用
gather的作用和wait类似不同的是。

  1. gather任务无法取消。
  2. 返回值是一个结果列表
  3. 可以按照传入参数的 顺序,顺序输出。
Python语法-多进程、多线程、协程(异步IO)

 

协程和多线程结合

同时多个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
    r = requests.get(url)
    print(r.text)
    return r.text


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]
    tasks = []
    start_time = time.time()
    for url in urls:
        task = loop.run_in_executor(executor, myquery, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print(f"用时{time.time() - start_time}")

结果

1
2
3
{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}
{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小红","sex":"女"},{"name":"小刚","sex":"未知"}]}
用时0.11207175254821777

 

单个请求添加回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def myquery(url):
    print(f"请求所在线程:{threading.current_thread().name}")
    r = requests.get(url)
    return r.text


def myfuture(future):
    print(f"回调所在线程:{threading.current_thread().name}")
    print(future.result())


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    url = "https://www.psvmc.cn/userlist.json"
    tasks = []
    start_time = time.time()
    task = loop.run_in_executor(executor, myquery, url)
    future = asyncio.ensure_future(task)
    future.add_done_callback(myfuture)
    loop.run_until_complete(future)
    print(f"用时{time.time() - start_time}")

 

多线程与多进程

多线程

引用模块

1
2
3
4
5
6
7
8
from threading import Thread

def func(num):
    return num

t = Thread(target=func, args=(100,))
t.start()
t.join()

数据通信

1
2
3
4
5
import queue

q = queue.Queue()
q.put(1)
item = q.get()

1
2
3
4
5
from threading import Lock

lock = Lock()
with lock:
    pass

 

池化技术

1
2
3
4
5
6
7
8
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1, 2, 3])
    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor
import threading
import time


# 定义一个准备作为线程任务的函数
def action(num):
    print(threading.current_thread().name)
    time.sleep(num)
    return num + 100


if __name__ == "__main__":
    # 创建一个包含3条线程的线程池
    with ThreadPoolExecutor(max_workers=3) as pool:
        future1 = pool.submit(action, 3)

        future1.result()
        print(f"单个任务返回:{future1.result()}")

        print('------------------------------')
        # 使用线程执行map计算
        results = pool.map(action, (1, 3, 5))
        for r in results:
            print(f"多个任务返回:{r}")

结果

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor-0_0
单个任务返回:103
------------------------------
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_2
多个任务返回:101
多个任务返回:103
多个任务返回:105

 

多进程

引用模块

1
2
3
4
5
6
7
8
from multiprocessing import Process

def func(num):
    return num

t = Process(target=func, args=(100,))
t.start()
t.join()

 

数据通信

1
2
3
4
import multiprocessing
q = multiprocessing.Queue()
q.put(1)
item = q.get()

 

1
2
3
4
5
from multiprocessing import Lock

lock = Lock()
with lock:
    pass

 

池化技术

1
2
3
4
5
6
7
8
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1, 2, 3])
    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

 

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time


# 定义一个准备作为进程任务的函数
def action(num):
    print(multiprocessing.current_process().name)
    time.sleep(num)
    return num + 100


if __name__ == "__main__":
    # 创建一个包含3条进程的进程池
    with ProcessPoolExecutor(max_workers=3) as pool:
        future1 = pool.submit(action, 3)

        future1.result()
        print(f"单个任务返回:{future1.result()}")

        print('------------------------------')
        # 使用线程执行map计算
        results = pool.map(action, [1, 3, 5])
        for r in results:
            print(f"多个任务返回:{r}")

结果

1
2
3
4
5
6
7
8
9
SpawnProcess-1
单个任务返回:103
------------------------------
SpawnProcess-2
SpawnProcess-3
SpawnProcess-1
多个任务返回:101
多个任务返回:103
多个任务返回:105

 

多进程/多线程/协程对比

异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)

IO 密集型应用CPU等待IO时间远大于CPU 自身运行时间,太浪费;

常见的 IO 密集型业务包括:浏览器交互、磁盘请求、网络爬虫、数据库请求等

Python 世界对于 IO 密集型场景的并发提升有 3 种方法:多进程、多线程、多协程;

理论上讲asyncio是性能最高的,原因如下:

  1. 进程、线程会有CPU上下文切换
  2. 进程、线程需要内核态和用户态的交互,性能开销大;而协程对内核透明的,只在用户态运行
  3. 进程、线程并不可以无限创建,最佳实践一般是 CPU*2;而协程并发能力强,并发上限理论上取决于操作系统IO多路复用(linux下是 epoll)可注册的文件描述符的极限

那asyncio的实际表现是否如理论上那么强,到底强多少呢?我构建了如下测试场景:

请求10此,并sleep 1s模拟业务查询

  • 方法 1;顺序串行执行
  • 方法 2:多进程
  • 方法 3:多线程
  • 方法 4:asyncio
  • 方法 5:asyncio+uvloop

最后的asyncio+uvloop和官方asyncio 最大不同是用 Cython+libuv 重新实现了asyncio 的事件循环(event loop)部分,

官方测试性能是 node.js的 2 倍,持平 golang。

Python语法-多进程、多线程、协程(异步IO)

 

顺序串行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    for h in range(10):
        query(h)


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    with futures.ProcessPoolExecutor() as executor:
        for future in executor.map(query, range(10)):
            pass


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures
import time


def query(num):
    print(num)
    time.sleep(1)


def main():
    with futures.ThreadPoolExecutor() as executor:
        for future in executor.map(query, range(10)):
            pass


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def query(num):
    print(num)
    await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(query(num)) for num in range(10)]
    await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

asyncio+uvloop

注意

windows上不支持uvloop。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import uvloop
import time


async def query(num):
    print(num)
    await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(query(host)) for host in range(10)]
    await asyncio.gather(*tasks)


# main entrance
if __name__ == '__main__':
    uvloop.install()
    start_time = time.perf_counter()
    asyncio.run(main())
    end_time = time.perf_counter()
    print(f"时间差:{end_time-start_time}")

 

运行时间对比

方式

运行时间

串行

10.0750972s

多进程

1.1638731999999998s

多线程

1.0146456s

asyncio

1.0110082s

asyncio+uvloop

1.01s

 


可以看出: 无论多进程、多线程还是asyncio都能大幅提升IO 密集型场景下的并发,但asyncio+uvloop性能最高!

Python语法-多进程、多线程、协程(异步IO)

 

原文链接:
https://www.psvmc.cn/article/2021-11-24-python-async.html



Tags:Python语法   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
相关概念 并发和并行 并发:指一个时间段内,在一个CPU(CPU核心)能运行的程序的数量。 并行:指在同一时刻,在多个CPU上运行多个程序,跟CPU(CPU核心)数量有关。因为计算机CPU(CPU核心)在同...【详细内容】
2021-11-30  Tags: Python语法  点击:(140)  评论:(0)  加入收藏
最近有许多小伙伴后台联系我,说目前想要学习Python,但是没有一份很好的资料入门。一方面的确现在市面上Python的资料过多,导致新手会不知如何选择,另一个问题很多资料内容也很杂...【详细内容】
2020-09-10  Tags: Python语法  点击:(54)  评论:(0)  加入收藏
Django的基本介绍与特点基本介绍 Django 是一个由 Python 编写的具有完整架站能力的开源Web框架。 使用 Django,只要很少的代码,Python 的程序开发人员就可以轻松地完成一个正...【详细内容】
2020-06-06  Tags: Python语法  点击:(125)  评论:(0)  加入收藏
大多数编程语言都提供了 if...else... 语句,即表示如果满足条件就做这件事,否则就做另外一件事。同时,在 Python 中 else 除了可以与 if 搭配使用,还有其他特别的语法: for&helli...【详细内容】
2019-09-10  Tags: Python语法  点击:(261)  评论:(0)  加入收藏
▌简易百科推荐
近几年 Web3 被炒得火热,但是大部分人可能还不清楚什么是 Web3,今天就让w3cschool编程狮小师妹带你了解下 Web3 是什么?与我们熟知的 Web1 和 Web2 又有什么区别呢?web3.0什么是...【详细内容】
2022-07-15  编程狮W3Cschool    Tags:Web3.0   点击:(2)  评论:(0)  加入收藏
1、让我们一起来看下吧,直接上图。 第一眼看到是不是觉得很高逼格,暗黑画风,这很大佬。其实它就是------AidLearning。一个运行在安卓平台的linux系统,而且还包含了许多非常强大...【详细内容】
2022-07-15  IT智能化专栏    Tags:AidLearning   点击:(2)  评论:(0)  加入收藏
真正的大师,永远都怀着一颗学徒的心! 一、项目简介 今天说的这个软件是一款基于Python+vue的自动化运维、完全开源的云管理平台。二、实现功能 基于RBAC权限系统 录像回放 ...【详细内容】
2022-07-14  菜鸟程序猿    Tags:Python   点击:(3)  评论:(0)  加入收藏
前言今天笔者想和大家来聊聊python接口自动化的MySQL数据连接,废话不多说咱们直接进入主题吧。 一、什么是 PyMySQL?PyMySQL是在Python3.x版本中用于连接MySQL服务器的一个库,P...【详细内容】
2022-07-11  测试架构师百里    Tags:python   点击:(19)  评论:(0)  加入收藏
aiohttp什么是 aiohttp?一个异步的 HTTP 客户端\服务端框架,基于 asyncio 的异步模块。可用于实现异步爬虫,更快于 requests 的同步爬虫。安装pip install aiohttpaiohttp 和 r...【详细内容】
2022-07-11  VT漫步    Tags:aiohttp   点击:(15)  评论:(0)  加入收藏
今天我们学习下 Queue 的进阶用法。生产者消费者模型在并发编程中,比如爬虫,有的线程负责爬取数据,有的线程负责对爬取到的数据做处理(清洗、分类和入库)。假如他们是直接交互的,...【详细内容】
2022-07-06  VT漫步    Tags:Python Queue   点击:(34)  评论:(0)  加入收藏
继承:是面向对象编程最重要的特性之一,例如,我们每个人都从祖辈和父母那里继承了一些体貌特征,但每个人却又不同于父母,有自己独有的一些特性。在面向对象中被继承的类是父类或基...【详细内容】
2022-07-06  至尊小狸子    Tags:python   点击:(25)  评论:(0)  加入收藏
点击上方头像关注我,每周上午 09:00准时推送,每月不定期赠送技术书籍。本文1553字,阅读约需4分钟 Hi,大家好,我是CoCo。在上一篇Python自动化测试系列文章:Python自动化测试之P...【详细内容】
2022-07-05  CoCo的软件测试小栈    Tags:Python   点击:(27)  评论:(0)  加入收藏
第一种方式:res = requests.get(url, params=data, headers = headers)第二种方式:res = requests.get(url, data=data, headers = headers)注意:1.url格式入参只支持第一种方...【详细内容】
2022-07-05  独钓寒江雪之IT    Tags:Python request   点击:(19)  评论:(0)  加入收藏
什么是python类的多态python的多态,可以为不同的类实例,或者说不同的数据处理方式,提供统一的接口。用比喻的方式理解python类的多态比如,同一个苹果(统一的接口)在孩子的眼里(类实...【详细内容】
2022-07-04  写小说的程序员    Tags:python类   点击:(28)  评论:(0)  加入收藏
站内最新
站内热门
站内头条