Deploy Progress Streaming V2 技术方案¶
状态:待评审 日期:2026-03-08 范围:tfrsmanager 仓库(operator 侧输出对接要求文档)
1. 问题陈述¶
当前部署进度流式推送存在以下设计缺陷:
| 缺陷 | 描述 | 严重程度 |
|---|---|---|
| 百分比反推步骤 | progressToStep() 用 0-100% 百分比反向推导步骤名称,进度跳跃时步骤丢失 |
P0 |
| DB 轮询 | Handler 每 2s 轮询 Task 表读进度,快速变化时丢步骤 | P0 |
| 去重粒度粗 | 只按 percent 去重,不按步骤状态去重 | P1 |
| 前端不补全 | 前端只渲染收到的步骤,缺失步骤不显示 | P1 |
| Init 单步骤 | Init 阶段只映射为 1 个步骤,无子步骤 | P0 |
影响范围:progressToStep() 在生产环境的 CRDCreateExecutor 和 Mock 的 MockDeployExecutor 中均在使用,不仅仅是 Mock 问题。
2. 设计决策汇总¶
| 决策项 | 选择 | 理由 |
|---|---|---|
| 推送机制 | Go channel(进程内 pub/sub) | 零延迟、零外部依赖、天然背压、单进程部署场景完全匹配 |
| Channel 粒度 | 按 Instance ID | 与前端连接模型一致——一个 instance 一个 stream |
| Operator 组件进度 | 透传 TFRServer.Status.Components | WaitReady 期间读取组件状态推送为子步骤 |
| WaitReady 改造 | 改造为支持回调的版本 | 完全事件驱动,每次 Status 变更时回调 |
| 步骤模型 | 4 个主步骤 + 动态子步骤 | 步骤是一等公民,有独立状态 |
| Mock 模式 | 模拟子步骤 | 前端开发/测试体验一致 |
| Init 子步骤 | 动态,来自 Postman Collection item.Name | 前端动态渲染 |
| 向后兼容 | 一次性切换 | 前后端同步升级,不做双版本并行 |
| Executor 接口 | 改造 progressCallback 签名 | 从 int 改为 *ProgressEvent |
| ProgressBus 位置 | internal/shared/task_manager/ |
与任务执行密切相关 |
| 重连策略 | 从当前最新状态开始推送 | 简单直接,前端处理步骤跳变 |
| 测试 | 单元 + 集成测试 | ProgressBus 单元测试 + HTTP stream 端到端测试 |
| 改造范围 | 仅 tfrsmanager | Operator 侧输出对接要求文档 |
3. 架构设计¶
3.1 整体架构¶
┌──────────────────────────────────────────────────────────────────┐
│ tfrsmanager 进程 │
│ │
│ ┌─────────────┐ Publish() ┌──────────────┐ │
│ │ Executor │ ──────────────→ │ ProgressBus │ │
│ │ (goroutine) │ │ (per-inst │ │
│ │ │ │ channel) │ │
│ └─────────────┘ └──────┬───────┘ │
│ │ Subscribe() │
│ ▼ │
│ ┌──────────────┐ │
│ │ Handler │──→ NDJSON │
│ │ (goroutine) │ Stream │
│ └──────────────┘ to Client │
└──────────────────────────────────────────────────────────────────┘
对比 V1:
V1: Executor → DB write → [2s ticker] → Handler polls DB → Client
V2: Executor → channel → [instant] → Handler receives → Client
3.2 事件模型¶
// ProgressEvent 部署进度事件(V2)
type ProgressEvent struct {
Type string `json:"type"` // "step_update" | "complete" | "error"
Step StepInfo `json:"step"` // 当前主步骤
SubSteps []SubStepInfo `json:"subSteps,omitempty"` // 子步骤(步骤3、4有)
TotalSteps int `json:"totalSteps"` // 主步骤总数 = 4
Message string `json:"message,omitempty"` // 附加消息
ErrorCode string `json:"errorCode,omitempty"` // 错误码
Timestamp string `json:"timestamp"` // ISO8601
}
type StepInfo struct {
Index int `json:"index"` // 1-4
Name string `json:"name"` // 步骤中文名
Status string `json:"status"` // "pending" | "running" | "completed" | "failed"
}
type SubStepInfo struct {
Name string `json:"name"` // 子步骤名称(如 "MinIO Init"、"健康检查")
Status string `json:"status"` // "pending" | "running" | "completed" | "failed"
}
3.3 主步骤定义¶
| Index | Name | 触发条件 | 子步骤来源 |
|---|---|---|---|
| 1 | 准备部署 | 参数解析、配置应用 | 无 |
| 2 | 等待租户就绪 | TFRTenant WaitReady(可能瞬间完成) | 无 |
| 3 | 创建资源 | Apply TFRServer CR + Watch Ready | TFRServer.Status.Components |
| 4 | 初始化资源 | InitExecutorV2 执行 Postman Collection | Collection request items |
事件序列示例(完整部署):
{"type":"step_update","step":{"index":1,"name":"准备部署","status":"running"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":1,"name":"准备部署","status":"completed"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":2,"name":"等待租户就绪","status":"running"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":2,"name":"等待租户就绪","status":"completed"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"running"},"subSteps":[{"name":"MinIO Init","status":"pending"},{"name":"DB Init","status":"pending"},{"name":"API Server","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"running"},"subSteps":[{"name":"MinIO Init","status":"completed"},{"name":"DB Init","status":"running"},{"name":"API Server","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"running"},"subSteps":[{"name":"MinIO Init","status":"completed"},{"name":"DB Init","status":"completed"},{"name":"API Server","status":"running"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"completed"},"subSteps":[{"name":"MinIO Init","status":"completed"},{"name":"DB Init","status":"completed"},{"name":"API Server","status":"completed"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":4,"name":"初始化资源","status":"running"},"subSteps":[{"name":"健康检查","status":"running"},{"name":"创建Token","status":"pending"},{"name":"配置工作流","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":4,"name":"初始化资源","status":"running"},"subSteps":[{"name":"健康检查","status":"completed"},{"name":"创建Token","status":"running"},{"name":"配置工作流","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"complete","step":{"index":4,"name":"初始化资源","status":"completed"},"totalSteps":4,"timestamp":"..."}
4. 核心组件设计¶
4.1 ProgressBus(事件总线)¶
位置:internal/shared/task_manager/progress_bus.go
// ProgressBus 管理 per-instance 的事件通道
type ProgressBus interface {
// Publish 发布事件到指定 instance 的通道
// 如果没有订阅者,事件会被丢弃(不阻塞 executor)
Publish(instanceID uint, event *ProgressEvent)
// Subscribe 订阅指定 instance 的事件通道
// 返回只读 channel 和取消订阅函数
// 支持多个订阅者(同一 instance 多个浏览器窗口)
Subscribe(instanceID uint) (<-chan *ProgressEvent, func())
// Close 关闭并清理指定 instance 的通道
// 所有订阅者的 channel 会被关闭
Close(instanceID uint)
}
实现要点:
- 使用 sync.RWMutex 保护 map[uint][]*subscriber
- 每个 subscriber 有独立的 buffered channel(容量 32)
- Publish 时非阻塞写入所有 subscriber channel(满了跳过,避免慢消费者阻塞 executor)
- Subscribe 返回的 cancel func 负责从订阅列表中移除并关闭该 subscriber 的 channel
- Close 时关闭所有 subscriber channel,清理 map entry
4.2 Executor 接口变更¶
位置:internal/shared/task_manager/executor.go
// 旧接口
Execute(ctx context.Context, task *models.Task,
progressCallback func(taskID uint, progress int) error) (interface{}, error)
// 新接口
Execute(ctx context.Context, task *models.Task,
progressCallback func(taskID uint, event *ProgressEvent) error) (interface{}, error)
Worker 层适配:worker.go 中构造 progressCallback 时:
1. 将事件 Publish 到 ProgressBus
2. 同时仍更新 DB 中的 Task.Progress(保留 DB 持久化,用于重连场景和审计)
progressCallback := func(taskID uint, event *ProgressEvent) error {
// 1. 事件驱动推送
if bus != nil {
bus.Publish(instanceID, event)
}
// 2. DB 持久化(兼容旧逻辑 + 重连场景)
percent := calculatePercent(event) // 从步骤状态推算百分比
return tm.taskRepo.UpdateProgress(tm.ctx, taskID, percent)
}
4.3 CRDCreateExecutor 改造¶
位置:internal/user/service/k8s_service/crd_service/executor.go
改造前(5 个硬编码百分比点):
_ = progressCallback(task.ID, 5) // 准备
_ = progressCallback(task.ID, 10) // 租户就绪
_ = progressCallback(task.ID, 20) // 构建 CR
_ = progressCallback(task.ID, 50) // CR 已创建
_ = progressCallback(task.ID, 100) // Ready
改造后(结构化步骤事件):
// 步骤 1:准备部署
emitStep(1, "准备部署", "running")
// ... 参数解析、配置应用 ...
emitStep(1, "准备部署", "completed")
// 步骤 2:等待租户就绪
emitStep(2, "等待租户就绪", "running")
tenantClient.WaitReady(...)
emitStep(2, "等待租户就绪", "completed")
// 步骤 3:创建资源(带子步骤)
emitStep(3, "创建资源", "running")
serverClient.Create(ctx, server, namespace)
// WaitReadyWithProgress 回调透传组件进度
serverClient.WaitReadyWithProgress(ctx, name, namespace, timeout, func(components map[string]ComponentStatus) {
subSteps := mapComponentsToSubSteps(components)
emitStepWithSubSteps(3, "创建资源", "running", subSteps)
})
emitStep(3, "创建资源", "completed")
4.4 WaitReady 改造¶
位置:internal/shared/k8s/crd/client.go(或对应文件)
新增方法,保留原有 WaitReady 不变:
// WaitReadyWithProgress 等待 TFRServer Ready,并在每次 Status 变更时回调组件进度
func (c *tfrServerClient) WaitReadyWithProgress(
ctx context.Context,
name, namespace string,
timeout time.Duration,
onStatusChange func(components map[string]ServerComponentStatus),
) (*TFRServer, error) {
// 使用 K8s Watch 监听 TFRServer
// 每次 Watch 事件触发时:
// 1. 检查 Ready condition
// 2. 调用 onStatusChange(server.Status.Components) 推送组件进度
// 3. Ready 时返回
}
4.5 InitExecutorV2 改造¶
位置:internal/user/service/digital_employee_service/init_executor_v2.go
当前只报百分比:
改造后报结构化事件:
// 在执行每个 Postman request item 时
subSteps := buildSubStepsFromCollection(collection, completedItems)
event := &ProgressEvent{
Type: "step_update",
Step: StepInfo{Index: 4, Name: "初始化资源", Status: "running"},
SubSteps: subSteps, // 动态,来自 collection items
TotalSteps: 4,
}
_ = progressCallback(task.ID, event)
4.6 MockDeployExecutor 改造¶
Mock 模拟完整的步骤+子步骤序列,包括模拟 Operator 组件进度:
// 步骤 3 模拟子步骤
mockComponents := []SubStepInfo{
{Name: "MinIO Init", Status: "pending"},
{Name: "DB Init", Status: "pending"},
{Name: "Config TOML", Status: "pending"},
{Name: "API Server", Status: "pending"},
}
for i := range mockComponents {
time.Sleep(stepDelay)
mockComponents[i].Status = "completed"
emitStepWithSubSteps(3, "创建资源", "running", mockComponents)
}
4.7 Handler 改造¶
位置:internal/user/api/handlers/digital_employee_handler.go
从 DB 轮询改为 ProgressBus 订阅:
func (h *DigitalEmployeeHandler) DeployProgress(c *gin.Context) {
// ... 解析 ID、校验权限 ...
// 终态处理不变
if isTerminal(instance.Status) {
writeTerminalEvent(c, instance)
return
}
// 设置流式响应头
c.Header("Content-Type", "application/x-ndjson")
c.Header("Cache-Control", "no-cache")
c.Header("X-Accel-Buffering", "no")
// 订阅事件通道
ch, cancel := h.progressBus.Subscribe(instance.ID)
defer cancel()
timeout := time.After(10 * time.Minute)
for {
select {
case <-c.Request.Context().Done():
return
case <-timeout:
writeTimeoutEvent(c)
return
case event, ok := <-ch:
if !ok {
// channel 被关闭,查询最终状态
writeTerminalFromDB(c, instance.ID)
return
}
writeEvent(c, event)
if event.Type == "complete" || event.Type == "error" {
return
}
}
}
}
4.8 重连处理¶
当前端刷新页面重连时: 1. Handler 重新查询 instance 状态 2. 如果已是终态,直接返回终态事件 3. 如果正在进行中,Subscribe 到 ProgressBus,从当前最新事件开始接收 4. 重连期间错过的事件不补发(前端从当前状态渲染)
4.9 CompletionHandler 适配¶
Deploy → Init 的过渡:
TaskCompletionHandler(生产)和MockDeployCompletionHandler(Mock)在 deploy task 完成后创建 init task- 创建 init task 时,ProgressBus 的 instance channel 不关闭,init executor 继续在同一 channel 发布步骤 4 的事件
- 只在实例到达终态(running/failed/stopped)时才 Close channel
5. 数据流对比¶
V1(当前)¶
CRDCreateExecutor
└→ progressCallback(taskID, 5)
└→ taskRepo.UpdateProgress(taskID, 5) [写DB]
└→ Handler ticker (2s)
└→ taskRepo.GetByID(taskID) [读DB]
└→ progressToStep(5) → "preparing" [反推]
└→ if percent != lastPercent → writeNDJSON [去重]
V2(新方案)¶
CRDCreateExecutor
└→ progressCallback(taskID, &ProgressEvent{Step: {1, "准备部署", "running"}})
├→ progressBus.Publish(instanceID, event) [channel推送,纳秒级]
│ └→ Handler goroutine receives from channel
│ └→ writeNDJSON(event) [即时]
└→ taskRepo.UpdateProgress(taskID, percent) [DB持久化,审计用]
6. 文件变更清单¶
| 文件 | 变更类型 | 说明 |
|---|---|---|
internal/shared/task_manager/progress_event.go |
新增 | ProgressEvent、StepInfo、SubStepInfo 类型定义 |
internal/shared/task_manager/progress_bus.go |
新增 | ProgressBus 接口和实现 |
internal/shared/task_manager/progress_bus_test.go |
新增 | ProgressBus 单元测试 |
internal/shared/task_manager/executor.go |
修改 | progressCallback 签名变更 |
internal/shared/task_manager/worker.go |
修改 | 构造 progressCallback 时集成 ProgressBus |
internal/shared/task_manager/manager.go |
修改 | 注入 ProgressBus |
internal/shared/k8s/crd/client.go |
修改 | 新增 WaitReadyWithProgress 方法 |
internal/user/service/k8s_service/crd_service/executor.go |
修改 | 步骤化事件替代百分比 |
internal/user/service/digital_employee_service/mock_deploy_executor.go |
修改 | 步骤化事件 + 模拟子步骤 |
internal/user/service/digital_employee_service/init_executor_v2.go |
修改 | 步骤化事件 + Collection item 子步骤 |
internal/user/service/digital_employee_service/deploy_progress.go |
删除/重写 | 移除 progressToStep,移除 MapTaskToProgressEvent |
internal/user/api/handlers/digital_employee_handler.go |
修改 | DeployProgress 从轮询改为订阅 |
internal/user/api/handlers/deploy_progress_handler_test.go |
修改 | 适配新事件模型 |
internal/user/service/digital_employee_service/service_manager.go |
修改 | 注入 ProgressBus |
internal/user/service/digital_employee_service/task_completion_handler.go |
修改 | deploy→init 不关闭 channel |
internal/user/service/digital_employee_service/mock_deploy_completion_handler.go |
修改 | 同上 |
cmd/user-service/main.go |
修改 | 创建 ProgressBus 并注入 |
7. 测试计划¶
7.1 单元测试¶
| 测试目标 | 测试内容 |
|---|---|
| ProgressBus | Publish/Subscribe 基本功能、多订阅者、Close 后 channel 关闭、无订阅者时 Publish 不阻塞 |
| CRDCreateExecutor | 验证步骤事件序列(使用 mock ProgressBus) |
| MockDeployExecutor | 验证步骤+子步骤事件序列、失败场景在正确步骤失败 |
| InitExecutorV2 | 验证 Collection items 映射为子步骤、动态 subSteps 数量 |
| Worker | 验证 progressCallback 同时写 ProgressBus 和 DB |
7.2 集成测试¶
| 测试目标 | 测试内容 |
|---|---|
| 端到端 NDJSON Stream | 启动 HTTP 服务 → 触发 Mock deploy → 订阅 NDJSON stream → 验证完整事件序列(4步+子步骤) |
| 重连场景 | 断开 stream → 重连 → 验证从当前状态恢复 |
| 终态场景 | 实例已 running → 请求 deploy-progress → 验证返回单条 complete 事件 |
| 错误场景 | Mock 失败模板 → 验证 error 事件包含正确步骤信息 |
| Deploy→Init 过渡 | 验证 deploy 完成后 stream 无中断地继续推送 init 步骤 |
7.3 验收标准¶
- 无步骤丢失:无论进度多快,4 个主步骤的 running→completed 事件必须全部推送
- 子步骤完整:步骤 3 展示 Operator 组件进度,步骤 4 展示 Postman request items
- 零轮询:Handler 不再有任何 ticker/轮询逻辑
- 事件即时性:executor 发布事件到 Handler 推送的延迟 < 10ms
- Mock 一致性:Mock 模式和生产模式的事件格式完全一致
- 重连不崩溃:前端刷新页面后重连,stream 正常恢复
8. Operator 对接要求文档¶
以下要求发送给 tfrs-operator 开发者,用于自检和对接。
8.1 TFRServer Status.Components 要求¶
tfrsmanager 将在 WaitReady 期间读取 TFRServer.Status.Components,用于向用户展示部署子步骤进度。
要求:
-
组件粒度:
Status.Components中的每个条目代表一个用户可感知的部署子步骤。当前已有的组件(MinIO Init、DB Init、API Server 等)粒度合适。 -
状态值标准化:每个
ComponentStatus.State必须使用以下值之一: Pending— 组件尚未开始部署Deploying— 组件正在部署中Ready— 组件部署完成且就绪-
Failed— 组件部署失败 -
及时更新:Reconciler 在每个组件状态变更时必须及时更新
Status.Components并调用r.Status().Update()。tfrsmanager 通过 K8s Watch 监听这些变更,延迟越低用户体验越好。 -
组件名称稳定:
Componentsmap 的 key(组件名称)应保持稳定,不随版本变化。这些名称会直接展示给最终用户。 -
组件显示名称(可选):如果希望前端展示更友好的中文名称,可在
ComponentStatus中增加DisplayName字段。否则 tfrsmanager 侧维护映射表。
8.2 TFRTenant Status 要求¶
同 TFRServer,TFRTenant.Status.Components 也应遵循上述标准。但当前方案中 TFRTenant 的等待不透传子步骤,仅在步骤 2 展示整体状态。
8.3 对接时间线¶
- tfrsmanager 侧先行开发,使用 Mock WaitReadyWithProgress 进行测试
- Operator 侧自检 Status.Components 是否满足上述要求
- 联调时对接真实 Watch 事件
9. 实施计划¶
所有变更为一次原子提交,不分阶段交付。以下为建议的编码顺序(自底向上,先基础设施后上层消费者):
- ProgressEvent 类型定义 —
progress_event.go - ProgressBus 实现 + 单元测试 —
progress_bus.go、progress_bus_test.go - Executor 接口签名变更 —
executor.go中 progressCallback 从int改为*ProgressEvent - Worker 层适配 —
worker.go构造 progressCallback,集成 ProgressBus + DB 持久化 - WaitReadyWithProgress —
crd/client.go新增方法 - 三个 Executor 改造 — CRDCreateExecutor、MockDeployExecutor、InitExecutorV2
- Handler 改造 — 从 DB 轮询改为 ProgressBus 订阅
- CompletionHandler 适配 — deploy→init 过渡不关闭 channel
- 注入层 — ServiceManager、main.go 创建并注入 ProgressBus
- 删除旧代码 — progressToStep、MapTaskToProgressEvent 等
- 集成测试 — HTTP stream 端到端验证
10. 风险与缓解¶
| 风险 | 缓解措施 |
|---|---|
| 前端未及时订阅,错过早期事件 | Handler 在 Subscribe 后立即查询当前状态发送初始事件 |
| Executor 崩溃,channel 未关闭 | Worker 层 defer Close,CompletionHandler 兜底 |
| 多浏览器窗口同时订阅 | ProgressBus 支持多订阅者,每个有独立 channel |
| WaitReadyWithProgress Watch 断开 | K8s Watch 自带重连机制,已有的 WaitReady 逻辑可复用 |
| DB 持久化与 channel 推送不一致 | progressCallback 先 Publish channel 再写 DB,最坏情况 DB 落后于 stream |