From c3977819d8995bd48f91b141b00258ace873dc3f Mon Sep 17 00:00:00 2001 From: xianhu Date: Fri, 9 Dec 2016 19:40:26 +0800 Subject: [PATCH] add python_thread_multiprocess.py --- README.md | 2 + python_thread_multiprocess.py | 115 ++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 python_thread_multiprocess.py diff --git a/README.md b/README.md index 6e75767..b8b45a2 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ ### python_coroutine.py: Python进阶: 理解Python中的异步IO和协程(Coroutine), 并应用在爬虫中 ### python_aiohttp.py: Python中最好用的异步爬虫库Aiohttp代码实例 + +### python_thread_multiprocess.py: Python进阶: 聊聊IO密集型任务、计算密集型任务,以及多线程、多进程 =================================================================================================== ### 您可以fork该项目,并在修改后提交Pull request diff --git a/python_thread_multiprocess.py b/python_thread_multiprocess.py new file mode 100644 index 0000000..fb5365d --- /dev/null +++ b/python_thread_multiprocess.py @@ -0,0 +1,115 @@ +# _*_ coding: utf-8 _*_ + +""" +python_thread_multiprocee.py by xianhu +""" + +import time +import threading +import multiprocessing + +# 定义全局变量Queue +g_queue = multiprocessing.Queue() +g_search_list = list(range(10000)) + + +# 定义一个IO密集型任务:利用time.sleep() +def task_io(task_id): + print("IOTask[%s] start" % task_id) + while not g_queue.empty(): + time.sleep(1) + try: + data = g_queue.get(block=True, timeout=1) + print("IOTask[%s] get data: %s" % (task_id, data)) + except Exception as excep: + print("IOTask[%s] error: %s" % (task_id, str(excep))) + print("IOTask[%s] end" % task_id) + return + + +# 定义一个计算密集型任务:利用一些复杂加减乘除、列表查找等 +def task_cpu(task_id): + print("CPUTask[%s] start" % task_id) + while not g_queue.empty(): + count = 0 + for i in range(10000): + count += pow(3*2, 3*2) if i in g_search_list else 0 + try: + data = g_queue.get(block=True, timeout=1) + print("CPUTask[%s] get data: %s" % (task_id, data)) + except Exception as excep: + print("CPUTask[%s] error: %s" % (task_id, str(excep))) + print("CPUTask[%s] end" % task_id) + return task_id + + +def init_queue(): + print("init g_queue start") + while not g_queue.empty(): + g_queue.get() + for _index in range(10): + g_queue.put(_index) + print("init g_queue end") + return + + +if __name__ == '__main__': + print("cpu count:", multiprocessing.cpu_count(), "\n") + + print("========== 直接执行IO密集型任务 ==========") + init_queue() + time_0 = time.time() + task_io(0) + print("结束:", time.time() - time_0, "\n") + + print("========== 多线程执行IO密集型任务 ==========") + init_queue() + time_0 = time.time() + thread_list = [threading.Thread(target=task_io, args=(i,)) for i in range(5)] + for t in thread_list: + t.start() + for t in thread_list: + if t.is_alive(): + t.join() + print("结束:", time.time() - time_0, "\n") + + print("========== 多进程执行IO密集型任务 ==========") + init_queue() + time_0 = time.time() + process_list = [multiprocessing.Process(target=task_io, args=(i,)) for i in range(multiprocessing.cpu_count())] + for p in process_list: + p.start() + for p in process_list: + if p.is_alive(): + p.join() + print("结束:", time.time() - time_0, "\n") + + print("========== 直接执行CPU密集型任务 ==========") + init_queue() + time_0 = time.time() + task_cpu(0) + print("结束:", time.time() - time_0, "\n") + + print("========== 多线程执行CPU密集型任务 ==========") + init_queue() + time_0 = time.time() + thread_list = [threading.Thread(target=task_cpu, args=(i,)) for i in range(5)] + for t in thread_list: + t.start() + for t in thread_list: + if t.is_alive(): + t.join() + print("结束:", time.time() - time_0, "\n") + + print("========== 多进程执行cpu密集型任务 ==========") + init_queue() + time_0 = time.time() + process_list = [multiprocessing.Process(target=task_cpu, args=(i,)) for i in range(multiprocessing.cpu_count())] + for p in process_list: + p.start() + for p in process_list: + if p.is_alive(): + p.join() + print("结束:", time.time() - time_0, "\n") + + exit()