侧边栏壁纸
博主头像
顾小诺 博主等级

行动起来,活在当下

  • 累计撰写 30 篇文章
  • 累计创建 14 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

十二、多线程、多进程

顾小诺
2024-07-01 / 0 评论 / 0 点赞 / 14 阅读 / 0 字

十二、多线程、多进程

一、GIL

Python中的GIL(Global Interpreter Lock,全局解释器锁)是一个用于保护解释器内部共享资源的互斥锁。它确保在任何时候只有一个线程可以执行Python字节码,从而防止多线程环境下的竞争条件和数据不一致问题。

GIL的工作原理

  1. 单线程执行:GIL保证同时只有一个线程在执行Python代码。即使在多核处理器上运行,GIL也会限制Python程序的并行执行。
  2. 线程切换:Python解释器会定期释放GIL,以允许其他线程运行。这通常是在I/O操作(如文件读写、网络通信)或其他阻塞操作期间发生的。
  3. C扩展模块:一些C扩展模块可以释放GIL,使得其他线程在执行这些模块的同时可以继续运行。

GIL的影响

  1. 多线程性能:由于GIL的存在,CPU密集型的Python程序在多线程环境下无法充分利用多核处理器的并行计算能力。这意味着多线程并不能显著提高这些程序的执行速度。
  2. I/O密集型任务:对于I/O密集型任务(如网络请求、文件操作等),多线程仍然有效,因为这些任务在等待I/O操作完成时,GIL会被释放,其他线程可以继续执行。

解决方案和替代方案

  1. 多进程:使用多进程而非多线程可以绕过GIL限制。每个进程有自己的Python解释器实例和GIL。Python的multiprocessing​模块提供了多进程支持,适用于CPU密集型任务。
  2. 异步编程:使用异步编程模型(如asyncio​)可以有效管理I/O密集型任务,而不依赖于多线程。
  3. C扩展:编写C扩展模块,并在需要时释放GIL,可以提高特定任务的性能。
  4. 其他解释器:一些Python解释器如Jython(基于Java)和IronPython(基于.NET)没有GIL,可以在多线程环境下更好地利用多核处理器。

二、threading

threading​ 模块是 Python 标准库中的一个模块,用于创建和管理线程。线程是一种轻量级的进程,可以并发执行多个任务。以下是 threading​ 模块的一些关键概念和功能:

1. 创建线程

创建线程的基本方法是使用 threading.Thread​ 类。你可以通过传递一个函数给 target​ 参数来创建一个线程,该线程将执行该函数。

import threading

def my_function():
    print("Hello from a thread!")

# 创建线程
thread = threading.Thread(target=my_function)

# 启动线程
thread.start()

2. 等待线程结束

可以使用 join​ 方法等待线程完成执行。

thread.join()

3. 守护线程

守护线程在主线程结束时会自动退出。可以通过设置 daemon​ 属性来将线程设置为守护线程。

thread = threading.Thread(target=my_function)
thread.daemon = True
thread.start()

4. 线程同步

在多线程环境中,可能需要同步对共享资源的访问。threading​ 模块提供了多种同步原语,如锁(Lock)、条件变量(Condition)和事件(Event)。

锁(Lock)

锁用于确保某一时刻只有一个线程可以访问共享资源。

lock = threading.Lock()

def thread_safe_function():
    with lock:
        # 访问共享资源
        pass

条件变量(Condition)

条件变量允许线程挂起,直到某个条件被满足。

condition = threading.Condition()

def wait_for_condition():
    with condition:
        condition.wait()  # 等待条件满足
        # 执行任务

def notify_condition():
    with condition:
        condition.notify()  # 通知等待的线程

事件(Event)

事件允许线程等待某个事件的发生。

event = threading.Event()

def wait_for_event():
    event.wait()  # 等待事件被设置
    # 执行任务

def set_event():
    event.set()  # 设置事件

5. 线程局部数据

threading.local​ 类用于创建线程局部数据,即每个线程都有独立的数据副本。

thread_data = threading.local()

def thread_function():
    thread_data.value = 123  # 每个线程都有独立的 `value` 属性

6. 示例

以下是一个综合示例,展示了如何创建线程、使用锁和等待线程结束:

import threading
import time

lock = threading.Lock()

def worker(num):
    with lock:
        print(f"Thread {num} is starting")
        time.sleep(2)
        print(f"Thread {num} is finished")

threads = []

for i in range(5):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("All threads are finished")

在这个示例中,我们创建了5个线程,每个线程都会执行 worker​ 函数。我们使用锁来确保线程安全,并使用 join​ 方法等待所有线程完成。

三、线程间通信-共享变量和Queue

在多线程编程中,线程间通信是一个重要的课题。Python 提供了几种方式来实现线程间通信,其中共享变量和 Queue​ 是两种常见的方式。

1. 共享变量

共享变量是指多个线程可以访问和修改同一个变量。这种方法简单直接,但需要注意同步问题,以避免竞争条件(race condition)。通常使用锁(Lock)来保护共享变量。

示例:

import threading

# 共享变量
counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = []
for _ in range(5):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final counter value: {counter}")

在这个示例中,多个线程共享变量 counter​,并使用锁来确保每次只有一个线程能够修改 counter​,从而避免竞争条件。

2. Queue

Queue​ 是一个线程安全的队列,适用于生产者-消费者模型。Python 的 queue​ 模块提供了三种类型的队列:Queue​(FIFO队列)、LifoQueue​(LIFO队列)和 PriorityQueue​(优先级队列)。

示例:

import threading
import queue
import time

# 创建队列
q = queue.Queue()

def producer():
    for i in range(5):
        item = f"item {i}"
        q.put(item)
        print(f"Produced {item}")
        time.sleep(1)

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed {item}")
        q.task_done()

# 创建生产者线程
producer_thread = threading.Thread(target=producer)
producer_thread.start()

# 创建消费者线程
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()

# 等待生产者线程完成
producer_thread.join()

# 向队列发送停止信号
q.put(None)
#q.task_done()

# 等待消费者线程完成
consumer_thread.join()

print("All items have been consumed")

在这个示例中,生产者线程将项目放入队列,消费者线程从队列中取出项目并处理它们。队列确保了线程安全,因此不需要显式的锁来保护队列操作。

总结

  • 共享变量:简单直接,但需要使用锁来确保线程安全,适用于简单的共享数据场景。
  • Queue:适用于生产者-消费者模型,提供线程安全的队列操作,简化了线程间的数据传递和同步问题。

选择哪种方法取决于具体的应用场景和需求。对于简单的共享数据,使用共享变量和锁可能更合适。而对于复杂的生产者-消费者模型,Queue​ 提供了更方便和安全的解决方案。

四、线程同步

在Python中,Lock​ 和 RLock​ 都是用于线程同步的锁机制,它们都在 threading​ 模块中定义。虽然它们的用途相似,但它们有一些关键的区别:

Lock(互斥锁)

  • 基本功能Lock​ 是一个简单的互斥锁。它有两个状态:锁定和未锁定。
  • 使用方法:一个线程可以调用 acquire()​ 方法来获取锁,如果锁已经被其他线程持有,则该线程会被阻塞,直到锁被释放。锁释放是通过 release()​ 方法完成的。
  • 不可重入:如果一个线程已经持有了锁,再次尝试获取锁会导致死锁,因为同一个线程不能再次获取已经被它持有的锁。

示例代码:

import threading

lock = threading.Lock()

def thread_function():
    lock.acquire()
    try:
        # 进行线程安全的操作
        print("Lock acquired")
    finally:
        lock.release()
        print("Lock released")

thread = threading.Thread(target=thread_function)
thread.start()
thread.join()

RLock(可重入锁)

  • 基本功能RLock​ 是一个可重入锁,允许同一个线程多次获取它而不会导致死锁。
  • 使用方法:一个线程可以多次调用 acquire()​ 方法来获取锁,每次获取锁计数器会增加1。相应地,每次调用 release()​ 方法计数器会减少1,当计数器为0时,锁才真正释放。
  • 可重入:同一个线程可以多次获取锁,这在需要递归调用或在同一线程内多次调用锁定代码块时非常有用。

示例代码:

import threading

rlock = threading.RLock()

def thread_function():
    rlock.acquire()
    try:
        # 进行线程安全的操作
        print("RLock acquired first time")
        rlock.acquire()
        try:
            # 进行更多的线程安全操作
            print("RLock acquired second time")
        finally:
            rlock.release()
            print("RLock released second time")
    finally:
        rlock.release()
        print("RLock released first time")

thread = threading.Thread(target=thread_function)
thread.start()
thread.join()

总结

  • Lock​ 是一个简单的互斥锁,不允许同一个线程多次获取。
  • RLock​ 是一个可重入锁,允许同一个线程多次获取,非常适合需要递归调用或多次进入锁定代码块的场景。

选择使用哪种锁取决于你的具体需求。如果你不需要递归调用或多次进入锁定代码块,Lock​ 就足够了;否则,RLock​ 会是更好的选择。

五、线程池

什么是线程池

线程池(Thread Pool)是一种线程管理模式,它维护着多个线程等待执行任务,而不是为每个任务创建和销毁线程。线程池可以提高程序的性能,特别是在需要处理大量短时间任务的情况下,因为创建和销毁线程是有开销的。

Python中的线程池

在Python中,线程池的实现主要依赖于concurrent.futures​模块,该模块提供了一个高级接口来管理异步执行。concurrent.futures.ThreadPoolExecutor​类是用于实现线程池的主要类。

主要功能和使用方法

以下是一些主要功能和使用方法:

  1. 创建线程池

    from concurrent.futures import ThreadPoolExecutor
    
    # 创建一个包含5个线程的线程池
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 线程池的使用代码
    
  2. 提交任务: 可以使用submit​方法提交单个任务,或者使用map​方法提交多个任务。

    def task(n):
        return n * n
    
    # 提交单个任务
    future = executor.submit(task, 5)
    
    # 提交多个任务
    results = executor.map(task, [1, 2, 3, 4, 5])
    
  3. 获取结果: 可以通过Future​对象获取任务的结果。

    result = future.result()
    print(result)  # 输出25
    
  4. 等待任务完成: 可以使用as_completed​方法等待任务完成。

    from concurrent.futures import as_completed
    
    futures = [executor.submit(task, i) for i in range(5)]
    
    for future in as_completed(futures):
        print(future.result())
    
  5. 异常处理: 线程池中的任务可能会抛出异常,使用Future​对象的exception​方法可以捕获这些异常。

    future = executor.submit(task, -1)  # 假设task函数会对负数抛出异常
    
    try:
        result = future.result()
    except Exception as e:
        print(f"任务抛出异常: {e}")
    

线程池的优点

  • 减少线程创建销毁的开销:线程池可以重用线程,避免频繁的创建和销毁线程。
  • 控制并发数量:通过设置最大线程数,可以控制并发任务的数量,避免过多线程导致资源竞争。
  • 简化代码:使用线程池可以简化多线程编程的代码,使其更加易读和易维护。

线程池的缺点

  • 全局解释器锁(GIL) :在CPython解释器中,GIL限制了多线程的真正并行执行,因此对于CPU密集型任务,线程池的效果可能不如进程池。
  • 资源消耗:尽管线程池减少了线程创建的开销,但线程本身仍然消耗系统资源。

示例代码

下面是一个完整的示例,展示了如何使用ThreadPoolExecutor​进行并发任务处理:

from concurrent.futures import ThreadPoolExecutor, as_completed

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
  
    for future in as_completed(futures):
        print(future.result())

六、多进程

在 Python 中,多进程处理(multiprocessing)用于并行执行任务。多进程可以在多核 CPU 上并行运行,从而提高计算密集型任务的执行效率。Python 的 multiprocessing​ 模块提供了创建和管理进程的接口,类似于 threading​ 模块。

多进程的优点

  1. 并行执行:利用多核 CPU 提高性能。
  2. 避免 GIL 限制:Python 的全局解释器锁(GIL)限制了多线程的并行执行,但多进程不受此限制。

多进程的基本概念

  • Process:一个独立的执行单元,拥有自己的内存空间。
  • Queue:用于进程间通信的队列。
  • Pipe:另一种进程间通信的方式。
  • Pool:用于管理进程池,可以并行地执行多个任务。

示例代码

以下是一个简单的示例,展示了如何使用 multiprocessing​ 模块来并行执行任务。

import multiprocessing
import os
import time

def worker(num):
    """进程执行的任务函数"""
    print(f'Worker {num} (PID: {os.getpid()}) is starting...')
    time.sleep(2)
    print(f'Worker {num} (PID: {os.getpid()}) is done.')

if __name__ == '__main__':
    print(f'Main process PID: {os.getpid()}')

    # 创建多个进程
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    print('All workers are done.')

解释

  1. 导入模块

    import multiprocessing
    import os
    import time
    
  2. 定义任务函数

    def worker(num):
        """进程执行的任务函数"""
        print(f'Worker {num} (PID: {os.getpid()}) is starting...')
        time.sleep(2)
        print(f'Worker {num} (PID: {os.getpid()}) is done.')
    
  3. 主程序

    if __name__ == '__main__':
        print(f'Main process PID: {os.getpid()}')
    
        # 创建多个进程
        processes = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(i,))
            processes.append(p)
            p.start()
    
        # 等待所有进程完成
        for p in processes:
            p.join()
    
        print('All workers are done.')
    

进程池示例

使用 Pool​ 可以更方便地管理多个进程。

import multiprocessing
import os
import time

def worker(num):
    print(f'Worker {num} (PID: {os.getpid()}) is starting...')
    time.sleep(2)
    print(f'Worker {num} (PID: {os.getpid()}) is done.')
    return f'Result of worker {num}'

if __name__ == '__main__':
    print(f'Main process PID: {os.getpid()}')

    # 创建进程池
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(worker, range(5))

    print('All workers are done.')
    print('Results:', results)

解释

  1. 导入模块

    import multiprocessing
    import os
    import time
    
  2. 定义任务函数

    def worker(num):
        print(f'Worker {num} (PID: {os.getpid()}) is starting...')
        time.sleep(2)
        print(f'Worker {num} (PID: {os.getpid()}) is done.')
        return f'Result of worker {num}'
    
  3. 主程序

    if __name__ == '__main__':
        print(f'Main process PID: {os.getpid()}')
    
        # 创建进程池
        with multiprocessing.Pool(processes=3) as pool:
            results = pool.map(worker, range(5))
    
        print('All workers are done.')
        print('Results:', results)
    

在这个示例中,Pool​ 对象管理了一个包含 3 个进程的池,map​ 方法将任务分配给进程池中的进程并收集结果。

通过这些示例,你可以了解如何在 Python 中使用多进程来提高程序的并行处理能力。

在 Python 的 multiprocessing​ 模块中,进程间通信(IPC,Inter-Process Communication)是一个重要的功能。主要有三种方式:Queue​、Pipe​ 和 Manager​。

进程间通信

1. Queue

Queue​ 是一个线程和进程安全的 FIFO 队列,可以用于进程间通信。它使用简单,类似于线程模块中的 queue.Queue​。

import multiprocessing
import time

def producer(queue):
    for i in range(5):
        item = f'item {i}'
        print(f'Producing {item}')
        queue.put(item)
        time.sleep(1)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'Consuming {item}')
        time.sleep(2)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    queue.put(None)  # 终止消费者进程
    p2.join()

2. Pipe

Pipe​ 提供了一个双向通信的通道,返回两个连接对象,分别表示管道的两端。数据可以从一端发送,从另一端接收。

import multiprocessing

def sender(conn):
    for i in range(5):
        msg = f'Message {i}'
        print(f'Sending: {msg}')
        conn.send(msg)
    conn.close()

def receiver(conn):
    while True:
        msg = conn.recv()
        if msg is None:
            break
        print(f'Received: {msg}')

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    parent_conn.send(None)  # 终止接收者进程
    p2.join()

3. Manager

Manager​ 提供了一个共享的对象,可以在多个进程之间共享数据。它支持的对象包括 list​、dict​、Namespace​、Lock​、RLock​、Semaphore​、BoundedSemaphore​、Condition​、Event​、Barrier​、Queue​、Value​ 和 Array​。

import multiprocessing

def worker(shared_list, lock):
    with lock:
        shared_list.append(multiprocessing.current_process().name)

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    lock = manager.Lock()
    processes = [multiprocessing.Process(target=worker, args=(shared_list, lock)) for _ in range(5)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print(shared_list)

总结

  • Queue:适合需要先进先出(FIFO)顺序处理的任务。
  • Pipe:适合需要双向通信的任务,通常用于简单的通信。
  • Manager:适合需要共享复杂数据结构(如列表、字典等)的任务。

通过这些工具,可以有效地在多进程环境中进行数据共享和通信,从而实现复杂的并行计算任务。

0

评论区