pipeline-frameworks-argo-04.Argo工作流封装MCP接口

Argo 系列01 入门简介 · 02 常用命令 · 03 操作 Pod · 05 WorkflowTemplate · 04 MCP 封装(本文)

相关K8s 01 概述 · MCP 04 重任务 · MCP 07 阿里云案例


1. 场景:为什么要把 Argo 文章「再包一层 MCP」

你在 Argo 入门 或业务文档里已经写清:WorkflowTemplate、参数、镜像、DAG、制品(artifact)路径。团队用 argo submit 或 CI 能稳定跑通。下一步常见需求是:

诉求 仅靠 Argo CLI / UI 加 MCP 后
LLM Agent 用自然语言触发分析 需人工转参、粘贴 YAML Tool 参数 schema 约束输入
多步 pipeline 串联(A→B→C) 人工查上一步 OSS 路径 Agent 读 output_uri 自动传下游
统一鉴权与审计 分散在 kubeconfig / 脚本 MCP 网关 + RBAC 单点
结果进对话上下文 易把大文件塞进 prompt 摘要 + URI,大对象走 Resource

模型上下文协议(Model Context Protocol,MCP)是 Host 与外部能力之间的标准 RPC 层;Argo 是 Kubernetes 上的工作流引擎。二者职责不同:MCP 管「怎么被 Agent 调用」Argo 管「怎么在集群里跑容器」。本文回答:从一篇 Argo 流程文档到可上线 MCP 接口,难点在哪、怎么解、调用链长什么样

段末注释MCP 定义 Tools / Resources / Prompts 等原语及 stdio、Streamable HTTP 传输;CRD(Custom Resource Definition,自定义资源定义)是 K8s 扩展 API 的机制,Argo 的 Workflow 即 CRD。


2. 总体架构:MCP 薄、Argo 厚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
flowchart TB
subgraph host [MCP Host]
Agent["LLM Agent"]
end

subgraph gateway [MCP Server 无状态]
T["Tools: submit / status / summary"]
R["Resources: run 元数据"]
A["Argo API Client"]
S["对象存储 Client"]
end

subgraph cluster [K8s / Argo]
AW["Argo Workflows Controller"]
WT["WorkflowTemplate"]
Pod["Worker Pod"]
end

Store["对象存储 / PVC\n输入输出大文件"]

Agent --> T
Agent --> R
T --> A
T --> S
A --> AW
AW --> WT
AW --> Pod
Pod --> Store
S --> Store
T --> R
组件 职责 不应做的事
MCP Server 参数校验、生成 run_id、写输入到存储、提交 Workflow、查 phase、组装返回摘要 在 MCP 进程内加载 GPU 模型、长时间 sleep 等 Pod
WorkflowTemplate 容器镜像、命令行、资源 limits、重试、DAG 暴露给 LLM 的 JSON schema(由 MCP Tool 描述承担)
Worker Pod 读 URI → 计算 → 写 URI / metrics 直接对接 MCP 协议
对象存储 / PVC 大文件与跨步骤传递 把整文件内容返回给 Tool 文本

这与 MCP 04 重任务「MCP 薄、Worker 厚」 一致;生产级阿里云落地见 MCP 07


3. 从 Argo 文章到 MCP:改造路径

假设你已有一篇 Argo 文档(或 WorkflowTemplate YAML),可按下面顺序映射,不必重写业务容器

3.1 对照表:YAML 概念 → MCP 概念

Argo 文档中的概念 MCP 侧落点
WorkflowTemplate.metadata.name 内部常量,如 template_name = "esmfold-predict"
spec.arguments.parameters Tool 函数参数 + JSON Schema(类型、默认值、枚举)
artifacts(S3/OSS 输入输出) Tool 接收 *_oss_uri 或 MCP 代为上传后注入 parameter
entrypoint + dag 单 Tool 对应单 Template;多步 DAG 可另设 pipeline Tool
retryStrategy / activeDeadlineSeconds 留在 YAML;MCP 只读最终 phase
Pod resources.limits 留在 YAML;MCP 不重复声明

3.2 推荐落地顺序

  1. 冻结 Template:把文章里的示例改成集群内可 argo submit --from workflowtemplate/xxx 的版本(见 02 常用命令)。
  2. 定义 run 目录约定:如 s3://bucket/runs/{run_id}/input/outputs/meta.json(与 MCP 07 §3 同构)。
  3. 实现 MCP 三件套submit_*get_run_statusfetch_run_summary(或 Tasks 模式,见 §6)。
  4. 补 Resourceresource://runs/{run_id}/summary 只读 meta,避免重复查库逻辑。
  5. 写 Prompt(可选):教 Agent「先 submit 再 poll,禁止粘贴 PDB/FASTA 全文」。
  6. Host 注册:Streamable HTTP + 多副本(MCP 03 部署)。

4. 主要技术难点

4.1 同步 Tool 与长耗时 Argo 的矛盾

Argo 任务常为 分钟~小时(GPU、大批量样本),而 MCP tools/call 在 HTTP/stdio 上通常有 30s~数分钟 超时。若在 Tool 内阻塞 watch Workflow 直到 Succeeded,Host 易断连。

本质:协议层偏交互,编排层偏批处理。

4.2 大输入 / 大输出与 LLM 上下文

Argo 的 artifact 设计适合 GB 级文件;LLM context 不适合。若 Tool 返回完整 VCF、BAM 路径列表或文件正文,会 挤爆 token 且拖慢对话。

本质:数据平面(存储 URI)与控制平面(对话摘要)必须分离。

4.3 参数映射与类型安全

文章里常见 Shell 风格参数(路径字符串、可选 flag);MCP Tool 需要 JSON Schema。易出现:

  • 必填/可选与 YAML parameters 默认值不一致;
  • 枚举值(如 model=esm2_t33)未写进 description,模型乱填;
  • 文件类输入:用户给本地路径 vs 集群可访问 URI。

4.4 身份、鉴权与多租户

Argo 提交需要 K8s RBACcreate workflows)或 Argo Server Token;Worker 读写的 OSS/S3 需要 RAM/IRSA。MCP 若把 kubeconfig 或 AccessKey 打进镜像,风险极高。

本质:MCP 是新的攻击面,需 网关鉴权 + 工作负载身份 分离。

4.5 状态一致性与幂等

同一用户重复点击「提交」可能产生 duplicate Workflow;Agent 重试 tools/call 可能 double submit。需要 run_id / idempotency key 与 Workflow 命名策略(如 {template}-{run_id[:8]})。

4.6 可观测与排障

失败时用户问「为什么失败了」——答案在 Pod 日志03 操作 Pod),不在 MCP 返回的一句 failed。MCP 需把 workflow_namefailed_steplog_snippet 结构化返回,而不是只抛异常字符串。

4.7 Host 能力不齐(Tasks 支持度)

MCP Tasks(异步 call-now-fetch-later)并非所有 Host 已实现。仅实现 @mcp.tool(task=True) 会导致部分 Client 无法使用。

4.8 多 Template 与 DAG 的 Tool 粒度

一篇 Argo 文章若描述 整条 DAG(比对 → 变异检测 → 注释),Tool 设计有两种:

  • 一个 pipeline Tool:内部 submit 整条 Workflow(简单,难单独重跑某步);
  • 多个单步 Tool:Agent 串联 URI(灵活,Tool 数量多)。

需在文档与 Prompt 中明确选型,否则 Agent 会重复提交或漏步骤。


5. 可参考的解决方案

难点 方案 A(推荐) 方案 B(兼容) 参考
长耗时 MCP Tasks + report_progress 映射 Argo phase 三 Tool:submit / status / summary MCP 04 §2–3
大结果 Tool 返回 摘要 JSON + output_uris;完整列表挂 Resource 或 signed URL 阈值以上只返回 URI 字符串 MCP 04 §2.4
大输入 MCP 接收小字符串(序列)→ 写入 OSS → 传 input_oss_uri 给 Argo Tool 参数直接要求 oss://...(上游已上传) MCP 07 §3
鉴权 MCP Deployment ServiceAccount + RBAC;Worker RRSA/IRSA 访问存储 集群内 Argo Server,MCP 走 in-cluster API K8s 05 RBAC
幂等 Client 传 client_request_id → 映射唯一 run_id;重复 submit 返回已有 run Redis/DB 存 run_id → workflow_name MCP 04 §4
排障 fetch_run_summaryphasemessagefailed_template、日志 tail Resource 链到 Argo UI deep link 03 操作 Pod §7
多步 DAG 独立 pipeline-mcp + 单 WorkflowTemplate DAG 多 MCP 按 URI 串联 MCP 07 §5
部署 生产 Streamable HTTP 无状态多副本 开发 stdio 仅做参数 mock,不跑真 Argo MCP 03

开源 / 生态参考(实现时可对照,非唯一选型):

  • Argo Server API / argo-workflows Python SDK:程序化 submit、watch、terminate。
  • Hera(Python 生成 Argo YAML):适合从代码维护 Template,与 MCP 同仓库。
  • Kubeflow Pipelines SDKFlyte:若已用其他编排,思路仍是「API 网关 + 异步 status」,与 MCP 正交。

6. 调用与返回结果的逻辑

6.1 模式一:三 Tool(兼容性最好)

适用于 Host 尚未支持 Tasks,或希望 Agent 显式轮询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sequenceDiagram
participant U as User / Agent
participant M as MCP Server
participant DB as Redis或DB
participant A as Argo Server
participant W as Worker Pod
participant O as 对象存储

U->>M: tools/call submit_xxx(sequence, ...)
M->>M: 校验参数,生成 run_id
M->>O: 写 input + meta.json
M->>A: submit WorkflowTemplate(run_id, uris...)
A-->>M: workflow_name
M->>DB: 保存 run_id, workflow_name, pending
M-->>U: run_id

loop 轮询
U->>M: get_run_status(run_id)
M->>A: get workflow phase
M-->>U: running / succeeded / failed
end

U->>M: fetch_run_summary(run_id)
M->>O: 读 outputs/metrics.json
M->>A: 可选:失败时取 log snippet
M-->>U: 结构化摘要 + output_uris

各步返回约定

阶段 Tool 返回内容 不返回
提交 submit_* run_id(字符串或 {run_id, workflow_name} Workflow YAML 全文
查询 get_run_status pending | running | succeeded | failed | cancelled 完整 Pod describe
完成 fetch_run_summary 指标 JSON、output_uris、可选 preview(如 FASTA 前 80 字符) 整份 PDB/VCF/BAM
取消 cancel_run cancelled 或错误原因

6.2 模式二:MCP Tasks(Host 支持时)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sequenceDiagram
participant C as MCP Client
participant S as MCP Server
participant A as Argo

C->>S: tools/call submit_xxx (task capability)
S->>A: 创建 Workflow
S-->>C: taskId, status=working

loop tasks/get
S->>A: watch phase
S-->>C: working + progress(Argo: Running 2/5)
end

S-->>C: completed + structuredContent(summary)

Task 内部可将 Argo status.progress 或自定义步数映射到 report_progress(见 MCP 04 §2.2)。取消:Client 发 tasks/cancel → MCP 调 argo terminate

6.3 fetch_run_summary 响应形状(建议)

统一 JSON 字段,便于 Agent 解析并传下游 Tool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"run_id": "run-20260611-a1b2c3",
"workflow_name": "esmfold-predict-a1b2c3",
"status": "succeeded",
"started_at": "2026-06-11T02:00:00Z",
"finished_at": "2026-06-11T02:04:12Z",
"metrics": {
"mean_plddt": 78.5,
"n_residues": 256
},
"output_uris": {
"structure_pdb": "oss://bucket/runs/run-.../outputs/structure.pdb",
"metrics_json": "oss://bucket/runs/run-.../outputs/metrics.json"
},
"preview": {
"pdb_header_lines": ["HEADER ...", "REMARK ..."]
},
"resource_uri": "resource://runs/run-20260611-a1b2c3/summary"
}

跨 MCP 串联规则:下游 Tool 只接收 output_uris 中的 URI 字符串,例如 submit_mpnn_design(structure_oss_uri=...)MCP 07 §4.4)。

6.4 Argo phase → MCP 状态映射

Argo status.phase get_run_status 说明
Pending pending 排队或未调度
Running running 至少一 Pod 在跑
Succeeded succeeded 可 fetch summary
Failed / Error failed summary 应含 failed_step、message
(已 terminate) cancelled 用户或 MCP 取消

7. 最小实现骨架(Python 概念代码)

以下省略错误处理与鉴权,展示 submit → Argo API → 元数据存储 主路径。Argo 调用方式可与 02 §二 的 API / CLI 对照。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import uuid
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("pipeline-argo-gateway")

TEMPLATE = "my-bio-template" # 对应 WorkflowTemplate.metadata.name


def _new_run_id() -> str:
return f"run-{uuid.uuid4().hex[:12]}"


@mcp.tool()
def submit_my_pipeline(
sample_id: str,
input_oss_uri: str | None = None,
threads: int = 8,
) -> str:
"""提交 bio 流程到 Argo。返回 run_id;完成后用 fetch_run_summary 取结果 URI。"""
run_id = _new_run_id()
# 1. 若无 input_oss_uri,可将 sample_id 映射到约定路径或拒绝
# 2. oss_client.put(meta.json); 必要时上传小输入
params = {
"run-id": run_id,
"sample-id": sample_id,
"input-oss-uri": input_oss_uri or "",
"threads": str(threads),
}
workflow_name = argo_client.submit_from_template(TEMPLATE, params)
run_store.save(run_id, workflow_name=workflow_name, status="pending")
return run_id


@mcp.tool()
def get_run_status(run_id: str) -> str:
"""查询 Argo 工作流阶段。"""
wf = run_store.get_workflow_name(run_id)
phase = argo_client.get_phase(wf)
return map_phase(phase)


@mcp.tool()
def fetch_run_summary(run_id: str) -> dict:
"""成功或失败后返回摘要;大文件仅 URI。"""
meta = run_store.get(run_id)
if map_phase(argo_client.get_phase(meta["workflow_name"])) != "succeeded":
return {
"run_id": run_id,
"status": "failed",
"message": argo_client.last_error(meta["workflow_name"]),
}
return build_summary_from_oss(f"oss://bucket/runs/{run_id}/")


@mcp.resource("resource://runs/{run_id}/summary")
def run_summary_resource(run_id: str) -> str:
import json
return json.dumps(fetch_run_summary(run_id), ensure_ascii=False)

Worker 容器内仍按 Argo 文章约定执行,例如:

1
2
3
python -m worker --run-id={{workflow.parameters.run-id}} \
--input-uri={{workflow.parameters.input-oss-uri}} \
--output-uri=oss://bucket/runs/{{workflow.parameters.run-id}}/outputs/

8. Tool 设计清单(写进 MCP 描述里)

@mcp.tool() 的 docstring / description 中建议写明:

  1. 预期耗时量级(秒 / 分钟 / 小时)。
  2. 调用顺序:「须先 submit_*,再轮询 get_run_status,最后 fetch_run_summary」。
  3. 输入限制:序列最大长度、文件必须已为 oss:// / s3://
  4. 输出限制:「勿要求返回完整大文件;用 output_uris」。
  5. destructiveHint:若 cancel_run 会 terminate Workflow,标注为可能有副作用。

可选 @mcp.prompt() 固定多步流程,减少 Agent 漏步(见 MCP 01 §4)。


9. 反模式(避免)

反模式 后果
在 MCP 进程内 kubectl exec 跑分析 无法扩展、与 Argo 重试/审计脱节
Tool 同步 watch 至 Workflow 结束 Host 超时、无法取消
返回 10MB+ JSON 或文件 Base64 Context 爆炸、费用上升
每个 Argo 参数一个 Tool Tool 爆炸,模型选型困难
run_id 记录 重复提交、无法关联日志

10. 落地自检表

步骤 检查项
Template argo submit --from workflowtemplate/... 在无 MCP 时成功
存储 输入/输出 URI 在同 VPC/同 region;Worker 与 MCP 均可读写
RBAC MCP SA 可 create workflows;Worker SA 可读写在 Bucket/PVC
MCP 三 Tool 或 Tasks 跑通;summary 仅 URI + 小 preview
Host Cursor / 内部 Agent 注册 HTTP 端点;长任务轮询策略写在 system prompt
排障 失败 summary 含 workflow_name,可对照 03 日志

11. 小结

层次 要点
架构 MCP = 协议与调度网关;Argo = 容器编排;大文件 = 对象存储 URI
难点 超时、大 payload、鉴权、幂等、Host Tasks 能力、DAG Tool 粒度
解法 Tasks 或三 Tool;Resource 卸载;RRSA + 网关 OAuth
调用链 submit → 返回 run_id → poll status → fetch 摘要与 output_uris
延伸 蛋白设计全链路示例 → MCP 07

把 Argo 文章开发成 MCP,不是把 YAML 贴进 Tool 返回,而是:用 Tool schema 表达参数,用 run 元数据表达状态,用 URI 表达数据平面——Agent 才能稳定、可审计地驱动已有 pipeline。


概念索引

术语 含义
WorkflowTemplate 集群内可复用的 Argo 工作流蓝图;选型见 05
run_id MCP 侧业务运行 ID,映射 Workflow 与 OSS 目录
output_uris 结果对象存储路径集合,供下游 Tool 引用
Tasks MCP 异步工具能力,立即返回 taskId 后再 fetch 结果
Resource MCP 只读 URI,适合挂 summary 与元数据
-------------本文结束感谢您的阅读-------------