十三、协程和异步io
一、并发、并行、同步、异步、阻塞和非阻塞
在计算机科学中,特别是在编程和操作系统的上下文中,以下几个概念非常重要:并发、并行、同步、异步、阻塞和非阻塞。理解这些概念有助于更好地设计和优化程序。以下是这些概念的详细介绍:
并发(Concurrency)
并发是指在同一时间段内处理多个任务。并发并不意味着同时执行,而是任务在时间片上交替进行。例如,在单核 CPU 上,通过快速切换任务来实现并发。Python 中的多线程和异步编程都属于并发编程。
并行(Parallelism)
并行是指在同一时刻同时执行多个任务。并行需要多核 CPU 或多处理器系统支持。多进程编程通常用于实现并行,因为每个进程可以在不同的 CPU 核心上运行。
同步(Synchronous)
同步是指任务按顺序执行,一个任务必须等待前一个任务完成后才能开始。同步编程通常会导致阻塞,因为一个任务在等待资源或数据时会阻塞其他任务的执行。
异步(Asynchronous)
异步是指任务可以独立于其他任务执行,不必等待其他任务完成。异步编程允许任务在等待资源或数据时继续执行其他任务,从而提高效率和响应速度。Python 中的 asyncio 模块支持异步编程。
阻塞(Blocking)
阻塞是指一个任务在等待某个条件满足(如 I/O 操作完成)时,会停止执行,直到条件满足。阻塞会导致资源浪费,因为 CPU 在等待期间无法执行其他任务。
非阻塞(Non-blocking)
非阻塞是指一个任务在等待某个条件满足时,不会停止执行,而是立即返回并执行其他任务。非阻塞通常与异步编程结合使用,以提高程序的响应性和效率。
示例
以下是一些示例代码,帮助理解这些概念。
并发和并行
import threading
import multiprocessing
def task():
print("Task executed")
# 并发(多线程)
threads = [threading.Thread(target=task) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# 并行(多进程)
processes = [multiprocessing.Process(target=task) for _ in range(5)]
for process in processes:
process.start()
for process in processes:
process.join()
同步和异步
import time
import asyncio
# 同步
def sync_task():
time.sleep(1)
print("Synchronous task completed")
for _ in range(5):
sync_task()
# 异步
async def async_task():
await asyncio.sleep(1)
print("Asynchronous task completed")
async def main():
tasks = [async_task() for _ in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
阻塞和非阻塞
import socket
# 阻塞 I/O
def blocking_io():
sock = socket.socket()
sock.connect(('example.com', 80))
data = sock.recv(1024)
print("Blocking I/O completed")
# 非阻塞 I/O
def non_blocking_io():
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass # 连接正在进行中
while True:
try:
data = sock.recv(1024)
break
except BlockingIOError:
continue # 继续等待数据
print("Non-blocking I/O completed")
blocking_io()
non_blocking_io()
这些示例展示了如何在 Python 中实现并发、并行、同步、异步、阻塞和非阻塞操作。理解这些概念有助于编写高效的程序,特别是在处理 I/O 密集型任务时。
二、IO多路复用
IO多路复用(I/O Multiplexing)是指在一个线程内同时监视多个文件描述符(file descriptors),一旦某个文件描述符就绪(有数据可读、可写或发生错误),便通知程序进行相应的读写操作。它主要用于网络编程中处理大量并发连接。常见的IO多路复用机制包括 select、poll 和 epoll。下面是对这三种机制的详细解释:
1. select
特点
-
select 是最早的 IO 多路复用机制,几乎在所有的操作系统上都可以使用。 - 它通过一个固定大小的数组来存储文件描述符。
使用方法
-
select 需要传入三个文件描述符集合(读、写、异常)以及一个超时时间。 - 当调用
select 时,内核会检查这些文件描述符的状态,如果有文件描述符就绪,则返回。
缺点
- 文件描述符数量有限制(通常是 1024)。
- 每次调用
select 都需要将文件描述符集合从用户态拷贝到内核态,开销较大。 - 当文件描述符数量很多时,效率会下降,因为每次都需要遍历整个集合。
2. poll
特点
-
poll 是select 的改进版本,去掉了文件描述符数量的限制。 - 它使用一个链表来存储文件描述符,因此可以处理更多的文件描述符。
使用方法
-
poll 需要传入一个文件描述符数组及其事件类型。 - 内核会检查这些文件描述符的状态,并返回一个就绪的文件描述符列表。
缺点
- 与
select 类似,每次调用poll 都需要将文件描述符集合从用户态拷贝到内核态。 - 当文件描述符数量很多时,效率也会下降,因为每次都需要遍历整个数组。
3. epoll
特点
-
epoll 是 Linux 特有的 IO 多路复用机制,是对select 和poll 的进一步优化。 - 它使用一个事件驱动的机制,效率更高,适用于大量并发连接的场景。
使用方法
-
epoll 使用三个系统调用:epoll_create、epoll_ctl 和epoll_wait。-
epoll_create 创建一个 epoll 实例。 -
epoll_ctl 添加、修改或删除文件描述符。 -
epoll_wait 等待事件发生。
-
-
epoll 使用一个红黑树来管理文件描述符,只有文件描述符状态变化时才会通知程序。
优点
- 没有文件描述符数量的限制。
- 不需要每次都遍历整个文件描述符集合,只处理就绪的文件描述符,效率更高。
- 内核和用户态之间的拷贝开销较小。
缺点
- 仅在 Linux 系统上可用。
总结
-
select 和poll 适用于少量文件描述符的场景,但当文件描述符数量较多时,性能会显著下降。 -
epoll 更适用于大量并发连接的场景,具有更高的效率和更好的扩展性。
三、协程
在 Python 中,协程(Coroutine)是一种比线程更加轻量级的并发编程方式。协程允许你在一个函数执行过程中暂停,并在稍后继续执行,而不需要像线程那样进行上下文切换。协程通过 async 和 await 关键字来实现。
协程的特点
- 轻量级:协程比线程更轻量,因为它们在单一线程中运行,不需要操作系统级别的线程管理。
- 非阻塞:协程可以在等待 I/O 操作时暂停执行,并在 I/O 操作完成后恢复执行,从而提高程序的并发性能。
- 可暂停和恢复:协程可以在特定位置暂停执行,并在稍后从暂停的位置继续执行。
基本用法
以下是一个简单的 Python 协程示例:
import asyncio
# 定义一个协程函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟异步操作
print("World")
# 主函数
async def main():
await say_hello()
# 运行主函数
asyncio.run(main())
解释
-
async def say_hello():定义一个协程函数,使用async 关键字。 -
await asyncio.sleep(1):暂停协程的运行,模拟一个耗时的异步操作(比如 I/O 操作)。await 关键字用于等待一个异步操作完成。 -
asyncio.run(main()):运行主协程函数main,这是启动协程的标准方式。
更复杂的例子:并发执行多个协程
下面是一个更复杂的例子,展示了如何并发执行多个协程:
import asyncio
async def fetch_data(id):
print(f"Fetching data for {id}")
await asyncio.sleep(2) # 模拟一个耗时操作
print(f"Data fetched for {id}")
async def main():
tasks = []
for i in range(5):
tasks.append(fetch_data(i))
# 并发运行所有任务
await asyncio.gather(*tasks)
# 运行主函数
asyncio.run(main())
解释
-
fetch_data:一个模拟数据获取的协程函数。 -
asyncio.gather(*tasks):并发运行多个协程。tasks 是一个包含所有协程任务的列表,asyncio.gather 会并发地运行这些任务。 -
asyncio.run(main()):启动主协程main。
协程的优势
- 高效的 I/O 操作:协程在等待 I/O 操作时不会阻塞线程,从而可以有效地处理大量并发 I/O 操作。
- 更好的资源利用:协程比线程更轻量,不需要操作系统级别的线程管理,因此可以创建更多的并发任务而不会消耗大量系统资源。
四、生成器进阶(send、close、throws)
生成器(Generator)是Python中一种特殊的迭代器,通过使用 yield 关键字来生成一个序列的值。生成器不仅可以用于迭代,还提供了一些高级方法,如 send()、close() 和 throw(),使其更加灵活和强大。下面是对这些方法的详细介绍:
1. yield
yield 是生成器的核心,用于在生成器函数中返回值,并在函数的执行点暂停。每次调用生成器的 __next__() 方法时,生成器会从上次暂停的地方继续执行,直到遇到下一个 yield。
def simple_generator():
yield 1
yield 2
yield 3
gen = simple_generator()
print(next(gen)) # 输出 1
print(next(gen)) # 输出 2
print(next(gen)) # 输出 3
2. send(value)
send() 方法用于向生成器发送一个值,并在生成器暂停的地方恢复执行。发送的值会成为生成器函数中 yield 表达式的返回值。第一次调用生成器时,必须使用 __next__() 或 send(None) 来启动生成器。
def generator_with_send():
print("Start")
value = yield
print(f"Received: {value}")
yield value
gen = generator_with_send()
next(gen) # 启动生成器(或使用 gen.send(None))
gen.send("Hello") # 输出 "Received: Hello"
3. close()
close() 方法用于终止生成器的执行,并引发 GeneratorExit 异常。如果生成器在捕获到 GeneratorExit 异常时没有其他处理,会正常退出。
def generator_with_close():
try:
yield 1
yield 2
except GeneratorExit:
print("Generator closed")
gen = generator_with_close()
print(next(gen)) # 输出 1
gen.close() # 输出 "Generator closed"
4. throw(type, value=None, traceback=None)
throw() 方法用于在生成器中引发一个指定的异常,并在生成器暂停的地方恢复执行。生成器可以选择捕获该异常并处理,或让异常传播。
def generator_with_throw():
try:
yield 1
except ValueError:
print("ValueError caught")
yield 2
gen = generator_with_throw()
print(next(gen)) # 输出 1
gen.throw(ValueError) # 输出 "ValueError caught"
print(next(gen)) # 输出 2
5. 综合示例
下面是一个综合示例,展示了 yield、send()、close() 和 throw() 的使用:
def advanced_generator():
print("Generator started")
try:
while True:
try:
value = yield
print(f"Received: {value}")
except ValueError:
print("ValueError caught inside generator")
except GeneratorExit:
print("Generator closed")
gen = advanced_generator()
next(gen) # 启动生成器
gen.send("Hello") # 输出 "Received: Hello"
gen.throw(ValueError) # 输出 "ValueError caught inside generator"
gen.close() # 输出 "Generator closed"
通过这些方法,生成器在Python中不仅可以用于简单的迭代,还可以实现复杂的协程和异步编程模式。
五、async和await
在Python中,async 和 await 是用于定义和处理异步操作的关键字。它们是Python异步编程的核心,可以帮助你编写高效的I/O操作和并发程序。下面是对这两个关键字的详细介绍:
1. async 关键字
async 关键字用于定义一个异步函数。异步函数在被调用时返回一个协程对象,而不是立即执行函数体。协程对象可以使用 await 关键字来挂起其执行,直到等待的操作完成。
import asyncio
async def async_function():
print("Hello")
await asyncio.sleep(1) # 模拟异步I/O操作
print("World")
# 调用异步函数
coroutine = async_function()
2. await 关键字
await 关键字用于暂停异步函数的执行,等待一个异步操作完成。它只能在 async 定义的函数中使用。await 后面通常跟随一个协程对象、任务或Future对象。
async def another_async_function():
print("Start")
await asyncio.sleep(2) # 等待2秒
print("End")
# 运行异步函数
asyncio.run(another_async_function())
3. 异步编程示例
下面是一个简单的示例,展示了如何使用 async 和 await 来执行异步操作:
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # 模拟网络请求
return "Data received"
async def main():
print("Start main")
data = await fetch_data() # 等待 fetch_data 完成
print(data)
print("End main")
# 运行主异步函数
asyncio.run(main())
4. 并发执行
你可以使用 asyncio.gather() 来并发执行多个异步操作:
async def task1():
await asyncio.sleep(1)
print("Task 1 completed")
async def task2():
await asyncio.sleep(2)
print("Task 2 completed")
async def main():
await asyncio.gather(task1(), task2()) # 并发执行 task1 和 task2
asyncio.run(main())
5. 异步上下文管理器和迭代器
你还可以使用 async 和 await 来定义异步上下文管理器和异步迭代器:
异步上下文管理器
class AsyncContextManager:
async def __aenter__(self):
print("Enter context")
return self
async def __aexit__(self, exc_type, exc, tb):
print("Exit context")
async def main():
async with AsyncContextManager():
print("Inside context")
asyncio.run(main())
异步迭代器
class AsyncIterator:
def __init__(self):
self.counter = 0
async def __anext__(self):
if self.counter < 3:
self.counter += 1
await asyncio.sleep(1) # 模拟异步操作
return self.counter
else:
raise StopAsyncIteration
def __aiter__(self):
return self
async def main():
async for value in AsyncIterator():
print(value)
asyncio.run(main())
总结
-
async 用于定义异步函数,返回一个协程对象。 -
await 用于暂停异步函数的执行,等待一个异步操作完成。 - 使用
asyncio.run() 来运行异步函数。 - 使用
asyncio.gather() 并发执行多个异步操作。 - 可以定义异步上下文管理器和异步迭代器来处理更复杂的异步操作。
通过掌握 async 和 await,你可以编写高效的异步代码,以便更好地处理I/O密集型任务和并发操作。
六、生成器实现协程
在Python中,生成器可以用来实现协程。协程是一种更高级的控制流结构,它允许你在函数执行过程中暂停和恢复。生成器通过 yield 关键字实现了这种行为,但要实现协程的更多功能,还需要使用生成器的 send()、throw() 和 close() 方法。
基本概念
- 生成器函数:使用
yield 关键字的函数,调用时返回一个生成器对象。 - 生成器对象:支持迭代的对象,可以暂停和恢复其执行。
实现协程的步骤
- 定义生成器函数:
使用yield 关键字定义生成器函数,使其能够暂停和恢复执行。 - 使用
send() 方法:
通过send() 方法向生成器发送值,并在生成器内部处理这些值。 - 使用
throw() 方法:
通过throw() 方法向生成器抛出异常,在生成器内部捕获并处理这些异常。 - 使用
close() 方法:
通过close() 方法关闭生成器,生成器会捕获GeneratorExit 异常并执行清理操作。
示例代码
下面是一个示例,展示了如何使用生成器实现协程:
def coroutine():
print("Coroutine started")
try:
while True:
try:
value = yield
print(f"Received: {value}")
except ValueError:
print("ValueError caught inside coroutine")
except GeneratorExit:
print("Coroutine closed")
# 创建生成器对象
coro = coroutine()
# 启动生成器(或使用 coro.send(None))
next(coro)
# 发送值到生成器
coro.send(10) # 输出 "Received: 10"
coro.send(20) # 输出 "Received: 20"
# 向生成器抛出异常
coro.throw(ValueError) # 输出 "ValueError caught inside coroutine"
# 关闭生成器
coro.close() # 输出 "Coroutine closed"
详细解释
-
定义生成器函数:
def coroutine(): print("Coroutine started") try: while True: try: value = yield print(f"Received: {value}") except ValueError: print("ValueError caught inside coroutine") except GeneratorExit: print("Coroutine closed") -
创建生成器对象:
coro = coroutine() -
启动生成器:
next(coro)或
coro.send(None) -
发送值到生成器:
coro.send(10) # 输出 "Received: 10" coro.send(20) # 输出 "Received: 20" -
向生成器抛出异常:
coro.throw(ValueError) # 输出 "ValueError caught inside coroutine" -
关闭生成器:
coro.close() # 输出 "Coroutine closed"
实际应用
生成器实现的协程在处理异步I/O操作、并发任务调度等方面非常有用。下面是一个实际应用示例,展示了如何使用生成器实现简单的任务调度器:
import time
def task1():
while True:
print("Task 1 running")
yield
time.sleep(1)
def task2():
while True:
print("Task 2 running")
yield
time.sleep(1)
def scheduler(tasks):
while tasks:
task = tasks.pop(0)
try:
next(task)
tasks.append(task)
except StopIteration:
pass
# 创建任务生成器
tasks = [task1(), task2()]
# 启动任务调度器
scheduler(tasks)
在这个示例中,task1 和 task2 是两个简单的任务生成器,scheduler 函数负责循环调度这些任务,实现了简单的协程调度器。
通过这些示例,你可以看到生成器在实现协程方面的强大功能,它们可以帮助你编写高效的并发代码,从而更好地处理I/O密集型任务和其他需要并发的场景。
评论区