b1babo
timeline
keywords
articles
targets
projects
about
b1babo

2026 All Rights Reserved.

  • 关于本站
  • 所有文章
  • 站点地图
  • RSS Feed

Powered by Next.js & Trilium

  • Claude Code

SSE 流式输出

b1babo
2026年4月18日
2026年4月18日

SSE 流式输出

目录

  • 流式输出原理
  • SSE 详解
  • FastAPI 实现流式输出
  • 数据格式讨论
  • 协议支持
  • 格式自定义

流式输出原理

基本原理

  1. 逐token生成 - 大模型不是一次性生成全部文本,而是逐个token(词元)生成
  2. 即时传输 - 每生成一个或少量token,就立即发送给客户端
  3. 服务端推 (SSE) - 通常使用Server-Sent Events或WebSocket保持长连接

流程示意

用户请求
    ↓
模型开始推理 → 生成 "今"
    ↓ (立即发送)
继续推理 → 生成 "天"
    ↓ (立即发送)
继续推理 → 生成 "天气"
    ↓ (立即发送)
...

关键技术点

  • SSE格式: data: {"token": "今"}\n\n
  • 保持连接: HTTP持久连接,流式传输
  • 客户端处理: 接收一段渲染一段,实现打字机效果

优势

  • 降低首字延迟(TTFT)
  • 提升用户体验,不用等待全部生成完成
  • 适合长文本生成场景

SSE 详解

什么是SSE

SSE是一种基于HTTP的单向服务器推送技术,允许服务器主动向客户端推送事件流。

核心特点

特性说明
单向通信仅服务器→客户端
基于HTTP无需额外协议,复用HTTP
自动重连断线自动重连机制
文本格式纯文本,UTF-8编码
EventSource API浏览器原生支持

SSE消息格式

data: {"content": "Hello", "index": 0}
data: {"content": " World", "index": 1}
data: {"content": "!", "index": 2}
data: [DONE]

格式规则:

  • 每个字段以 字段名: 开头
  • data: 字段可多次出现表示多行数据
  • 两个换行符 \n\n 分隔不同事件
  • event: 可指定事件名(默认 message)
  • id: 用于断线重连恢复

完整流程图

客户端                                    服务器
    │                                         │
    │ ────────── 1. HTTP GET 请求 ──────────> │
    │    GET /v1/chat/completions              │
    │    Headers:                              │
    │      Accept: text/event-stream           │
    │                                         │
    │                                         │
    │ <────────── 2. 建立持久连接 ───────────── │
    │    Status: 200 OK                        │
    │    Headers:                              │
    │      Content-Type: text/event-stream     │
    │      Cache-Control: no-cache             │
    │      Connection: keep-alive              │
    │                                         │
    │                                         │
    │ <────────── 3. 推送数据块 ────────────── │
    │    data: {"token":"今"}\n\n              │
    │                                         │
    │    ┌─────────────────────────┐          │
    │    │ onmessage 触发          │          │
    │    │ 渲染: "今"              │          │
    │    └─────────────────────────┘          │
    │                                         │
    │ <────────── 4. 继续推送 ─────────────── │
    │    data: {"token":"天"}\n\n              │
    │                                         │
    │    ┌─────────────────────────┐          │
    │    │ onmessage 触发          │          │
    │    │ 渲染: "今天"            │          │
    │    └─────────────────────────┘          │
    │                                         │
    │ <────────── 5. 持续推送 ─────────────── │
    │    data: {"token":"气","index":2}\n\n   │
    │    data: {"token":"不","index":3}\n\n   │
    │    data: {"token":"错","index":4}\n\n   │
    │                                         │
    │    ┌─────────────────────────┐          │
    │    │ 累积渲染                │          │
    │    │ "今天天气不错"          │          │
    │    └─────────────────────────┘          │
    │                                         │
    │ <────────── 6. 结束标记 ─────────────── │
    │    data: [DONE]\n\n                     │
    │                                         │
    │    ┌─────────────────────────┐          │
    │    │ 检测到 [DONE]           │          │
    │    │ 关闭连接                │          │
    │    └─────────────────────────┘          │

客户端代码示例

const eventSource = new EventSource('/api/stream');

eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') {
    eventSource.close();
    return;
  }

  const data = JSON.parse(event.data);
  appendToUI(data.content);
};

eventSource.onerror = (err) => {
  console.error('Connection error', err);
  eventSource.close();
};

Node.js 服务端示例

app.get('/api/stream', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const stream = async function* () {
    yield '今'; yield '天'; yield '天'; yield '气'; yield '不'; yield '错';
  };

  (async () => {
    for await (const token of stream()) {
      res.write(`data: ${JSON.stringify({token})}\n\n`);
    }
    res.write('data: [DONE]\n\n');
    res.end();
  })();
});

SSE vs WebSocket

 SSEWebSocket
方向单向(服务器→客户端)双向
协议HTTPHTTP + WebSocket协议
重连自动重连需手动实现
二进制仅文本支持二进制
适用场景推送通知、流式输出聊天、游戏、实时协作

对于大模型流式输出,SSE是最佳选择,因为只需要单向传输且实现简单。


FastAPI 实现流式输出

基础示例:纯文本流

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()


async def generate_text():
    """生成器函数:逐个产生token"""
    tokens = ["今", "天", "天", "气", "真", "好", "啊"]
    for token in tokens:
        yield token
        await asyncio.sleep(0.1)  # 模拟生成延迟


@app.get("/stream")
async def stream_text():
    return StreamingResponse(
        generate_text(),
        media_type="text/plain",
    )

SSE 格式流式输出

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()


async def generate_sse():
    """生成SSE格式的数据流"""
    tokens = ["今", "天", "天", "气", "真", "好"]

    for index, token in enumerate(tokens):
        data = {
            "id": index,
            "content": token,
            "finished": False
        }
        # SSE格式: data: {json}\n\n
        yield f"data: {json.dumps(data)}\n\n"
        await asyncio.sleep(0.1)

    # 发送结束标记
    yield "data: [DONE]\n\n"


@app.get("/chat/stream")
async def chat_stream():
    return StreamingResponse(
        generate_sse(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

集成大模型API(如OpenAI)

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import httpx
import json

app = FastAPI()


class ChatRequest(BaseModel):
    message: str


async def stream_from_llm(message: str):
    """转发大模型的流式响应"""
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": "Bearer YOUR_API_KEY",
                "Content-Type": "application/json",
            },
            json={
                "model": "gpt-4",
                "messages": [{"role": "user", "content": message}],
                "stream": True,
            },
            timeout=60.0,
        ) as response:
            async for chunk in response.aiter_text():
                if chunk.strip():
                    yield chunk


@app.post("/chat")
async def chat(request: ChatRequest):
    return StreamingResponse(
        stream_from_llm(request.message),
        media_type="text/event-stream",
    )

完整的生产级示例

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
from typing import AsyncGenerator

app = FastAPI(title="流式API服务")


class StreamRequest(BaseModel):
    prompt: str
    max_tokens: int = 100


async def mock_llm_generate(prompt: str, max_tokens: int) -> AsyncGenerator[str, None]:
    """模拟LLM生成过程"""
    # 这里可以替换为真实的大模型调用
    response = f"关于'{prompt}'的回答,这是一个模拟的流式输出。"

    for i, char in enumerate(response):
        if i >= max_tokens:
            break
        yield char
        await asyncio.sleep(0.05)  # 模拟token生成延迟


async def format_sse(chunk: str, index: int, is_done: bool = False) -> str:
    """格式化为SSE格式"""
    if is_done:
        return "data: [DONE]\n\n"

    payload = {
        "id": index,
        "content": chunk,
        "model": "mock-model",
        "created": asyncio.get_event_loop().time(),
    }
    return f"data: {json.dumps(payload)}\n\n"


@app.post("/v1/chat/completions")
async def stream_completion(request: StreamRequest):
    """OpenAI兼容的流式接口"""

    async def generate():
        index = 0
        async for chunk in mock_llm_generate(request.prompt, request.max_tokens):
            sse_data = await format_sse(chunk, index)
            yield sse_data
            index += 1

        # 发送结束信号
        yield await format_sse("", 0, is_done=True)

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用Nginx缓冲
        }
    )


@app.get("/")
async def root():
    return {"message": "流式API服务运行中", "docs": "/docs"}


@app.get("/health")
async def health():
    return {"status": "healthy"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )

客户端测试代码

import requests
import json

def test_stream():
    url = "http://localhost:8000/v1/chat/completions"
    payload = {"prompt": "你好", "max_tokens": 50}

    with requests.post(url, json=payload, stream=True) as response:
        print(f"状态码: {response.status_code}")

        for line in response.iter_lines():
            if line:
                line = line.decode('utf-8')
                if line.startswith('data: '):
                    data = line[6:]  # 去掉 "data: " 前缀
                    if data == '[DONE]':
                        print("\n[流结束]")
                        break
                    try:
                        parsed = json.loads(data)
                        print(parsed.get('content', ''), end='', flush=True)
                    except json.JSONDecodeError:
                        pass

if __name__ == "__main__":
    test_stream()

运行与测试

# 安装依赖
pip install fastapi uvicorn httpx

# 启动服务
python main.py

# 访问文档
# http://localhost:8000/docs

# 使用curl测试
curl -N -X POST http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"prompt": "你好,世界"}'

关键点总结

要点说明
生成器函数使用 async def + yield 产生数据
StreamingResponseFastAPI的流式响应类
media_typeSSE使用 text/event-stream
headers设置 Cache-Control: no-cache 禁用缓存
异步操作确保生成器内部使用 await

数据格式讨论

流式输出的数据格式有多种方式,取决于具体实现。

1. SSE + JSON(最常见)

data: {"token": "今", "index": 0}
data: {"token": "天", "index": 1}
data: {"token": "气", "index": 2}
data: [DONE]

这是OpenAI、Anthropic等主流API使用的格式。

2. 纯文本流

今天天气很好
@app.get("/stream")
async def stream_text():
    async def generate():
        yield "今"
        yield "天"
        yield "天"
        yield "气"

    return StreamingResponse(generate(), media_type="text/plain")

3. 行分隔JSON(NDJSON)

{"token":"今","index":0}
{"token":"天","index":1}
{"token":"气","index":2}
[DONE]
@app.get("/stream")
async def stream_ndjson():
    async def generate():
        yield json.dumps({"token": "今", "index": 0}) + "\n"
        yield json.dumps({"token": "天", "index": 1}) + "\n"

    return StreamingResponse(generate(), media_type="application/x-ndjson")

4. 原始事件流(带类型)

event: message
data: 今天

event: message
data: 天气

event: done
data: finished
@app.get("/stream")
async def stream_events():
    async def generate():
        yield "event: message\ndata: 今天\n\n"
        yield "event: message\ndata: 天气\n\n"
        yield "event: done\ndata: finished\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

格式对比

格式示例适用场景
SSE+JSONdata: {...}\n\n需要结构化数据,主流API
纯文本直接输出字符简单文本推送
NDJSON{...}\n日志流、大数据
原始SSEevent: xxx\ndata: xxx\n\n需要事件类型区分

实际上大模型API的格式

OpenAI 格式

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"今"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1699000000,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"天"},"finish_reason":null}]}

data: [DONE]

简化的内部格式

有时内部服务会用更简单的格式:

{"t":"今"}
{"t":"天"}
{"t":"气"}
[DONE]

选择建议

# 简单场景:纯文本即可
yield "今天天气"

# 需要元数据:用JSON
yield json.dumps({"content": "今天", "confidence": 0.9})

# 需要事件类型:用完整SSE
yield "event: token\ndata: 今天\n\n"

# 完全兼容OpenAI:复制其格式
yield f"data: {json.dumps(openai_chunk)}\n\n"

总结:JSON格式在SSE中最常见,因为它能携带更多元数据(索引、结束标志、错误信息等),但纯文本流在某些简单场景下也完全够用。


协议支持

SSE 不需要特殊协议,完全基于 标准 HTTP/1.1 即可实现。

SSE 的本质

SSE 只是 HTTP 响应的一种特殊格式约定,没有引入新协议。

普通HTTP请求                    SSE请求
      │                              │
      ├─ GET /api/data              ├─ GET /api/stream
      │                              │
      ├─ Accept: application/json    ├─ Accept: text/event-stream  ← 关键
      │                              │
      ▼                              ▼

一次性返回全部内容                保持连接,持续推送

唯一的"特殊"要求

1. 响应头

HTTP/1.1 200 OK
Content-Type: text/event-stream    ← 告诉客户端这是SSE
Cache-Control: no-cache            ← 禁止缓存
Connection: keep-alive             ← 保持长连接

2. 响应体格式

data: 第一条消息\n\n
data: 第二条消息\n\n
data: {"content":"结构化数据"}\n\n

就这么简单! 没有握手、没有协议升级。

与 WebSocket 对比

┌────────────────────────────────────────────────────────────┐
│                    WebSocket 握手流程                        │
├────────────────────────────────────────────────────────────┤
│  客户端                                                     │
│    GET /ws HTTP/1.1                                        │
│    Upgrade: websocket        ← 请求协议升级                │
│    Connection: Upgrade                                        │
│    Sec-WebSocket-Key: xxx...                                 │
│                      ↓                                       │
│  服务器                                                     │
│    HTTP/1.1 101 Switching Protocols  ← 协议切换            │
│    Upgrade: websocket                                        │
│    Sec-WebSocket-Accept: yyy...                              │
│                      ↓                                       │
│  ──────────── 进入 WebSocket 二进制协议 ─────────────       │
│           (完全不同于 HTTP 的帧格式)                         │
└────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────┐
│                    SSE 流程                                 │
├────────────────────────────────────────────────────────────┤
│  客户端                                                     │
│    GET /stream HTTP/1.1                                     │
│    Accept: text/event-stream                                │
│                      ↓                                       │
│  服务器                                                     │
│    HTTP/1.1 200 OK          ← 普通HTTP响应                 │
│    Content-Type: text/event-stream                          │
│                      ↓                                       │
│  ──────────── 继续 HTTP,发送文本数据 ──────────────        │
│           (仍使用 HTTP,数据格式有约定)                      │
└────────────────────────────────────────────────────────────┘

用 telnet 验证

# 用最原始的 telnet 手动发送 HTTP 请求
$ telnet localhost 8000
Trying 127.0.0.1...
Connected to localhost.

# 手动输入 HTTP 请求
GET /stream HTTP/1.1
Host: localhost
Accept: text/event-stream

# 按回车后,服务器立即开始推送
HTTP/1.1 200 OK
content-type: text/event-stream
cache-control: no-cache

data: hello
data: world
data: [DONE]

完全就是普通 HTTP!

中间件兼容性

中间件对SSE的支持
Nginx✅ 默认支持(需配置 proxy_buffering off;)
Apache✅ 默认支持
Caddy✅ 默认支持
Cloudflare✅ 支持
AWS ALB✅ 支持

Nginx 配置示例

location /stream {
    proxy_pass http://backend;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    proxy_buffering off;          # 关键:禁用缓冲
    proxy_cache off;              # 关键:禁用缓存
    chunked_transfer_encoding on;
}

浏览器原生支持

// 不需要任何库,浏览器原生支持
const eventSource = new EventSource('/stream');

eventSource.onmessage = (event) => {
    console.log(event.data);
};

// 自动重连机制内置
// 断线后会自动尝试重连

总结

特性SSEWebSocket
协议标准 HTTPHTTP + WebSocket协议
协议升级❌ 不需要✅ 需要(101状态码)
连接方式持久HTTP连接独立的TCP连接
数据格式文本(约定格式)二进制帧
浏览器APIEventSourceWebSocket
实现复杂度极低中等

结论:SSE 就是 HTTP,只是约定了响应的格式和保持连接的方式。任何能处理 HTTP 的服务器、代理、客户端都能支持 SSE。


格式自定义

SSE 只是一个约定,不是强制的协议标准。只要客户端和服务端协商好,格式可以任意自定义。

SSE 标准格式(建议遵循)

event: message      ← 事件名(可选)
id: 123            ← 事件ID(可选)
retry: 3000        ← 重连延迟(可选)
data: hello        ← 数据内容
data: world        ← 多行data表示一个消息

                    ← 空行分隔不同事件

实际上可以这样自定义

示例1:极简格式

hello

world

[DONE]

示例2:自定义前缀

msg:今天

msg:天气

msg:很好

END

示例3:带时间戳

[2025-01-19 10:30:00] 今天
[2025-01-19 10:30:01] 天气
[2025-01-19 10:30:02] 很好
EOF

示例4:二进制标记

SIZE:5|DATA:今天
SIZE:5|DATA:天气
SIZE:4|DATA:很好
DONE

关键:解析逻辑配套

# 服务端:按你的约定生成
async def custom_stream():
    yield "MSG:今天\n"
    yield "MSG:天气\n"
    yield "MSG:很好\n"
    yield "END\n"

# 客户端:按同样的约定解析
async for line in response:
    if line.startswith("MSG:"):
        content = line[4:]
        print(content)
    elif line == "END":
        break

为什么大多用 data: 前缀

原因说明
浏览器原生EventSource API 自动解析 data: 格式
生态兼容各种库、工具默认支持标准格式
可扩展性event:、id:、retry: 提供额外能力

浏览器 EventSource 的解析

// 标准格式会被自动解析
const es = new EventSource('/stream');

// 自动处理 data:
es.onmessage = (e) => {
    console.log(e.data);  // 自动提取 data: 后的内容
};

// 自动处理 event:
es.addEventListener('custom', (e) => {
    // event: custom 触发
});

// 自动处理 id: 和断线重连
es.lastEventId;  // 最后的 id: 值

自定义格式的权衡

┌─────────────────────────────────────────────────────────────┐
│                    格式选择决策                              │
├────────────────────────────────────────────────────────────┤
│                                                             │
│  浏览器客户端?                                             │
│       │                                                     │
│       ├─ 是 → 用标准 data: 格式(EventSource自动解析)     │
│       │                                                     │
│       └─ 否(自写客户端) → 可以自定义                      │
│              但建议:简单 = 好                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实践建议

# 推荐:简单清晰的自定义格式
async def simple_stream():
    """每行一个JSON,最后[DONE]"""
    for token in ["今", "天", "天", "气"]:
        yield json.dumps({"token": token}) + "\n"
    yield "[DONE]\n"

# 客户端解析
async for line in response:
    line = line.strip()
    if line == "[DONE]":
        break
    data = json.loads(line)
    print(data["token"])

总结

SSE 本质是长连接 + 流式文本,格式完全由你定义。标准格式的好处是生态兼容,但你可以根据需要自由设计。


结语

SSE 是一种简单、高效的流式输出方案,具有以下特点:

  1. 无需特殊协议 - 基于标准 HTTP
  2. 实现简单 - 服务端生成器 + 客户端逐行解析
  3. 格式灵活 - 可以自定义数据格式
  4. 浏览器原生支持 - EventSource API
  5. 自动重连 - 内置断线重连机制

对于大模型流式输出、服务器推送通知等单向数据流场景,SSE 是理想的选择。

本页目录

  • SSE 流式输出
  • 目录
  • 流式输出原理
    • 基本原理
    • 流程示意
    • 关键技术点
    • 优势
  • SSE 详解
    • 什么是SSE
    • 核心特点
    • SSE消息格式
    • 完整流程图
    • 客户端代码示例
    • Node.js 服务端示例
    • SSE vs WebSocket
  • FastAPI 实现流式输出
    • 基础示例:纯文本流
    • SSE 格式流式输出
    • 集成大模型API(如OpenAI)
    • 完整的生产级示例
    • 客户端测试代码
    • 运行与测试
    • 关键点总结
  • 数据格式讨论
    • 1. SSE + JSON(最常见)
    • 2. 纯文本流
    • 3. 行分隔JSON(NDJSON)
    • 4. 原始事件流(带类型)
    • 格式对比
    • 实际上大模型API的格式
      • OpenAI 格式
      • 简化的内部格式
    • 选择建议
  • 协议支持
    • SSE 的本质
    • 唯一的"特殊"要求
      • 1. 响应头
      • 2. 响应体格式
    • 与 WebSocket 对比
    • 用 telnet 验证
    • 中间件兼容性
      • Nginx 配置示例
    • 浏览器原生支持
    • 总结
  • 格式自定义
    • SSE 标准格式(建议遵循)
    • 实际上可以这样自定义
      • 示例1:极简格式
      • 示例2:自定义前缀
      • 示例3:带时间戳
      • 示例4:二进制标记
    • 关键:解析逻辑配套
    • 为什么大多用 data: 前缀
      • 浏览器 EventSource 的解析
    • 自定义格式的权衡
    • 实践建议
    • 总结
  • 结语

评论