Python-莫烦python学习笔记(multiprocessing)

Python 多进程的初尝试。学习自莫烦python。

正文

2 创建进程

jupyter 只能跟踪主进程,没法跟踪子进程。

python
import multiprocessing as mp
import threading as td
 
def job(a, d):
    print('aaaa')
    
    
if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join()

3 queue 进程输出

python
import multiprocessing as mp
 
def job(q):
    res = 0
    for i in range(1000):
        res += i + i ** 2 + i ** 3
    q.put(res)
    
 
if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1 + res2)
python
499667166000

4 效率对比 multiprocessing, multithreading

使用常规方法, 多核运算, 多线程运算:

python
import multiprocessing as mp
import threading as td
import time
 
 
def job(q):
    res = 0
    for i in range(10_000_000):
        res += i + i ** 2 + i ** 3
    q.put(res)
 
 
def multicore():  # 多核运算(2)
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    return res1 + res2
 
 
def multithread():  # 多线程(2)
    q = mp.Queue()
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    return res1 + res2
 
 
def normal():  # 常规方式
    res = 0
    for _ in range(2):
        for i in range(10_000_000):
            res += i + i ** 2 + i ** 3
    return res
 
 
if __name__ == '__main__':
    st = time.time()
 
    print(normal())
    print('normal time: ', time.time() - st)
 
    st = time.time()
    print(multicore())
    print('multicore time: ', time.time() - st)
 
    st = time.time()
    print(multithread())
    print('multithread time: ', time.time() - st)
 

4999999666666716666660000000 normal time: 12.397605657577515 4999999666666716666660000000 multicore time: 6.265762090682983 4999999666666716666660000000 multithread time: 18.33289623260498

用时: 多线程 > 常规 > 多核

5 进程池 pool

把要运行的东西放到一个 pool 中, python 会帮你解决怎么分配 在 pool 中, job 函数可以拥有返回值

python
import multiprocessing as mp
 
 
def job(x):
    return x * x
 
 
def multicore():
    pool = mp.Pool(processes=3)  # 至多用 3 个核, 默认用全部的核
    res = pool.map(job, range(10))  # 多核运算 0 - 9 的平方, map()可以放入很多参数, 自动分配给定义好的线程
    print(res)
 
    res = pool.apply_async(job, (2,))  # 只能有一个参数, 返回值会存到 res 中
    print(res.get())  # 从 res 中拿出结果
 
    multi_res = [pool.apply_async(job, (i, )) for i in range(10)]  # 使用迭代器达到 map()的效果
    print([res.get() for res in multi_res])
 
 
if __name__ == '__main__':
    multicore()
 

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 4 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

6 共享内存(Cache) shared memory

在多线程中, 可以使用 global(全局变量)共享内存, 但这在多进程中行不通 我们只能用共享内存让不同核的 CPU 进行数据间的交流

shared memory 支持的数据类型

python
import multiprocessing as mp
 
value = mp.Value('i', 1)  # 先传入类型, 再传入值 
array = mp.Array('i', [1, 3, 4])  # 这个数组只能是一维的, 否则会报错

7 lock 锁

未加锁:

python
import multiprocessing as mp
import time
 
 
def job(v, num, name):
    for _ in range(10):
        time.sleep(0.1)
        v.value += num
        print(name + ': ' + str(v.value))
 
 
def multicore():
    v = mp.Value('i', 0)  # 定义一个共享内存
    p1 = mp.Process(target=job, args=(v, 1, 'p1'))
    p2 = mp.Process(target=job, args=(v, 3, 'p2'))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
 
 
if __name__ == '__main__':
    multicore()
 
python
p1: 1
p2: 4
p1: 5
p2: 8
p1: 9
p2: 12
p1: 13
p2: 16
p1: 17
p2: 20
p1: 21
p2: 24
p1: 25
p2: 28
p1: 29
p2: 32
p1: 33
p2: 36
p1: 37
p2: 40

加锁:

python
import multiprocessing as mp
import time
 
 
def job(v, num, name, l):
    l.acquire()  # 上锁
    for _ in range(10):
        time.sleep(0.1)
        v.value += num
        print(name + ': ' + str(v.value))
    l.release()  # 解锁
 
 
def multicore():
    l = mp.Lock()
    v = mp.Value('i', 0)  # 定义一个共享内存
    p1 = mp.Process(target=job, args=(v, 1, 'p1', l))
    p2 = mp.Process(target=job, args=(v, 3, 'p2', l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
 
 
if __name__ == '__main__':
    multicore()
 
python
p1: 1
p1: 2
p1: 3
p1: 4
p1: 5
p1: 6
p1: 7
p1: 8
p1: 9
p1: 10
p2: 13
p2: 16
p2: 19
p2: 22
p2: 25
p2: 28
p2: 31
p2: 34
p2: 37
p2: 40