RAG-05在线治理-Streaming-Online-RAG

本文属于 RAG 工程框架中的「5 在线运营与成本治理」环节,聚焦「Streaming Online RAG」方法。可先阅读 RAG-00.方法概述 再进入本篇。

原理

把“文档更。-> 索引更新”从批处理改为流式。

1
2
3
4
5
6
flowchart LR
Src[数据源变更] --> Bus[消息队列]
Bus --> ETL[清洗/切分]
ETL --> Emb[增量向量化]
Emb --> IDX[增量索引写入]
IDX --> Serve[在线检索服务]

优缺。

  • 优点:知识新鲜度高,适合实时业务。
  • 缺点:工程复杂,回滚与一致性挑战大。

性能/资源

  • 长期运行成本高于批处理。
  • 需监控延迟、丢消息、索引漂移。

应用场景

  • 新闻、行情、监控告警、工单系统。

统一合成数据示例

输入数据片段

1
2
3
4
{
"event": {"doc_id": "D01", "version": "v3", "change": "机票上限由2000改为2200"},
"ingest_time": "2026-03-26T10:00:00Z"
}

中间结果(流式更新)

1
2
3
4
5
6
7
8
{
"pipeline": [
{"stage": "clean_chunk", "status": "ok", "latency_ms": 120},
{"stage": "embed", "status": "ok", "latency_ms": 85},
{"stage": "index_upsert", "status": "ok", "latency_ms": 60}
],
"index_visible_time": "2026-03-26T10:00:01Z"
}

最终生成示例(含引用)

1
2
3
4
5
{
"query": "当前机票报销上限是多少?",
"answer": "当前上限为 2200 元。",
"citations": [{"doc_id": "D01@v3", "evidence_span": "机票上限 2200 元"}]
}

原始发表与工程实现

  • 代表性原始发表:流式索引工程实践。
  • 核心解决问题:解决知识新鲜度。
  • 成熟实现工具:Kafka, Flink, Debezium。

详细原理拆解

  • 增量管道保障新鲜度 freshness=now-index_time,同时保证幂等与回滚。
  • 典型实现可拆为:输入预处理 -> 方法核心计算 -> 候选/证据构建 -> 生成与引用。
  • 工程调优重点:质量(准确率/引用率)与成本(时延/token)的联合优化。
1
2
3
4
5
flowchart LR
In[输入 Query 与知识] --> Core[方法核心计算]
Core --> Rank[匹配/路由/排序]
Rank --> Build[证据组装]
Build --> Out[答案与引用]

工程落地扩展示例

伪代码

1
2
3
4
def ingest_stream(event, cleaner, embedder, index):
chunks = cleaner.chunk(event.doc)
vectors = embedder.encode(chunks)
index.upsert(event.doc_id, event.version, vectors) # 幂等 key = (doc_id, version)

参数示例

1
2
3
pipeline_stages: [clean_chunk, embed, index_upsert]
at_least_once: true
compaction_interval_s: 300

常见失败案例

  • 失败模式 1:乱序事件导致 旧版覆盖新版,读者看到回退数据。
  • 失败模式 2:嵌入与索引 不同步,检索到 chunk 但向量缺失。
  • 失败模式 3:高峰积压,freshness SLA 持续违约。

Demo 数据带入计算示例

1
2
变更事件 10:00:00,索引可见 10:00:01 → freshness=1s < 3s SLA。
若仅更新向量未更新 BM25 倒排,**同一问句两路结果不一致**——流式治理需保证各索引 **同版本提交**。
-------------本文结束感谢您的阅读-------------