异步函数(协程函数)
async和协程函数
- 在 Python 中,协程是可暂停的函数,定义方式为
async def。 - 如果直接调用, 它不会立即执行,返回的是一个
coroutine(协程)对象。
1 | async def say_hello(): |
不能直接调用协程函数获得结果,必须使用 await 或事件循环调度。
await 和可等待对象
在 Python 异步编程中,await 后面必须是可等待对象,即:
实现了 __await__() 方法的对象,称为 awaitable(可等待对象)
Python 中的三种原生 awaitable 类型
| 类型 | 示例 | 说明 |
|---|---|---|
| 协程对象 | 由 async def调用返回 |
最常见的 awaitable |
| Task 对象 | asyncio.create_task()的返回值 |
包装协程为并发任务 |
| Future 对象 | asyncio.Future() |
底层机制,手动控制结果 |
协程的调用方式
错误调用方式
1 | result = say_hello() # 返回 coroutine 对象,不会执行 |
正确调用方式:
- 使用
await(仅限在async def中使用) - 或用
asyncio.run()在顶层运行,asyncio异步编程的核心库
1 | import asyncio |
异步编程核心架构及流程
graph TB
subgraph "Python异步编程架构"
A[应用层] --> B[协程 Coroutines]
A --> C[任务 Tasks]
A --> D[Future对象]
B --> E[asyncio 运行时]
C --> E
D --> E
E --> F[事件循环 Event Loop]
F --> G[选择器 Selector]
F --> H[任务调度器]
F --> I[回调管理器]
G --> J[操作系统I/O多路复用]
H --> K[就绪队列 Ready Queue]
H --> L[定时器队列 Timer Queue]
H --> M[完成队列 Done Queue]
J --> N[网络I/O事件]
J --> O[文件I/O事件]
J --> P[子进程事件]
end
subgraph "协程执行状态机"
Q[协程状态] --> R[未启动 CREATED]
R --> S[运行中 RUNNING]
S --> T[挂起 SUSPENDED]
T --> S
S --> U[完成 FINISHED]
T --> V[异常 EXCEPTION]
end
flowchart TD
A[异步程序启动] --> B[创建事件循环]
B --> C[运行主协程
asyncio.run]
C --> D[协程调用栈初始化]
D --> E[创建Task包装主协程]
E --> F[事件循环开始运行
loop.run_until_complete]
F --> G{就绪队列检查}
G -->|有任务| H[取出Task执行]
G -->|空| I[等待I/O事件]
H --> J[执行Task._step]
J --> K{协程状态}
K -->|第一次执行| L[coro.sendNone启动]
K -->|恢复执行| M[coro.send结果
或 coro.throw异常]
L --> N{执行结果}
M --> N
N -->|返回新awaitable| O[处理嵌套await]
N -->|StopIteration| P[Task完成设置结果]
N -->|异常| Q[Task完成设置异常]
O --> R{awaitable类型分析}
R -->|协程| S[递归创建子Task]
R -->|Future/Task| T[添加回调等待完成]
R -->|其他| U[转换为Future]
S --> V[新Task加入就绪队列]
T --> W[当前Task暂停等待]
U --> W
W --> X[事件循环继续
执行其他任务]
P --> Y[Task标记为完成]
Q --> Y
Y --> Z[触发回调链]
Z --> AA[清理资源]
X --> G
I --> AB[调用select/poll/epoll
等待I/O事件]
AB --> AC[I/O事件就绪]
AC --> AD[对应Future设置结果]
AD --> AE[相关Task加入就绪队列]
AE --> G
subgraph "并发执行模式"
AF[多个Task并发] --> AG[时间片轮转]
AG --> AH[遇到await让出控制权]
AH --> AI[事件循环调度下一个Task]
AI --> AJ[I/O完成时恢复执行]
end
协程执行并发
asyncio.create_task()
提前调度多个协程并发执行
1 | async def main(): |
asyncio.create_task() 是 Python 异步编程的核心函数之一,用于将协程包装为 Task 对象并调度执行。
执行流程详解
flowchart TD
A[调用 asyncio.create_task] --> B{参数有效性检查}
B -->|无效| C[抛出TypeError异常]
B -->|有效| D[获取当前事件循环]
D --> E[创建Task对象]
E --> F[初始化Task]
F --> G[设置协程对象]
F --> H[设置状态为PENDING]
F --> I[创建Future对象]
F --> J[注册回调函数]
J --> K[调度Task执行]
K --> L[将Task加入就绪队列]
L --> M[事件循环开始调度]
M --> N{事件循环是否运行}
N -->|是| O[执行Task._step方法]
N -->|否| P[等待事件循环启动]
O --> Q{协程执行状态}
Q -->|第一次执行| R[发送None启动协程]
Q -->|后续恢复| S[发送结果或异常]
R --> T{协程执行结果}
S --> T
T -->|返回awaitable对象| U[处理awaitable对象]
T -->|抛出StopIteration| V[协程执行完成]
T -->|抛出其他异常| W[设置Task异常]
U --> X{awaitable对象类型}
X -->|Future或Task| Y[添加完成回调]
X -->|协程| Z[递归创建子Task]
X -->|其他类型| AA[转换为Future对象]
Y --> AB[暂停当前Task执行]
Z --> AB
AA --> AB
AB --> AC[事件循环继续其他任务]
V --> AD[Task状态变为FINISHED]
W --> AD
AD --> AE[执行完成回调函数]
AE --> AF[清理Task资源]
AF --> AG[Task执行完成]
AC --> M
asyncio.gather()
批量创建协程任务,并发执行
1 | async def main(): |
asyncio.gather() 是 Python 异步编程中用于并发运行多个可等待对象的核心函数,它收集所有结果并以列表形式返回,保持原始顺序。
执行流程详解
flowchart TD
A[调用 asyncio.gather
*aws, return_exceptions=False] --> B[创建 _GatheringFuture 对象]
B --> C{检查参数}
C -->|无参数| D[返回空的已完成 Future]
C -->|有参数| E[初始化结果容器和计数器]
E --> F[遍历所有可等待对象 aws]
F --> G{是否为协程或 Future?}
G -->|协程| H[包装为 Task
ensure_future]
G -->|Future| I[直接使用]
G -->|其他| J[转换为 Future]
H --> K[设置回调函数]
I --> K
J --> K
K --> L[为每个 Future 添加
_done_callback 回调]
L --> M[将所有内部 Future
保存到 _children 集合]
M --> N{所有 Future 已创建?}
N -->|否| F
N -->|是| O[返回 _GatheringFuture
给调用者]
O --> P{_GatheringFuture 是否被取消?}
P -->|是| Q[取消所有子 Future]
P -->|否| R[等待所有子 Future 完成]
subgraph "回调处理流程 (并行执行)"
direction LR
S[子 Future 完成] --> T[执行 _done_callback]
T --> U{子 Future 是否有异常?}
U -->|是| V{return_exceptions 参数?}
V -->|True| W[将异常作为结果保存]
V -->|False| X[取消所有其他子 Future]
U -->|否| Y[将结果保存到对应位置]
X --> Z[设置 _GatheringFuture 异常]
W --> AA[更新完成计数器]
Y --> AA
AA --> AB{所有子 Future 已完成?}
AB -->|是| AC[设置 _GatheringFuture 结果]
AB -->|否| AD[继续等待其他]
end
Q --> AE[_GatheringFuture 被取消]
Z --> AE
AC --> AF[_GatheringFuture 完成
返回结果列表]
R --> S
AD --> R
迭代器和异步迭代器
什么是迭代器?
迭代器对象需要实现:
__iter__()→ 返回迭代器自身__next__()→ 每次迭代返回一个值,结束时抛出StopIteration
1 | class SyncCounter: |
什么是异步迭代器?
需要实现两个异步方法:
__aiter__()→ 返回异步迭代器对象(一般为 self)__anext__()→ 使用await返回下一个值;结束时抛出StopAsyncIteration
1 | class AsyncCounter: |
LangChain中异步迭代器应用
.astream()方法执行返回的就是异步迭代器
- 当你运行一个多轮对话、决策链、或 Agent 图谱时,可以通过
.astream()以“事件流”的形式异步获取每个节点的中间状态或输出。
1 | async def run_stream(app, input): |
生成器和异步生成器
同步生成器
- 使用
yield定义生成器函数 - 每次
next()会从上次中断的地方继续执行
1 | def countdown(n): |
异步生成器的语法与机制
- 使用
async def+yield - 用
async for来消费 - 在
yield之间可使用await执行异步操作
1 | import asyncio |
异步生成器 vs 异步迭代器
| 特性 | 异步迭代器(手写类) | 异步生成器(函数式) |
|---|---|---|
| 编写方式 | 类 + __anext__() |
async def + yield |
| 是否简洁 | ❌ 复杂 | ✅ 简洁 |
| 推荐使用场景 | 控制状态较复杂的异步迭代 | 异步数据流、分页等 |
上下文管理器和异步上下文
上下文管理器
作用
- 管理资源的申请与释放
- 通过
with语句保证资源自动关闭(文件、数据库连接等)
语法
1 | with open('file.txt', 'r') as f: |
自定义上下文管理器
- 实现
__enter__()和__exit__()方法
1 | class MyContext: |
异步上下文管理器基础
作用
- 处理异步资源的申请和释放
- 用于异步 I/O 操作、数据库连接池、HTTP 客户端等
语法
1 | async with aiohttp.ClientSession() as session: |
自定义异步上下文管理器
- 实现
__aenter__()和__aexit__()异步方法
1 | class AsyncContext: |