Ir al contenido

Canalización de streaming (Go)

La canalización de streaming es el núcleo de ai-lib-go. Procesa las respuestas del proveedor a través de operadores composables, cada uno impulsado por la configuración del protocolo.

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

Cada operador es una etapa en la canalización:

Convierte flujos de bytes sin procesar en frames JSON.

FormatDescription
sseServer-Sent Events (OpenAI, Groq, etc.)
ndjsonJSON delimitado por líneas nuevas
anthropic_sseFormato SSE personalizado de Anthropic

El formato del decodificador se especifica en el manifiesto del proveedor:

streaming:
decoder:
format: 'sse'
done_signal: '[DONE]'

Filtra frames JSON usando expresiones JSONPath definidas en event_map del manifiesto:

event_map:
- match: '$.choices[0].delta.content'
emit: 'PartialContentDelta'

Ensambla statefulmente las llamadas a herramientas parciales. Cuando un proveedor transmite argumentos de llamadas a herramientas en fragmentos, el accumulator los recopila en llamadas completas:

PartialToolCall("get_we") → PartialToolCall("ather") → PartialToolCall("(\"Tokyo\")")

Maneja respuestas multicandidato (cuando n > 1). Expande candidatos en flujos de eventos separados.

La etapa final — convierte frames procesados en tipos StreamingEvent unificados.

La canalización se construye automáticamente a partir del manifiesto del proveedor. No se necesita configuración manual:

// La canalización se construye internamente basada en el manifiesto del protocolo
stream, err := aiClient.Chat().
User("Hola").
ExecuteStream(ctx)
if err != nil {
panic(err)
}
defer stream.Close()
for stream.Next() {
event := stream.Event()
// Procesar evento
}

El tiempo de ejecución lee la sección streaming del manifiesto y conecta el decodificador apropiado, las reglas del selector y el mapeador de eventos.

La canalización también incluye operadores de resiliencia:

  • Retry — Reintenta flujos fallidos según la política de reintento del manifiesto
  • Fallback — Cambia a proveedores/modelos alternativos ante fallos