Skip to content

Streaming Pipeline (Go)

The streaming pipeline is the core of ai-lib-go. It processes provider responses through composable operators, each driven by protocol configuration.

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

Each operator is a stage in the pipeline:

Converts raw byte streams into JSON frames.

FormatDescription
sseServer-Sent Events (OpenAI, Groq, etc.)
ndjsonNewline-delimited JSON
anthropic_sseAnthropic’s custom SSE format

The decoder format is specified in the provider manifest:

streaming:
decoder:
format: 'sse'
done_signal: '[DONE]'

Filters JSON frames using JSONPath expressions defined in the manifest’s event_map:

event_map:
- match: '$.choices[0].delta.content'
emit: 'PartialContentDelta'

Statefully assembles partial tool calls. When a provider streams tool call arguments in chunks, the accumulator collects them into complete tool calls:

PartialToolCall("get_we") → PartialToolCall("ather") → PartialToolCall("(\"Tokyo\")")

Handles multi-candidate responses (when n > 1). Expands candidates into separate event streams.

The final stage — converts processed frames into unified StreamingEvent types.

The pipeline is built automatically from the provider manifest. No manual configuration needed:

// The pipeline is constructed internally based on the protocol manifest
stream, err := aiClient.Chat().
User("Hello").
ExecuteStream(ctx)
if err != nil {
panic(err)
}
defer stream.Close()
for stream.Next() {
event := stream.Event()
// Process event
}

The runtime reads the streaming section of the manifest and wires up the appropriate decoder, selector rules, and event mapper.

The pipeline also includes resilience operators:

  • Retry — Retries failed streams based on the manifest’s retry policy
  • Fallback — Falls back to alternative providers/models on failure