有效的提高程序执行效率的两种方法是异步和并发,Golang,node.js之所以可以有很高执行效率主要是他们的协程和异步并发机制。实际上异步和并发是每一种现代语言都在追求的特性,当然Python也不例外,今天我们就讲讲Python 3中的异步并发编程。
Python 标准库提供了许多模块来处理异步并发和多进程任务,包括:
本文中,我们主要以于asyncio和 concurrent.futures为主讲讲Python中异步和并发机制及实例。当然你也可能正在使用_thread和threading,但是无疑最应用用multiprocessing和concurrent.futures。
编写并发代码(使用_thread或threading模块)的是为了解决大家对CPU调用和中断的问题(因为CPU中一次只能运行一个线程),这涉及了"CPU 上下文切换"的成本,虽然速度很快,但是也需要消耗资源。而且还要处理"条件竞争","死锁/实时"和"资源不足"(某些线程被过度使用,而其他线程未被充分利用)等等的问题。而这些正是异步任务可以避免的。
asyncio是一个标准库,用于使用asyn/cawait语法编写并发代码。asyncio模块分别提供了高级和低级 API。库和框架开发人员将使用低级 API,同时鼓励使用不同于更传统threading或multiprocess异步代码执行方法。它利用事件循环的来处理异步"任务"的调度,而不是传统的线程或子进程。
需要提及的是asyncio是为了解决 I/O 性能,而不是CPU 绑定操作。因此,asyncio不是所有类型的异步执行的替代品。
asyncio基于"合作多任务"的概念设计的,因此可以完全控制 CPU上下文切换发生的时间(即上下文切换发生在应用程序级别,而不是硬件级别)。
使用asyncio,Python调度器负责管理,因此应用程序可能随时进行上下文切换)。
所以使用asyncio时,还需要使用某种形式的"锁定"机制来防止多个线程访问/更改共享内存(否则可能会破坏程序的线程安全)。
concurrent.futures 模块是为异步执行可调用项提供高级接口,为thread 和multiprocessing模块提供了高级抽象,这就是为什么本文没有详细讨论这些模块的原因。事实上,_thread模块是一个非常低级别的 API,_threading模块本身是建立在它之上的。
前面我们已经提到,异步可以帮助我们避免使用线程,那么,如果它只是线程(和多处理)上的抽象,那么为什么要使用concurrent.futures呢?嗯,因为并非所有库/模块/API都支持异步模型。
例如,如果使用boto3和 AWS S3 并与之交互,就会发现这些是同步操作。可以在多线程代码中包装这些调用,但最好使用concurrent.futures,因为这样不仅受益于传统线程,而且受益于异步友好特性。该模块还设计为和异步事件循环互操作,从而更轻松地在异步驱动应用程序中处理线程/子进程池。
此外,当需要线程池或子进程池时,还需要利用concurrent.futures,同时可以使用干净而现代的 Python API。
实现异步编程的方法有很多种。有事件循环方法(异步实现),这种"回调"风格是 JAVAScript 等单线程语言一直采用的方法。传统上还有一个称为"绿色线程"的概念。
从本质上讲,绿色线程的外观和感觉与普通线程完全一样,只不过线程是由应用程序代码而不是硬件安排的。因此,可以有效地处理(与事件循环一样)的确定性上下文切换问题。但是处理共享内存同样存在问题。
因此,让我们现在快速看看什么是"事件循环",因为它是使异步工作的基础,为什么我们可以避免"回调地狱"和与"绿色线程"固有的问题...
所有异步应用程序的核心元素是"事件循环"。事件循环是计划并运行异步任务的内容,它还包括处理网络 IO 操作和子进程运行。
异步事件循环非常有效,是因为它由Python内部在生成器实现的。生成器使函数能够部分执行,然后在特定点停止,维护一堆对象和异常,然后再恢复。
异步背后的驱动力是计划异步"任务"的能力。Python 中有几个不同类型的对象有助于支持此功能,它们通常通过Awaitables(可等待)进行分组。
如果某物可以在表达式中使用,它是可以等待的。await有三种主要的等待类型:
协程、任务和Future。
注意:Future 是一种低级类型,因此,如果不是库/框架开发人员,则无需过多考虑它。
协程属于可等待对象,因此可以在其他协程中被等待。协程中两个重要的概念:协程函数,定义为async def的函数。
协程对象:通过调用协程函数所返回的对象。
asyncio基于生成器的协程函数修饰函数定义的函数将被async/await语法取代,但将继续支持,直到Python 3.10
任务用于同时安排协同程序。所有异步应用程序通常(至少)具有单个"主"入口点任务,该任务将安排在事件循环上立即运行。这是使用asyncio.run函数完成的。
协同例程函数预计将传递给asyncio.run ,而内部异步将使用帮助器函数coroutines.iscoroutine检查该函数。如果不是协同程序,则引发错误,否则协同例程将传递给 loop.run_until_complete
run_until_complete函数需要一个Future,并使用另一个帮助futures.isfuture函数来检查提供的类型。如果检查不是Future,则低级 API ensure_future用于将协同例程转换为 Future。
在旧版本的 Python 中,如果要手动创建自己的 Future 并将其安排到事件循环上,那么就要使用asyncio.ensure_future(现在被视为低级 API),但使用 Python 3.7+ 时,它被asyncio.create_task所取代。
另外使用 Python 3.7,直接与事件循环交互(例如获取事件循环、创建任务并将其传递到事件循环)的函数已被替换为create_taskasyncio.run。
我们可以通过下面的 API 查看在事件循环上运行的任务的状态:
asyncio.current_task
asyncio.all_tasks
Future 是一个低级await对象,表示异步操作的最终结果。该 API 的存在是为了启用基于回调的代码与async/await一起使用,而loop.run_in_executor是返回 Future 的异步低级 API 函数的示例。
高级 API(根据 Python 3.7+)是:
import asyncio
async def foo():
print("Foo!")
async def hello_cc():
await foo() # waits for `foo()` to complete
print("Hello Chongchong!")
asyncio.run(hello_world())
.run函数始终创建新的事件循环并在末尾关闭它。如果使用的是较低级别的 API,则必须手动处理该API:
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_cc())
loop.close()
在 Python 3.8 之前,我们无法在标准 Python REPL 中执行异步代码(需要改用 IPython REPL)。
要使用最新版本的 Python 执行此操作,运行python -m asyncio 。REPL 启动后,就无需再使用asyncio.run(),只需直接使用await语句。
[Clang 10.0.1 (clang-1001.0.46.4)] on darwin
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio
>>> async def foo():
... await asyncio.sleep(5)
... print("done")
...
>>> await foo()
done
请注意,REPL 在启动时会自动执行import asyncio,因此我们可以使用任何asyncio函数(如.sleep函数),而无需自行手动键入导入语句。
如果由于种种原因我们不想使用asyncio提供的事件循环(这是纯 Python 实现),则可以将其交换为另一个事件循环,如uvloop。
uvloop 是内置异步事件循环的快速回放替换。uvloop 在中实现,在引擎中使用libuv。根据uvloop的作者基准测试,它在执行性能上可堪比golang。
uvloop的使用也很简单,先使用pip install uvloop 安装,然后添加一个uvloop.install()调用,如下所示:
import asyncio
import uvloop
async def foo():
print("Foo!")
async def hello_cc():
await foo()
print("Hello Chongchong!")
uvloop.install()
asyncio.run(hello_cc())
以下函数有助于协调同时运行函数,并提供根据应用程序需求的不同控制。
asyncio.gather:获取一系列可等待值,返回成功等待的值的聚合列表。
asyncio.shield:防止取消可等待的对象。
asyncio.wait:等待一系列可等待,直到满足给定的"条件"。
asyncio.wait_for:等待单个等待,直到达到给定的"超时"。
asyncio.as_completed: 类似于gather,但返回在结果准备就绪时填充的 Future。
注意:gather有处理错误和取消的特定选项。例如,return_exceptions: False如果随后由其中一个可等待项引发的第一个异常返回到 gather的调用方,其中似乎设置为True,则异常将连同成功结果一起在列表中聚合。如果gather()
被取消,所有提交的等待(尚未完成)也将被取消。
@asyncio.coroutine: Python 3.10删除async def了
asyncio.sleep:将在 Python 3.10 中删除参数loop
注意:这些大多数这些 API 中都使用了一个参数参数loop,让其指示要使用的特定事件循环。Python 3.8 中已经弃用了该参数,并计划在 3.10 中完全删除它。
下面的示例演示如何等待多个异步任务完成。
import asyncio
async def foo(n):
await asyncio.sleep(5) # wait 5s before continuing
print(f"n: {n}!")
async def main():
tasks = [foo(1), foo(2), foo(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
下面的示例使用FIRST_COMPLETED选项,表示无论任务首先完成什么,都将返回什么。
import asyncio
from random import randrange
async def foo(n):
s = randrange(5)
print(f"{n} will sleep for: {s} seconds")
await asyncio.sleep(s)
print(f"n: {n}!")
async def main():
tasks = [foo(1), foo(2), foo(3)]
result = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(result)
asyncio.run(main())
此程序的示例输出是:
1 will sleep for: 4 seconds
2 will sleep for: 2 seconds
3 will sleep for: 1 seconds
n: 3!
({<Task finished coro=<foo() done, defined at await.py:5> result=None>}, {<Task pending coro=<foo() running at await.py:8> wait_for=<Future pending cb=[<TaskWakeupMethWrApper object at 0x10322b468>()]>>, <Task pending coro=<foo() running at await.py:8> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10322b4c8>()]>>})
下面的示例演示如何利用超时来防止无休止地等待异步任务完成。
import asyncio
async def foo(n):
await asyncio.sleep(10)
print(f"n: {n}!")
async def main():
try:
await asyncio.wait_for(foo(1), timeout=5)
except asyncio.TimeoutError:
print("timeout!")
asyncio.run(main())
注意: asyncio.TimeoutError不提供任何额外信息,因此尝试在输出中使用它没有意义(例如 except asyncio.TimeoutError as err: print(err))。
下面的示例演示如何as_complete生成要完成的第一个任务,然后是下一个最快任务,下一个任务在完成之前。
import asyncio
from random import randrange
async def foo(n):
s = randrange(10)
print(f"{n} will sleep for: {s} seconds")
await asyncio.sleep(s)
return f"{n}!"
async def main():
counter = 0
tasks = [foo("a"), foo("b"), foo("c")]
for future in asyncio.as_completed(tasks):
n = "quickest" if counter == 0 else "next quickest"
counter += 1
result = await future
print(f"the {n} result was: {result}")
asyncio.run(main())
此程序的示例输出是:
c will sleep for: 9 seconds
a will sleep for: 1 seconds
b will sleep for: 0 seconds
the quickest result was: b!
the next quickest result was: a!
the next quickest result was: c!
下面的示例演示如何将协同例程转换为任务并将其安排到事件循环上。
import asyncio
async def foo():
await asyncio.sleep(10)
print("Foo!")
async def hello_cc():
task = asyncio.create_task(foo())
print(task)
await asyncio.sleep(5)
print("Hello Chongchong!")
await asyncio.sleep(10)
print(task)
asyncio.run(hello_cc())
从上面的程序中我们可以看到,我们用create_task将协同例程函数转换为任务。这将自动安排在下一个可用刻度处在事件循环上运行的任务。
这与较低级别的 API ensure_future(这是创建新任务的首选方法)不同。ensure_future函数具有特定的逻辑分支,使其可用于更多的输入类型,而不是create_task仅支持将协同例程排到事件循环中并将其包装到任务中(请参阅:ensure_future源代码)。
此程序的输出将是:
<Task pending coro=<foo() running at create_task.py:4>>
Hello Chongchong!
Foo!
<Task finished coro=<foo() done, defined at create_task.py:4> result=None>
让我们回顾一下代码,并比较我们可以看到的上述输出...
我们将转换foo()到任务,然后在创建任务后立即打印返回的任务。因此,当我们打印任务时,我们可以看到其状态显示为"挂起"(因为它尚未执行)。
接下来,我们将sleep五秒钟,因为这将导致foo任务现在运行(因为当前任务hello_world将被视为繁忙)。
在foo任务中,我们也处于sleep状态,但时间比hello_world长,因此事件循环现在上下文将切换回hello_world任务,在睡眠时将传递该任务,我们将打印输出字符串 Hello Chongchong。
最后,我们又sleep十秒钟。这只是为了我们可以给foo任务足够的时间来完成和打印自己的输出。如果我们不这样做,那么hello_world任务将完成并关闭事件循环。最后一行hello_world是打印foo任务,我们将看到foo任务的状态现在将显示为"已完成"。
处理任务(实际上是未来)时,一旦 Future 在任务上设置了值,就可以执行"回调"函数。
以下示例通过修改上一create_task示例代码来演示这一点:
import asyncio
async def foo():
await asyncio.sleep(10)
return "Foo!"
def got_result(future):
print(f"got the result! {future.result()}")
async def hello_cc():
task = asyncio.create_task(foo())
task.add_done_callback(got_result)
print(task)
await asyncio.sleep(5)
print("Hello Chongchong!")
await asyncio.sleep(10)
print(task)
asyncio.run(hello_cc())
请注意,在上面的程序中,我们增加了一个函数,该got_result
函数期望接收"未来"类型,从而调用.result()"未来"。另请注意,要调用此函数,我们会将其传递给.add_done_callback()在create_task返回的任务上调用它。
此程序的输出是:
<Task pending coro=<foo() running at gather.py:4> cb=[got_result() at gather.py:9]>
Hello Chongchong!
got the result! Foo!
<Task finished coro=<foo() done, defined at gather.py:4> result='Foo!'>
在处理大量并发操作时,最好利用线程(和/或子进程)的"池"来防止耗尽应用程序的主机资源。
这也是concurrent.futures模块的来历。它提供了一个称为执行器的概念来帮助它,它可以独立运行或集成到现有的异步事件循环中。
有两种类型的执行器:ThreadPoolExecutor和ProcessPoolExecutor
让我们看一下在其中一个执行器中执行代码的第一种方法,方法是使用异步事件循环来计划执行器的运行。
为此,需要调用事件循环的.run_in_executor()函数,并将执行器类型作为第一个参数传递。如果None被提供,则使用默认执行器(即 ThreadPoolExecutor)下面的实例来自Python官方文档:
import asyncio
import concurrent.futures
def blocking_io():
with open("/dev/urandom", "rb") as f:
return f.read(100)
def cpu_bound():
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print("default thread pool", result)
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print("custom thread pool", result)
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print("custom process pool", result)
在其中一个执行器中执行代码的第二种方法是将要执行的代码直接发送到池。这意味着我们不必获取当前事件循环将池传递到其中(如前面的示例所示),但它附带了一个警告,即父程序不会等待任务完成,除非您明确告诉它(我接下来将演示)。考虑到这一点,让我们来看看这个替代方法。它涉及调用执行者的方法submit():
import concurrent.futures
import time
def slow_op(*args):
print(f"arguments: {args}")
time.sleep(5)
print("slow operation complete")
return 123
def do_something():
with concurrent.futures.ProcessPoolExecutor() as pool:
future = pool.submit(slow_op, "a", "b", "c")
for fut in concurrent.futures.as_completed([future]):
assert future.done() and not future.cancelled()
print(f"got the result from slow_op: {fut.result()}")
if __name__ == "__main__":
print("program started")
do_something()
print("program complete")
这里值得注意的一点是,如果我们没有使用with语句(如上例中所示),则意味着一旦池完成其工作,将不会关闭它,因此(取决于程序是否继续运行),可能会发现资源没有被清理。
要解决此问题,可以调用.shutdown()通过其父类向两种类型的执行器公开的方法。concurrent.futures.Executor
下面是一个这样做的示例,但现在使用线程池执行器:
import concurrent.futures
THREAD_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def slow_op(*args):
print(f"arguments: {args}")
print("some kind of slow operation")
return 123
def do_something():
future = THREAD_POOL.submit(slow_op, "a", "b", "c")
THREAD_POOL.shutdown()
assert future.done() and not future.cancelled()
print(f"got the result from slow_op: {future.result()}")
if __name__ == "__main__":
print("program started")
do_something()
print("program complete")
现在,我尚未在示例中还未使用time.sleep(),因为我们使用的是线程池,并且time.sleep()是 CPU 绑定操作,如果直接使用将阻止线程完成。
这意味着我们的示例可能总是导致slow_op()函数完成之前,我们开始检查 future.done()。所以,是的,这不是最好的例子。通过合并一个不阻止的真正缓慢的操作,可以更现实地测试它。
但想象一下,我们有一个真正缓慢的操作发生,这意味着任务没有完成,当我们检查future.done()。
在这种情况下,我们应该注意,调用.shutdown()的位置是在我们明确等待计划的任务完成之前,然而当我们断言返回的future是否.done()返回时,我们会发现,无论尝试关闭线程池,任务都标记为"已完成"。
这是因为关闭方wait=True法的默认行为意味着它将等待所有计划的任务完成,然后再关闭执行器池。
因此,该方法是同步调用.shutdown()(即,它确保所有任务在关闭之前都已完成,因此我们可以保证所有结果都可用)。
如果我们传递.shutdown(wait=False)相反,那么调用future.done()将引发一个异常(因为计划的任务仍将在线程池关闭时运行),因此在这种情况下,我们需要确保我们使用另一种机制来获取计划任务的结果(如 concurrent.futures.as_completed或concurrent.futures.wait )。
最后一点要提的是,concurrent.futures.Future对象与asyncio.Future
不同。asyncio.Future用于异步事件循环,并且可等待。concurrent.futures.Future不可等待。
使用事件循环的.run_in_executor()方法将通过将concurrent.futures.Future类型包装到asyncio.wrap_future(有关详细信息,请参阅下一节)来提供两种未来类型之间的必要互操作性。
由于 Python 3.5,我们可以使用asyncio.wrap_future 将 concurrent.futures.Futur转换为easyncio.Future 。下面可以看到这方面的示例...
import asyncio
import random
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
async def doit():
identify = random.randint(1, 100)
future = pool.submit(return_after_5_secs, (f"result: {identify}"))
awaitable = asyncio.wrap_future(future)
print(f"waiting result: {identify}")
return await awaitable
async def app():
# run some stuff multiple times
tasks = [doit(), doit()]
result = await asyncio.gather(*tasks)
print(result)
print("waiting app")
asyncio.run(app())
此程序的输出将是:
waiting app
waiting result: 62
waiting result: 83
# ...five seconds pass by...
['result: 62', 'result: 83']