跳至主要內容

多进程

blacklad大约 3 分钟PythonPython

多进程

一、介绍

多进程是指在同一程序中创建多个独立的进程来执行任务。每个进程都有自己独立的内存空间,相互之间不干扰。

因为 GIL 锁的存在,对于 CPU 密集型任务(例如计算密集型操作),使用多进程可以提高程序的效率。

优点

  1. 可以利用多核 CPU,提高计算效率。
  2. 每个进程独立运行,安全性高。

二、使用

Python 提供了 multiprocessing 模块来实现多进程。

  1. multiprocessing.Process 用于创建进程。target 参数指定了进程要执行的函数,args 参数传递给该函数的参数。
  2. 使用 start() 方法启动进程。
  3. join() 方法使主进程等待子进程完成。
import multiprocessing
import time

def worker(name):
    print(f"Worker {name} starting")
    time.sleep(2)
    print(f"Worker {name} finished")

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
    print("All workers finished")
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 0 finished
Worker 1 finished
Worker 2 finished
Worker 3 finished
Worker 4 finished
All workers finished

三、进程池

进程池是一种预先创建的一组工作进程,这些进程可以重复使用来执行多个任务。通过进程池,可以避免频繁创建和销毁进程的开销,从而提高效率。

进程池的使用

Python 的 multiprocessing 模块提供了 Pool 类来实现进程池。

from multiprocessing import Pool
import time

def worker(num):
    print(f"Worker {num} starting")
    time.sleep(2)
    print(f"Worker {num} finished")
    return num * num

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        results = pool.map(worker, range(10))
    print("Results:", results)
  1. 创建进程池:使用 Pool(processes=4) 创建一个包含 4 个进程的进程池。
  2. 提交任务pool.map(worker, range(10))worker 函数应用到 range(10) 中的每一个元素,并将任务分配给进程池中的进程去执行。
  3. 获取结果map 方法会阻塞主进程,直到所有任务完成,并返回结果列表。
Worker 0 starting
Worker 1 starting
Worker 0 finished
Worker 1 finished
Worker 2 starting
Worker 3 starting
Worker 2 finished
Worker 4 starting
Worker 3 finished
Worker 4 finished
Results: [0, 1, 4, 9, 16]

常用方法

applyapply_async

  • apply(func, args): 同步执行,类似于普通函数调用,阻塞主进程直到任务完成。
  • apply_async(func, args): 异步执行,不阻塞主进程,通过回调函数获取结果。
from multiprocessing import Pool

def worker(num):
    return num * num

def print_result(result):
    print("Result:", result)

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 同步等待结果
        result = pool.apply(worker, (10,))
        print("Synchronous Result:", result)

        # 异步等待结果
        pool.apply_async(worker, (20,), callback=print_result)

        pool.close()
        pool.join()

mapmap_async

使用该方法传参数时,将iterable的每个元素作为参数,相当于一次提交多个任务。

  • map(func, iterable): 同步映射,阻塞主进程直到所有任务完成,返回结果列表。
  • map_async(func, iterable): 异步映射,不阻塞主进程,通过回调函数获取结果。
from multiprocessing import Pool

def worker(num):
    return num * num

def print_result(results):
    print("Results:", results)

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 同步
        results = pool.map(worker, range(10))
        print("Synchronous Results:", results)

        # 异步
        pool.map_async(worker, range(10), callback=print_result)

        pool.close()
        pool.join()

starmapstarmap_async

  • starmap(func, iterable_of_tuples): 类似于 map,但可以传递多个参数
  • starmap_async(func, iterable_of_tuples): 异步版本的 starmap
from multiprocessing import Pool

def worker(x, y):
    return x * y

def print_result(results):
    print("Results:", results)

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 同步
        results = pool.starmap(worker, [(1, 2), (3, 4), (5, 6)])
        print("Synchronous Results:", results)

        # 异步
        pool.starmap_async(worker, [(1, 2), (3, 4), (5, 6)], callback=print_result)

        pool.close()
        pool.join()

先使用pool.close() 关闭进程池,防止更多的任务提交到该池。才可以执行pool.join() 阻塞主进程,等待所有子进程完成。

上次编辑于:
贡献者: blacklad