Python各类并发模版
总结Python所有并发模式,多进程并发、多线程并发、协程并发、多进程+线程并发、多进程+协程并发,所有的并发以并发池开发,方便可控。希望有所帮助:)
进程并发
from multiprocessing import Pool, Manager
def func(d, results):
res = d + 1
print(res)
results.append(res)
if __name__ == "__main__":
num = 5
data = range(40)
print(data)
pool = Pool(processes=num)
manager = Manager()
results = manager.list()
jobs = []
for d in data:
job = pool.apply_async(func, (d, results))
jobs.append(job)
pool.close()
pool.join()
print(results)
多进程并发,通过Manager类共享资源,传递结果。windows下,manager必须建立在main函数下。为了保持平台兼容性,写成如上。
python3,python2通用,只是python3的执行结果有一定的乱序,需要注意。
$ python2 exp1.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:27:07]
$ python3 exp1.py
range(0, 40)
2
5
3
4
1
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[2, 3, 5, 1, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 16, 18, 19, 20, 21, 22, 24, 23, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
线程并发
from multiprocessing.pool import ThreadPool
def func(d):
res = d + 1
print(res)
return res
def ThreadPools():
num = 5
data = range(40)
print(data)
jobs = []
results = []
pool = ThreadPool(num)
for d in data:
job = pool.apply_async(func, (d,))
jobs.append(job)
pool.close()
pool.join()
for i in jobs:
results.append(i.get())
print(results)
if __name__ == '__main__':
Threadpools()
python的多线程一般比较鸡肋。一般而言,在IO密集型下,协程会比其他并发操作好很多。而在CPU密集型下,进程并发比其他并发操作要好。而线程在IO密集型下,在并发数低的情况下,与协程差不多,并发高时,线程速率远远低于协程。CPU密集型下,线程的速度和协程差不多。
个人而言,线程比多进程更好的一点在于其参数传递回更方便一些。
python3与python2整体执行效果差距不大,如下:
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:34:21]
$ python2 exp2.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
1
2
3
4
5
6
7
89
10
11
12
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:34:24]
$ python3 exp2.py
range(0, 40)
1
2
3
5
7
8
9
10
11
12
13
14
6
16
15
18
19
20
17
21
4
22
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
23
24
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
协程并发
协程,又称微线程。与线程的区别在于,其并发时协同式的调度,线程是抢占式调度。协程在处理IO密集型操作时,非常有优势。
import gevent
from gevent import monkey, pool; monkey.patch_all()
from gevent import Timeout
def func(d):
res = d + 1
print(res)
return res
def GeventPools():
num = 8
data = range(40)
print(data)
results = []
p = pool.Pool(num)
timer = Timeout(60 * 1200).start() # Execute up to 120 minutes per coroutine
jobs = []
for d in data:
job = p.spawn(func, d)
jobs.append(job)
try:
gevent.joinall(jobs) # wait all jobs done
except Timeout:
print("[-] Time out....")
except Exception as e:
print("[-] error:{}".format(e))
finally:
pass
for i in jobs:
results.append(i.get())
print(results)
if __name__=='__main__':
GeventPools()
python2版本并发,利用gevent.
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:53:40]
$ python2 exp3.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
Python3版本:
import asyncio
import asyncpool
import logging
import functools
def func(d):
res = d + 1
print(res)
return res
def asyncmul():
async def worker_coro(data, result_queue):
# print("Processing Value! -> {}".format(data))
results = await loop.run_in_executor(None, functools.partial(func, data))
await result_queue.put(results)
async def result_reader(queue):
while True:
value = await queue.get()
if value is None:
break
results.append(value)
# print("Got value! -> {}".format(value))
async def run():
result_queue = asyncio.Queue()
reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
# Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
logger=logging.getLogger("WorkerPool"),
worker_co=worker_coro, max_task_time=5 * 60,
log_every_n=10) as pool:
for d in data:
await pool.push(d, result_queue)
await result_queue.put(None)
await reader_future
num = 8
data = range(40)
print(data)
results = []
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
print(results)
asyncmul()
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [15:37:36] C:1
$ python3 exp4.py
range(0, 40)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
进程+线程并发
from multiprocessing import Pool, Manager
from multiprocessing.pool import ThreadPool
def func(d):
res = d * d
print(res)
return res
def thread_pools(l, results):
num = 3
print(l)
jobs = []
tmp_results = []
pool = ThreadPool(num)
for i in l:
job = pool.apply_async(func, (i,))
jobs.append(job)
pool.close()
pool.join()
for i in jobs:
tmp_results.append(i.get())
print(tmp_results)
results.append(tmp_results)
if __name__ == "__main__":
num = 3
data = [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
print(data)
pool = Pool(processes=num)
manager = Manager()
results = manager.list()
jobs = []
for l in data:
job = pool.apply_async(thread_pools, (l, results))
jobs.append(job)
pool.close()
pool.join()
print(results)
CPU密集型,采用此类并发。
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [15:50:25]
$ python exp5.py
[[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
[1, 2, 3]
1
4
9
[3, 4, 5]
9
16
25
[5, 6, 7]
25
36
49
[1, 4, 9]
[7, 8, 9]
49
64
81
[9, 16, 25]
[9, 10, 11]
81
100
121
[25, 36, 49]
[49, 64, 81]
[81, 100, 121]
[[1, 4, 9], [9, 16, 25], [25, 36, 49], [49, 64, 81], [81, 100, 121]]
进程+协程并发
import gevent
from gevent import monkey, pool; #monkey.patch_all(thread=False)
from gevent import Timeout
from multiprocessing import Pool, Manager
def func(d):
res = d * d
print(res)
return res
def GeventPools(l, results):
print(l)
num = 3
tmp_results = []
p = pool.Pool(num)
timer = Timeout(60 * 1200).start() # Execute up to 120 minutes per coroutine
jobs = []
for i in l:
job = p.spawn(func, i)
jobs.append(job)
try:
gevent.joinall(jobs) # wait all jobs done
except Timeout:
print("[-] Time out....")
except Exception as e:
print("[-] error:{}".format(e))
finally:
pass
for i in jobs:
tmp_results.append(i.get())
print(tmp_results)
results.append(tmp_results)
if __name__ == "__main__":
num = 3
data = [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
print(data)
pool = Pool(processes=num)
manager = Manager()
results = manager.list()
jobs = []
for l in data:
job = pool.apply_async(GeventPools, (l, results))
jobs.append(job)
pool.close()
pool.join()
print(results)
此时,会出现gevent与进程之间的管道冲突,从而阻塞。
具体参考:http://xiaorui.cc/2017/08/01/%E5%A4%9A%E8%BF%9B%E7%A8%8B%E4%B8%8Bgevent%E9%81%87%E5%88%B0%E7%AE%A1%E9%81%93%E5%86%B2%E7%AA%81%E9%97%AE%E9%A2%98/
这里暂时先暴力关闭,后面再看一下。
升级到Python3版本。该版本不会有管道冲突。
import asyncio
import asyncpool
import logging
import functools
from multiprocessing import Pool, Manager
def func(d):
res = d + 1
print(res)
return res
def asyncmul(l,results):
async def worker_coro(data, result_queue):
# print("Processing Value! -> {}".format(data))
results = await loop.run_in_executor(None, functools.partial(func, data))
await result_queue.put(results)
async def result_reader(queue):
while True:
value = await queue.get()
if value is None:
break
results.append(value)
# print("Got value! -> {}".format(value))
async def run():
result_queue = asyncio.Queue()
reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
# Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
logger=logging.getLogger("WorkerPool"),
worker_co=worker_coro, max_task_time=5 * 60,
log_every_n=10) as pool:
for d in l:
await pool.push(d, result_queue)
await result_queue.put(None)
await reader_future
# async num workers
num = 8
print(l)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == "__main__":
num = 3
data = [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
print(data)
pool = Pool(processes=num)
manager = Manager()
results = manager.list()
jobs = []
for l in data:
job = pool.apply_async(asyncmul, (l, results))
jobs.append(job)
pool.close()
pool.join()
print(results)
运行截图
$ python3 exp3.py
[[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
[1, 2, 3]
[5, 6, 7]
[3, 4, 5]
2
4
6
7
5
3
8
6
4
[7, 8, 9]
[9, 10, 11]
8
10
9
11
10
12
[2, 6, 4, 3, 7, 5, 4, 6, 8, 8, 10, 9, 10, 11, 12]