侧边栏壁纸
博主头像
顾小诺 博主等级

行动起来,活在当下

  • 累计撰写 30 篇文章
  • 累计创建 14 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

十三、协程和异步io

顾小诺
2024-07-03 / 0 评论 / 0 点赞 / 49 阅读 / 0 字

十三、协程和异步io

一、并发、并行、同步、异步、阻塞和非阻塞

在计算机科学中,特别是在编程和操作系统的上下文中,以下几个概念非常重要:并发、并行、同步、异步、阻塞和非阻塞。理解这些概念有助于更好地设计和优化程序。以下是这些概念的详细介绍:

并发(Concurrency)

并发是指在同一时间段内处理多个任务。并发并不意味着同时执行,而是任务在时间片上交替进行。例如,在单核 CPU 上,通过快速切换任务来实现并发。Python 中的多线程和异步编程都属于并发编程。

并行(Parallelism)

并行是指在同一时刻同时执行多个任务。并行需要多核 CPU 或多处理器系统支持。多进程编程通常用于实现并行,因为每个进程可以在不同的 CPU 核心上运行。

同步(Synchronous)

同步是指任务按顺序执行,一个任务必须等待前一个任务完成后才能开始。同步编程通常会导致阻塞,因为一个任务在等待资源或数据时会阻塞其他任务的执行。

异步(Asynchronous)

异步是指任务可以独立于其他任务执行,不必等待其他任务完成。异步编程允许任务在等待资源或数据时继续执行其他任务,从而提高效率和响应速度。Python 中的 asyncio​ 模块支持异步编程。

阻塞(Blocking)

阻塞是指一个任务在等待某个条件满足(如 I/O 操作完成)时,会停止执行,直到条件满足。阻塞会导致资源浪费,因为 CPU 在等待期间无法执行其他任务。

非阻塞(Non-blocking)

非阻塞是指一个任务在等待某个条件满足时,不会停止执行,而是立即返回并执行其他任务。非阻塞通常与异步编程结合使用,以提高程序的响应性和效率。

示例

以下是一些示例代码,帮助理解这些概念。

并发和并行

import threading
import multiprocessing

def task():
    print("Task executed")

# 并发(多线程)
threads = [threading.Thread(target=task) for _ in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

# 并行(多进程)
processes = [multiprocessing.Process(target=task) for _ in range(5)]
for process in processes:
    process.start()
for process in processes:
    process.join()

同步和异步

import time
import asyncio

# 同步
def sync_task():
    time.sleep(1)
    print("Synchronous task completed")

for _ in range(5):
    sync_task()

# 异步
async def async_task():
    await asyncio.sleep(1)
    print("Asynchronous task completed")

async def main():
    tasks = [async_task() for _ in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

阻塞和非阻塞

import socket

# 阻塞 I/O
def blocking_io():
    sock = socket.socket()
    sock.connect(('example.com', 80))
    data = sock.recv(1024)
    print("Blocking I/O completed")

# 非阻塞 I/O
def non_blocking_io():
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('example.com', 80))
    except BlockingIOError:
        pass  # 连接正在进行中
    while True:
        try:
            data = sock.recv(1024)
            break
        except BlockingIOError:
            continue  # 继续等待数据
    print("Non-blocking I/O completed")

blocking_io()
non_blocking_io()

这些示例展示了如何在 Python 中实现并发、并行、同步、异步、阻塞和非阻塞操作。理解这些概念有助于编写高效的程序,特别是在处理 I/O 密集型任务时。

二、IO多路复用

IO多路复用(I/O Multiplexing)是指在一个线程内同时监视多个文件描述符(file descriptors),一旦某个文件描述符就绪(有数据可读、可写或发生错误),便通知程序进行相应的读写操作。它主要用于网络编程中处理大量并发连接。常见的IO多路复用机制包括 select​、poll​ 和 epoll​。下面是对这三种机制的详细解释:

1. select

特点

  • select​ 是最早的 IO 多路复用机制,几乎在所有的操作系统上都可以使用。
  • 它通过一个固定大小的数组来存储文件描述符。

使用方法

  • select​ 需要传入三个文件描述符集合(读、写、异常)以及一个超时时间。
  • 当调用 select​ 时,内核会检查这些文件描述符的状态,如果有文件描述符就绪,则返回。

缺点

  • 文件描述符数量有限制(通常是 1024)。
  • 每次调用 select​ 都需要将文件描述符集合从用户态拷贝到内核态,开销较大。
  • 当文件描述符数量很多时,效率会下降,因为每次都需要遍历整个集合。

2. poll

特点

  • poll​ 是 select​ 的改进版本,去掉了文件描述符数量的限制。
  • 它使用一个链表来存储文件描述符,因此可以处理更多的文件描述符。

使用方法

  • poll​ 需要传入一个文件描述符数组及其事件类型。
  • 内核会检查这些文件描述符的状态,并返回一个就绪的文件描述符列表。

缺点

  • select​ 类似,每次调用 poll​ 都需要将文件描述符集合从用户态拷贝到内核态。
  • 当文件描述符数量很多时,效率也会下降,因为每次都需要遍历整个数组。

3. epoll

特点

  • epoll​ 是 Linux 特有的 IO 多路复用机制,是对 select​ 和 poll​ 的进一步优化。
  • 它使用一个事件驱动的机制,效率更高,适用于大量并发连接的场景。

使用方法

  • epoll​ 使用三个系统调用:epoll_create​、epoll_ctl​ 和 epoll_wait​。

    • epoll_create​ 创建一个 epoll 实例。
    • epoll_ctl​ 添加、修改或删除文件描述符。
    • epoll_wait​ 等待事件发生。
  • epoll​ 使用一个红黑树来管理文件描述符,只有文件描述符状态变化时才会通知程序。

优点

  • 没有文件描述符数量的限制。
  • 不需要每次都遍历整个文件描述符集合,只处理就绪的文件描述符,效率更高。
  • 内核和用户态之间的拷贝开销较小。

缺点

  • 仅在 Linux 系统上可用。

总结

  • select​ 和 poll​ 适用于少量文件描述符的场景,但当文件描述符数量较多时,性能会显著下降。
  • epoll​ 更适用于大量并发连接的场景,具有更高的效率和更好的扩展性。

三、协程

在 Python 中,协程(Coroutine)是一种比线程更加轻量级的并发编程方式。协程允许你在一个函数执行过程中暂停,并在稍后继续执行,而不需要像线程那样进行上下文切换。协程通过 async​ 和 await​ 关键字来实现。

协程的特点

  1. 轻量级:协程比线程更轻量,因为它们在单一线程中运行,不需要操作系统级别的线程管理。
  2. 非阻塞:协程可以在等待 I/O 操作时暂停执行,并在 I/O 操作完成后恢复执行,从而提高程序的并发性能。
  3. 可暂停和恢复:协程可以在特定位置暂停执行,并在稍后从暂停的位置继续执行。

基本用法

以下是一个简单的 Python 协程示例:

import asyncio

# 定义一个协程函数
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步操作
    print("World")

# 主函数
async def main():
    await say_hello()

# 运行主函数
asyncio.run(main())

解释

  1. async def say_hello()​:定义一个协程函数,使用 async​ 关键字。
  2. await asyncio.sleep(1)​:暂停协程的运行,模拟一个耗时的异步操作(比如 I/O 操作)。await​ 关键字用于等待一个异步操作完成。
  3. asyncio.run(main())​:运行主协程函数 main​,这是启动协程的标准方式。

更复杂的例子:并发执行多个协程

下面是一个更复杂的例子,展示了如何并发执行多个协程:

import asyncio

async def fetch_data(id):
    print(f"Fetching data for {id}")
    await asyncio.sleep(2)  # 模拟一个耗时操作
    print(f"Data fetched for {id}")

async def main():
    tasks = []
    for i in range(5):
        tasks.append(fetch_data(i))
  
    # 并发运行所有任务
    await asyncio.gather(*tasks)

# 运行主函数
asyncio.run(main())

解释

  1. fetch_data​:一个模拟数据获取的协程函数。
  2. asyncio.gather(*tasks)​:并发运行多个协程。tasks​ 是一个包含所有协程任务的列表,asyncio.gather​ 会并发地运行这些任务。
  3. asyncio.run(main())​:启动主协程 main​。

协程的优势

  • 高效的 I/O 操作:协程在等待 I/O 操作时不会阻塞线程,从而可以有效地处理大量并发 I/O 操作。
  • 更好的资源利用:协程比线程更轻量,不需要操作系统级别的线程管理,因此可以创建更多的并发任务而不会消耗大量系统资源。

四、生成器进阶(send、close、throws)

生成器(Generator)是Python中一种特殊的迭代器,通过使用 yield​ 关键字来生成一个序列的值。生成器不仅可以用于迭代,还提供了一些高级方法,如 send()​、close()​ 和 throw()​,使其更加灵活和强大。下面是对这些方法的详细介绍:

1. yield

yield​ 是生成器的核心,用于在生成器函数中返回值,并在函数的执行点暂停。每次调用生成器的 __next__()​ 方法时,生成器会从上次暂停的地方继续执行,直到遇到下一个 yield​。

def simple_generator():
    yield 1
    yield 2
    yield 3

gen = simple_generator()
print(next(gen))  # 输出 1
print(next(gen))  # 输出 2
print(next(gen))  # 输出 3

2. send(value)

send()​ 方法用于向生成器发送一个值,并在生成器暂停的地方恢复执行。发送的值会成为生成器函数中 yield​ 表达式的返回值。第一次调用生成器时,必须使用 __next__()​ 或 send(None)​ 来启动生成器。

def generator_with_send():
    print("Start")
    value = yield
    print(f"Received: {value}")
    yield value

gen = generator_with_send()
next(gen)            # 启动生成器(或使用 gen.send(None))
gen.send("Hello")    # 输出 "Received: Hello"

3. close()

close()​ 方法用于终止生成器的执行,并引发 GeneratorExit​ 异常。如果生成器在捕获到 GeneratorExit​ 异常时没有其他处理,会正常退出。

def generator_with_close():
    try:
        yield 1
        yield 2
    except GeneratorExit:
        print("Generator closed")

gen = generator_with_close()
print(next(gen))  # 输出 1
gen.close()       # 输出 "Generator closed"

4. throw(type, value=None, traceback=None)

throw()​ 方法用于在生成器中引发一个指定的异常,并在生成器暂停的地方恢复执行。生成器可以选择捕获该异常并处理,或让异常传播。

def generator_with_throw():
    try:
        yield 1
    except ValueError:
        print("ValueError caught")
    yield 2

gen = generator_with_throw()
print(next(gen))  # 输出 1
gen.throw(ValueError)  # 输出 "ValueError caught"
print(next(gen))  # 输出 2

5. 综合示例

下面是一个综合示例,展示了 yield​、send()​、close()​ 和 throw()​ 的使用:

def advanced_generator():
    print("Generator started")
    try:
        while True:
            try:
                value = yield
                print(f"Received: {value}")
            except ValueError:
                print("ValueError caught inside generator")
    except GeneratorExit:
        print("Generator closed")

gen = advanced_generator()
next(gen)                # 启动生成器
gen.send("Hello")        # 输出 "Received: Hello"
gen.throw(ValueError)    # 输出 "ValueError caught inside generator"
gen.close()              # 输出 "Generator closed"

通过这些方法,生成器在Python中不仅可以用于简单的迭代,还可以实现复杂的协程和异步编程模式。

五、async和await

在Python中,async​ 和 await​ 是用于定义和处理异步操作的关键字。它们是Python异步编程的核心,可以帮助你编写高效的I/O操作和并发程序。下面是对这两个关键字的详细介绍:

1. async​ 关键字

async​ 关键字用于定义一个异步函数。异步函数在被调用时返回一个协程对象,而不是立即执行函数体。协程对象可以使用 await​ 关键字来挂起其执行,直到等待的操作完成。

import asyncio

async def async_function():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步I/O操作
    print("World")

# 调用异步函数
coroutine = async_function()

2. await​ 关键字

await​ 关键字用于暂停异步函数的执行,等待一个异步操作完成。它只能在 async​ 定义的函数中使用。await​ 后面通常跟随一个协程对象、任务或Future对象。

async def another_async_function():
    print("Start")
    await asyncio.sleep(2)  # 等待2秒
    print("End")

# 运行异步函数
asyncio.run(another_async_function())

3. 异步编程示例

下面是一个简单的示例,展示了如何使用 async​ 和 await​ 来执行异步操作:

import asyncio

async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # 模拟网络请求
    return "Data received"

async def main():
    print("Start main")
    data = await fetch_data()  # 等待 fetch_data 完成
    print(data)
    print("End main")

# 运行主异步函数
asyncio.run(main())

4. 并发执行

你可以使用 asyncio.gather()​ 来并发执行多个异步操作:

async def task1():
    await asyncio.sleep(1)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(2)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())  # 并发执行 task1 和 task2

asyncio.run(main())

5. 异步上下文管理器和迭代器

你还可以使用 async​ 和 await​ 来定义异步上下文管理器和异步迭代器:

异步上下文管理器

class AsyncContextManager:
    async def __aenter__(self):
        print("Enter context")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Exit context")

async def main():
    async with AsyncContextManager():
        print("Inside context")

asyncio.run(main())

异步迭代器

class AsyncIterator:
    def __init__(self):
        self.counter = 0

    async def __anext__(self):
        if self.counter < 3:
            self.counter += 1
            await asyncio.sleep(1)  # 模拟异步操作
            return self.counter
        else:
            raise StopAsyncIteration

    def __aiter__(self):
        return self

async def main():
    async for value in AsyncIterator():
        print(value)

asyncio.run(main())

总结

  • async​ 用于定义异步函数,返回一个协程对象。
  • await​ 用于暂停异步函数的执行,等待一个异步操作完成。
  • 使用 asyncio.run()​ 来运行异步函数。
  • 使用 asyncio.gather()​ 并发执行多个异步操作。
  • 可以定义异步上下文管理器和异步迭代器来处理更复杂的异步操作。

通过掌握 async​ 和 await​,你可以编写高效的异步代码,以便更好地处理I/O密集型任务和并发操作。

六、生成器实现协程

在Python中,生成器可以用来实现协程。协程是一种更高级的控制流结构,它允许你在函数执行过程中暂停和恢复。生成器通过 yield​ 关键字实现了这种行为,但要实现协程的更多功能,还需要使用生成器的 send()​、throw()​ 和 close()​ 方法。

基本概念

  • 生成器函数:使用 yield​ 关键字的函数,调用时返回一个生成器对象。
  • 生成器对象:支持迭代的对象,可以暂停和恢复其执行。

实现协程的步骤

  1. 定义生成器函数
    使用 yield​ 关键字定义生成器函数,使其能够暂停和恢复执行。
  2. 使用 send()方法
    通过 send()​ 方法向生成器发送值,并在生成器内部处理这些值。
  3. 使用 throw()方法
    通过 throw()​ 方法向生成器抛出异常,在生成器内部捕获并处理这些异常。
  4. 使用 close()方法
    通过 close()​ 方法关闭生成器,生成器会捕获 GeneratorExit​ 异常并执行清理操作。

示例代码

下面是一个示例,展示了如何使用生成器实现协程:

def coroutine():
    print("Coroutine started")
    try:
        while True:
            try:
                value = yield
                print(f"Received: {value}")
            except ValueError:
                print("ValueError caught inside coroutine")
    except GeneratorExit:
        print("Coroutine closed")

# 创建生成器对象
coro = coroutine()

# 启动生成器(或使用 coro.send(None))
next(coro)

# 发送值到生成器
coro.send(10)  # 输出 "Received: 10"
coro.send(20)  # 输出 "Received: 20"

# 向生成器抛出异常
coro.throw(ValueError)  # 输出 "ValueError caught inside coroutine"

# 关闭生成器
coro.close()  # 输出 "Coroutine closed"

详细解释

  1. 定义生成器函数

    def coroutine():
        print("Coroutine started")
        try:
            while True:
                try:
                    value = yield
                    print(f"Received: {value}")
                except ValueError:
                    print("ValueError caught inside coroutine")
        except GeneratorExit:
            print("Coroutine closed")
    
  2. 创建生成器对象

    coro = coroutine()
    
  3. 启动生成器

    next(coro)
    

    coro.send(None)
    
  4. 发送值到生成器

    coro.send(10)  # 输出 "Received: 10"
    coro.send(20)  # 输出 "Received: 20"
    
  5. 向生成器抛出异常

    coro.throw(ValueError)  # 输出 "ValueError caught inside coroutine"
    
  6. 关闭生成器

    coro.close()  # 输出 "Coroutine closed"
    

实际应用

生成器实现的协程在处理异步I/O操作、并发任务调度等方面非常有用。下面是一个实际应用示例,展示了如何使用生成器实现简单的任务调度器:

import time

def task1():
    while True:
        print("Task 1 running")
        yield
        time.sleep(1)

def task2():
    while True:
        print("Task 2 running")
        yield
        time.sleep(1)

def scheduler(tasks):
    while tasks:
        task = tasks.pop(0)
        try:
            next(task)
            tasks.append(task)
        except StopIteration:
            pass

# 创建任务生成器
tasks = [task1(), task2()]

# 启动任务调度器
scheduler(tasks)

在这个示例中,task1​ 和 task2​ 是两个简单的任务生成器,scheduler​ 函数负责循环调度这些任务,实现了简单的协程调度器。

通过这些示例,你可以看到生成器在实现协程方面的强大功能,它们可以帮助你编写高效的并发代码,从而更好地处理I/O密集型任务和其他需要并发的场景。

0

评论区