本文属于 RAG 工程框架中的「5 在线运营与成本治理」环节,聚焦「Streaming Online RAG」方法。可先阅读 RAG-00.方法概述 再进入本篇。
原理
把“文档更。-> 索引更新”从批处理改为流式。
1 | flowchart LR |
优缺。
- 优点:知识新鲜度高,适合实时业务。
- 缺点:工程复杂,回滚与一致性挑战大。
性能/资源
- 长期运行成本高于批处理。
- 需监控延迟、丢消息、索引漂移。
应用场景
- 新闻、行情、监控告警、工单系统。
统一合成数据示例
输入数据片段
1 | { |
中间结果(流式更新)
1 | { |
最终生成示例(含引用)
1 | { |
原始发表与工程实现
- 代表性原始发表:流式索引工程实践。
- 核心解决问题:解决知识新鲜度。
- 成熟实现工具:Kafka, Flink, Debezium。
详细原理拆解
- 增量管道保障新鲜度 freshness=now-index_time,同时保证幂等与回滚。
- 典型实现可拆为:输入预处理 -> 方法核心计算 -> 候选/证据构建 -> 生成与引用。
- 工程调优重点:质量(准确率/引用率)与成本(时延/token)的联合优化。
1 | flowchart LR |
工程落地扩展示例
伪代码
1 | def ingest_stream(event, cleaner, embedder, index): |
参数示例
1 | pipeline_stages: [clean_chunk, embed, index_upsert] |
常见失败案例
- 失败模式 1:乱序事件导致 旧版覆盖新版,读者看到回退数据。
- 失败模式 2:嵌入与索引 不同步,检索到 chunk 但向量缺失。
- 失败模式 3:高峰积压,freshness SLA 持续违约。
Demo 数据带入计算示例
1 | 变更事件 10:00:00,索引可见 10:00:01 → freshness=1s < 3s SLA。 |