KiCloud SaaS 可用性保障——线程池隔离、限流降级与 MTTR 闭环机制

_

一、单设备影响线程池的雪崩场景

1. 故障还原

上线后第三周遇到的一次真实事故。某化工客户的一台离心泵的振动传感器固件出了 bug,正常情况下每秒上报 2 条数据,故障后变成了每秒上报 200 条——相当于单台设备的数据量暴增了 100 倍。

这台设备的数据进入 device-monitor 服务后,每条都要走完整的处理链路:MQTT 消息解析 → 协议适配 → 振动特征提取 → 阈值判断 → 写 DB → 更新 Redis 状态。其中振动特征提取涉及 FFT 计算,CPU 密集,单条处理约 5-8ms。200 条/秒意味着这一台设备就占用了 1-1.6 秒的 CPU 时间/秒,几乎吃满了一个线程。

更致命的是当时的架构问题:device-monitor 服务用的是 Spring 默认的 Tomcat 线程池处理 MQTT 回调(通过 EMQX 的 WebHook 转发为 HTTP 请求),线程池大小 200。这台故障设备的 200 条/秒请求迅速占满了大量线程,而且每个请求里还有一步是同步调用第三方振动诊断 API(前面讲的,虽然有预筛过滤,但 RMS 值全都超标所以全部送诊断了),第三方 API 响应时间 500ms-2s,线程被阻塞在 HTTP 调用上。

结果: 200 个 Tomcat 线程中约 150 个被这一台设备的请求占据(大部分阻塞在第三方 API 调用上),其余 119 家客户的正常设备数据处理只剩 50 个线程可用,大面积超时。MQTT 消息积压从 0 飙到 5 万条,其他客户的设备状态更新延迟超过 2 分钟,告警推送全面滞后。Grafana 上 device_data_processing_rate 从正常的 2000 TPS 掉到了 300 TPS,http_server_requests_seconds_max 飙到 30 秒。

从用户发现问题到我们手动封禁了这台设备的数据上报,一共花了 18 分钟。这 18 分钟里整个平台的监测能力基本瘫痪——如果恰好有另一家客户的设备真出了故障,告警会严重延迟,直接影响 MTTR。

2. 根因分析

复盘后总结了三个架构缺陷:

第一是没有设备级别的流量控制,单台设备可以无限制占用平台资源。第二是同步调用第三方 API 阻塞了主处理线程,一个慢依赖拖垮了全局。第三是所有租户所有设备共享一个线程池,没有任何隔离,一个租户的问题直接影响全平台。


二、限流 + 降级 + 消息队列的完整解决方案

1. 架构改造总览

改造后的数据处理链路从同步改成了异步分层

设备 → MQTT Broker (EMQX)
         ↓
    RocketMQ(设备数据Topic,按tenant_id分区)  ← 第一层解耦
         ↓
    device-monitor Consumer(多线程消费)
         ├── 快速通道:基础处理(解析+阈值判断+写DB+更新Redis)← 线程池A
         └── 慢速通道:振动诊断API调用                        ← 线程池B(隔离)

核心思路:MQTT 数据不再直接打到 HTTP 接口,而是先落 RocketMQ 做缓冲;消费端内部用两个独立线程池隔离快慢操作;在多个层面做设备级和租户级限流。

2. 第一层改造:MQTT → RocketMQ 解耦

原来 EMQX 通过 WebHook 同步推到 device-monitor 的 HTTP 接口,改成 EMQX 的 RocketMQ Bridge 直接把 MQTT 消息投递到 RocketMQ Topic。

# EMQX Bridge配置
bridges:
  rocketmq:
    connector: kicloud-rocketmq
    parameters:
      topic: device-data-raw
      # 按tenantId做消息分区,同一租户的消息落同一个Queue
      # 保证同一设备的数据顺序性
      message_key: "${clientid}"

RocketMQ Topic device-data-raw 配了 16 个 Queue,按 tenant_id hash 分区。好处有三个:EMQX 不再阻塞(发完 MQ 立刻返回);MQ 天然做了削峰填谷,突发流量先堆在 MQ 里慢慢消费;按租户分区保证单个租户的问题不会影响其他分区的消费。

3. 第二层改造:设备级限流(Sentinel 热点参数限流)

在 RocketMQ Consumer 拿到消息后、进入业务处理之前,加了一层 Sentinel 热点参数限流,限流维度是 device_id

@Component
public class DeviceDataConsumer {
​
    private static final String RESOURCE_NAME = "device-data-process";
​
    @PostConstruct
    public void initSentinelRules() {
        // 单设备限流:每台设备最多处理10条/秒
        ParamFlowRule deviceRule = new ParamFlowRule(RESOURCE_NAME)
            .setParamIdx(0)         // 第0个参数是deviceId
            .setGrade(RuleConstant.FLOW_GRADE_QPS)
            .setCount(10)           // 默认10 QPS/设备
            .setDurationInSec(1);
        
        // VIP设备可以单独配更高的限额
        // 通过Nacos动态推送,不用重启
        
        ParamFlowRuleManager.loadRules(List.of(deviceRule));
    }
​
    @RocketMQMessageListener(topic = "device-data-raw", 
                              consumerGroup = "device-monitor-group",
                              consumeThreadNumber = 32)
    public void onMessage(DeviceRawMessage raw) {
        String deviceId = raw.getDeviceId();
        
        Entry entry = null;
        try {
            entry = SphU.entry(RESOURCE_NAME, EntryType.IN, 1, deviceId);
            // 通过限流,正常处理
            processDeviceData(raw);
            
        } catch (BlockException e) {
            // 被限流的消息不是直接丢弃,而是记录计数+采样存储
            handleRateLimited(raw);
            
        } finally {
            if (entry != null) entry.exit(1, deviceId);
        }
    }
​
    private void handleRateLimited(DeviceRawMessage raw) {
        // 1. 计数器递增(用于监控哪些设备被限流了)
        meterRegistry.counter("device_data_rate_limited_total",
            "device_id", raw.getDeviceId(),
            "tenant_id", raw.getTenantId()
        ).increment();
        
        // 2. 每100条被限流的消息采样保存1条(不完全丢弃,保留诊断数据)
        if (ThreadLocalRandom.current().nextInt(100) == 0) {
            sampledDataMapper.insert(toSampledData(raw));
        }
        
        // 3. 如果某设备连续被限流超过1分钟,推送告警给运维
        //    说明这台设备可能传感器固件有问题
        deviceRateLimitTracker.record(raw.getDeviceId());
    }
}

关键设计:单设备限流 10 QPS。正常泵类设备上报频率是 1-2 条/秒,10 QPS 留了 5-10 倍的余量。如果传感器固件 bug 导致每秒 200 条,超出的 190 条被 Sentinel 拦住,只有 10 条进入处理流程,不会占满线程。

被限流的消息不是直接丢弃而是做了采样保存(每 100 条存 1 条),保留诊断依据。同时设备持续被限流会触发告警通知客户检查传感器。

租户级限流叠加在设备级之上:

@PostConstruct
public void initTenantRules() {
    // 单租户总流量限流:默认200 QPS/租户
    ParamFlowRule tenantRule = new ParamFlowRule("tenant-data-process")
        .setParamIdx(0)    // tenantId
        .setGrade(RuleConstant.FLOW_GRADE_QPS)
        .setCount(200);
    
    // 大客户例外配置(通过Nacos动态推送)
    tenantRule.setParamFlowItemList(List.of(
        new ParamFlowItem("tenant_vip_chem", 500, String.class.getName()),
        new ParamFlowItem("tenant_vip_petro", 800, String.class.getName())
    ));
    
    ParamFlowRuleManager.loadRules(List.of(tenantRule));
}

两层限流叠加:即使一个租户有 100 台设备各上报 10 QPS(共 1000 QPS),租户级限流也会把总量压到 200 QPS,防止大客户挤占整个平台资源。

4. 第三层改造:线程池隔离(Bulkhead 模式)

消费端内部把处理逻辑拆成"快速通道"和"慢速通道",用两个独立线程池:

@Configuration
public class ThreadPoolConfig {
​
    /**
     * 快速通道线程池:基础数据处理(解析+阈值+写DB+Redis)
     * 单条处理 5-10ms,不涉及外部HTTP调用
     */
    @Bean("fastChannelPool")
    public ThreadPoolExecutor fastChannelPool() {
        return new ThreadPoolExecutor(
            16, 32,                             // core/max
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5000),     // 队列容量5000
            new NamedThreadFactory("fast-ch"),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
​
    /**
     * 慢速通道线程池:第三方振动诊断API调用
     * 单次 500ms-2s,有外部依赖
     */
    @Bean("slowChannelPool")
    public ThreadPoolExecutor slowChannelPool() {
        return new ThreadPoolExecutor(
            8, 16,                              // 故意给少一点,限制慢操作的并发
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),      // 队列容量200
            new NamedThreadFactory("slow-ch"),
            new ThreadPoolExecutor.AbortPolicy() // 队列满直接拒绝,走降级
        );
    }
}

处理流程:

@Service
public class DeviceDataProcessor {
​
    @Autowired @Qualifier("fastChannelPool")
    private ThreadPoolExecutor fastPool;
    
    @Autowired @Qualifier("slowChannelPool")
    private ThreadPoolExecutor slowPool;
    
    @Autowired
    private DiagnosisGatekeeper gatekeeper;
​
    public void processDeviceData(DeviceRawMessage raw) {
        // ===== 快速通道(同步执行,在MQ Consumer线程里直接跑)=====
        DeviceStatus status = protocolAdapter.parse(raw);
        
        // 写DB(异步,不阻塞主流程)
        fastPool.execute(() -> deviceDataMapper.insert(toDeviceData(raw, status)));
        
        // 更新Redis设备状态(同步,因为下游孪生体要实时读)
        updateRedisDeviceStatus(raw.getTenantId(), raw.getDeviceId(), status);
        
        // 基础阈值告警(本地计算,毫秒级)
        checkBasicThresholds(raw.getTenantId(), raw.getDeviceId(), status);
        
        // ===== 慢速通道(异步,独立线程池)=====
        if (gatekeeper.shouldDiagnose(raw.getDeviceId(), status.getVibration())) {
            try {
                slowPool.execute(() -> {
                    DiagnosisResult result = vibrationDiagnosisService
                        .diagnoseSync(raw.getDeviceId(), raw.getTenantId(),
                                      status.getVibration(), raw.getRawWaveform());
                    handleDiagnosisResult(raw.getDeviceId(), raw.getTenantId(), result);
                });
            } catch (RejectedExecutionException e) {
                // 慢速通道队列满了→降级到本地基础诊断
                meterRegistry.counter("slow_channel_rejected_total").increment();
                DiagnosisResult fallback = vibrationDiagnosisService
                    .fallbackLocalDiagnosis(status.getVibration());
                handleDiagnosisResult(raw.getDeviceId(), raw.getTenantId(), fallback);
            }
        }
    }
}

核心设计点: 快速通道(解析+Redis+基础告警)在 MQ Consumer 线程里直接完成,不受第三方 API 影响。慢速通道(振动诊断 API)用独立线程池,最多 16 个并发,队列满了直接走本地降级诊断。这样即使第三方 API 完全挂了或者极慢,也只是慢速通道的 16 个线程被占满,快速通道完全不受影响——设备状态更新、基础告警、数据写入全部正常运行。

5. 第三方 API 的熔断降级

在 slowPool 之外,对第三方 API 本身也加了 Sentinel 熔断:

@SentinelResource(value = "vibration-diagnosis-api",
    fallback = "fallbackLocalDiagnosis",
    blockHandler = "diagnosisBlockHandler")
public DiagnosisResult diagnoseSync(String deviceId, String tenantId,
                                     VibrationFeature feature,
                                     double[] rawWaveform) {
    // 正常调用第三方API...
}
​
// 熔断规则:10秒内慢调用(>2s)比例超过50%,触发熔断30秒
@PostConstruct
public void initDiagnosisCircuitBreaker() {
    DegradeRule rule = new DegradeRule("vibration-diagnosis-api")
        .setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())
        .setCount(0.5)          // 50%慢调用
        .setSlowRatioThreshold(2.0)  // 2秒算慢调用
        .setTimeWindow(30)      // 熔断30秒
        .setMinRequestAmount(10)
        .setStatIntervalMs(10000);
    DegradeRuleManager.loadRules(List.of(rule));
}
​
public DiagnosisResult diagnosisBlockHandler(String deviceId, String tenantId,
                                               VibrationFeature feature,
                                               double[] rawWaveform,
                                               BlockException ex) {
    meterRegistry.counter("diagnosis_api_circuit_open_total").increment();
    return fallbackLocalDiagnosis(feature);
}

降级分两级: 第一级是慢速线程池队列满了,submit 被 reject,走本地诊断。第二级是即使进入了慢速线程池,第三方 API 如果持续超时,Sentinel 熔断后所有请求直接走本地诊断,不再尝试 API 调用。两层降级保证不管第三方 API 什么情况,慢速通道的线程都不会被长时间阻塞。

6. 改造后的效果对比

用 JMeter 模拟那次故障场景——单台设备 200 QPS + 其余设备正常 2000 QPS:

指标

改造前

改造后

其他客户设备数据延迟

2 分钟+

<500ms(无影响)

正常设备告警延迟

>30s

<5s

整体 TPS 下降幅度

85%(从 2000 降到 300)

0%(稳定 2000+)

故障设备数据处理

占满线程池→全平台瘫痪

被限流到 10 QPS,其余采样保存

恢复时间

18 分钟(人工介入)

0(自动限流,无需干预)


三、MTTR 从 48 小时缩短至 6 小时的完整闭环

1. 闭环的五个环节

1. 告警触发(<30秒)
   设备数据异常 → 基础阈值判断/振动诊断 → 创建告警记录
   
2. 智能通知分发(<1分钟)
   根据告警级别+客户+值班表 → 企业微信/短信/电话
   告警自动关联:设备档案、历史维保记录、推荐处置方案
   
3. 工程师接单(目标<30分钟)
   App推送 → 工程师点击"接单" → 系统记录engineer_accepted_at
   超时未接单 → 自动升级到组长 → 再超时升级到经理
   
4. 远程/现场处理
   远程指导(60%的案例):工程师通过平台查看实时振动数据+诊断结果,
   电话指导客户现场人员操作
   现场出发(40%的案例):系统根据诊断结果推荐备件清单,
   工程师确认后自动创建出差申请
   
5. 闭环确认
   处理完成 → 工程师在App提交完工报告 → 系统检测设备数据恢复正常 → 
   自动关闭告警 → 更新MTTR统计

2. 通知分发的具体实现

@Service
public class AlertDispatchService {
​
    public void dispatch(DeviceAlert alert) {
        // 1. 查值班表,确定当前值班工程师
        OnCallEngineer engineer = onCallService
            .getCurrentEngineer(alert.getTenantId(), alert.getDeviceType());
        
        // 2. 组装通知内容(包含诊断摘要+历史上下文)
        AlertNotification notification = buildNotification(alert, engineer);
        
        // 3. 按告警级别分级发送
        switch (alert.getSeverity()) {
            case CRITICAL:
                // 同时发企业微信+短信,5分钟未接单打电话
                wechatSender.send(engineer, notification);
                smsSender.send(engineer.getPhone(), notification.getSmsText());
                scheduleEscalation(alert, engineer, Duration.ofMinutes(5));
                break;
            case HIGH:
                // 企业微信+短信
                wechatSender.send(engineer, notification);
                smsSender.send(engineer.getPhone(), notification.getSmsText());
                scheduleEscalation(alert, engineer, Duration.ofMinutes(15));
                break;
            case MEDIUM:
                // 仅企业微信
                wechatSender.send(engineer, notification);
                scheduleEscalation(alert, engineer, Duration.ofMinutes(30));
                break;
            default:
                // LOW级别只记录,不主动通知
                break;
        }
        
        // 4. 更新告警时间线
        alert.setNotificationSentAt(LocalDateTime.now());
        alert.setAssignedEngineerId(engineer.getId());
        alertMapper.updateById(alert);
    }
​
    private void scheduleEscalation(DeviceAlert alert, OnCallEngineer engineer,
                                     Duration timeout) {
        // 用RocketMQ延迟消息实现超时升级
        Message msg = new Message("alert-escalation-topic",
            JSON.toJSONString(new EscalationDTO(alert.getId(), 
                engineer.getId(), 1)));  // escalationLevel=1
        msg.setDelayTimeLevel(mapToDelayLevel(timeout));
        rocketMQTemplate.syncSend("alert-escalation-topic", msg);
    }
}

超时升级的消费者:

@RocketMQMessageListener(topic = "alert-escalation-topic")
public class AlertEscalationConsumer {
​
    public void onMessage(EscalationDTO dto) {
        DeviceAlert alert = alertMapper.selectById(dto.getAlertId());
        
        // 幂等:已接单或已关闭的告警不升级
        if (alert.getEngineerAcceptedAt() != null || 
            alert.getStatus() == AlertStatus.CLOSED) {
            return;
        }
        
        int level = dto.getEscalationLevel();
        if (level == 1) {
            // 升级到组长
            Engineer teamLead = engineerService.getTeamLead(alert.getAssignedEngineerId());
            smsSender.send(teamLead.getPhone(), "告警升级:" + alert.getSummary());
            phoneCaller.call(teamLead.getPhone(), alert.getVoiceMessage());
            // 再设一个超时
            scheduleEscalation(alert, level + 1, Duration.ofMinutes(15));
        } else if (level == 2) {
            // 升级到经理
            Engineer manager = engineerService.getManager(alert.getTenantId());
            phoneCaller.call(manager.getPhone(), "紧急告警未处理:" + alert.getSummary());
        }
    }
}

3. 自动闭环检测

工程师提交完工报告后,系统不是直接关闭告警,而是持续监测设备数据 30 分钟,确认异常确实消除了才自动关闭:

@XxlJob("alert-auto-close-check")
public void checkAlertAutoClose() {
    // 查找"工程师已提交完工但还没自动关闭"的告警
    List<DeviceAlert> pendingClose = alertMapper.selectByStatus(
        AlertStatus.REPAIR_SUBMITTED);
    
    for (DeviceAlert alert : pendingClose) {
        Duration sinceRepair = Duration.between(
            alert.getRepairCompletedAt(), LocalDateTime.now());
        
        if (sinceRepair.toMinutes() < 30) {
            continue; // 还没到30分钟观察期
        }
        
        // 检查设备最近30分钟的振动数据是否恢复正常
        VibrationSummary recent = vibrationService.getRecentSummary(
            alert.getDeviceId(), Duration.ofMinutes(30));
        
        if (recent.getMaxRms() < recent.getBaselineRms() * 1.2) {
            // 正常了,自动关闭
            alert.setStatus(AlertStatus.CLOSED);
            alert.setAlertClosedAt(LocalDateTime.now());
            alert.setResolutionVerified(true);
            alertMapper.updateById(alert);
            
            // 通知客户:设备已恢复正常运行
            notificationService.sendRecoveryNotice(alert);
        } else {
            // 30分钟后还异常,重新打开告警
            alert.setStatus(AlertStatus.REOPENED);
            alertMapper.updateById(alert);
            dispatchService.dispatch(alert); // 重新派单
            
            meterRegistry.counter("alert_reopen_total").increment();
        }
    }
}

4. 人力成本降低 60% 的统计方式

这个数据是凯泵售后部门统计的,维度是售后工程师人均每月处理的设备故障数

上线前:售后团队 12 个工程师,每月处理约 180 次设备故障(包含现场出差和电话支持)。人均 15 次/月,每次平均耗时 48 小时(含出差往返)。大量时间花在信息收集(不知道什么问题,到现场才能判断)和无效出差(到了发现是小问题或者没带对备件)。

上线后:同样 180 次故障/月,但 60% 的问题远程解决(平均 2 小时),40% 需要现场但提前知道故障类型和备件需求(平均 11 小时)。人均每月可处理的工单从 15 次提升到 35-40 次。原来 12 个工程师的工作量,现在 5 个人就能覆盖。凯泵实际是把省下来的 7 个人调去了新业务拓展,没有裁员,但等效的售后人力成本下降了约 58%(取整为 60%)。

这个数据通过两个渠道验证:一是平台的工单系统统计(每个工程师的接单数、处理时长、远程/现场比例),产品经理每月出报表;二是凯泵售后部门经理的季度汇报数据(和公司管理层汇报时引用了这个数字)。


四、120+ 客户高并发场景的踩坑

坑一:RocketMQ 分区消费不均导致大客户挤占

改造后设备数据走 RocketMQ,按 tenant_id hash 分 16 个 Queue。问题是 120 家客户的设备数量差异极大——最大的石化客户有 800 台设备,最小的只有 5 台。hash 分区后那个大客户的 Queue 数据量是其他 Queue 的 10 倍以上,对应的 Consumer 线程消费跟不上,该 Queue 积压严重,而其他 Queue 的 Consumer 线程很空闲。

排查: Grafana 上 rocketmq_consumer_lag{queue} 指标,16 个 Queue 中有 2 个 lag 持续增长(恰好是大客户 hash 到的 Queue),其余 14 个 lag 接近 0。

解决方案: 不再用简单的 tenant_id hash,改成了加权分区策略——大客户的数据分散到多个 Queue,小客户合并到少数 Queue。具体实现是维护一个租户→Queue 的映射配置表(存 Nacos),Consumer Group 的 16 个消费者按映射表分配 Queue:

@Component
public class TenantAwareQueueSelector implements MessageQueueSelector {
​
    // 从Nacos动态加载租户→Queue映射
    @NacosValue(value = "${tenant.queue.mapping}", autoRefreshed = true)
    private String mappingConfig;
​
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        String tenantId = msg.getProperty("tenantId");
        Map<String, List<Integer>> mapping = parseMapping(mappingConfig);
        
        List<Integer> assignedQueues = mapping.get(tenantId);
        if (assignedQueues != null && !assignedQueues.isEmpty()) {
            // 大客户分配了多个Queue,在其中轮询
            int idx = Math.abs(msg.getKeys().hashCode()) % assignedQueues.size();
            return mqs.get(assignedQueues.get(idx));
        }
        
        // 未配置的小客户走默认hash
        return mqs.get(Math.abs(tenantId.hashCode()) % mqs.size());
    }
}

配置示例:大客户 tenant_vip_petro 分配了 Queue 0/1/2/3 四个,其余小客户共享 Queue 4-15。调整后 16 个 Queue 的消费延迟趋于均匀。

坑二:单租户诊断 API 调用风暴导致第三方限流

某客户工厂做了一次设备全面检修后恢复运行,多台设备振动基线发生了变化(检修后重新安装,振动特征和以前不同了),系统判定为异常,短时间内送了 50+ 条诊断请求到第三方 API。第三方 API 的调用配额是 100 次/分钟(全平台共享),这一个客户就用了一半,导致其他客户的诊断请求被第三方返回 429。

解决方案: 在调第三方 API 之前加了一层租户级诊断限流,用 Guava RateLimiter 按 tenant_id 限制:

@Component
public class TenantDiagnosisRateLimiter {
​
    private final Map<String, RateLimiter> tenantLimiters = new ConcurrentHashMap<>();
    
    // 每个租户默认每分钟最多10次诊断调用
    private static final double DEFAULT_RATE = 10.0 / 60;  // 约0.167 QPS
​
    public boolean tryAcquire(String tenantId) {
        RateLimiter limiter = tenantLimiters.computeIfAbsent(
            tenantId, k -> RateLimiter.create(DEFAULT_RATE));
        return limiter.tryAcquire(0, TimeUnit.SECONDS);  // 非阻塞
    }
}

同时在第三方 API 返回 429 时,触发全局诊断熔断 60 秒,所有租户的诊断请求都走本地降级。

坑三:告警闭环率低——工程师"接单不处理"

上线后发现一个非技术问题:有些工程师在 App 上点了"接单"(记录了 engineer_accepted_at),但实际没有处理,告警长时间停留在"处理中"状态。统计发现"接单到完工"的平均时间从预期的 4-6 小时变成了 12 小时,工单闭环率只有 65%——35% 的告警"接了但没关"。

排查: 从工单系统拉数据分析,发现几个工程师的"接单到完工"时间中位数超过 24 小时,远高于平均水平。和售后经理沟通后确认是管理问题——有些工程师习惯性点接单(避免升级告警打扰组长),但实际排在后面处理。

解决方案是技术+管理结合: 技术上加了处理超时升级——接单后 4 小时没有任何进展更新(中间状态记录),自动推送提醒给工程师;8 小时没进展自动通知组长;同时在工程师绩效看板上展示每个人的平均处理时长和闭环率排名。管理上售后经理把闭环率纳入了 KPI 考核。

@XxlJob("alert-processing-timeout-check")
public void checkProcessingTimeout() {
    List<DeviceAlert> processing = alertMapper.selectByStatus(AlertStatus.ACCEPTED);
    
    for (DeviceAlert alert : processing) {
        Duration sincAccepted = Duration.between(
            alert.getEngineerAcceptedAt(), LocalDateTime.now());
        
        if (sincAccepted.toHours() >= 8) {
            // 升级到组长
            notifyTeamLead(alert, "告警接单超8小时未处理");
            meterRegistry.counter("alert_processing_timeout_total",
                "level", "team_lead").increment();
        } else if (sincAccepted.toHours() >= 4) {
            // 提醒工程师
            notifyEngineer(alert, "您有一条告警接单已超4小时,请尽快处理");
            meterRegistry.counter("alert_processing_timeout_total",
                "level", "reminder").increment();
        }
    }
}

两个月后闭环率从 65% 提升到了 92%。


五、监控指标汇总与复用

核心 Prometheus 指标

指标

用途

告警阈值

device_data_rate_limited_total{device_id}

被设备级限流的数据量

单设备持续限流 > 5 分钟

slow_channel_rejected_total

慢速通道拒绝数

> 10/分钟

diagnosis_api_circuit_open_total

诊断 API 熔断触发数

> 0

alert_e2e_latency_seconds p95

告警端到端延迟

> 30s

alert_escalation_total{level}

告警升级次数

level=2 > 0

alert_reopen_total

告警重新打开次数

日增 > 5

alert_processing_timeout_total

处理超时次数

日增 > 10

rocketmq_consumer_lag{queue}

MQ 消费积压

> 5000

fast_channel_pool_active

快速线程池活跃数

> 28(接近 max 32)

slow_channel_pool_active

慢速线程池活跃数

> 14(接近 max 16)

复用到其他项目

这套"设备级限流 + 线程池隔离 + 分级降级"的模式后来抽象成了一个内部组件 kicloud-resilience-starter,KiPlant 在做设备数据处理模块升级时直接引入了这个 starter。核心类(DeviceRateLimiter、BulkheadThreadPoolConfig、GracefulDegradation)可以通过 Nacos 配置动态调整参数,不同项目只需要改配置不用改代码。

KiCloud 凯泵智联——云平台监控体系、振动分析与销售瞭望台 2025-01-12
KiPlant 智慧工厂数字孪生平台——微服务架构搭建与多租户慢SQL治理 2025-01-26

评论区