解决issue#1008,补充关于多进程的示例
parent
143910a856
commit
41df7e56c3
|
@ -486,4 +486,73 @@ if __name__ == '__main__':
|
|||
```
|
||||
|
||||
比较两段代码的执行结果(在我目前使用的MacBook上,上面的代码需要大概6秒左右的时间,而下面的代码只需要不到1秒的时间,再强调一次我们只是比较了运算的时间,不考虑列表创建及切片操作花费的时间),使用多进程后由于获得了更多的CPU执行时间以及更好的利用了CPU的多核特性,明显的减少了程序的执行时间,而且计算量越大效果越明显。当然,如果愿意还可以将多个进程部署在不同的计算机上,做成分布式进程,具体的做法就是通过`multiprocessing.managers`模块中提供的管理器将`Queue`对象通过网络共享出来(注册到网络上让其他计算机可以访问),这部分内容也留到爬虫的专题再进行讲解。
|
||||
##### 上述例子的补充内容:
|
||||
这部分的代码中的p.start()在启动进程后就马上地开始运算,这样导致后续 start = time()开始记时的时候有部分进程实际已经计算了一些数据。实际的代码这样写没有什么问题,这样写的好处简单易懂,但为了严谨一些,我们可以用下面的思路来解决这个问题。
|
||||
|
||||
我们先需要加入一些必要的库和说明
|
||||
psutil 用于访问系统详情和进程控制,用来显示一些cpu和进程的信息。
|
||||
Process 用于创建新的进程
|
||||
Event 是一个用于进程间通信的同步原语,可以设置信号状态为真,让所有阻塞等待这个事件的进程继续运行。
|
||||
思路:
|
||||
1.我们可以先初始化进程但不启动它们:创建每个进程后,将它们加入到进程列表中,但不立即调用 p.start()。
|
||||
2.接着设置一个全局的 Event:使用 Event 来控制所有进程的启动。所有进程将在 start_event.wait() 调用处阻塞,直到 start_event.set() 被调用。
|
||||
3.统一启动所有进程:在主进程中,首先设置好计时开始的点 start = time(),然后通过 start_event.set() 释放所有子进程的阻塞状态,允许它们开始执行。这样使得所有进程的任务同步开始计算。
|
||||
4.开始并同步所有进程:等待所有任务结束后,合并执行结果。
|
||||
|
||||
代码中还加入查看每个进程任务所花费的用时和以及进程分配的cpu信息(这样保证不同进程都执行差不多的时间),同时也有最终总执行的时间。
|
||||
```python
|
||||
|
||||
import psutil
|
||||
from multiprocessing import Process, Queue, Event
|
||||
from time import time,sleep
|
||||
|
||||
def task_handler(curr_list, result_queue, start_event, cpu_id):
|
||||
psutil.Process().cpu_affinity([cpu_id]) # 尝试设置CPU亲和性
|
||||
start_event.wait() # 等待从主进程发出的开始信号
|
||||
start_time = time() # 记录开始时间
|
||||
total = sum(curr_list)
|
||||
end_time = time() # 记录结束时间
|
||||
# 将结果和时间放入队列
|
||||
result_queue.put((total, start_time, end_time, cpu_id))
|
||||
|
||||
def main():
|
||||
processes = []
|
||||
number_list = [x for x in range(1, 100000001)]
|
||||
result_queue = Queue()
|
||||
start_event = Event() # 创建一个事件用于同步开始计算
|
||||
cpus = psutil.cpu_count(logical=False) # 获取物理核心数
|
||||
index = 0
|
||||
|
||||
# 创建并启动进程,尽量平均分配到不同的CPU核心上
|
||||
for i in range(8):
|
||||
cpu_id = i % cpus
|
||||
p = Process(target=task_handler, args=(number_list[index:index + 12500000], result_queue, start_event, cpu_id))
|
||||
index += 12500000
|
||||
processes.append(p)
|
||||
p.start()
|
||||
|
||||
# sleep(5)
|
||||
start = time()
|
||||
|
||||
# 发送开始计算的信号
|
||||
start_event.set()
|
||||
|
||||
# 等待所有进程完成
|
||||
for p in processes:
|
||||
p.join()
|
||||
|
||||
# 计算完成,合并执行结果
|
||||
total = 0
|
||||
while not result_queue.empty():
|
||||
subtotal, proc_start_time, proc_end_time, cpu_id = result_queue.get()
|
||||
total += subtotal
|
||||
# 正确打印每个进程的执行时间和所在CPU
|
||||
print(f'Process on CPU {cpu_id} took: {proc_end_time - proc_start_time:.3f} seconds')
|
||||
end = time()
|
||||
print(total)
|
||||
print('Total Execution time: %.3fs' % (end - start))
|
||||
print('Total execution time: ', (proc_end_time - proc_start_time), 's', sep='')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
```
|
||||
|
|
Loading…
Reference in New Issue