Skip to content

Streaming Pipeline (Rust)

The streaming pipeline is the core of ai-lib-rust. It processes provider responses through composable operators, each driven by protocol configuration.

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

Each operator is a stage in the pipeline:

Converts raw byte streams into JSON frames.

FormatDescription
sseServer-Sent Events (OpenAI, Groq, etc.)
ndjsonNewline-delimited JSON
anthropic_sseAnthropic’s custom SSE format

The decoder format is specified in the provider manifest:

streaming:
decoder:
format: "sse"
done_signal: "[DONE]"

Filters JSON frames using JSONPath expressions defined in the manifest’s event_map:

event_map:
- match: "$.choices[0].delta.content"
emit: "PartialContentDelta"

Statefully assembles partial tool calls. When a provider streams tool call arguments in chunks, the accumulator collects them into complete tool calls:

PartialToolCall("get_we") → PartialToolCall("ather") → PartialToolCall("(\"Tokyo\")")

Handles multi-candidate responses (when n > 1). Expands candidates into separate event streams.

The final stage — converts processed frames into unified StreamingEvent types:

  • StreamingEvent::ContentDelta — Text content
  • StreamingEvent::ToolCallStarted — Tool invocation begins
  • StreamingEvent::PartialToolCall — Tool argument chunk
  • StreamingEvent::StreamEnd — Response complete

The pipeline is built automatically from the provider manifest. No manual configuration needed:

// The pipeline is constructed internally based on the protocol manifest
let mut stream = client.chat()
.user("Hello")
.stream()
.execute_stream()
.await?;

The runtime reads the streaming section of the manifest and wires up the appropriate decoder, selector rules, and event mapper.

The pipeline also includes resilience operators:

  • Retry — Retries failed streams based on the manifest’s retry policy
  • Fallback — Falls back to alternative providers/models on failure