Jean's Blog

一个专注软件测试开发技术的个人博客

0%

异步编程

异步函数(协程函数)

async和协程函数

  • 在 Python 中,协程是可暂停的函数,定义方式为 async def
  • 如果直接调用, 它不会立即执行,返回的是一个 coroutine (协程)对象。
1
2
3
async def say_hello():
print("开始执行say hello这个任务")
return "Hello"

不能直接调用协程函数获得结果,必须使用 await 或事件循环调度。

await 和可等待对象

在 Python 异步编程中,await 后面必须是可等待对象,即:

实现了 __await__() 方法的对象,称为 awaitable(可等待对象)

Python 中的三种原生 awaitable 类型

类型 示例 说明
协程对象 async def调用返回 最常见的 awaitable
Task 对象 asyncio.create_task()的返回值 包装协程为并发任务
Future 对象 asyncio.Future() 底层机制,手动控制结果

协程的调用方式

错误调用方式

1
result = say_hello()  # 返回 coroutine 对象,不会执行

正确调用方式:

  • 使用 await(仅限在 async def 中使用)
  • 或用 asyncio.run() 在顶层运行,asyncio异步编程的核心库
1
2
3
4
5
6
7
import asyncio

async def main():
result = await say_hello()
print(result)

asyncio.run(main())

异步编程核心架构及流程

graph TB
    subgraph "Python异步编程架构"
        A[应用层] --> B[协程 Coroutines]
        A --> C[任务 Tasks]
        A --> D[Future对象]

        B --> E[asyncio 运行时]
        C --> E
        D --> E

        E --> F[事件循环 Event Loop]

        F --> G[选择器 Selector]
        F --> H[任务调度器]
        F --> I[回调管理器]

        G --> J[操作系统I/O多路复用]

        H --> K[就绪队列 Ready Queue]
        H --> L[定时器队列 Timer Queue]
        H --> M[完成队列 Done Queue]

        J --> N[网络I/O事件]
        J --> O[文件I/O事件]
        J --> P[子进程事件]
    end

    subgraph "协程执行状态机"
        Q[协程状态] --> R[未启动 CREATED]
        R --> S[运行中 RUNNING]
        S --> T[挂起 SUSPENDED]
        T --> S
        S --> U[完成 FINISHED]
        T --> V[异常 EXCEPTION]
    end
flowchart TD
    A[异步程序启动] --> B[创建事件循环]
    B --> C[运行主协程
asyncio.run] C --> D[协程调用栈初始化] D --> E[创建Task包装主协程] E --> F[事件循环开始运行
loop.run_until_complete] F --> G{就绪队列检查} G -->|有任务| H[取出Task执行] G -->|空| I[等待I/O事件] H --> J[执行Task._step] J --> K{协程状态} K -->|第一次执行| L[coro.sendNone启动] K -->|恢复执行| M[coro.send结果
或 coro.throw异常] L --> N{执行结果} M --> N N -->|返回新awaitable| O[处理嵌套await] N -->|StopIteration| P[Task完成设置结果] N -->|异常| Q[Task完成设置异常] O --> R{awaitable类型分析} R -->|协程| S[递归创建子Task] R -->|Future/Task| T[添加回调等待完成] R -->|其他| U[转换为Future] S --> V[新Task加入就绪队列] T --> W[当前Task暂停等待] U --> W W --> X[事件循环继续
执行其他任务] P --> Y[Task标记为完成] Q --> Y Y --> Z[触发回调链] Z --> AA[清理资源] X --> G I --> AB[调用select/poll/epoll
等待I/O事件] AB --> AC[I/O事件就绪] AC --> AD[对应Future设置结果] AD --> AE[相关Task加入就绪队列] AE --> G subgraph "并发执行模式" AF[多个Task并发] --> AG[时间片轮转] AG --> AH[遇到await让出控制权] AH --> AI[事件循环调度下一个Task] AI --> AJ[I/O完成时恢复执行] end

协程执行并发

asyncio.create_task()

提前调度多个协程并发执行

1
2
3
4
5
async def main():
task1 = asyncio.create_task(say_hello())
task2 = asyncio.create_task(say_hello())
await task1
await task2

asyncio.create_task() 是 Python 异步编程的核心函数之一,用于将协程包装为 Task 对象并调度执行。

执行流程详解

flowchart TD
    A[调用 asyncio.create_task] --> B{参数有效性检查}
    B -->|无效| C[抛出TypeError异常]
    B -->|有效| D[获取当前事件循环]

    D --> E[创建Task对象]

    E --> F[初始化Task]
    F --> G[设置协程对象]
    F --> H[设置状态为PENDING]
    F --> I[创建Future对象]
    F --> J[注册回调函数]

    J --> K[调度Task执行]

    K --> L[将Task加入就绪队列]

    L --> M[事件循环开始调度]
    M --> N{事件循环是否运行}
    N -->|是| O[执行Task._step方法]
    N -->|否| P[等待事件循环启动]

    O --> Q{协程执行状态}
    Q -->|第一次执行| R[发送None启动协程]
    Q -->|后续恢复| S[发送结果或异常]

    R --> T{协程执行结果}
    S --> T

    T -->|返回awaitable对象| U[处理awaitable对象]
    T -->|抛出StopIteration| V[协程执行完成]
    T -->|抛出其他异常| W[设置Task异常]

    U --> X{awaitable对象类型}
    X -->|Future或Task| Y[添加完成回调]
    X -->|协程| Z[递归创建子Task]
    X -->|其他类型| AA[转换为Future对象]

    Y --> AB[暂停当前Task执行]
    Z --> AB
    AA --> AB

    AB --> AC[事件循环继续其他任务]

    V --> AD[Task状态变为FINISHED]
    W --> AD

    AD --> AE[执行完成回调函数]

    AE --> AF[清理Task资源]
    AF --> AG[Task执行完成]

    AC --> M

asyncio.gather()

批量创建协程任务,并发执行

1
2
async def main():
await asyncio.gather(say_hello(),say_hello())

asyncio.gather() 是 Python 异步编程中用于并发运行多个可等待对象的核心函数,它收集所有结果并以列表形式返回,保持原始顺序。

执行流程详解

flowchart TD
    A[调用 asyncio.gather
*aws, return_exceptions=False] --> B[创建 _GatheringFuture 对象] B --> C{检查参数} C -->|无参数| D[返回空的已完成 Future] C -->|有参数| E[初始化结果容器和计数器] E --> F[遍历所有可等待对象 aws] F --> G{是否为协程或 Future?} G -->|协程| H[包装为 Task
ensure_future] G -->|Future| I[直接使用] G -->|其他| J[转换为 Future] H --> K[设置回调函数] I --> K J --> K K --> L[为每个 Future 添加
_done_callback 回调] L --> M[将所有内部 Future
保存到 _children 集合] M --> N{所有 Future 已创建?} N -->|否| F N -->|是| O[返回 _GatheringFuture
给调用者] O --> P{_GatheringFuture 是否被取消?} P -->|是| Q[取消所有子 Future] P -->|否| R[等待所有子 Future 完成] subgraph "回调处理流程 (并行执行)" direction LR S[子 Future 完成] --> T[执行 _done_callback] T --> U{子 Future 是否有异常?} U -->|是| V{return_exceptions 参数?} V -->|True| W[将异常作为结果保存] V -->|False| X[取消所有其他子 Future] U -->|否| Y[将结果保存到对应位置] X --> Z[设置 _GatheringFuture 异常] W --> AA[更新完成计数器] Y --> AA AA --> AB{所有子 Future 已完成?} AB -->|是| AC[设置 _GatheringFuture 结果] AB -->|否| AD[继续等待其他] end Q --> AE[_GatheringFuture 被取消] Z --> AE AC --> AF[_GatheringFuture 完成
返回结果列表] R --> S AD --> R

迭代器和异步迭代器

什么是迭代器?

  • 迭代器对象需要实现:

    • __iter__() → 返回迭代器自身

    • __next__() → 每次迭代返回一个值,结束时抛出 StopIteration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SyncCounter:
def __init__(self, max):
self.count = 0
self.max = max

def __iter__(self):
return self

def __next__(self):
if self.count >= self.max:
raise StopIteration
self.count += 1
return self.count

for num in SyncCounter(3):
print(num)

什么是异步迭代器?

  • 需要实现两个异步方法:

    • __aiter__() → 返回异步迭代器对象(一般为 self)

    • __anext__() → 使用 await 返回下一个值;结束时抛出 StopAsyncIteration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AsyncCounter:
def __init__(self, max):
self.count = 0
self.max = max

def __aiter__(self):
return self

async def __anext__(self):
if self.count >= self.max:
raise StopAsyncIteration
await asyncio.sleep(1)
self.count += 1
return self.count

async def main():
async for i in AsyncCounter(3):
print(i)

asyncio.run(main())

LangChain中异步迭代器应用

.astream()方法执行返回的就是异步迭代器

  • 当你运行一个多轮对话、决策链、或 Agent 图谱时,可以通过 .astream() 以“事件流”的形式异步获取每个节点的中间状态或输出。
1
2
3
4
5
6
async def run_stream(app, input):
async for event in app.astream(input):
print("📦 当前状态片段:", event)

# 调用方式
asyncio.run(run_stream(app, {"input": "开始执行"}))

生成器和异步生成器

同步生成器

  • 使用 yield 定义生成器函数
  • 每次 next() 会从上次中断的地方继续执行
1
2
3
4
5
6
7
def countdown(n):
while n > 0:
yield n
n -= 1

for num in countdown(3):
print(num)

异步生成器的语法与机制

  • 使用 async def + yield
  • async for 来消费
  • yield 之间可使用 await 执行异步操作
1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def async_countdown(n):
while n > 0:
await asyncio.sleep(1)
yield n
n -= 1

async def main():
async for value in async_countdown(3):
print(value)

asyncio.run(main())

异步生成器 vs 异步迭代器

特性 异步迭代器(手写类) 异步生成器(函数式)
编写方式 类 + __anext__() async def + yield
是否简洁 ❌ 复杂 ✅ 简洁
推荐使用场景 控制状态较复杂的异步迭代 异步数据流、分页等

上下文管理器和异步上下文

上下文管理器

作用

  • 管理资源的申请与释放
  • 通过 with 语句保证资源自动关闭(文件、数据库连接等)

语法

1
2
with open('file.txt', 'r') as f:
data = f.read()

自定义上下文管理器

  • 实现 __enter__()__exit__() 方法
1
2
3
4
5
6
7
8
9
10
class MyContext:
def __enter__(self):
print("进入上下文")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
print("退出上下文,释放资源")

with MyContext() as ctx:
print("处理中...")

异步上下文管理器基础

作用

  • 处理异步资源的申请和释放
  • 用于异步 I/O 操作、数据库连接池、HTTP 客户端等

语法

1
2
3
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com') as resp:
data = await resp.text()

自定义异步上下文管理器

  • 实现 __aenter__()__aexit__() 异步方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class AsyncContext:
async def __aenter__(self):
print("异步进入上下文")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("异步退出上下文,释放资源")

async def main():
async with AsyncContext() as ctx:
print("异步处理中...")

import asyncio
asyncio.run(main())