跳转到内容

流式管道(Python)

Python SDK 实现与 Rust 运行时相同的基于算子的管道架构,并适配 Python 的异步生态。

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

将 HTTP 响应字节转换为 JSON 帧:

Decoder ClassProvider Format
SseDecoder标准 SSE(OpenAI、Groq 等)
JsonLinesDecoderNewline-delimited JSON
AnthropicSseDecoderAnthropic 自定义 SSE

解码器根据清单的 streaming.decoder.format 选择。

使用清单中的 JSONPath 表达式过滤 JSON 帧:

# Internally, the pipeline creates selectors from manifest rules:
# match: "$.choices[0].delta.content" → emit: "PartialContentDelta"

使用 jsonpath-ng 进行 JSONPath 表达式求值。

将部分工具调用组装为完整调用:

# Provider streams:
# {"tool_calls": [{"index": 0, "function": {"arguments": '{"ci'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'ty":"T'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'okyo"}'}}]}
# Accumulator produces complete: {"city": "Tokyo"}

对于多候选响应(n > 1),展开为按候选的流。

三种 mapper 实现:

MapperDescription
ProtocolEventMapper使用清单的 event_map 规则(JSONPath → 事件类型)
DefaultEventMapper兼容 OpenAI 的提供商的回退
AnthropicEventMapper处理 Anthropic 的独特事件结构

管道以异步迭代器形式暴露事件:

async for event in client.chat().user("Hello").stream():
if event.is_content_delta:
text = event.as_content_delta.text
print(text, end="")
elif event.is_tool_call_started:
call = event.as_tool_call_started
print(f"\nTool: {call.name}")
elif event.is_stream_end:
end = event.as_stream_end
print(f"\nFinish: {end.finish_reason}")

流支持优雅取消:

from ai_lib_python import CancelToken
token = CancelToken()
async for event in client.chat().user("...").stream(cancel_token=token):
# Cancel after receiving enough content
if total_chars > 1000:
token.cancel()
break