ストリーミングパイプライン(Go)
ストリーミングパイプライン
Section titled “ストリーミングパイプライン”ストリーミングパイプラインは ai-lib-go の中核です。プロトコル設定によって駆動される構成可能なオペレーターを通じて、プロバイダーのレスポンスを処理します。
パイプラインアーキテクチャ
Section titled “パイプラインアーキテクチャ”Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent各オペレーターはパイプラインのステージです:
1. Decoder
Section titled “1. Decoder”生のバイトストリームを JSON フレームに変換します。
| 形式 | 説明 |
|---|---|
sse | Server-Sent Events(OpenAI、Groq など) |
ndjson | 改行区切り JSON |
anthropic_sse | Anthropic のカスタム SSE 形式 |
デコーダー形式はプロバイダーマニフェストで指定されます:
streaming: decoder: format: 'sse' done_signal: '[DONE]'2. Selector
Section titled “2. Selector”マニフェストの event_map で定義された JSONPath 式を使用して JSON フレームをフィルタリングします:
event_map: - match: '$.choices[0].delta.content' emit: 'PartialContentDelta'3. Accumulator
Section titled “3. Accumulator”部分的なツール呼び出しを状態を持って組み立てます。プロバイダーがツール呼び出し引数をチャンクでストリーミングする場合、アキュムレーターはそれらを完全なツール呼び出しに収集します:
PartialToolCall("get_we") → PartialToolCall("ather") → PartialToolCall("(\"Tokyo\")")4. FanOut
Section titled “4. FanOut”マルチ候補レスポンス(n > 1 の場合)を処理します。候補を別々のイベントストリームに展開します。
5. EventMapper
Section titled “5. EventMapper”最終ステージ — 処理済みフレームを統一 StreamingEvent 型に変換します。
プロトコル駆動の構築
Section titled “プロトコル駆動の構築”パイプラインはプロバイダーマニフェストから自動的に構築されます。手動設定は不要です:
// パイプラインはプロトコルマニフェストに基づいて内部的に構築されますstream, err := aiClient.Chat(). User("こんにちは"). ExecuteStream(ctx)if err != nil { panic(err)}defer stream.Close()
for stream.Next() { event := stream.Event() // イベントの処理}ランタイムはマニフェストの streaming セクションを読み、適切なデコーダー、セレクタールール、イベントマッパーを配線します。
リトライおよびフォールバックオペレーター
Section titled “リトライおよびフォールバックオペレーター”パイプラインには耐障害性オペレーターも含まれます:
- Retry — マニフェストのリトライポリシーに基づいて失敗したストリームをリトライ
- Fallback — 障害時に代替プロバイダー/モデルにフォールバック