一、单设备影响线程池的雪崩场景
1. 故障还原
上线后第三周遇到的一次真实事故。某化工客户的一台离心泵的振动传感器固件出了 bug,正常情况下每秒上报 2 条数据,故障后变成了每秒上报 200 条——相当于单台设备的数据量暴增了 100 倍。
这台设备的数据进入 device-monitor 服务后,每条都要走完整的处理链路:MQTT 消息解析 → 协议适配 → 振动特征提取 → 阈值判断 → 写 DB → 更新 Redis 状态。其中振动特征提取涉及 FFT 计算,CPU 密集,单条处理约 5-8ms。200 条/秒意味着这一台设备就占用了 1-1.6 秒的 CPU 时间/秒,几乎吃满了一个线程。
更致命的是当时的架构问题:device-monitor 服务用的是 Spring 默认的 Tomcat 线程池处理 MQTT 回调(通过 EMQX 的 WebHook 转发为 HTTP 请求),线程池大小 200。这台故障设备的 200 条/秒请求迅速占满了大量线程,而且每个请求里还有一步是同步调用第三方振动诊断 API(前面讲的,虽然有预筛过滤,但 RMS 值全都超标所以全部送诊断了),第三方 API 响应时间 500ms-2s,线程被阻塞在 HTTP 调用上。
结果: 200 个 Tomcat 线程中约 150 个被这一台设备的请求占据(大部分阻塞在第三方 API 调用上),其余 119 家客户的正常设备数据处理只剩 50 个线程可用,大面积超时。MQTT 消息积压从 0 飙到 5 万条,其他客户的设备状态更新延迟超过 2 分钟,告警推送全面滞后。Grafana 上 device_data_processing_rate 从正常的 2000 TPS 掉到了 300 TPS,http_server_requests_seconds_max 飙到 30 秒。
从用户发现问题到我们手动封禁了这台设备的数据上报,一共花了 18 分钟。这 18 分钟里整个平台的监测能力基本瘫痪——如果恰好有另一家客户的设备真出了故障,告警会严重延迟,直接影响 MTTR。
2. 根因分析
复盘后总结了三个架构缺陷:
第一是没有设备级别的流量控制,单台设备可以无限制占用平台资源。第二是同步调用第三方 API 阻塞了主处理线程,一个慢依赖拖垮了全局。第三是所有租户所有设备共享一个线程池,没有任何隔离,一个租户的问题直接影响全平台。
二、限流 + 降级 + 消息队列的完整解决方案
1. 架构改造总览
改造后的数据处理链路从同步改成了异步分层:
设备 → MQTT Broker (EMQX)
↓
RocketMQ(设备数据Topic,按tenant_id分区) ← 第一层解耦
↓
device-monitor Consumer(多线程消费)
├── 快速通道:基础处理(解析+阈值判断+写DB+更新Redis)← 线程池A
└── 慢速通道:振动诊断API调用 ← 线程池B(隔离)核心思路:MQTT 数据不再直接打到 HTTP 接口,而是先落 RocketMQ 做缓冲;消费端内部用两个独立线程池隔离快慢操作;在多个层面做设备级和租户级限流。
2. 第一层改造:MQTT → RocketMQ 解耦
原来 EMQX 通过 WebHook 同步推到 device-monitor 的 HTTP 接口,改成 EMQX 的 RocketMQ Bridge 直接把 MQTT 消息投递到 RocketMQ Topic。
# EMQX Bridge配置
bridges:
rocketmq:
connector: kicloud-rocketmq
parameters:
topic: device-data-raw
# 按tenantId做消息分区,同一租户的消息落同一个Queue
# 保证同一设备的数据顺序性
message_key: "${clientid}"RocketMQ Topic device-data-raw 配了 16 个 Queue,按 tenant_id hash 分区。好处有三个:EMQX 不再阻塞(发完 MQ 立刻返回);MQ 天然做了削峰填谷,突发流量先堆在 MQ 里慢慢消费;按租户分区保证单个租户的问题不会影响其他分区的消费。
3. 第二层改造:设备级限流(Sentinel 热点参数限流)
在 RocketMQ Consumer 拿到消息后、进入业务处理之前,加了一层 Sentinel 热点参数限流,限流维度是 device_id:
@Component
public class DeviceDataConsumer {
private static final String RESOURCE_NAME = "device-data-process";
@PostConstruct
public void initSentinelRules() {
// 单设备限流:每台设备最多处理10条/秒
ParamFlowRule deviceRule = new ParamFlowRule(RESOURCE_NAME)
.setParamIdx(0) // 第0个参数是deviceId
.setGrade(RuleConstant.FLOW_GRADE_QPS)
.setCount(10) // 默认10 QPS/设备
.setDurationInSec(1);
// VIP设备可以单独配更高的限额
// 通过Nacos动态推送,不用重启
ParamFlowRuleManager.loadRules(List.of(deviceRule));
}
@RocketMQMessageListener(topic = "device-data-raw",
consumerGroup = "device-monitor-group",
consumeThreadNumber = 32)
public void onMessage(DeviceRawMessage raw) {
String deviceId = raw.getDeviceId();
Entry entry = null;
try {
entry = SphU.entry(RESOURCE_NAME, EntryType.IN, 1, deviceId);
// 通过限流,正常处理
processDeviceData(raw);
} catch (BlockException e) {
// 被限流的消息不是直接丢弃,而是记录计数+采样存储
handleRateLimited(raw);
} finally {
if (entry != null) entry.exit(1, deviceId);
}
}
private void handleRateLimited(DeviceRawMessage raw) {
// 1. 计数器递增(用于监控哪些设备被限流了)
meterRegistry.counter("device_data_rate_limited_total",
"device_id", raw.getDeviceId(),
"tenant_id", raw.getTenantId()
).increment();
// 2. 每100条被限流的消息采样保存1条(不完全丢弃,保留诊断数据)
if (ThreadLocalRandom.current().nextInt(100) == 0) {
sampledDataMapper.insert(toSampledData(raw));
}
// 3. 如果某设备连续被限流超过1分钟,推送告警给运维
// 说明这台设备可能传感器固件有问题
deviceRateLimitTracker.record(raw.getDeviceId());
}
}关键设计:单设备限流 10 QPS。正常泵类设备上报频率是 1-2 条/秒,10 QPS 留了 5-10 倍的余量。如果传感器固件 bug 导致每秒 200 条,超出的 190 条被 Sentinel 拦住,只有 10 条进入处理流程,不会占满线程。
被限流的消息不是直接丢弃而是做了采样保存(每 100 条存 1 条),保留诊断依据。同时设备持续被限流会触发告警通知客户检查传感器。
租户级限流叠加在设备级之上:
@PostConstruct
public void initTenantRules() {
// 单租户总流量限流:默认200 QPS/租户
ParamFlowRule tenantRule = new ParamFlowRule("tenant-data-process")
.setParamIdx(0) // tenantId
.setGrade(RuleConstant.FLOW_GRADE_QPS)
.setCount(200);
// 大客户例外配置(通过Nacos动态推送)
tenantRule.setParamFlowItemList(List.of(
new ParamFlowItem("tenant_vip_chem", 500, String.class.getName()),
new ParamFlowItem("tenant_vip_petro", 800, String.class.getName())
));
ParamFlowRuleManager.loadRules(List.of(tenantRule));
}两层限流叠加:即使一个租户有 100 台设备各上报 10 QPS(共 1000 QPS),租户级限流也会把总量压到 200 QPS,防止大客户挤占整个平台资源。
4. 第三层改造:线程池隔离(Bulkhead 模式)
消费端内部把处理逻辑拆成"快速通道"和"慢速通道",用两个独立线程池:
@Configuration
public class ThreadPoolConfig {
/**
* 快速通道线程池:基础数据处理(解析+阈值+写DB+Redis)
* 单条处理 5-10ms,不涉及外部HTTP调用
*/
@Bean("fastChannelPool")
public ThreadPoolExecutor fastChannelPool() {
return new ThreadPoolExecutor(
16, 32, // core/max
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000), // 队列容量5000
new NamedThreadFactory("fast-ch"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* 慢速通道线程池:第三方振动诊断API调用
* 单次 500ms-2s,有外部依赖
*/
@Bean("slowChannelPool")
public ThreadPoolExecutor slowChannelPool() {
return new ThreadPoolExecutor(
8, 16, // 故意给少一点,限制慢操作的并发
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200), // 队列容量200
new NamedThreadFactory("slow-ch"),
new ThreadPoolExecutor.AbortPolicy() // 队列满直接拒绝,走降级
);
}
}处理流程:
@Service
public class DeviceDataProcessor {
@Autowired @Qualifier("fastChannelPool")
private ThreadPoolExecutor fastPool;
@Autowired @Qualifier("slowChannelPool")
private ThreadPoolExecutor slowPool;
@Autowired
private DiagnosisGatekeeper gatekeeper;
public void processDeviceData(DeviceRawMessage raw) {
// ===== 快速通道(同步执行,在MQ Consumer线程里直接跑)=====
DeviceStatus status = protocolAdapter.parse(raw);
// 写DB(异步,不阻塞主流程)
fastPool.execute(() -> deviceDataMapper.insert(toDeviceData(raw, status)));
// 更新Redis设备状态(同步,因为下游孪生体要实时读)
updateRedisDeviceStatus(raw.getTenantId(), raw.getDeviceId(), status);
// 基础阈值告警(本地计算,毫秒级)
checkBasicThresholds(raw.getTenantId(), raw.getDeviceId(), status);
// ===== 慢速通道(异步,独立线程池)=====
if (gatekeeper.shouldDiagnose(raw.getDeviceId(), status.getVibration())) {
try {
slowPool.execute(() -> {
DiagnosisResult result = vibrationDiagnosisService
.diagnoseSync(raw.getDeviceId(), raw.getTenantId(),
status.getVibration(), raw.getRawWaveform());
handleDiagnosisResult(raw.getDeviceId(), raw.getTenantId(), result);
});
} catch (RejectedExecutionException e) {
// 慢速通道队列满了→降级到本地基础诊断
meterRegistry.counter("slow_channel_rejected_total").increment();
DiagnosisResult fallback = vibrationDiagnosisService
.fallbackLocalDiagnosis(status.getVibration());
handleDiagnosisResult(raw.getDeviceId(), raw.getTenantId(), fallback);
}
}
}
}核心设计点: 快速通道(解析+Redis+基础告警)在 MQ Consumer 线程里直接完成,不受第三方 API 影响。慢速通道(振动诊断 API)用独立线程池,最多 16 个并发,队列满了直接走本地降级诊断。这样即使第三方 API 完全挂了或者极慢,也只是慢速通道的 16 个线程被占满,快速通道完全不受影响——设备状态更新、基础告警、数据写入全部正常运行。
5. 第三方 API 的熔断降级
在 slowPool 之外,对第三方 API 本身也加了 Sentinel 熔断:
@SentinelResource(value = "vibration-diagnosis-api",
fallback = "fallbackLocalDiagnosis",
blockHandler = "diagnosisBlockHandler")
public DiagnosisResult diagnoseSync(String deviceId, String tenantId,
VibrationFeature feature,
double[] rawWaveform) {
// 正常调用第三方API...
}
// 熔断规则:10秒内慢调用(>2s)比例超过50%,触发熔断30秒
@PostConstruct
public void initDiagnosisCircuitBreaker() {
DegradeRule rule = new DegradeRule("vibration-diagnosis-api")
.setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())
.setCount(0.5) // 50%慢调用
.setSlowRatioThreshold(2.0) // 2秒算慢调用
.setTimeWindow(30) // 熔断30秒
.setMinRequestAmount(10)
.setStatIntervalMs(10000);
DegradeRuleManager.loadRules(List.of(rule));
}
public DiagnosisResult diagnosisBlockHandler(String deviceId, String tenantId,
VibrationFeature feature,
double[] rawWaveform,
BlockException ex) {
meterRegistry.counter("diagnosis_api_circuit_open_total").increment();
return fallbackLocalDiagnosis(feature);
}降级分两级: 第一级是慢速线程池队列满了,submit 被 reject,走本地诊断。第二级是即使进入了慢速线程池,第三方 API 如果持续超时,Sentinel 熔断后所有请求直接走本地诊断,不再尝试 API 调用。两层降级保证不管第三方 API 什么情况,慢速通道的线程都不会被长时间阻塞。
6. 改造后的效果对比
用 JMeter 模拟那次故障场景——单台设备 200 QPS + 其余设备正常 2000 QPS:
三、MTTR 从 48 小时缩短至 6 小时的完整闭环
1. 闭环的五个环节
1. 告警触发(<30秒)
设备数据异常 → 基础阈值判断/振动诊断 → 创建告警记录
2. 智能通知分发(<1分钟)
根据告警级别+客户+值班表 → 企业微信/短信/电话
告警自动关联:设备档案、历史维保记录、推荐处置方案
3. 工程师接单(目标<30分钟)
App推送 → 工程师点击"接单" → 系统记录engineer_accepted_at
超时未接单 → 自动升级到组长 → 再超时升级到经理
4. 远程/现场处理
远程指导(60%的案例):工程师通过平台查看实时振动数据+诊断结果,
电话指导客户现场人员操作
现场出发(40%的案例):系统根据诊断结果推荐备件清单,
工程师确认后自动创建出差申请
5. 闭环确认
处理完成 → 工程师在App提交完工报告 → 系统检测设备数据恢复正常 →
自动关闭告警 → 更新MTTR统计2. 通知分发的具体实现
@Service
public class AlertDispatchService {
public void dispatch(DeviceAlert alert) {
// 1. 查值班表,确定当前值班工程师
OnCallEngineer engineer = onCallService
.getCurrentEngineer(alert.getTenantId(), alert.getDeviceType());
// 2. 组装通知内容(包含诊断摘要+历史上下文)
AlertNotification notification = buildNotification(alert, engineer);
// 3. 按告警级别分级发送
switch (alert.getSeverity()) {
case CRITICAL:
// 同时发企业微信+短信,5分钟未接单打电话
wechatSender.send(engineer, notification);
smsSender.send(engineer.getPhone(), notification.getSmsText());
scheduleEscalation(alert, engineer, Duration.ofMinutes(5));
break;
case HIGH:
// 企业微信+短信
wechatSender.send(engineer, notification);
smsSender.send(engineer.getPhone(), notification.getSmsText());
scheduleEscalation(alert, engineer, Duration.ofMinutes(15));
break;
case MEDIUM:
// 仅企业微信
wechatSender.send(engineer, notification);
scheduleEscalation(alert, engineer, Duration.ofMinutes(30));
break;
default:
// LOW级别只记录,不主动通知
break;
}
// 4. 更新告警时间线
alert.setNotificationSentAt(LocalDateTime.now());
alert.setAssignedEngineerId(engineer.getId());
alertMapper.updateById(alert);
}
private void scheduleEscalation(DeviceAlert alert, OnCallEngineer engineer,
Duration timeout) {
// 用RocketMQ延迟消息实现超时升级
Message msg = new Message("alert-escalation-topic",
JSON.toJSONString(new EscalationDTO(alert.getId(),
engineer.getId(), 1))); // escalationLevel=1
msg.setDelayTimeLevel(mapToDelayLevel(timeout));
rocketMQTemplate.syncSend("alert-escalation-topic", msg);
}
}超时升级的消费者:
@RocketMQMessageListener(topic = "alert-escalation-topic")
public class AlertEscalationConsumer {
public void onMessage(EscalationDTO dto) {
DeviceAlert alert = alertMapper.selectById(dto.getAlertId());
// 幂等:已接单或已关闭的告警不升级
if (alert.getEngineerAcceptedAt() != null ||
alert.getStatus() == AlertStatus.CLOSED) {
return;
}
int level = dto.getEscalationLevel();
if (level == 1) {
// 升级到组长
Engineer teamLead = engineerService.getTeamLead(alert.getAssignedEngineerId());
smsSender.send(teamLead.getPhone(), "告警升级:" + alert.getSummary());
phoneCaller.call(teamLead.getPhone(), alert.getVoiceMessage());
// 再设一个超时
scheduleEscalation(alert, level + 1, Duration.ofMinutes(15));
} else if (level == 2) {
// 升级到经理
Engineer manager = engineerService.getManager(alert.getTenantId());
phoneCaller.call(manager.getPhone(), "紧急告警未处理:" + alert.getSummary());
}
}
}3. 自动闭环检测
工程师提交完工报告后,系统不是直接关闭告警,而是持续监测设备数据 30 分钟,确认异常确实消除了才自动关闭:
@XxlJob("alert-auto-close-check")
public void checkAlertAutoClose() {
// 查找"工程师已提交完工但还没自动关闭"的告警
List<DeviceAlert> pendingClose = alertMapper.selectByStatus(
AlertStatus.REPAIR_SUBMITTED);
for (DeviceAlert alert : pendingClose) {
Duration sinceRepair = Duration.between(
alert.getRepairCompletedAt(), LocalDateTime.now());
if (sinceRepair.toMinutes() < 30) {
continue; // 还没到30分钟观察期
}
// 检查设备最近30分钟的振动数据是否恢复正常
VibrationSummary recent = vibrationService.getRecentSummary(
alert.getDeviceId(), Duration.ofMinutes(30));
if (recent.getMaxRms() < recent.getBaselineRms() * 1.2) {
// 正常了,自动关闭
alert.setStatus(AlertStatus.CLOSED);
alert.setAlertClosedAt(LocalDateTime.now());
alert.setResolutionVerified(true);
alertMapper.updateById(alert);
// 通知客户:设备已恢复正常运行
notificationService.sendRecoveryNotice(alert);
} else {
// 30分钟后还异常,重新打开告警
alert.setStatus(AlertStatus.REOPENED);
alertMapper.updateById(alert);
dispatchService.dispatch(alert); // 重新派单
meterRegistry.counter("alert_reopen_total").increment();
}
}
}4. 人力成本降低 60% 的统计方式
这个数据是凯泵售后部门统计的,维度是售后工程师人均每月处理的设备故障数。
上线前:售后团队 12 个工程师,每月处理约 180 次设备故障(包含现场出差和电话支持)。人均 15 次/月,每次平均耗时 48 小时(含出差往返)。大量时间花在信息收集(不知道什么问题,到现场才能判断)和无效出差(到了发现是小问题或者没带对备件)。
上线后:同样 180 次故障/月,但 60% 的问题远程解决(平均 2 小时),40% 需要现场但提前知道故障类型和备件需求(平均 11 小时)。人均每月可处理的工单从 15 次提升到 35-40 次。原来 12 个工程师的工作量,现在 5 个人就能覆盖。凯泵实际是把省下来的 7 个人调去了新业务拓展,没有裁员,但等效的售后人力成本下降了约 58%(取整为 60%)。
这个数据通过两个渠道验证:一是平台的工单系统统计(每个工程师的接单数、处理时长、远程/现场比例),产品经理每月出报表;二是凯泵售后部门经理的季度汇报数据(和公司管理层汇报时引用了这个数字)。
四、120+ 客户高并发场景的踩坑
坑一:RocketMQ 分区消费不均导致大客户挤占
改造后设备数据走 RocketMQ,按 tenant_id hash 分 16 个 Queue。问题是 120 家客户的设备数量差异极大——最大的石化客户有 800 台设备,最小的只有 5 台。hash 分区后那个大客户的 Queue 数据量是其他 Queue 的 10 倍以上,对应的 Consumer 线程消费跟不上,该 Queue 积压严重,而其他 Queue 的 Consumer 线程很空闲。
排查: Grafana 上 rocketmq_consumer_lag{queue} 指标,16 个 Queue 中有 2 个 lag 持续增长(恰好是大客户 hash 到的 Queue),其余 14 个 lag 接近 0。
解决方案: 不再用简单的 tenant_id hash,改成了加权分区策略——大客户的数据分散到多个 Queue,小客户合并到少数 Queue。具体实现是维护一个租户→Queue 的映射配置表(存 Nacos),Consumer Group 的 16 个消费者按映射表分配 Queue:
@Component
public class TenantAwareQueueSelector implements MessageQueueSelector {
// 从Nacos动态加载租户→Queue映射
@NacosValue(value = "${tenant.queue.mapping}", autoRefreshed = true)
private String mappingConfig;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String tenantId = msg.getProperty("tenantId");
Map<String, List<Integer>> mapping = parseMapping(mappingConfig);
List<Integer> assignedQueues = mapping.get(tenantId);
if (assignedQueues != null && !assignedQueues.isEmpty()) {
// 大客户分配了多个Queue,在其中轮询
int idx = Math.abs(msg.getKeys().hashCode()) % assignedQueues.size();
return mqs.get(assignedQueues.get(idx));
}
// 未配置的小客户走默认hash
return mqs.get(Math.abs(tenantId.hashCode()) % mqs.size());
}
}配置示例:大客户 tenant_vip_petro 分配了 Queue 0/1/2/3 四个,其余小客户共享 Queue 4-15。调整后 16 个 Queue 的消费延迟趋于均匀。
坑二:单租户诊断 API 调用风暴导致第三方限流
某客户工厂做了一次设备全面检修后恢复运行,多台设备振动基线发生了变化(检修后重新安装,振动特征和以前不同了),系统判定为异常,短时间内送了 50+ 条诊断请求到第三方 API。第三方 API 的调用配额是 100 次/分钟(全平台共享),这一个客户就用了一半,导致其他客户的诊断请求被第三方返回 429。
解决方案: 在调第三方 API 之前加了一层租户级诊断限流,用 Guava RateLimiter 按 tenant_id 限制:
@Component
public class TenantDiagnosisRateLimiter {
private final Map<String, RateLimiter> tenantLimiters = new ConcurrentHashMap<>();
// 每个租户默认每分钟最多10次诊断调用
private static final double DEFAULT_RATE = 10.0 / 60; // 约0.167 QPS
public boolean tryAcquire(String tenantId) {
RateLimiter limiter = tenantLimiters.computeIfAbsent(
tenantId, k -> RateLimiter.create(DEFAULT_RATE));
return limiter.tryAcquire(0, TimeUnit.SECONDS); // 非阻塞
}
}同时在第三方 API 返回 429 时,触发全局诊断熔断 60 秒,所有租户的诊断请求都走本地降级。
坑三:告警闭环率低——工程师"接单不处理"
上线后发现一个非技术问题:有些工程师在 App 上点了"接单"(记录了 engineer_accepted_at),但实际没有处理,告警长时间停留在"处理中"状态。统计发现"接单到完工"的平均时间从预期的 4-6 小时变成了 12 小时,工单闭环率只有 65%——35% 的告警"接了但没关"。
排查: 从工单系统拉数据分析,发现几个工程师的"接单到完工"时间中位数超过 24 小时,远高于平均水平。和售后经理沟通后确认是管理问题——有些工程师习惯性点接单(避免升级告警打扰组长),但实际排在后面处理。
解决方案是技术+管理结合: 技术上加了处理超时升级——接单后 4 小时没有任何进展更新(中间状态记录),自动推送提醒给工程师;8 小时没进展自动通知组长;同时在工程师绩效看板上展示每个人的平均处理时长和闭环率排名。管理上售后经理把闭环率纳入了 KPI 考核。
@XxlJob("alert-processing-timeout-check")
public void checkProcessingTimeout() {
List<DeviceAlert> processing = alertMapper.selectByStatus(AlertStatus.ACCEPTED);
for (DeviceAlert alert : processing) {
Duration sincAccepted = Duration.between(
alert.getEngineerAcceptedAt(), LocalDateTime.now());
if (sincAccepted.toHours() >= 8) {
// 升级到组长
notifyTeamLead(alert, "告警接单超8小时未处理");
meterRegistry.counter("alert_processing_timeout_total",
"level", "team_lead").increment();
} else if (sincAccepted.toHours() >= 4) {
// 提醒工程师
notifyEngineer(alert, "您有一条告警接单已超4小时,请尽快处理");
meterRegistry.counter("alert_processing_timeout_total",
"level", "reminder").increment();
}
}
}两个月后闭环率从 65% 提升到了 92%。
五、监控指标汇总与复用
核心 Prometheus 指标
复用到其他项目
这套"设备级限流 + 线程池隔离 + 分级降级"的模式后来抽象成了一个内部组件 kicloud-resilience-starter,KiPlant 在做设备数据处理模块升级时直接引入了这个 starter。核心类(DeviceRateLimiter、BulkheadThreadPoolConfig、GracefulDegradation)可以通过 Nacos 配置动态调整参数,不同项目只需要改配置不用改代码。