跳至主要內容

多线程

blacklad大约 9 分钟PythonPython

多线程

一、介绍

进程与线程

进程:是一个独立运行的程序,有自己的内存空间、数据栈和其他系统资源。

线程:是进程中的一个执行单元,共享进程的内存空间和资源。一个进程可以包含多个线程,它们之间共享数据和状态。也是操作系统调度的最小单位。

并发与并行

  1. 并发是指在同一时间段内,多个任务在多个线程之间交替执行。Python的多线程主要实现并发。

  2. 并行是指多个任务在多个处理器上同时执行,真正的并行执行。

Python 多线程是指在同一个进程中,通过多个线程并发执行,以提高程序的并发性和响应性。

二、多线程实现

Python提供了threading 模块,封装了底层的 **_thread **模块,提供了更方便的API使用。

线程创建

使用threading模块可以方便地创建和管理线程:

import threading

def print_numbers():
    for i in range(5):
        print(i)

# 创建线程
thread = threading.Thread(target=print_numbers)

# 启动线程
thread.start()

# 等待线程完成
thread.join()
0
1
2
3
4

代码中通过threading.Thread方法,创建了一个线程,指定了线程需要执行的方法。

通过继承的方式创建线程

创建一个自定义线程类,先继承 threading.Thread 类,并重写其 __init__ 方法和 run 方法。

  1. __init__方法:在初始化时可以接收并存储参数。
  2. run方法:定义线程的执行逻辑。在线程启动时,系统会调用run方法。
import threading
import time

# 自定义线程类
class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay

    def run(self):
        print(f"Starting {self.name}")
        for i in range(5):
            time.sleep(self.delay)
            print(f"{self.name}: {i}")
        print(f"Exiting {self.name}")

# 创建线程实例
thread1 = MyThread("Thread-1", 1)
thread2 = MyThread("Thread-2", 2)

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

print("Exiting Main Thread")
Starting Thread-1
Starting Thread-2
Thread-1: 0
Thread-2: 0
Thread-1: 1
Thread-1: 2
Thread-2: 1
Thread-1: 3
Thread-1: 4
Exiting Thread-1
Thread-2: 2
Thread-2: 3
Thread-2: 4
Exiting Thread-2
Exiting Main Thread

代码中创建了两个线程,Thread-1线程每隔1s打印一次, Thread-2线程每隔2s打印一次。

守护线程

守护线程是指在主线程结束时自动退出的线程。可以通过设置daemon=True来创建守护线程。

主线程是只要执行程序就会启动一个主线程执行代码。

import threading
import time

def background_task():
    while True:
        print("Running in the background...")
        time.sleep(1)

# 创建守护线程
daemon_thread = threading.Thread(target=background_task, daemon=True)

# 启动守护线程
daemon_thread.start()

# 主线程等待5秒后结束
time.sleep(5)
print("Main thread ending")
Running in the background...
Running in the background...
Running in the background...
Running in the background...
Running in the background...
Main thread ending

当主线程 sleep 5s退出后,守护线程也会一块退出。

主线程的结束会等待所有的非守护线程结束

三、锁

有了多线程,就会引出一个问题,当多个线程同时读取修改同一个变量,会出现各种意想不到的结果。

task 函数对num加 1000000 次 1,启动两个线程分别对 num 加1000000次 1。按照正常的想法结果肯定是2000000,但结果却完全不同。

import threading

num = 0
def task():
    global num
    for i in range(1000000):
        num += 1

thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)

# 分别开始两个线程
thread1.start()
thread2.start()

# 等待两个线程执行结束
thread1.join()
thread2.join()

print(num)
1358536

多次执行会发现最后的 num 每次结果都不同。

原因

对于 num+=1 实际执行了两个步骤。1.首先读取num的值,2.将 num+1 的值赋值给num

两个线程在并行执行的时候,会出现同时执行第1步,获取到相同的数,再加1赋值。看似各执行了一次,实际结果只加了1次。

互斥锁

互斥锁用于确保多个线程在同一时间内访问共享资源时,不会发生数据竞争和数据不一致问题。用于对共享变量进行写操作的场景。

使用 lock = threading.Lock() 可以创建一个锁。

  1. lock.acquire() 获取锁,只能被一个线程获取到。
  2. lock.release() 释放锁,释放后可以被其他线程获取。
import threading

lock = threading.Lock()

num = 0
def task():
    global num
    for i in range(1000000):
        lock.acquire()
        try:
        	num += 1
        finally:
        	lock.release()

thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)

# 分别开始两个线程
thread1.start()
thread2.start()

# 等待两个线程执行结束
thread1.join()
thread2.join()

print(num)
2000000

加上 lock 不管执行多次,最后的结果都是 2000000。

一般更建议使用 with 关键字,Python会自动获取释放锁。

def task():
    global num
    for i in range(1000000):
        with lock:
            num += 1

递归锁

递归锁允许同一线程多次获取同一个锁,而不会导致死锁。适用于需要在一个锁被持有时再次获取该锁的场景,比如递归函数。通过 threading.RLock()可以创建一个递归锁。

import threading

rlock = threading.RLock()
shared_data = 0

def recursive_function(n):
    global shared_data
    if n <= 0:
        return
    with rlock:
        shared_data += 1
        recursive_function(n-1)

thread = threading.Thread(target=recursive_function, args=(5,))
thread.start()
thread.join()

print(shared_data)

5

信号量

信号量用于控制对资源的访问数量,适用于限制同时访问某资源的线程数量。例如,限制同时访问某数据库连接的线程数。

import threading
import time

semaphore = threading.Semaphore(3)  # 最多允许3个线程同时访问

def access_resource(name):
    with semaphore:
        print(f"{name} is accessing the resource")
        time.sleep(2)
        print(f"{name} is leaving the resource")

threads = [threading.Thread(target=access_resource, args=(f"Thread {i}",)) for i in range(5)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

事件

事件用于实现线程间的通知机制,一个线程等待事件的发生,另一个线程触发该事件。适用于需要协调多个线程的场景。

方法含义
set()将Flag设置为True
wait()阻塞线程
clear()将Flag设置为False
is_set()返回bool值,判断Flag是否为True
import threading
import time

event = threading.Event()

def wait_for_event():
    print("Waiting for event...")
    event.wait()  # 等待事件被触发
    print("Event received!")

def set_event():
    time.sleep(3)
    event.set()  # 触发事件

thread1 = threading.Thread(target=wait_for_event)
thread2 = threading.Thread(target=set_event)

thread1.start()
thread2.start()

thread1.join()
thread2.join()
Waiting for event...
Event received!

线程局部存储

线程局部存储用于在多个线程中存储独立的数据,适用于需要在线程间保持数据独立性的场景。

import threading

thread_local = threading.local()

def process_data():
    thread_local.data = f"Hello {threading.current_thread().name}"
    print(f"{threading.current_thread().name}: {thread_local.data}")

threads = [threading.Thread(target=process_data) for _ in range(3)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
Thread-1: Hello Thread-1
Thread-2: Hello Thread-2
Thread-3: Hello Thread-3

对于同一个变量thread_local,不同的线程中可以存储和读取独有的内容。

四、GIL锁

GIL(Global Interpreter Lock,全局解释器锁)是CPython(Python最广泛使用的实现)中一个互斥锁,用于保护访问Python对象的全局状态。这意味着在任何时候,只有一个线程能够执行Python字节码。

GIL 的主要作用是简化CPython解释器的实现,使其能以线程安全的方式管理内存并执行Python代码。这一设计选择简化了CPython的内存管理,但也带来了一些限制和性能问题。

GIL 的优点

  1. GIL 简化了 CPython 解释器的实现,使得对 CPython 的内存管理和垃圾回收机制的实现更加简单和高效。
  2. GIL 确保了 Python 内置数据结构(如列表和字典)的线程安全,避免了开发者在大多数情况下手动管理线程锁。

GIL 的限制

多线程性能瓶颈

由于GIL的存在,CPython在多线程情况下无法充分利用多核CPU的优势。即使在多核系统上,Python程序也只能在一个CPU上执行Python字节码,导致CPU密集型任务在多线程环境中无法获得显著的性能提升。

线程竞争

GIL会导致线程竞争问题,即多个线程需要频繁获取和释放GIL,增加了上下文切换的开销,进一步影响性能。

GIL主要影响CPU密集型任务。对于I/O密集型任务,如文件读写、网络操作,多线程可以在等待I/O操作完成时释放GIL,从而提高程序的并发性和响应性。

解决方法

  1. 多进程

    使用multiprocessing模块创建多个进程,每个进程都有自己的Python解释器实例和GIL,可以充分利用多核CPU的优势。

  2. C 扩展

    对于计算密集型任务,可以使用C扩展模块来编写核心计算部分,并在C扩展中释放GIL,从而实现并行执行。

五、线程池

线程池是一种设计模式,用于管理和复用一组预先创建的线程,以便执行并发任务。这种机制可以避免频繁创建和销毁线程的开销,提高程序的性能和资源利用率。

线程池作用

  1. 资源复用
    • 线程池中线程的复用减少了线程创建和销毁的开销,提高了系统资源利用率。
  2. 管理并发
    • 通过限制线程池中线程的数量,可以有效控制程序的并发度,避免系统过载。
  3. 简化编程模型
    • 线程池提供了更简单的并发任务管理接口,使得并发编程更加方便和易于维护。

使用

在Python中,concurrent.futures模块提供了一个高层次的接口来实现线程池,通过ThreadPoolExecutor类来管理线程池。以下是使用线程池的一些基本操作和示例。

创建线程池

使用ThreadPoolExecutor类创建线程池。指定最大工作线程数。

提交任务

使用submit方法向线程池提交任务。submit方法返回一个Future对象,表示任务的执行结果。

获取结果

使用Future对象的result方法获取任务的执行结果,可以使用as_completed方法来遍历已完成的任务。

关闭线程池

使用shutdown方法关闭线程池,等待所有任务完成后释放资源。

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * 2

# 创建线程池,最多允许5个线程同时运行
with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任务
    futures = [executor.submit(task, i) for i in range(10)]

    # 获取结果
    for future in futures:
        print(future.result())

print("All tasks completed")
0
2
4
6
8
10
12
14
16
18
All tasks completed
上次编辑于:
贡献者: blacklad