SSE 流式输出
b1babo
2026年4月18日
2026年4月18日
SSE 流式输出
目录
流式输出原理
基本原理
- 逐token生成 - 大模型不是一次性生成全部文本,而是逐个token(词元)生成
- 即时传输 - 每生成一个或少量token,就立即发送给客户端
- 服务端推 (SSE) - 通常使用Server-Sent Events或WebSocket保持长连接
流程示意
用户请求
↓
模型开始推理 → 生成 "今"
↓ (立即发送)
继续推理 → 生成 "天"
↓ (立即发送)
继续推理 → 生成 "天气"
↓ (立即发送)
...关键技术点
- SSE格式:
data: {"token": "今"}\n\n - 保持连接: HTTP持久连接,流式传输
- 客户端处理: 接收一段渲染一段,实现打字机效果
优势
- 降低首字延迟(TTFT)
- 提升用户体验,不用等待全部生成完成
- 适合长文本生成场景
SSE 详解
什么是SSE
SSE是一种基于HTTP的单向服务器推送技术,允许服务器主动向客户端推送事件流。
核心特点
| 特性 | 说明 |
|---|---|
| 单向通信 | 仅服务器→客户端 |
| 基于HTTP | 无需额外协议,复用HTTP |
| 自动重连 | 断线自动重连机制 |
| 文本格式 | 纯文本,UTF-8编码 |
| EventSource API | 浏览器原生支持 |
SSE消息格式
data: {"content": "Hello", "index": 0}
data: {"content": " World", "index": 1}
data: {"content": "!", "index": 2}
data: [DONE]
格式规则:
- 每个字段以
字段名:开头 data:字段可多次出现表示多行数据- 两个换行符
\n\n分隔不同事件 event:可指定事件名(默认message)id:用于断线重连恢复
完整流程图
客户端 服务器
│ │
│ ────────── 1. HTTP GET 请求 ──────────> │
│ GET /v1/chat/completions │
│ Headers: │
│ Accept: text/event-stream │
│ │
│ │
│ <────────── 2. 建立持久连接 ───────────── │
│ Status: 200 OK │
│ Headers: │
│ Content-Type: text/event-stream │
│ Cache-Control: no-cache │
│ Connection: keep-alive │
│ │
│ │
│ <────────── 3. 推送数据块 ────────────── │
│ data: {"token":"今"}\n\n │
│ │
│ ┌─────────────────────────┐ │
│ │ onmessage 触发 │ │
│ │ 渲染: "今" │ │
│ └─────────────────────────┘ │
│ │
│ <────────── 4. 继续推送 ─────────────── │
│ data: {"token":"天"}\n\n │
│ │
│ ┌─────────────────────────┐ │
│ │ onmessage 触发 │ │
│ │ 渲染: "今天" │ │
│ └─────────────────────────┘ │
│ │
│ <────────── 5. 持续推送 ─────────────── │
│ data: {"token":"气","index":2}\n\n │
│ data: {"token":"不","index":3}\n\n │
│ data: {"token":"错","index":4}\n\n │
│ │
│ ┌─────────────────────────┐ │
│ │ 累积渲染 │ │
│ │ "今天天气不错" │ │
│ └─────────────────────────┘ │
│ │
│ <────────── 6. 结束标记 ─────────────── │
│ data: [DONE]\n\n │
│ │
│ ┌─────────────────────────┐ │
│ │ 检测到 [DONE] │ │
│ │ 关闭连接 │ │
│ └─────────────────────────┘ │客户端代码示例
const eventSource = new EventSource('/api/stream');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
appendToUI(data.content);
};
eventSource.onerror = (err) => {
console.error('Connection error', err);
eventSource.close();
};Node.js 服务端示例
app.get('/api/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = async function* () {
yield '今'; yield '天'; yield '天'; yield '气'; yield '不'; yield '错';
};
(async () => {
for await (const token of stream()) {
res.write(`data: ${JSON.stringify({token})}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
})();
});SSE vs WebSocket
| SSE | WebSocket | |
|---|---|---|
| 方向 | 单向(服务器→客户端) | 双向 |
| 协议 | HTTP | HTTP + WebSocket协议 |
| 重连 | 自动重连 | 需手动实现 |
| 二进制 | 仅文本 | 支持二进制 |
| 适用场景 | 推送通知、流式输出 | 聊天、游戏、实时协作 |
对于大模型流式输出,SSE是最佳选择,因为只需要单向传输且实现简单。
FastAPI 实现流式输出
基础示例:纯文本流
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def generate_text():
"""生成器函数:逐个产生token"""
tokens = ["今", "天", "天", "气", "真", "好", "啊"]
for token in tokens:
yield token
await asyncio.sleep(0.1) # 模拟生成延迟
@app.get("/stream")
async def stream_text():
return StreamingResponse(
generate_text(),
media_type="text/plain",
)SSE 格式流式输出
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def generate_sse():
"""生成SSE格式的数据流"""
tokens = ["今", "天", "天", "气", "真", "好"]
for index, token in enumerate(tokens):
data = {
"id": index,
"content": token,
"finished": False
}
# SSE格式: data: {json}\n\n
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(0.1)
# 发送结束标记
yield "data: [DONE]\n\n"
@app.get("/chat/stream")
async def chat_stream():
return StreamingResponse(
generate_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)集成大模型API(如OpenAI)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import httpx
import json
app = FastAPI()
class ChatRequest(BaseModel):
message: str
async def stream_from_llm(message: str):
"""转发大模型的流式响应"""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json",
},
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": message}],
"stream": True,
},
timeout=60.0,
) as response:
async for chunk in response.aiter_text():
if chunk.strip():
yield chunk
@app.post("/chat")
async def chat(request: ChatRequest):
return StreamingResponse(
stream_from_llm(request.message),
media_type="text/event-stream",
)完整的生产级示例
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
from typing import AsyncGenerator
app = FastAPI(title="流式API服务")
class StreamRequest(BaseModel):
prompt: str
max_tokens: int = 100
async def mock_llm_generate(prompt: str, max_tokens: int) -> AsyncGenerator[str, None]:
"""模拟LLM生成过程"""
# 这里可以替换为真实的大模型调用
response = f"关于'{prompt}'的回答,这是一个模拟的流式输出。"
for i, char in enumerate(response):
if i >= max_tokens:
break
yield char
await asyncio.sleep(0.05) # 模拟token生成延迟
async def format_sse(chunk: str, index: int, is_done: bool = False) -> str:
"""格式化为SSE格式"""
if is_done:
return "data: [DONE]\n\n"
payload = {
"id": index,
"content": chunk,
"model": "mock-model",
"created": asyncio.get_event_loop().time(),
}
return f"data: {json.dumps(payload)}\n\n"
@app.post("/v1/chat/completions")
async def stream_completion(request: StreamRequest):
"""OpenAI兼容的流式接口"""
async def generate():
index = 0
async for chunk in mock_llm_generate(request.prompt, request.max_tokens):
sse_data = await format_sse(chunk, index)
yield sse_data
index += 1
# 发送结束信号
yield await format_sse("", 0, is_done=True)
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用Nginx缓冲
}
)
@app.get("/")
async def root():
return {"message": "流式API服务运行中", "docs": "/docs"}
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)客户端测试代码
import requests
import json
def test_stream():
url = "http://localhost:8000/v1/chat/completions"
payload = {"prompt": "你好", "max_tokens": 50}
with requests.post(url, json=payload, stream=True) as response:
print(f"状态码: {response.status_code}")
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # 去掉 "data: " 前缀
if data == '[DONE]':
print("\n[流结束]")
break
try:
parsed = json.loads(data)
print(parsed.get('content', ''), end='', flush=True)
except json.JSONDecodeError:
pass
if __name__ == "__main__":
test_stream()运行与测试
# 安装依赖
pip install fastapi uvicorn httpx
# 启动服务
python main.py
# 访问文档
# http://localhost:8000/docs
# 使用curl测试
curl -N -X POST http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"prompt": "你好,世界"}'关键点总结
| 要点 | 说明 |
|---|---|
| 生成器函数 | 使用 async def + yield 产生数据 |
| StreamingResponse | FastAPI的流式响应类 |
| media_type | SSE使用 text/event-stream |
| headers | 设置 Cache-Control: no-cache 禁用缓存 |
| 异步操作 | 确保生成器内部使用 await |
数据格式讨论
流式输出的数据格式有多种方式,取决于具体实现。
1. SSE + JSON(最常见)
data: {"token": "今", "index": 0}
data: {"token": "天", "index": 1}
data: {"token": "气", "index": 2}
data: [DONE]这是OpenAI、Anthropic等主流API使用的格式。
2. 纯文本流
今天天气很好@app.get("/stream")
async def stream_text():
async def generate():
yield "今"
yield "天"
yield "天"
yield "气"
return StreamingResponse(generate(), media_type="text/plain")3. 行分隔JSON(NDJSON)
{"token":"今","index":0}
{"token":"天","index":1}
{"token":"气","index":2}
[DONE]@app.get("/stream")
async def stream_ndjson():
async def generate():
yield json.dumps({"token": "今", "index": 0}) + "\n"
yield json.dumps({"token": "天", "index": 1}) + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson")4. 原始事件流(带类型)
event: message
data: 今天
event: message
data: 天气
event: done
data: finished@app.get("/stream")
async def stream_events():
async def generate():
yield "event: message\ndata: 今天\n\n"
yield "event: message\ndata: 天气\n\n"
yield "event: done\ndata: finished\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")格式对比
| 格式 | 示例 | 适用场景 |
|---|---|---|
| SSE+JSON | data: {...}\n\n | 需要结构化数据,主流API |
| 纯文本 | 直接输出字符 | 简单文本推送 |
| NDJSON | {...}\n | 日志流、大数据 |
| 原始SSE | event: xxx\ndata: xxx\n\n | 需要事件类型区分 |
实际上大模型API的格式
OpenAI 格式
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"今"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"天"},"finish_reason":null}]}
data: [DONE]简化的内部格式
有时内部服务会用更简单的格式:
{"t":"今"}
{"t":"天"}
{"t":"气"}
[DONE]选择建议
# 简单场景:纯文本即可
yield "今天天气"
# 需要元数据:用JSON
yield json.dumps({"content": "今天", "confidence": 0.9})
# 需要事件类型:用完整SSE
yield "event: token\ndata: 今天\n\n"
# 完全兼容OpenAI:复制其格式
yield f"data: {json.dumps(openai_chunk)}\n\n"总结:JSON格式在SSE中最常见,因为它能携带更多元数据(索引、结束标志、错误信息等),但纯文本流在某些简单场景下也完全够用。
协议支持
SSE 不需要特殊协议,完全基于 标准 HTTP/1.1 即可实现。
SSE 的本质
SSE 只是 HTTP 响应的一种特殊格式约定,没有引入新协议。
普通HTTP请求 SSE请求
│ │
├─ GET /api/data ├─ GET /api/stream
│ │
├─ Accept: application/json ├─ Accept: text/event-stream ← 关键
│ │
▼ ▼
一次性返回全部内容 保持连接,持续推送唯一的"特殊"要求
1. 响应头
2. 响应体格式
data: 第一条消息\n\n
data: 第二条消息\n\n
data: {"content":"结构化数据"}\n\n就这么简单! 没有握手、没有协议升级。
与 WebSocket 对比
┌────────────────────────────────────────────────────────────┐
│ WebSocket 握手流程 │
├────────────────────────────────────────────────────────────┤
│ 客户端 │
│ GET /ws HTTP/1.1 │
│ Upgrade: websocket ← 请求协议升级 │
│ Connection: Upgrade │
│ Sec-WebSocket-Key: xxx... │
│ ↓ │
│ 服务器 │
│ HTTP/1.1 101 Switching Protocols ← 协议切换 │
│ Upgrade: websocket │
│ Sec-WebSocket-Accept: yyy... │
│ ↓ │
│ ──────────── 进入 WebSocket 二进制协议 ───────────── │
│ (完全不同于 HTTP 的帧格式) │
└────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────┐
│ SSE 流程 │
├────────────────────────────────────────────────────────────┤
│ 客户端 │
│ GET /stream HTTP/1.1 │
│ Accept: text/event-stream │
│ ↓ │
│ 服务器 │
│ HTTP/1.1 200 OK ← 普通HTTP响应 │
│ Content-Type: text/event-stream │
│ ↓ │
│ ──────────── 继续 HTTP,发送文本数据 ────────────── │
│ (仍使用 HTTP,数据格式有约定) │
└────────────────────────────────────────────────────────────┘用 telnet 验证
# 用最原始的 telnet 手动发送 HTTP 请求
$ telnet localhost 8000
Trying 127.0.0.1...
Connected to localhost.
# 手动输入 HTTP 请求
GET /stream HTTP/1.1
Host: localhost
Accept: text/event-stream
# 按回车后,服务器立即开始推送
HTTP/1.1 200 OK
content-type: text/event-stream
cache-control: no-cache
data: hello
data: world
data: [DONE]完全就是普通 HTTP!
中间件兼容性
| 中间件 | 对SSE的支持 |
|---|---|
| Nginx | ✅ 默认支持(需配置 proxy_buffering off;) |
| Apache | ✅ 默认支持 |
| Caddy | ✅ 默认支持 |
| Cloudflare | ✅ 支持 |
| AWS ALB | ✅ 支持 |
Nginx 配置示例
location /stream {
proxy_pass http://backend;
proxy_set_header Connection '';
proxy_http_version 1.1;
proxy_buffering off; # 关键:禁用缓冲
proxy_cache off; # 关键:禁用缓存
chunked_transfer_encoding on;
}浏览器原生支持
// 不需要任何库,浏览器原生支持
const eventSource = new EventSource('/stream');
eventSource.onmessage = (event) => {
console.log(event.data);
};
// 自动重连机制内置
// 断线后会自动尝试重连总结
| 特性 | SSE | WebSocket |
|---|---|---|
| 协议 | 标准 HTTP | HTTP + WebSocket协议 |
| 协议升级 | ❌ 不需要 | ✅ 需要(101状态码) |
| 连接方式 | 持久HTTP连接 | 独立的TCP连接 |
| 数据格式 | 文本(约定格式) | 二进制帧 |
| 浏览器API | EventSource | WebSocket |
| 实现复杂度 | 极低 | 中等 |
结论:SSE 就是 HTTP,只是约定了响应的格式和保持连接的方式。任何能处理 HTTP 的服务器、代理、客户端都能支持 SSE。
格式自定义
SSE 只是一个约定,不是强制的协议标准。只要客户端和服务端协商好,格式可以任意自定义。
SSE 标准格式(建议遵循)
event: message ← 事件名(可选)
id: 123 ← 事件ID(可选)
retry: 3000 ← 重连延迟(可选)
data: hello ← 数据内容
data: world ← 多行data表示一个消息
← 空行分隔不同事件实际上可以这样自定义
示例1:极简格式
hello
world
[DONE]示例2:自定义前缀
msg:今天
msg:天气
msg:很好
END示例3:带时间戳
[2025-01-19 10:30:00] 今天
[2025-01-19 10:30:01] 天气
[2025-01-19 10:30:02] 很好
EOF示例4:二进制标记
SIZE:5|DATA:今天
SIZE:5|DATA:天气
SIZE:4|DATA:很好
DONE关键:解析逻辑配套
# 服务端:按你的约定生成
async def custom_stream():
yield "MSG:今天\n"
yield "MSG:天气\n"
yield "MSG:很好\n"
yield "END\n"
# 客户端:按同样的约定解析
async for line in response:
if line.startswith("MSG:"):
content = line[4:]
print(content)
elif line == "END":
break为什么大多用 data: 前缀
| 原因 | 说明 |
|---|---|
| 浏览器原生 | EventSource API 自动解析 data: 格式 |
| 生态兼容 | 各种库、工具默认支持标准格式 |
| 可扩展性 | event:、id:、retry: 提供额外能力 |
浏览器 EventSource 的解析
// 标准格式会被自动解析
const es = new EventSource('/stream');
// 自动处理 data:
es.onmessage = (e) => {
console.log(e.data); // 自动提取 data: 后的内容
};
// 自动处理 event:
es.addEventListener('custom', (e) => {
// event: custom 触发
});
// 自动处理 id: 和断线重连
es.lastEventId; // 最后的 id: 值自定义格式的权衡
┌─────────────────────────────────────────────────────────────┐
│ 格式选择决策 │
├────────────────────────────────────────────────────────────┤
│ │
│ 浏览器客户端? │
│ │ │
│ ├─ 是 → 用标准 data: 格式(EventSource自动解析) │
│ │ │
│ └─ 否(自写客户端) → 可以自定义 │
│ 但建议:简单 = 好 │
│ │
└─────────────────────────────────────────────────────────────┘实践建议
# 推荐:简单清晰的自定义格式
async def simple_stream():
"""每行一个JSON,最后[DONE]"""
for token in ["今", "天", "天", "气"]:
yield json.dumps({"token": token}) + "\n"
yield "[DONE]\n"
# 客户端解析
async for line in response:
line = line.strip()
if line == "[DONE]":
break
data = json.loads(line)
print(data["token"])总结
SSE 本质是长连接 + 流式文本,格式完全由你定义。标准格式的好处是生态兼容,但你可以根据需要自由设计。
结语
SSE 是一种简单、高效的流式输出方案,具有以下特点:
- 无需特殊协议 - 基于标准 HTTP
- 实现简单 - 服务端生成器 + 客户端逐行解析
- 格式灵活 - 可以自定义数据格式
- 浏览器原生支持 - EventSource API
- 自动重连 - 内置断线重连机制
对于大模型流式输出、服务器推送通知等单向数据流场景,SSE 是理想的选择。
评论