我们将讨论如何利用Python/ target=_blank class=infotextkey>Python执行多线程和多进程任务。它们提供了在单个进程或多个进程之间执行并发操作的方法。并行和并发执行可以提高系统的速度和效率。
在讨论多线程和多进程的基础知识之后,我们还将讨论使用Python库实现它们的实际方法。
首先简要讨论并行系统的好处。
并行计算的优势
以上是需要并发或并行执行的一些常见原因。现在,回到主题,即多线程和多进程,并讨论它们的主要区别。
多线程是在单个进程中实现并行性的一种方法,能够执行同时进行的任务。在单个进程内可以创建多个线程,并在该进程内并行执行较小的任务。
单个进程中的线程共享一个公共内存空间,但它们的堆栈跟踪和寄存器是独立的。由于共享内存,它们的计算成本较低。
单线程和多线程Env.
Python中的多线程主要用于执行I/O操作,即如果程序的某个部分正在执行I/O操作,则其余程序可以保持响应。然而,在Python的实现中,由于全局解释器锁(GIL)的存在,多线程无法实现真正的并行性。
简而言之,GIL是一个互斥锁,一次只允许一个线程与Python字节码交互,即使在多线程模式下,一次也只能有一个线程执行字节码。
这样做是为了在CPython中保持线程安全,但它限制了多线程的性能优势。为了解决这个问题,Python有一个单独的多进程库,我们将在之后进行讨论。
不断在后台运行的线程称为守护线程。它们的主要工作是支持主线程或非守护线程。守护线程不会阻塞主线程的执行,甚至会在主线程执行完毕后继续运行。
在Python中,守护线程主要用作垃圾回收器。它会默认销毁所有无用的对象并释放内存,以便主线程可以正常使用和执行。
多进程用于执行多个进程的并行执行。它可以帮助实现真正的并行性,因为可以同时执行不同的进程,并且每个进程都拥有自己的内存空间。它使用CPU的独立核心,并且在执行进程间的数据交换时也很有帮助。
与多线程相比,多进程的计算成本更高,因为不使用共享内存空间。不过,它允许进行独立执行,并克服了全局解释器锁的限制。
多进程环境
上图展示了一个多进程环境,在该环境中,一个主进程创建了两个独立的进程,并为它们分配了不同的工作。
现在,我们使用Python实现一个基本的多线程示例。Python有一个内置的threading模块用于多线程实现。
import threading
import os
这是一个用于计算数字平方的简单函数,它接受一个数字列表作为输入,并输出列表中每个数字的平方,同时输出使用的线程名称和与该线程关联的进程ID。
def calculate_squares(numbers):
for num in numbers:
square = num * num
print(
f"Square of the number {num} is {square} | Thread Name {threading.current_thread().name} | PID of the process {os.getpid()}"
)
本示例有一个数字列表,将其平均分成两半,并分别命名为first_half和second_half。现在,将为这些列表分配两个独立的线程t1和t2。
Thread函数创建一个新线程,该线程接受一个带有参数列表的函数作为输入。还可以为线程分配一个单独的名称。
.start()函数将开始执行这些线程,而.join()函数将阻塞主线程的执行,直到给定的线程完全执行完毕。
if __name__ == "__mAIn__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
half = len(numbers) // 2
first_half = numbers[:half]
second_half = numbers[half:]
t1 = threading.Thread(target=calculate_squares, name="t1", args=(first_half,))
t2 = threading.Thread(target=calculate_squares, name="t2", args=(second_half,))
t1.start()
t2.start()
t1.join()
t2.join()
输出:
Square of the number 1 is 1 | Thread Name t1 | PID of the process 345
Square of the number 2 is 4 | Thread Name t1 | PID of the process 345
Square of the number 5 is 25 | Thread Name t2 | PID of the process 345
Square of the number 3 is 9 | Thread Name t1 | PID of the process 345
Square of the number 6 is 36 | Thread Name t2 | PID of the process 345
Square of the number 4 is 16 | Thread Name t1 | PID of the process 345
Square of the number 7 is 49 | Thread Name t2 | PID of the process 345
Square of the number 8 is 64 | Thread Name t2 | PID of the process 345
注意:上述创建的所有线程都是非守护线程。要创建守护线程,需要编写t1.setDaemon(True),将线程t1设置为守护线程。
现在来了解一下上述代码生成的输出结果。可以观察到两个线程的进程ID(即PID)保持不变,这意味着这两个线程属于同一个进程。
还可以观察到输出并非按顺序生成。第一行中可以看到是线程1生成的输出,然后在第三行是线程2生成的输出,接着在第四行,再次是线程1生成的输出。这清楚地表明这些线程是同时工作的。
并发并不意味着这两个线程并行执行,因为一次只有一个线程被执行。它不会减少执行时间,与顺序执行所需的时间相同。CPU开始执行一个线程,但在中途离开,并切换到另一个线程,过一段时间后,又回到主线程,并从上次离开的地方开始执行。
目前对多线程及其实现方式和限制已经有基本的了解。现在,是时候学习多进程的实现以及如何克服这些限制了。
在这里将沿用相同的示例,但不再创建两个独立的线程,而是创建两个独立的进程,并讨论观察结果。
from multiprocessing import Process
import os
本例将使用multiprocessing模块来创建独立的进程。
该函数将保持不变。只是在这里删除了有关线程信息的打印语句。
def calculate_squares(numbers):
for num in numbers:
square = num * num
print(
f"Square of the number {num} is {square} | PID of the process {os.getpid()}"
)
主函数有一些修改。只是创建了一个独立的进程,而不是线程。
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
half = len(numbers) // 2
first_half = numbers[:half]
second_half = numbers[half:]
p1 = Process(target=calculate_squares, args=(first_half,))
p2 = Process(target=calculate_squares, args=(second_half,))
p1.start()
p2.start()
p1.join()
p2.join()
输出:
Square of the number 1 is 1 | PID of the process 1125
Square of the number 2 is 4 | PID of the process 1125
Square of the number 3 is 9 | PID of the process 1125
Square of the number 4 is 16 | PID of the process 1125
Square of the number 5 is 25 | PID of the process 1126
Square of the number 6 is 36 | PID of the process 1126
Square of the number 7 is 49 | PID of the process 1126
Square of the number 8 is 64 | PID of the process 1126
可以观察到,每个列表都由一个独立的进程执行。它们具有不同的进程ID。为了检查进程是否已并行执行,需要创建一个单独的环境,下面我们将讨论这一点。
为了检查是否获得了真正的并行性,在这里将计算使用和不使用多进程的算法运行时间。
为此,需要一个包含超过10^6个整数的大型整数列表。可以使用random库生成一个列表。此处将使用Python的time模块来计算运行时间。下面是实现的代码,代码本身很容易理解,也可以随时查看代码注释。
from multiprocessing import Process
import os
import time
import random
def calculate_squares(numbers):
for num in numbers:
square = num * num
if __name__ == "__main__":
numbers = [
random.randrange(1, 50, 1) for i in range(10000000)
] # 创建一个包含10^7个整数的随机列表。
half = len(numbers) // 2
first_half = numbers[:half]
second_half = numbers[half:]
# ----------------- 创建单进程环境 ------------------------#
start_time = time.time() # 开始计时(不使用多进程)
p1 = Process(
target=calculate_squares, args=(numbers,)
) # 单进程P1执行整个列表
p1.start()
p1.join()
end_time = time.time() # 结束计时(不使用多进程)
print(f"Execution Time Without Multiprocessing: {(end_time-start_time)*10**3}ms")
# ----------------- 创建多进程环境 ------------------------#
start_time = time.time() # 开始计时(使用多进程)
p2 = Process(target=calculate_squares, args=(first_half,))
p3 = Process(target=calculate_squares, args=(second_half,))
p2.start()
p3.start()
p2.join()
p3.join()
end_time = time.time() # 结束计时(使用多进程)
print(f"Execution Time With Multiprocessing: {(end_time-start_time)*10**3}ms")
输出:
Execution Time Without Multiprocessing: 619.8039054870605ms
Execution Time With Multiprocessing: 321.70287895202637ms
可以观察到,使用多进程的时间几乎是不使用多进程时间的一半。这表明这两个进程在同一时间内并行执行,并展示了真正的并行性行为。