正文
2 创建进程
jupyter 只能跟踪主进程,没法跟踪子进程。
import multiprocessing as mp
import threading as td
def job(a, d):
print('aaaa')
if __name__ == '__main__':
p1 = mp.Process(target=job, args=(1, 2))
p1.start()
p1.join()3 queue 进程输出
import multiprocessing as mp
def job(q):
res = 0
for i in range(1000):
res += i + i ** 2 + i ** 3
q.put(res)
if __name__ == '__main__':
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1 + res2)4996671660004 效率对比 multiprocessing, multithreading
使用常规方法, 多核运算, 多线程运算:
import multiprocessing as mp
import threading as td
import time
def job(q):
res = 0
for i in range(10_000_000):
res += i + i ** 2 + i ** 3
q.put(res)
def multicore(): # 多核运算(2)
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
return res1 + res2
def multithread(): # 多线程(2)
q = mp.Queue()
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
return res1 + res2
def normal(): # 常规方式
res = 0
for _ in range(2):
for i in range(10_000_000):
res += i + i ** 2 + i ** 3
return res
if __name__ == '__main__':
st = time.time()
print(normal())
print('normal time: ', time.time() - st)
st = time.time()
print(multicore())
print('multicore time: ', time.time() - st)
st = time.time()
print(multithread())
print('multithread time: ', time.time() - st)
4999999666666716666660000000 normal time: 12.397605657577515 4999999666666716666660000000 multicore time: 6.265762090682983 4999999666666716666660000000 multithread time: 18.33289623260498
用时: 多线程 > 常规 > 多核
5 进程池 pool
把要运行的东西放到一个 pool 中, python 会帮你解决怎么分配 在 pool 中, job 函数可以拥有返回值
import multiprocessing as mp
def job(x):
return x * x
def multicore():
pool = mp.Pool(processes=3) # 至多用 3 个核, 默认用全部的核
res = pool.map(job, range(10)) # 多核运算 0 - 9 的平方, map()可以放入很多参数, 自动分配给定义好的线程
print(res)
res = pool.apply_async(job, (2,)) # 只能有一个参数, 返回值会存到 res 中
print(res.get()) # 从 res 中拿出结果
multi_res = [pool.apply_async(job, (i, )) for i in range(10)] # 使用迭代器达到 map()的效果
print([res.get() for res in multi_res])
if __name__ == '__main__':
multicore()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 4 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
6 共享内存(Cache) shared memory
在多线程中, 可以使用 global(全局变量)共享内存, 但这在多进程中行不通 我们只能用共享内存让不同核的 CPU 进行数据间的交流
import multiprocessing as mp
value = mp.Value('i', 1) # 先传入类型, 再传入值
array = mp.Array('i', [1, 3, 4]) # 这个数组只能是一维的, 否则会报错7 lock 锁
未加锁:
import multiprocessing as mp
import time
def job(v, num, name):
for _ in range(10):
time.sleep(0.1)
v.value += num
print(name + ': ' + str(v.value))
def multicore():
v = mp.Value('i', 0) # 定义一个共享内存
p1 = mp.Process(target=job, args=(v, 1, 'p1'))
p2 = mp.Process(target=job, args=(v, 3, 'p2'))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
p1: 1
p2: 4
p1: 5
p2: 8
p1: 9
p2: 12
p1: 13
p2: 16
p1: 17
p2: 20
p1: 21
p2: 24
p1: 25
p2: 28
p1: 29
p2: 32
p1: 33
p2: 36
p1: 37
p2: 40加锁:
import multiprocessing as mp
import time
def job(v, num, name, l):
l.acquire() # 上锁
for _ in range(10):
time.sleep(0.1)
v.value += num
print(name + ': ' + str(v.value))
l.release() # 解锁
def multicore():
l = mp.Lock()
v = mp.Value('i', 0) # 定义一个共享内存
p1 = mp.Process(target=job, args=(v, 1, 'p1', l))
p2 = mp.Process(target=job, args=(v, 3, 'p2', l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
p1: 1
p1: 2
p1: 3
p1: 4
p1: 5
p1: 6
p1: 7
p1: 8
p1: 9
p1: 10
p2: 13
p2: 16
p2: 19
p2: 22
p2: 25
p2: 28
p2: 31
p2: 34
p2: 37
p2: 40