Module 630 min read · Building with AI APIs
Streaming Responses
Without streaming, a user stares at a blank screen for several seconds until the entire response is ready. With streaming, the first words appear in under a second and text flows naturally. This module covers how streaming works under the hood and how to implement it correctly in Python backends and browser frontends.
Why streaming matters: latency perception
There are two distinct latency numbers in AI APIs: time-to-first-token (TTFT) and total generation time. Without streaming, users experience the full total latency — a 5-second wait for a blank screen followed by instant full response. With streaming, content starts appearing at TTFT — typically 200ms to 800ms — and users perceive the application as dramatically more responsive even if total time is identical.
Human perception of wait time is nonlinear. A 5-second blank wait feels painful; the same 5 seconds watching text appear word by word feels natural and engaging. Streaming is not just a performance optimization — it is a UX transformation that changes how users judge your application's quality.
Server-Sent Events: the underlying protocol
Streaming AI responses use Server-Sent Events (SSE) — a simple HTTP protocol for pushing data from server to client over a persistent connection. Each event is a line beginning with data: followed by JSON content, followed by a blank line. The stream ends with a special data: [DONE] sentinel.
# Raw SSE stream (what flows over the HTTP connection):
data: {"choices":[{"delta":{"role":"assistant","content":""},"index":0}]}
data: {"choices":[{"delta":{"content":"The"},"index":0}]}
data: {"choices":[{"delta":{"content":" capital"},"index":0}]}
data: {"choices":[{"delta":{"content":" of"},"index":0}]}
data: {"choices":[{"delta":{"content":" France"},"index":0}]}
data: {"choices":[{"delta":{"content":" is"},"index":0}]}
data: {"choices":[{"delta":{"content":" Paris"},"index":0}]}
data: {"choices":[{"delta":{},"finish_reason":"stop","index":0}]}
data: [DONE]
Each delta.content field contains a small chunk of text — sometimes a full word, sometimes a few characters, sometimes just punctuation. Your code must accumulate these chunks to build the final response.
Streaming with the OpenAI Python SDK
The SDK abstracts the raw SSE protocol. You iterate over a stream object with a simple for loop. The SDK handles parsing the SSE events and extracting the delta content.
from openai import OpenAI
import sys
client = OpenAI()
def stream_response(prompt: str) -> str:
"""
Stream a response to stdout and return the complete text.
Simulates a terminal-based chat interface.
"""
full_response = ""
finish_reason = None
# stream=True enables streaming mode
with client.chat.completions.stream(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a concise technical assistant."},
{"role": "user", "content": prompt}
],
max_tokens=1000
) as stream:
for text in stream.text_stream:
# Each iteration yields a small string chunk
print(text, end="", flush=True) # flush=True is critical for real-time display
full_response += text
print() # newline after streaming completes
# Access the final snapshot for metadata
final = stream.get_final_completion()
print(f"\n[finish_reason: {final.choices[0].finish_reason}]")
print(f"[tokens: {final.usage.total_tokens}]")
return full_response
result = stream_response("Explain how garbage collection works in Python.")
Lower-level streaming with chunk iteration
For more control — particularly when handling tool calls or tracking metadata per chunk — iterate over the raw chunks rather than using the text_stream helper.
from openai import OpenAI
client = OpenAI()
def stream_with_chunks(messages: list) -> str:
"""Stream with full chunk access for monitoring and metadata."""
accumulated = ""
chunk_count = 0
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
for chunk in stream:
chunk_count += 1
choice = chunk.choices[0] if chunk.choices else None
if choice is None:
continue
# Extract delta content — may be None for the first chunk
delta_content = choice.delta.content
if delta_content:
accumulated += delta_content
print(delta_content, end="", flush=True)
# Check for completion
if choice.finish_reason is not None:
print(f"\n[done: {choice.finish_reason} after {chunk_count} chunks]")
return accumulated
Streaming with the Anthropic SDK
Anthropic's SDK uses a context manager pattern for streaming. The event model is more structured — you receive typed events for different phases of the response rather than a single uniform chunk type.
import anthropic
client = anthropic.Anthropic()
def stream_anthropic(prompt: str) -> str:
"""Stream a Claude response with Anthropic's SDK."""
full_text = ""
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
# text_stream yields string chunks — the cleanest interface
for text_chunk in stream.text_stream:
print(text_chunk, end="", flush=True)
full_text += text_chunk
print()
# get_final_message() returns the complete message with usage stats
final_message = stream.get_final_message()
usage = final_message.usage
print(f"\nInput tokens: {usage.input_tokens} | Output tokens: {usage.output_tokens}")
print(f"Stop reason: {final_message.stop_reason}")
return full_text
response = stream_anthropic("What are the main differences between async and sync Python?")
Streaming tool calls: accumulating JSON arguments
When the model wants to call a function during streaming, the tool call arguments arrive as JSON fragments across multiple chunks. You must accumulate the argument deltas and parse them only after the stream completes. This is one of the trickiest parts of streaming.
import json
from openai import OpenAI
client = OpenAI()
def stream_with_tool_calls(messages: list, tools: list) -> dict:
"""
Stream a response that may include tool calls.
Accumulates JSON argument fragments correctly.
"""
accumulated_content = ""
tool_calls_accumulator = {} # index -> {id, name, arguments_str}
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True
)
finish_reason = None
for chunk in stream:
if not chunk.choices:
continue
choice = chunk.choices[0]
delta = choice.delta
finish_reason = choice.finish_reason or finish_reason
# Accumulate regular text content
if delta.content:
accumulated_content += delta.content
print(delta.content, end="", flush=True)
# Accumulate tool call fragments
if delta.tool_calls:
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in tool_calls_accumulator:
tool_calls_accumulator[idx] = {
"id": "",
"name": "",
"arguments_str": ""
}
tc = tool_calls_accumulator[idx]
# These fields arrive on the first chunk for this tool call
if tc_delta.id:
tc["id"] += tc_delta.id
if tc_delta.function and tc_delta.function.name:
tc["name"] += tc_delta.function.name
# Arguments arrive as JSON fragments — accumulate as string
if tc_delta.function and tc_delta.function.arguments:
tc["arguments_str"] += tc_delta.function.arguments
# Parse accumulated tool calls
parsed_tool_calls = []
for idx, tc in sorted(tool_calls_accumulator.items()):
try:
arguments = json.loads(tc["arguments_str"])
except json.JSONDecodeError:
arguments = {} # malformed — handle gracefully
parsed_tool_calls.append({
"id": tc["id"],
"name": tc["name"],
"arguments": arguments
})
return {
"content": accumulated_content,
"tool_calls": parsed_tool_calls,
"finish_reason": finish_reason
}
Error handling in streams
Streams can fail mid-generation — network interruptions, timeouts, or provider errors. Always wrap streaming loops in try/except and decide whether to retry or surface the partial response.
from openai import OpenAI, APIConnectionError, APIStatusError
import time
client = OpenAI()
def stream_with_error_handling(messages: list, max_retries: int = 2) -> str:
"""Stream with robust error handling and partial response recovery."""
for attempt in range(max_retries + 1):
accumulated = ""
try:
with client.chat.completions.stream(
model="gpt-4o-mini",
messages=messages,
max_tokens=500,
timeout=30.0 # timeout per request, not per chunk
) as stream:
for text in stream.text_stream:
accumulated += text
print(text, end="", flush=True)
print()
return accumulated
except APIConnectionError as e:
print(f"\n[Connection lost after {len(accumulated)} chars]")
if attempt < max_retries:
print(f"Retrying in {2**attempt}s...")
time.sleep(2**attempt)
else:
# Return whatever we got — partial response may still be useful
return accumulated or ""
except APIStatusError as e:
if e.status_code == 429: # rate limit
time.sleep(2**attempt)
else:
raise # don't retry on other errors
return accumulated
Building a real-time streaming UI in JavaScript
In browser applications, you stream from your backend to the browser. Your backend streams from the AI provider, and the browser connects to your backend via the browser's native EventSource API or by reading a ReadableStream from a fetch response.
// Frontend JavaScript — streaming from a backend endpoint
async function streamChat(userMessage) {
const outputEl = document.getElementById('output');
outputEl.textContent = '';
// POST to your backend, which streams from the AI provider
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: userMessage })
});
if (!response.ok) {
outputEl.textContent = 'Error: ' + response.statusText;
return;
}
// Read the streaming response body
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Decode the chunk and parse SSE format
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (data === '[DONE]') break;
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
outputEl.textContent += content;
// Auto-scroll as content arrives
outputEl.scrollTop = outputEl.scrollHeight;
}
} catch (e) {
// Skip malformed chunks
}
}
}
}
}
// Python FastAPI backend that proxies the stream:
/*
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
@app.post("/api/chat/stream")
async def chat_stream(request: dict):
async def generate():
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": request["message"]}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content or ""
if content:
import json
data = json.dumps({"choices": [{"delta": {"content": content}}]})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
*/
When NOT to stream
Streaming is not always the right choice. There are cases where the non-streaming API is strictly better:
Batch processing pipelines
When you are processing 1000 documents overnight and no human is watching, streaming adds complexity with no benefit. Use the standard API, parallelize requests, and focus on throughput. For bulk workloads, use the Batch API at 50% cost.
Structured output parsing
If you are using JSON mode or structured outputs, you cannot parse the result until it is complete. Streaming has no benefit here — you receive fragments of JSON that you cannot use until the full object arrives. Use the non-streaming API.
Short responses
For responses under 50 tokens — one-word answers, classifications, simple extractions — the overhead of setting up and managing a stream outweighs any UX benefit. The total latency is often lower without streaming.
Downstream processing required before display
If your application needs to post-process, validate, or transform the complete response before showing it to the user, streaming provides no benefit and adds complexity. Render-then-display is simpler than stream-and-transform.
Streaming is table stakes for chat UIs
Any chat interface that waits for a full response before displaying it will feel broken to modern users. Streaming is the expected behavior. Implement it from the start rather than retrofitting — the streaming code path is different enough from synchronous that a rewrite is often faster than adapting existing code.