正文
2 创建进程
jupyter 只能跟踪主进程,没法跟踪子进程。
1 2 3 4 5 6 7 8 9 10 11
| import multiprocessing as mp import threading as td
def job(a, d): print('aaaa') if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) p1.start() p1.join()
|
3 queue 进程输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import multiprocessing as mp
def job(q): res = 0 for i in range(1000): res += i + i ** 2 + i ** 3 q.put(res)
if __name__ == '__main__': q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print(res1 + res2)
|
4 效率对比 multiprocessing, multithreading
使用常规方法, 多核运算, 多线程运算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| import multiprocessing as mp import threading as td import time
def job(q): res = 0 for i in range(10_000_000): res += i + i ** 2 + i ** 3 q.put(res)
def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() return res1 + res2
def multithread(): q = mp.Queue() t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() return res1 + res2
def normal(): res = 0 for _ in range(2): for i in range(10_000_000): res += i + i ** 2 + i ** 3 return res
if __name__ == '__main__': st = time.time()
print(normal()) print('normal time: ', time.time() - st)
st = time.time() print(multicore()) print('multicore time: ', time.time() - st)
st = time.time() print(multithread()) print('multithread time: ', time.time() - st)
|
4999999666666716666660000000
normal time: 12.397605657577515
4999999666666716666660000000
multicore time: 6.265762090682983
4999999666666716666660000000
multithread time: 18.33289623260498
用时: 多线程 > 常规 > 多核
5 进程池 pool
把要运行的东西放到一个 pool 中, python 会帮你解决怎么分配
在 pool 中, job 函数可以拥有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import multiprocessing as mp
def job(x): return x * x
def multicore(): pool = mp.Pool(processes=3) res = pool.map(job, range(10)) print(res)
res = pool.apply_async(job, (2,)) print(res.get())
multi_res = [pool.apply_async(job, (i, )) for i in range(10)] print([res.get() for res in multi_res])
if __name__ == '__main__': multicore()
|
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
6 共享内存(Cache) shared memory
在多线程中, 可以使用 global(全局变量)共享内存, 但这在多进程中行不通
我们只能用共享内存让不同核的 CPU 进行数据间的交流
shared memory 支持的数据类型
1 2 3 4
| import multiprocessing as mp
value = mp.Value('i', 1) array = mp.Array('i', [1, 3, 4])
|
7 lock 锁
未加锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import multiprocessing as mp import time
def job(v, num, name): for _ in range(10): time.sleep(0.1) v.value += num print(name + ': ' + str(v.value))
def multicore(): v = mp.Value('i', 0) p1 = mp.Process(target=job, args=(v, 1, 'p1')) p2 = mp.Process(target=job, args=(v, 3, 'p2')) p1.start() p2.start() p1.join() p2.join()
if __name__ == '__main__': multicore()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| p1: 1 p2: 4 p1: 5 p2: 8 p1: 9 p2: 12 p1: 13 p2: 16 p1: 17 p2: 20 p1: 21 p2: 24 p1: 25 p2: 28 p1: 29 p2: 32 p1: 33 p2: 36 p1: 37 p2: 40
|
加锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import multiprocessing as mp import time
def job(v, num, name, l): l.acquire() for _ in range(10): time.sleep(0.1) v.value += num print(name + ': ' + str(v.value)) l.release()
def multicore(): l = mp.Lock() v = mp.Value('i', 0) p1 = mp.Process(target=job, args=(v, 1, 'p1', l)) p2 = mp.Process(target=job, args=(v, 3, 'p2', l)) p1.start() p2.start() p1.join() p2.join()
if __name__ == '__main__': multicore()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| p1: 1 p1: 2 p1: 3 p1: 4 p1: 5 p1: 6 p1: 7 p1: 8 p1: 9 p1: 10 p2: 13 p2: 16 p2: 19 p2: 22 p2: 25 p2: 28 p2: 31 p2: 34 p2: 37 p2: 40
|