Python各类并发模版

Posted by Xiaoxi on March 12, 2019

Python各类并发模版

总结Python所有并发模式,多进程并发、多线程并发、协程并发、多进程+线程并发、多进程+协程并发,所有的并发以并发池开发,方便可控。希望有所帮助:)

进程并发

from multiprocessing import Pool, Manager


def func(d, results):
    res = d + 1
    print(res)
    results.append(res)


if __name__ == "__main__":
    num = 5
    data = range(40)
    print(data)
    pool = Pool(processes=num)
    manager = Manager()
    results = manager.list()
    jobs = []
    for d in data:
        job = pool.apply_async(func, (d, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

多进程并发,通过Manager类共享资源,传递结果。windows下,manager必须建立在main函数下。为了保持平台兼容性,写成如上。

python3,python2通用,只是python3的执行结果有一定的乱序,需要注意。


$ python2 exp1.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:27:07]
$ python3 exp1.py
range(0, 40)
2
5
3
4
1
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[2, 3, 5, 1, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 16, 18, 19, 20, 21, 22, 24, 23, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

线程并发

from multiprocessing.pool import ThreadPool


def func(d):
    res = d + 1
    print(res)
    return res


def ThreadPools():
    num = 5
    data = range(40)
    print(data)
    jobs = []
    results = []
    pool = ThreadPool(num)
    for d in data:
        job = pool.apply_async(func, (d,))
        jobs.append(job)
    pool.close()
    pool.join()
    for i in jobs:
        results.append(i.get())
    print(results)


if __name__ == '__main__':
    Threadpools()

python的多线程一般比较鸡肋。一般而言,在IO密集型下,协程会比其他并发操作好很多。而在CPU密集型下,进程并发比其他并发操作要好。而线程在IO密集型下,在并发数低的情况下,与协程差不多,并发高时,线程速率远远低于协程。CPU密集型下,线程的速度和协程差不多。

个人而言,线程比多进程更好的一点在于其参数传递回更方便一些。

python3与python2整体执行效果差距不大,如下:

# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:34:21]
$ python2 exp2.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
1
 2
3
4
5
6
7
 89

10
11
12
13
14
15
16
17
18
19
20
 2122

23
24
25
26
 27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:34:24]
$ python3 exp2.py
range(0, 40)
1
2
3
5
7
8
9
10
11
12
13
14
6
16
15
18
19
20
17
21
4
22
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
23
24
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

协程并发

协程,又称微线程。与线程的区别在于,其并发时协同式的调度,线程是抢占式调度。协程在处理IO密集型操作时,非常有优势。

import gevent
from gevent import monkey, pool; monkey.patch_all()
from gevent import Timeout

def func(d):
    res = d + 1
    print(res)
    return res


def GeventPools():
    num = 8
    data = range(40)
    print(data)
    results = []
    p = pool.Pool(num)
    timer = Timeout(60 * 1200).start()  # Execute up to 120 minutes per coroutine
    
    jobs = []
    for d in data:
        job = p.spawn(func, d)
        jobs.append(job)
    try:
        gevent.joinall(jobs)  # wait all jobs done
        
    except Timeout:
        print("[-] Time out....")
    except Exception as e:
        print("[-] error:{}".format(e))
    finally:
        pass
    for i in jobs:
        results.append(i.get())
    print(results)

if __name__=='__main__':
    GeventPools()

python2版本并发,利用gevent.

# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [14:53:40]
$ python2 exp3.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

Python3版本:

import asyncio
import asyncpool
import logging
import functools



def func(d):
    res = d + 1
    print(res)
    return res


def asyncmul():
    async def worker_coro(data, result_queue):
        # print("Processing Value! -> {}".format(data))
        
        results = await loop.run_in_executor(None, functools.partial(func, data))
        await result_queue.put(results)

    async def result_reader(queue):
        while True:
            value = await queue.get()
            if value is None:
                break
            results.append(value)
            # print("Got value! -> {}".format(value))

    async def run():
        result_queue = asyncio.Queue()
        reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
        # Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
        
        async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
                                       logger=logging.getLogger("WorkerPool"),
                                       worker_co=worker_coro, max_task_time=5 * 60,
                                       log_every_n=10) as pool:
            for d in data:
                await pool.push(d, result_queue)

        await result_queue.put(None)
        await reader_future

    num = 8
    data = range(40)
    print(data)
    results = []
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    print(results)

asyncmul()
# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [15:37:36] C:1

$ python3 exp4.py
range(0, 40)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]

进程+线程并发

from multiprocessing import Pool, Manager
from multiprocessing.pool import ThreadPool


def func(d):
    res = d * d
    print(res)
    return res


def thread_pools(l, results):
    num = 3
    print(l)
    jobs = []
    tmp_results = []
    pool = ThreadPool(num)
    for i in l:
        job = pool.apply_async(func, (i,))
        jobs.append(job)
    pool.close()
    pool.join()
    for i in jobs:
        tmp_results.append(i.get())
    print(tmp_results)
    results.append(tmp_results)


if __name__ == "__main__":
    num = 3
    data = [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
    print(data)
    pool = Pool(processes=num)
    manager = Manager()
    results = manager.list()
    jobs = []
    for l in data:
        job = pool.apply_async(thread_pools, (l, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

CPU密集型,采用此类并发。

# moxiaoxi @ moxiaoxideMacBook-Pro-2 in ~/Desktop/python [15:50:25]

$ python exp5.py
[[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
[1, 2, 3]
1
4
9
[3, 4, 5]
9
 16
25
[5, 6, 7]
25
 36
49
[1, 4, 9]
[7, 8, 9]
49
64
81
[9, 16, 25]
[9, 10, 11]
81
100
121
[25, 36, 49]
[49, 64, 81]
[81, 100, 121]
[[1, 4, 9], [9, 16, 25], [25, 36, 49], [49, 64, 81], [81, 100, 121]]

进程+协程并发

import gevent
from gevent import monkey, pool; #monkey.patch_all(thread=False)

from gevent import Timeout
from multiprocessing import Pool, Manager



def func(d):
    res = d * d
    print(res)
    return res

def GeventPools(l, results):
    print(l)
    num = 3
    tmp_results = []
    p = pool.Pool(num)
    timer = Timeout(60 * 1200).start()  # Execute up to 120 minutes per coroutine
    
    jobs = []
    for i in l:
        job = p.spawn(func, i)
        jobs.append(job)
    try:
        gevent.joinall(jobs)  # wait all jobs done
        
    except Timeout:
        print("[-] Time out....")
    except Exception as e:
        print("[-] error:{}".format(e))
    finally:
        pass
    for i in jobs:
        tmp_results.append(i.get())
    print(tmp_results)
    results.append(tmp_results)

if __name__ == "__main__":
    num = 3
    data = [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
    print(data)
    pool = Pool(processes=num)
    manager = Manager()
    results = manager.list()
    jobs = []
    for l in data:
        job = pool.apply_async(GeventPools, (l, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

此时,会出现gevent与进程之间的管道冲突,从而阻塞。

具体参考:http://xiaorui.cc/2017/08/01/%E5%A4%9A%E8%BF%9B%E7%A8%8B%E4%B8%8Bgevent%E9%81%87%E5%88%B0%E7%AE%A1%E9%81%93%E5%86%B2%E7%AA%81%E9%97%AE%E9%A2%98/

这里暂时先暴力关闭,后面再看一下。

升级到Python3版本。该版本不会有管道冲突。

import asyncio
import asyncpool
import logging
import functools
from multiprocessing import Pool, Manager


def func(d):
    res = d + 1
    print(res)
    return res


def asyncmul(l,results):
    async def worker_coro(data, result_queue):
        # print("Processing Value! -> {}".format(data))
        
        results = await loop.run_in_executor(None, functools.partial(func, data))
        await result_queue.put(results)

    async def result_reader(queue):
        while True:
            value = await queue.get()
            if value is None:
                break
            results.append(value)
            # print("Got value! -> {}".format(value))

    async def run():
        result_queue = asyncio.Queue()
        reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
        # Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
        
        async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
                                       logger=logging.getLogger("WorkerPool"),
                                       worker_co=worker_coro, max_task_time=5 * 60,
                                       log_every_n=10) as pool:
            for d in l:
                await pool.push(d, result_queue)

        await result_queue.put(None)
        await reader_future
    # async num workers
    
    num = 8
    print(l)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

if __name__ == "__main__":
    num = 3
    data = [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
    print(data)
    pool = Pool(processes=num)
    manager = Manager()
    results = manager.list()
    jobs = []
    for l in data:
        job = pool.apply_async(asyncmul, (l, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

运行截图

$ python3 exp3.py
[[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8, 9], [9, 10, 11]]
[1, 2, 3]
[5, 6, 7]
[3, 4, 5]
2
4
6
7
5
3
8
6
4
[7, 8, 9]
[9, 10, 11]
8
10
9
11
10
12
[2, 6, 4, 3, 7, 5, 4, 6, 8, 8, 10, 9, 10, 11, 12]