正文
2 创建进程
jupyter只能跟踪主进程,没法跟踪子进程。
import multiprocessing as mp import threading as td
def job(a, d): print('aaaa') if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) p1.start() p1.join()
|
3 queue 进程输出
import multiprocessing as mp
def job(q): res = 0 for i in range(1000): res += i + i ** 2 + i ** 3 q.put(res)
if __name__ == '__main__': q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print(res1 + res2)
|
4 效率对比 multiprocessing, multithreading
使用常规方法, 多核运算, 多线程运算:
import multiprocessing as mp import threading as td import time
def job(q): res = 0 for i in range(10_000_000): res += i + i ** 2 + i ** 3 q.put(res)
def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() return res1 + res2
def multithread(): q = mp.Queue() t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() return res1 + res2
def normal(): res = 0 for _ in range(2): for i in range(10_000_000): res += i + i ** 2 + i ** 3 return res
if __name__ == '__main__': st = time.time()
print(normal()) print('normal time: ', time.time() - st)
st = time.time() print(multicore()) print('multicore time: ', time.time() - st)
st = time.time() print(multithread()) print('multithread time: ', time.time() - st)
|
4999999666666716666660000000
normal time: 12.397605657577515
4999999666666716666660000000
multicore time: 6.265762090682983
4999999666666716666660000000
multithread time: 18.33289623260498
用时: 多线程 > 常规 > 多核
5 进程池 pool
把要运行的东西放到一个pool中, python会帮你解决怎么分配
在pool中, job函数可以拥有返回值
import multiprocessing as mp
def job(x): return x * x
def multicore(): pool = mp.Pool(processes=3) res = pool.map(job, range(10)) print(res)
res = pool.apply_async(job, (2,)) print(res.get())
multi_res = [pool.apply_async(job, (i, )) for i in range(10)] print([res.get() for res in multi_res])
if __name__ == '__main__': multicore()
|
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
6 共享内存(Cache) shared memory
在多线程中, 可以使用global(全局变量)共享内存, 但这在多进程中行不通
我们只能用共享内存让不同核的CPU进行数据间的交流
shared memory支持的数据类型
import multiprocessing as mp
value = mp.Value('i', 1) array = mp.Array('i', [1, 3, 4])
|
7 lock 锁
未加锁:
import multiprocessing as mp import time
def job(v, num, name): for _ in range(10): time.sleep(0.1) v.value += num print(name + ': ' + str(v.value))
def multicore(): v = mp.Value('i', 0) p1 = mp.Process(target=job, args=(v, 1, 'p1')) p2 = mp.Process(target=job, args=(v, 3, 'p2')) p1.start() p2.start() p1.join() p2.join()
if __name__ == '__main__': multicore()
|
p1: 1 p2: 4 p1: 5 p2: 8 p1: 9 p2: 12 p1: 13 p2: 16 p1: 17 p2: 20 p1: 21 p2: 24 p1: 25 p2: 28 p1: 29 p2: 32 p1: 33 p2: 36 p1: 37 p2: 40
|
加锁:
import multiprocessing as mp import time
def job(v, num, name, l): l.acquire() for _ in range(10): time.sleep(0.1) v.value += num print(name + ': ' + str(v.value)) l.release()
def multicore(): l = mp.Lock() v = mp.Value('i', 0) p1 = mp.Process(target=job, args=(v, 1, 'p1', l)) p2 = mp.Process(target=job, args=(v, 3, 'p2', l)) p1.start() p2.start() p1.join() p2.join()
if __name__ == '__main__': multicore()
|
p1: 1 p1: 2 p1: 3 p1: 4 p1: 5 p1: 6 p1: 7 p1: 8 p1: 9 p1: 10 p2: 13 p2: 16 p2: 19 p2: 22 p2: 25 p2: 28 p2: 31 p2: 34 p2: 37 p2: 40
|