十二、多线程、多进程
一、GIL
Python中的GIL(Global Interpreter Lock,全局解释器锁)是一个用于保护解释器内部共享资源的互斥锁。它确保在任何时候只有一个线程可以执行Python字节码,从而防止多线程环境下的竞争条件和数据不一致问题。
GIL的工作原理
- 单线程执行:GIL保证同时只有一个线程在执行Python代码。即使在多核处理器上运行,GIL也会限制Python程序的并行执行。
- 线程切换:Python解释器会定期释放GIL,以允许其他线程运行。这通常是在I/O操作(如文件读写、网络通信)或其他阻塞操作期间发生的。
- C扩展模块:一些C扩展模块可以释放GIL,使得其他线程在执行这些模块的同时可以继续运行。
GIL的影响
- 多线程性能:由于GIL的存在,CPU密集型的Python程序在多线程环境下无法充分利用多核处理器的并行计算能力。这意味着多线程并不能显著提高这些程序的执行速度。
- I/O密集型任务:对于I/O密集型任务(如网络请求、文件操作等),多线程仍然有效,因为这些任务在等待I/O操作完成时,GIL会被释放,其他线程可以继续执行。
解决方案和替代方案
- 多进程:使用多进程而非多线程可以绕过GIL限制。每个进程有自己的Python解释器实例和GIL。Python的
multiprocessing模块提供了多进程支持,适用于CPU密集型任务。 - 异步编程:使用异步编程模型(如
asyncio)可以有效管理I/O密集型任务,而不依赖于多线程。 - C扩展:编写C扩展模块,并在需要时释放GIL,可以提高特定任务的性能。
- 其他解释器:一些Python解释器如Jython(基于Java)和IronPython(基于.NET)没有GIL,可以在多线程环境下更好地利用多核处理器。
二、threading
threading 模块是 Python 标准库中的一个模块,用于创建和管理线程。线程是一种轻量级的进程,可以并发执行多个任务。以下是 threading 模块的一些关键概念和功能:
1. 创建线程
创建线程的基本方法是使用 threading.Thread 类。你可以通过传递一个函数给 target 参数来创建一个线程,该线程将执行该函数。
import threading
def my_function():
print("Hello from a thread!")
# 创建线程
thread = threading.Thread(target=my_function)
# 启动线程
thread.start()
2. 等待线程结束
可以使用 join 方法等待线程完成执行。
thread.join()
3. 守护线程
守护线程在主线程结束时会自动退出。可以通过设置 daemon 属性来将线程设置为守护线程。
thread = threading.Thread(target=my_function)
thread.daemon = True
thread.start()
4. 线程同步
在多线程环境中,可能需要同步对共享资源的访问。threading 模块提供了多种同步原语,如锁(Lock)、条件变量(Condition)和事件(Event)。
锁(Lock)
锁用于确保某一时刻只有一个线程可以访问共享资源。
lock = threading.Lock()
def thread_safe_function():
with lock:
# 访问共享资源
pass
条件变量(Condition)
条件变量允许线程挂起,直到某个条件被满足。
condition = threading.Condition()
def wait_for_condition():
with condition:
condition.wait() # 等待条件满足
# 执行任务
def notify_condition():
with condition:
condition.notify() # 通知等待的线程
事件(Event)
事件允许线程等待某个事件的发生。
event = threading.Event()
def wait_for_event():
event.wait() # 等待事件被设置
# 执行任务
def set_event():
event.set() # 设置事件
5. 线程局部数据
threading.local 类用于创建线程局部数据,即每个线程都有独立的数据副本。
thread_data = threading.local()
def thread_function():
thread_data.value = 123 # 每个线程都有独立的 `value` 属性
6. 示例
以下是一个综合示例,展示了如何创建线程、使用锁和等待线程结束:
import threading
import time
lock = threading.Lock()
def worker(num):
with lock:
print(f"Thread {num} is starting")
time.sleep(2)
print(f"Thread {num} is finished")
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("All threads are finished")
在这个示例中,我们创建了5个线程,每个线程都会执行 worker 函数。我们使用锁来确保线程安全,并使用 join 方法等待所有线程完成。
三、线程间通信-共享变量和Queue
在多线程编程中,线程间通信是一个重要的课题。Python 提供了几种方式来实现线程间通信,其中共享变量和 Queue 是两种常见的方式。
1. 共享变量
共享变量是指多个线程可以访问和修改同一个变量。这种方法简单直接,但需要注意同步问题,以避免竞争条件(race condition)。通常使用锁(Lock)来保护共享变量。
示例:
import threading
# 共享变量
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = []
for _ in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"Final counter value: {counter}")
在这个示例中,多个线程共享变量 counter,并使用锁来确保每次只有一个线程能够修改 counter,从而避免竞争条件。
2. Queue
Queue 是一个线程安全的队列,适用于生产者-消费者模型。Python 的 queue 模块提供了三种类型的队列:Queue(FIFO队列)、LifoQueue(LIFO队列)和 PriorityQueue(优先级队列)。
示例:
import threading
import queue
import time
# 创建队列
q = queue.Queue()
def producer():
for i in range(5):
item = f"item {i}"
q.put(item)
print(f"Produced {item}")
time.sleep(1)
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Consumed {item}")
q.task_done()
# 创建生产者线程
producer_thread = threading.Thread(target=producer)
producer_thread.start()
# 创建消费者线程
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
# 等待生产者线程完成
producer_thread.join()
# 向队列发送停止信号
q.put(None)
#q.task_done()
# 等待消费者线程完成
consumer_thread.join()
print("All items have been consumed")
在这个示例中,生产者线程将项目放入队列,消费者线程从队列中取出项目并处理它们。队列确保了线程安全,因此不需要显式的锁来保护队列操作。
总结
- 共享变量:简单直接,但需要使用锁来确保线程安全,适用于简单的共享数据场景。
- Queue:适用于生产者-消费者模型,提供线程安全的队列操作,简化了线程间的数据传递和同步问题。
选择哪种方法取决于具体的应用场景和需求。对于简单的共享数据,使用共享变量和锁可能更合适。而对于复杂的生产者-消费者模型,Queue 提供了更方便和安全的解决方案。
四、线程同步
在Python中,Lock 和 RLock 都是用于线程同步的锁机制,它们都在 threading 模块中定义。虽然它们的用途相似,但它们有一些关键的区别:
Lock(互斥锁)
- 基本功能:
Lock 是一个简单的互斥锁。它有两个状态:锁定和未锁定。 - 使用方法:一个线程可以调用
acquire() 方法来获取锁,如果锁已经被其他线程持有,则该线程会被阻塞,直到锁被释放。锁释放是通过release() 方法完成的。 - 不可重入:如果一个线程已经持有了锁,再次尝试获取锁会导致死锁,因为同一个线程不能再次获取已经被它持有的锁。
示例代码:
import threading
lock = threading.Lock()
def thread_function():
lock.acquire()
try:
# 进行线程安全的操作
print("Lock acquired")
finally:
lock.release()
print("Lock released")
thread = threading.Thread(target=thread_function)
thread.start()
thread.join()
RLock(可重入锁)
- 基本功能:
RLock 是一个可重入锁,允许同一个线程多次获取它而不会导致死锁。 - 使用方法:一个线程可以多次调用
acquire() 方法来获取锁,每次获取锁计数器会增加1。相应地,每次调用release() 方法计数器会减少1,当计数器为0时,锁才真正释放。 - 可重入:同一个线程可以多次获取锁,这在需要递归调用或在同一线程内多次调用锁定代码块时非常有用。
示例代码:
import threading
rlock = threading.RLock()
def thread_function():
rlock.acquire()
try:
# 进行线程安全的操作
print("RLock acquired first time")
rlock.acquire()
try:
# 进行更多的线程安全操作
print("RLock acquired second time")
finally:
rlock.release()
print("RLock released second time")
finally:
rlock.release()
print("RLock released first time")
thread = threading.Thread(target=thread_function)
thread.start()
thread.join()
总结
-
Lock 是一个简单的互斥锁,不允许同一个线程多次获取。 -
RLock 是一个可重入锁,允许同一个线程多次获取,非常适合需要递归调用或多次进入锁定代码块的场景。
选择使用哪种锁取决于你的具体需求。如果你不需要递归调用或多次进入锁定代码块,Lock 就足够了;否则,RLock 会是更好的选择。
五、线程池
什么是线程池
线程池(Thread Pool)是一种线程管理模式,它维护着多个线程等待执行任务,而不是为每个任务创建和销毁线程。线程池可以提高程序的性能,特别是在需要处理大量短时间任务的情况下,因为创建和销毁线程是有开销的。
Python中的线程池
在Python中,线程池的实现主要依赖于concurrent.futures模块,该模块提供了一个高级接口来管理异步执行。concurrent.futures.ThreadPoolExecutor类是用于实现线程池的主要类。
主要功能和使用方法
以下是一些主要功能和使用方法:
-
创建线程池:
from concurrent.futures import ThreadPoolExecutor # 创建一个包含5个线程的线程池 with ThreadPoolExecutor(max_workers=5) as executor: # 线程池的使用代码 -
提交任务: 可以使用
submit方法提交单个任务,或者使用map方法提交多个任务。def task(n): return n * n # 提交单个任务 future = executor.submit(task, 5) # 提交多个任务 results = executor.map(task, [1, 2, 3, 4, 5]) -
获取结果: 可以通过
Future对象获取任务的结果。result = future.result() print(result) # 输出25 -
等待任务完成: 可以使用
as_completed方法等待任务完成。from concurrent.futures import as_completed futures = [executor.submit(task, i) for i in range(5)] for future in as_completed(futures): print(future.result()) -
异常处理: 线程池中的任务可能会抛出异常,使用
Future对象的exception方法可以捕获这些异常。future = executor.submit(task, -1) # 假设task函数会对负数抛出异常 try: result = future.result() except Exception as e: print(f"任务抛出异常: {e}")
线程池的优点
- 减少线程创建销毁的开销:线程池可以重用线程,避免频繁的创建和销毁线程。
- 控制并发数量:通过设置最大线程数,可以控制并发任务的数量,避免过多线程导致资源竞争。
- 简化代码:使用线程池可以简化多线程编程的代码,使其更加易读和易维护。
线程池的缺点
- 全局解释器锁(GIL) :在CPython解释器中,GIL限制了多线程的真正并行执行,因此对于CPU密集型任务,线程池的效果可能不如进程池。
- 资源消耗:尽管线程池减少了线程创建的开销,但线程本身仍然消耗系统资源。
示例代码
下面是一个完整的示例,展示了如何使用ThreadPoolExecutor进行并发任务处理:
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
return n * n
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
for future in as_completed(futures):
print(future.result())
六、多进程
在 Python 中,多进程处理(multiprocessing)用于并行执行任务。多进程可以在多核 CPU 上并行运行,从而提高计算密集型任务的执行效率。Python 的 multiprocessing 模块提供了创建和管理进程的接口,类似于 threading 模块。
多进程的优点
- 并行执行:利用多核 CPU 提高性能。
- 避免 GIL 限制:Python 的全局解释器锁(GIL)限制了多线程的并行执行,但多进程不受此限制。
多进程的基本概念
- Process:一个独立的执行单元,拥有自己的内存空间。
- Queue:用于进程间通信的队列。
- Pipe:另一种进程间通信的方式。
- Pool:用于管理进程池,可以并行地执行多个任务。
示例代码
以下是一个简单的示例,展示了如何使用 multiprocessing 模块来并行执行任务。
import multiprocessing
import os
import time
def worker(num):
"""进程执行的任务函数"""
print(f'Worker {num} (PID: {os.getpid()}) is starting...')
time.sleep(2)
print(f'Worker {num} (PID: {os.getpid()}) is done.')
if __name__ == '__main__':
print(f'Main process PID: {os.getpid()}')
# 创建多个进程
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
print('All workers are done.')
解释
-
导入模块:
import multiprocessing import os import time -
定义任务函数:
def worker(num): """进程执行的任务函数""" print(f'Worker {num} (PID: {os.getpid()}) is starting...') time.sleep(2) print(f'Worker {num} (PID: {os.getpid()}) is done.') -
主程序:
if __name__ == '__main__': print(f'Main process PID: {os.getpid()}') # 创建多个进程 processes = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start() # 等待所有进程完成 for p in processes: p.join() print('All workers are done.')
进程池示例
使用 Pool 可以更方便地管理多个进程。
import multiprocessing
import os
import time
def worker(num):
print(f'Worker {num} (PID: {os.getpid()}) is starting...')
time.sleep(2)
print(f'Worker {num} (PID: {os.getpid()}) is done.')
return f'Result of worker {num}'
if __name__ == '__main__':
print(f'Main process PID: {os.getpid()}')
# 创建进程池
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(worker, range(5))
print('All workers are done.')
print('Results:', results)
解释
-
导入模块:
import multiprocessing import os import time -
定义任务函数:
def worker(num): print(f'Worker {num} (PID: {os.getpid()}) is starting...') time.sleep(2) print(f'Worker {num} (PID: {os.getpid()}) is done.') return f'Result of worker {num}' -
主程序:
if __name__ == '__main__': print(f'Main process PID: {os.getpid()}') # 创建进程池 with multiprocessing.Pool(processes=3) as pool: results = pool.map(worker, range(5)) print('All workers are done.') print('Results:', results)
在这个示例中,Pool 对象管理了一个包含 3 个进程的池,map 方法将任务分配给进程池中的进程并收集结果。
通过这些示例,你可以了解如何在 Python 中使用多进程来提高程序的并行处理能力。
在 Python 的 multiprocessing 模块中,进程间通信(IPC,Inter-Process Communication)是一个重要的功能。主要有三种方式:Queue、Pipe 和 Manager。
进程间通信
1. Queue
Queue 是一个线程和进程安全的 FIFO 队列,可以用于进程间通信。它使用简单,类似于线程模块中的 queue.Queue。
import multiprocessing
import time
def producer(queue):
for i in range(5):
item = f'item {i}'
print(f'Producing {item}')
queue.put(item)
time.sleep(1)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f'Consuming {item}')
time.sleep(2)
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None) # 终止消费者进程
p2.join()
2. Pipe
Pipe 提供了一个双向通信的通道,返回两个连接对象,分别表示管道的两端。数据可以从一端发送,从另一端接收。
import multiprocessing
def sender(conn):
for i in range(5):
msg = f'Message {i}'
print(f'Sending: {msg}')
conn.send(msg)
conn.close()
def receiver(conn):
while True:
msg = conn.recv()
if msg is None:
break
print(f'Received: {msg}')
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
parent_conn.send(None) # 终止接收者进程
p2.join()
3. Manager
Manager 提供了一个共享的对象,可以在多个进程之间共享数据。它支持的对象包括 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value 和 Array。
import multiprocessing
def worker(shared_list, lock):
with lock:
shared_list.append(multiprocessing.current_process().name)
if __name__ == '__main__':
manager = multiprocessing.Manager()
shared_list = manager.list()
lock = manager.Lock()
processes = [multiprocessing.Process(target=worker, args=(shared_list, lock)) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(shared_list)
总结
- Queue:适合需要先进先出(FIFO)顺序处理的任务。
- Pipe:适合需要双向通信的任务,通常用于简单的通信。
- Manager:适合需要共享复杂数据结构(如列表、字典等)的任务。
通过这些工具,可以有效地在多进程环境中进行数据共享和通信,从而实现复杂的并行计算任务。
评论区