跳转至

Deploy Progress Streaming V2 技术方案

状态:待评审 日期:2026-03-08 范围:tfrsmanager 仓库(operator 侧输出对接要求文档)


1. 问题陈述

当前部署进度流式推送存在以下设计缺陷:

缺陷 描述 严重程度
百分比反推步骤 progressToStep() 用 0-100% 百分比反向推导步骤名称,进度跳跃时步骤丢失 P0
DB 轮询 Handler 每 2s 轮询 Task 表读进度,快速变化时丢步骤 P0
去重粒度粗 只按 percent 去重,不按步骤状态去重 P1
前端不补全 前端只渲染收到的步骤,缺失步骤不显示 P1
Init 单步骤 Init 阶段只映射为 1 个步骤,无子步骤 P0

影响范围progressToStep() 在生产环境的 CRDCreateExecutor 和 Mock 的 MockDeployExecutor 中均在使用,不仅仅是 Mock 问题。


2. 设计决策汇总

决策项 选择 理由
推送机制 Go channel(进程内 pub/sub) 零延迟、零外部依赖、天然背压、单进程部署场景完全匹配
Channel 粒度 按 Instance ID 与前端连接模型一致——一个 instance 一个 stream
Operator 组件进度 透传 TFRServer.Status.Components WaitReady 期间读取组件状态推送为子步骤
WaitReady 改造 改造为支持回调的版本 完全事件驱动,每次 Status 变更时回调
步骤模型 4 个主步骤 + 动态子步骤 步骤是一等公民,有独立状态
Mock 模式 模拟子步骤 前端开发/测试体验一致
Init 子步骤 动态,来自 Postman Collection item.Name 前端动态渲染
向后兼容 一次性切换 前后端同步升级,不做双版本并行
Executor 接口 改造 progressCallback 签名 int 改为 *ProgressEvent
ProgressBus 位置 internal/shared/task_manager/ 与任务执行密切相关
重连策略 从当前最新状态开始推送 简单直接,前端处理步骤跳变
测试 单元 + 集成测试 ProgressBus 单元测试 + HTTP stream 端到端测试
改造范围 仅 tfrsmanager Operator 侧输出对接要求文档

3. 架构设计

3.1 整体架构

┌──────────────────────────────────────────────────────────────────┐
│                         tfrsmanager 进程                         │
│                                                                  │
│  ┌─────────────┐     Publish()     ┌──────────────┐             │
│  │  Executor    │ ──────────────→  │ ProgressBus  │             │
│  │ (goroutine)  │                  │  (per-inst   │             │
│  │              │                  │   channel)   │             │
│  └─────────────┘                  └──────┬───────┘             │
│                                          │ Subscribe()          │
│                                          ▼                      │
│                                   ┌──────────────┐              │
│                                   │   Handler    │──→ NDJSON    │
│                                   │ (goroutine)  │   Stream     │
│                                   └──────────────┘   to Client  │
└──────────────────────────────────────────────────────────────────┘

对比 V1:

V1: Executor → DB write → [2s ticker] → Handler polls DB → Client
V2: Executor → channel  → [instant]  → Handler receives  → Client

3.2 事件模型

// ProgressEvent 部署进度事件(V2)
type ProgressEvent struct {
    Type       string        `json:"type"`                // "step_update" | "complete" | "error"
    Step       StepInfo      `json:"step"`                // 当前主步骤
    SubSteps   []SubStepInfo `json:"subSteps,omitempty"`  // 子步骤(步骤3、4有)
    TotalSteps int           `json:"totalSteps"`          // 主步骤总数 = 4
    Message    string        `json:"message,omitempty"`   // 附加消息
    ErrorCode  string        `json:"errorCode,omitempty"` // 错误码
    Timestamp  string        `json:"timestamp"`           // ISO8601
}

type StepInfo struct {
    Index  int    `json:"index"`  // 1-4
    Name   string `json:"name"`   // 步骤中文名
    Status string `json:"status"` // "pending" | "running" | "completed" | "failed"
}

type SubStepInfo struct {
    Name   string `json:"name"`   // 子步骤名称(如 "MinIO Init"、"健康检查")
    Status string `json:"status"` // "pending" | "running" | "completed" | "failed"
}

3.3 主步骤定义

Index Name 触发条件 子步骤来源
1 准备部署 参数解析、配置应用
2 等待租户就绪 TFRTenant WaitReady(可能瞬间完成)
3 创建资源 Apply TFRServer CR + Watch Ready TFRServer.Status.Components
4 初始化资源 InitExecutorV2 执行 Postman Collection Collection request items

事件序列示例(完整部署):

{"type":"step_update","step":{"index":1,"name":"准备部署","status":"running"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":1,"name":"准备部署","status":"completed"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":2,"name":"等待租户就绪","status":"running"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":2,"name":"等待租户就绪","status":"completed"},"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"running"},"subSteps":[{"name":"MinIO Init","status":"pending"},{"name":"DB Init","status":"pending"},{"name":"API Server","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"running"},"subSteps":[{"name":"MinIO Init","status":"completed"},{"name":"DB Init","status":"running"},{"name":"API Server","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"running"},"subSteps":[{"name":"MinIO Init","status":"completed"},{"name":"DB Init","status":"completed"},{"name":"API Server","status":"running"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":3,"name":"创建资源","status":"completed"},"subSteps":[{"name":"MinIO Init","status":"completed"},{"name":"DB Init","status":"completed"},{"name":"API Server","status":"completed"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":4,"name":"初始化资源","status":"running"},"subSteps":[{"name":"健康检查","status":"running"},{"name":"创建Token","status":"pending"},{"name":"配置工作流","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"step_update","step":{"index":4,"name":"初始化资源","status":"running"},"subSteps":[{"name":"健康检查","status":"completed"},{"name":"创建Token","status":"running"},{"name":"配置工作流","status":"pending"}],"totalSteps":4,"timestamp":"..."}
{"type":"complete","step":{"index":4,"name":"初始化资源","status":"completed"},"totalSteps":4,"timestamp":"..."}

4. 核心组件设计

4.1 ProgressBus(事件总线)

位置internal/shared/task_manager/progress_bus.go

// ProgressBus 管理 per-instance 的事件通道
type ProgressBus interface {
    // Publish 发布事件到指定 instance 的通道
    // 如果没有订阅者,事件会被丢弃(不阻塞 executor)
    Publish(instanceID uint, event *ProgressEvent)

    // Subscribe 订阅指定 instance 的事件通道
    // 返回只读 channel 和取消订阅函数
    // 支持多个订阅者(同一 instance 多个浏览器窗口)
    Subscribe(instanceID uint) (<-chan *ProgressEvent, func())

    // Close 关闭并清理指定 instance 的通道
    // 所有订阅者的 channel 会被关闭
    Close(instanceID uint)
}

实现要点: - 使用 sync.RWMutex 保护 map[uint][]*subscriber - 每个 subscriber 有独立的 buffered channel(容量 32) - Publish 时非阻塞写入所有 subscriber channel(满了跳过,避免慢消费者阻塞 executor) - Subscribe 返回的 cancel func 负责从订阅列表中移除并关闭该 subscriber 的 channel - Close 时关闭所有 subscriber channel,清理 map entry

4.2 Executor 接口变更

位置internal/shared/task_manager/executor.go

// 旧接口
Execute(ctx context.Context, task *models.Task,
    progressCallback func(taskID uint, progress int) error) (interface{}, error)

// 新接口
Execute(ctx context.Context, task *models.Task,
    progressCallback func(taskID uint, event *ProgressEvent) error) (interface{}, error)

Worker 层适配worker.go 中构造 progressCallback 时: 1. 将事件 Publish 到 ProgressBus 2. 同时仍更新 DB 中的 Task.Progress(保留 DB 持久化,用于重连场景和审计)

progressCallback := func(taskID uint, event *ProgressEvent) error {
    // 1. 事件驱动推送
    if bus != nil {
        bus.Publish(instanceID, event)
    }
    // 2. DB 持久化(兼容旧逻辑 + 重连场景)
    percent := calculatePercent(event) // 从步骤状态推算百分比
    return tm.taskRepo.UpdateProgress(tm.ctx, taskID, percent)
}

4.3 CRDCreateExecutor 改造

位置internal/user/service/k8s_service/crd_service/executor.go

改造前(5 个硬编码百分比点):

_ = progressCallback(task.ID, 5)   // 准备
_ = progressCallback(task.ID, 10)  // 租户就绪
_ = progressCallback(task.ID, 20)  // 构建 CR
_ = progressCallback(task.ID, 50)  // CR 已创建
_ = progressCallback(task.ID, 100) // Ready

改造后(结构化步骤事件):

// 步骤 1:准备部署
emitStep(1, "准备部署", "running")
// ... 参数解析、配置应用 ...
emitStep(1, "准备部署", "completed")

// 步骤 2:等待租户就绪
emitStep(2, "等待租户就绪", "running")
tenantClient.WaitReady(...)
emitStep(2, "等待租户就绪", "completed")

// 步骤 3:创建资源(带子步骤)
emitStep(3, "创建资源", "running")
serverClient.Create(ctx, server, namespace)
// WaitReadyWithProgress 回调透传组件进度
serverClient.WaitReadyWithProgress(ctx, name, namespace, timeout, func(components map[string]ComponentStatus) {
    subSteps := mapComponentsToSubSteps(components)
    emitStepWithSubSteps(3, "创建资源", "running", subSteps)
})
emitStep(3, "创建资源", "completed")

4.4 WaitReady 改造

位置internal/shared/k8s/crd/client.go(或对应文件)

新增方法,保留原有 WaitReady 不变:

// WaitReadyWithProgress 等待 TFRServer Ready,并在每次 Status 变更时回调组件进度
func (c *tfrServerClient) WaitReadyWithProgress(
    ctx context.Context,
    name, namespace string,
    timeout time.Duration,
    onStatusChange func(components map[string]ServerComponentStatus),
) (*TFRServer, error) {
    // 使用 K8s Watch 监听 TFRServer
    // 每次 Watch 事件触发时:
    //   1. 检查 Ready condition
    //   2. 调用 onStatusChange(server.Status.Components) 推送组件进度
    //   3. Ready 时返回
}

4.5 InitExecutorV2 改造

位置internal/user/service/digital_employee_service/init_executor_v2.go

当前只报百分比:

pct := (completedBefore + index + 1) * 100 / totalAllItems
_ = progressCallback(task.ID, pct)

改造后报结构化事件:

// 在执行每个 Postman request item 时
subSteps := buildSubStepsFromCollection(collection, completedItems)
event := &ProgressEvent{
    Type: "step_update",
    Step: StepInfo{Index: 4, Name: "初始化资源", Status: "running"},
    SubSteps: subSteps, // 动态,来自 collection items
    TotalSteps: 4,
}
_ = progressCallback(task.ID, event)

4.6 MockDeployExecutor 改造

Mock 模拟完整的步骤+子步骤序列,包括模拟 Operator 组件进度:

// 步骤 3 模拟子步骤
mockComponents := []SubStepInfo{
    {Name: "MinIO Init", Status: "pending"},
    {Name: "DB Init", Status: "pending"},
    {Name: "Config TOML", Status: "pending"},
    {Name: "API Server", Status: "pending"},
}
for i := range mockComponents {
    time.Sleep(stepDelay)
    mockComponents[i].Status = "completed"
    emitStepWithSubSteps(3, "创建资源", "running", mockComponents)
}

4.7 Handler 改造

位置internal/user/api/handlers/digital_employee_handler.go

从 DB 轮询改为 ProgressBus 订阅:

func (h *DigitalEmployeeHandler) DeployProgress(c *gin.Context) {
    // ... 解析 ID、校验权限 ...

    // 终态处理不变
    if isTerminal(instance.Status) {
        writeTerminalEvent(c, instance)
        return
    }

    // 设置流式响应头
    c.Header("Content-Type", "application/x-ndjson")
    c.Header("Cache-Control", "no-cache")
    c.Header("X-Accel-Buffering", "no")

    // 订阅事件通道
    ch, cancel := h.progressBus.Subscribe(instance.ID)
    defer cancel()

    timeout := time.After(10 * time.Minute)

    for {
        select {
        case <-c.Request.Context().Done():
            return
        case <-timeout:
            writeTimeoutEvent(c)
            return
        case event, ok := <-ch:
            if !ok {
                // channel 被关闭,查询最终状态
                writeTerminalFromDB(c, instance.ID)
                return
            }
            writeEvent(c, event)
            if event.Type == "complete" || event.Type == "error" {
                return
            }
        }
    }
}

4.8 重连处理

当前端刷新页面重连时: 1. Handler 重新查询 instance 状态 2. 如果已是终态,直接返回终态事件 3. 如果正在进行中,Subscribe 到 ProgressBus,从当前最新事件开始接收 4. 重连期间错过的事件不补发(前端从当前状态渲染)

4.9 CompletionHandler 适配

Deploy → Init 的过渡:

  • TaskCompletionHandler(生产)和 MockDeployCompletionHandler(Mock)在 deploy task 完成后创建 init task
  • 创建 init task 时,ProgressBus 的 instance channel 不关闭,init executor 继续在同一 channel 发布步骤 4 的事件
  • 只在实例到达终态(running/failed/stopped)时才 Close channel

5. 数据流对比

V1(当前)

CRDCreateExecutor
  └→ progressCallback(taskID, 5)
       └→ taskRepo.UpdateProgress(taskID, 5)  [写DB]
            └→ Handler ticker (2s)
                 └→ taskRepo.GetByID(taskID)  [读DB]
                      └→ progressToStep(5) → "preparing"  [反推]
                           └→ if percent != lastPercent → writeNDJSON  [去重]

V2(新方案)

CRDCreateExecutor
  └→ progressCallback(taskID, &ProgressEvent{Step: {1, "准备部署", "running"}})
       ├→ progressBus.Publish(instanceID, event)  [channel推送,纳秒级]
       │    └→ Handler goroutine receives from channel
       │         └→ writeNDJSON(event)  [即时]
       └→ taskRepo.UpdateProgress(taskID, percent)  [DB持久化,审计用]

6. 文件变更清单

文件 变更类型 说明
internal/shared/task_manager/progress_event.go 新增 ProgressEvent、StepInfo、SubStepInfo 类型定义
internal/shared/task_manager/progress_bus.go 新增 ProgressBus 接口和实现
internal/shared/task_manager/progress_bus_test.go 新增 ProgressBus 单元测试
internal/shared/task_manager/executor.go 修改 progressCallback 签名变更
internal/shared/task_manager/worker.go 修改 构造 progressCallback 时集成 ProgressBus
internal/shared/task_manager/manager.go 修改 注入 ProgressBus
internal/shared/k8s/crd/client.go 修改 新增 WaitReadyWithProgress 方法
internal/user/service/k8s_service/crd_service/executor.go 修改 步骤化事件替代百分比
internal/user/service/digital_employee_service/mock_deploy_executor.go 修改 步骤化事件 + 模拟子步骤
internal/user/service/digital_employee_service/init_executor_v2.go 修改 步骤化事件 + Collection item 子步骤
internal/user/service/digital_employee_service/deploy_progress.go 删除/重写 移除 progressToStep,移除 MapTaskToProgressEvent
internal/user/api/handlers/digital_employee_handler.go 修改 DeployProgress 从轮询改为订阅
internal/user/api/handlers/deploy_progress_handler_test.go 修改 适配新事件模型
internal/user/service/digital_employee_service/service_manager.go 修改 注入 ProgressBus
internal/user/service/digital_employee_service/task_completion_handler.go 修改 deploy→init 不关闭 channel
internal/user/service/digital_employee_service/mock_deploy_completion_handler.go 修改 同上
cmd/user-service/main.go 修改 创建 ProgressBus 并注入

7. 测试计划

7.1 单元测试

测试目标 测试内容
ProgressBus Publish/Subscribe 基本功能、多订阅者、Close 后 channel 关闭、无订阅者时 Publish 不阻塞
CRDCreateExecutor 验证步骤事件序列(使用 mock ProgressBus)
MockDeployExecutor 验证步骤+子步骤事件序列、失败场景在正确步骤失败
InitExecutorV2 验证 Collection items 映射为子步骤、动态 subSteps 数量
Worker 验证 progressCallback 同时写 ProgressBus 和 DB

7.2 集成测试

测试目标 测试内容
端到端 NDJSON Stream 启动 HTTP 服务 → 触发 Mock deploy → 订阅 NDJSON stream → 验证完整事件序列(4步+子步骤)
重连场景 断开 stream → 重连 → 验证从当前状态恢复
终态场景 实例已 running → 请求 deploy-progress → 验证返回单条 complete 事件
错误场景 Mock 失败模板 → 验证 error 事件包含正确步骤信息
Deploy→Init 过渡 验证 deploy 完成后 stream 无中断地继续推送 init 步骤

7.3 验收标准

  1. 无步骤丢失:无论进度多快,4 个主步骤的 running→completed 事件必须全部推送
  2. 子步骤完整:步骤 3 展示 Operator 组件进度,步骤 4 展示 Postman request items
  3. 零轮询:Handler 不再有任何 ticker/轮询逻辑
  4. 事件即时性:executor 发布事件到 Handler 推送的延迟 < 10ms
  5. Mock 一致性:Mock 模式和生产模式的事件格式完全一致
  6. 重连不崩溃:前端刷新页面后重连,stream 正常恢复

8. Operator 对接要求文档

以下要求发送给 tfrs-operator 开发者,用于自检和对接。

8.1 TFRServer Status.Components 要求

tfrsmanager 将在 WaitReady 期间读取 TFRServer.Status.Components,用于向用户展示部署子步骤进度。

要求:

  1. 组件粒度Status.Components 中的每个条目代表一个用户可感知的部署子步骤。当前已有的组件(MinIO Init、DB Init、API Server 等)粒度合适。

  2. 状态值标准化:每个 ComponentStatus.State 必须使用以下值之一:

  3. Pending — 组件尚未开始部署
  4. Deploying — 组件正在部署中
  5. Ready — 组件部署完成且就绪
  6. Failed — 组件部署失败

  7. 及时更新:Reconciler 在每个组件状态变更时必须及时更新 Status.Components 并调用 r.Status().Update()。tfrsmanager 通过 K8s Watch 监听这些变更,延迟越低用户体验越好。

  8. 组件名称稳定Components map 的 key(组件名称)应保持稳定,不随版本变化。这些名称会直接展示给最终用户。

  9. 组件显示名称(可选):如果希望前端展示更友好的中文名称,可在 ComponentStatus 中增加 DisplayName 字段。否则 tfrsmanager 侧维护映射表。

8.2 TFRTenant Status 要求

同 TFRServer,TFRTenant.Status.Components 也应遵循上述标准。但当前方案中 TFRTenant 的等待不透传子步骤,仅在步骤 2 展示整体状态。

8.3 对接时间线

  • tfrsmanager 侧先行开发,使用 Mock WaitReadyWithProgress 进行测试
  • Operator 侧自检 Status.Components 是否满足上述要求
  • 联调时对接真实 Watch 事件

9. 实施计划

所有变更为一次原子提交,不分阶段交付。以下为建议的编码顺序(自底向上,先基础设施后上层消费者):

  1. ProgressEvent 类型定义progress_event.go
  2. ProgressBus 实现 + 单元测试progress_bus.goprogress_bus_test.go
  3. Executor 接口签名变更executor.go 中 progressCallback 从 int 改为 *ProgressEvent
  4. Worker 层适配worker.go 构造 progressCallback,集成 ProgressBus + DB 持久化
  5. WaitReadyWithProgresscrd/client.go 新增方法
  6. 三个 Executor 改造 — CRDCreateExecutor、MockDeployExecutor、InitExecutorV2
  7. Handler 改造 — 从 DB 轮询改为 ProgressBus 订阅
  8. CompletionHandler 适配 — deploy→init 过渡不关闭 channel
  9. 注入层 — ServiceManager、main.go 创建并注入 ProgressBus
  10. 删除旧代码 — progressToStep、MapTaskToProgressEvent 等
  11. 集成测试 — HTTP stream 端到端验证

10. 风险与缓解

风险 缓解措施
前端未及时订阅,错过早期事件 Handler 在 Subscribe 后立即查询当前状态发送初始事件
Executor 崩溃,channel 未关闭 Worker 层 defer Close,CompletionHandler 兜底
多浏览器窗口同时订阅 ProgressBus 支持多订阅者,每个有独立 channel
WaitReadyWithProgress Watch 断开 K8s Watch 自带重连机制,已有的 WaitReady 逻辑可复用
DB 持久化与 channel 推送不一致 progressCallback 先 Publish channel 再写 DB,最坏情况 DB 落后于 stream