一、项目背景与我的角色
1. 业务背景
KiCloud 是凯泵集团的工业设备智联云平台,核心客户是使用凯泵泵类设备(离心泵、往复泵、螺杆泵等)的企业。这些泵广泛用于石油化工、水处理、矿业等场景,设备分散在全国各地的客户工厂里。凯泵需要一个云平台来远程监测这些设备的运行状态,提供预防性维护服务,同时支撑销售团队管理 120+ 企业客户的合同和商机。
我加入时平台处于从 0 到 1 的阶段,之前凯泵的设备监测完全靠客户自己巡检+电话报修,从设备出问题到凯泵售后工程师到达现场平均需要 48 小时——信息传递慢(客户发现异常→打电话→售后接单→安排工程师→出差到场),而且很多时候客户发现异常已经是设备停机了,错过了最佳维修窗口。
我负责整个云平台的后端架构搭建,包括 K8s 部署体系、监控告警全链路、振动分析模块集成、以及销售瞭望台的 Elasticsearch 查询架构。
2. 与 KiPlant 的关系
KiCloud 复用了 KiPlant 的部分微服务底座(租户上下文透传、Nacos 配置体系、RocketMQ 消息规范),但业务场景完全不同——KiPlant 是面向工厂内部的生产管理,KiCloud 是面向设备制造商的远程运维+销售管理。两者的技术架构也有差异:KiPlant 部署在客户私有环境或阿里云 ECS 上,KiCloud 从一开始就设计为 K8s 原生的 SaaS 平台。
二、Docker + Kubernetes + 监控体系的搭建
1. 为什么选 K8s 而不是传统 ECS 部署
KiPlant 早期用的是阿里云 ECS + Docker Compose,每个租户环境手动维护,扩缩容靠运维手工操作,痛点明显——新客户上线要手动配环境,高峰期扛不住要手动加机器,半夜设备数据突增没人处理。KiCloud 面向 120+ 客户,设备数量预计会持续增长,必须有自动化的弹性能力。
选阿里云 ACK(Managed Kubernetes)而不是自建 K8s 集群,原因是团队没有专职运维,ACK 的 Master 节点由阿里云托管,我们只需要管 Worker 节点和应用层。
2. K8s 集群架构
# 集群规模
Master: 3节点(阿里云托管,免运维)
Worker:
- 业务节点池: 6台 ECS(4C8G),运行微服务
- 监控节点池: 2台 ECS(4C8G),运行Prometheus/Grafana/AlertManager
- ES节点池: 3台 ECS(8C32G + 500G SSD),运行Elasticsearch
# Namespace划分
- kicloud-prod # 生产环境业务服务
- kicloud-staging # 预发布环境
- monitoring # 监控组件
- elastic-system # Elasticsearch集群
- ingress-nginx # 入口控制器3. 微服务 K8s 部署配置
以核心的设备监测服务为例:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kicloud-device-monitor
namespace: kicloud-prod
spec:
replicas: 3
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0 # 滚动更新零停服
selector:
matchLabels:
app: device-monitor
template:
metadata:
labels:
app: device-monitor
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/actuator/prometheus"
spec:
containers:
- name: device-monitor
image: registry.cn-shanghai.aliyuncs.com/kicloud/device-monitor:${TAG}
ports:
- containerPort: 8080
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "2Gi"
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /actuator/health/liveness
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
env:
- name: SPRING_PROFILES_ACTIVE
value: "prod"
- name: NACOS_SERVER_ADDR
valueFrom:
configMapKeyRef:
name: kicloud-config
key: nacos.addr
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: device-monitor-hpa
namespace: kicloud-prod
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: kicloud-device-monitor
minReplicas: 3
maxReplicas: 8
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: device_data_processing_rate
target:
type: AverageValue
averageValue: "1000" # 每个Pod处理1000条/秒时触发扩容HPA 配了两个扩容维度:CPU 利用率超 70% 或自定义指标(单 Pod 设备数据处理速率超 1000 条/秒)。自定义指标通过 Prometheus Adapter 暴露给 K8s metrics API。
4. Prometheus + AlertManager + Grafana 监控体系
用 kube-prometheus-stack(Helm Chart)一键部署,在此基础上做了大量自定义配置。
Prometheus 自定义采集配置:
# prometheus-additional-scrape.yaml
- job_name: 'kicloud-services'
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['kicloud-prod']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
target_label: __metrics_path__
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_port, __meta_kubernetes_pod_ip]
action: replace
target_label: __address__
regex: (.+);(.+)
replacement: $2:$1
- job_name: 'emqx-mqtt'
static_configs:
- targets: ['emqx-svc.kicloud-prod:18083']
metrics_path: '/api/v5/prometheus/stats'核心业务指标——设备异常响应 <30 秒的实现:
"设备异常响应 <30 秒"指的是从设备上报异常数据到平台触发告警通知的端到端延迟。这个指标拆解为几段:
设备→MQTT Broker: ~200ms(网络传输)
MQTT→device-monitor服务: ~100ms(EMQX消息转发)
device-monitor处理+异常检测: ~50-200ms(振动分析/阈值判断)
异常→告警通知发出: ~500ms(创建告警记录+推送企业微信/短信)在 device-monitor 服务里埋了端到端延迟指标:
@Component
public class DeviceAlertMetrics {
private final MeterRegistry registry;
private final Timer alertE2eLatency;
private final Counter alertTriggeredTotal;
public DeviceAlertMetrics(MeterRegistry registry) {
this.registry = registry;
this.alertE2eLatency = Timer.builder("device_alert_e2e_latency_seconds")
.description("End-to-end latency from device data arrival to alert dispatch")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
this.alertTriggeredTotal = Counter.builder("device_alert_triggered_total")
.tag("level", "")
.tag("type", "")
.register(registry);
}
public void recordAlertLatency(long deviceTimestampMs, String level, String type) {
long latencyMs = System.currentTimeMillis() - deviceTimestampMs;
alertE2eLatency.record(Duration.ofMillis(latencyMs));
registry.counter("device_alert_triggered_total",
"level", level, "type", type).increment();
// 超过30秒的记录告警
if (latencyMs > 30_000) {
log.warn("Alert e2e latency exceeded 30s: {}ms, level={}, type={}",
latencyMs, level, type);
}
}
}AlertManager 规则配置:
groups:
- name: kicloud-device-alerts
rules:
# 告警响应延迟超标
- alert: DeviceAlertLatencyHigh
expr: histogram_quantile(0.95,
rate(device_alert_e2e_latency_seconds_bucket[5m])) > 30
for: 2m
labels:
severity: critical
annotations:
summary: "设备告警端到端延迟p95超过30秒"
# 设备离线检测
- alert: DeviceOffline
expr: time() - device_last_heartbeat_timestamp > 300
for: 5m
labels:
severity: warning
annotations:
summary: "设备 {{ $labels.device_id }} 超过5分钟无心跳"
# MQTT消息积压
- alert: MqttMessageBacklog
expr: emqx_messages_qos1_received_total
- emqx_messages_qos1_delivered_total > 5000
for: 1m
labels:
severity: warning
# 设备数据写入TPS骤降(可能是采集通道故障)
- alert: DeviceDataWriteDropped
expr: rate(device_data_write_total[5m])
< rate(device_data_write_total[1h] offset 1h) * 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "设备数据写入速率较一小时前下降50%以上"AlertManager 通知路由——分级分渠道:
route:
receiver: 'default-wechat'
group_by: ['alertname', 'tenant_id']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
# 设备DANGER级别 → 短信+企业微信+电话(值班工程师)
- match:
severity: critical
alert_source: device
receiver: 'critical-oncall'
repeat_interval: 30m
# 设备WARNING级别 → 企业微信
- match:
severity: warning
alert_source: device
receiver: 'warning-wechat'
# 平台基础设施告警 → 运维企业微信群
- match:
severity: critical
alert_source: infrastructure
receiver: 'ops-wechat'
receivers:
- name: 'critical-oncall'
webhook_configs:
- url: 'http://kicloud-notification-svc:8080/alert/oncall'
# 内部通知服务:短信+企业微信+电话轮询值班表
- name: 'warning-wechat'
webhook_configs:
- url: 'http://kicloud-notification-svc:8080/alert/wechat'
- name: 'ops-wechat'
wechat_configs:
- corp_id: 'xxx'
to_party: 'ops-team'
agent_id: 'xxx'
api_secret: 'xxx'Grafana Dashboard 布局:
做了 4 个核心 Dashboard:
设备监测总览:120+ 客户设备在线率、数据上报 TPS 实时曲线、按客户维度的设备健康度热力图
告警运营:告警触发数(按级别/类型/客户)、告警响应延迟分布、告警→工单闭环率
平台基础设施:K8s 节点资源使用率、Pod 状态、MQTT Broker 连接数和消息吞吐、Elasticsearch 集群健康
SaaS 可用性:各服务接口成功率、p99 延迟、错误率趋势
三、第三方振动分析算法集成
1. 为什么用第三方而不是自研
KiPlant 里我们自己实现了基础的振动 RMS 和 FFT 分析,但 KiCloud 的客户是泵类设备专业领域,振动诊断需要故障模式识别能力——不仅要知道"振动值偏高",还要判断是轴承内圈故障、不对中、还是气蚀。这需要大量的泵类设备故障样本数据来训练模型,凯泵选择了和一家专门做旋转机械振动诊断的第三方公司合作(国内做这个的有容知日新、安维尔等),他们提供 RESTful API 形式的诊断服务。
2. 集成架构与调用流程
设备振动传感器 → MQTT → device-monitor服务
↓
提取原始波形 + 基础特征(RMS/峰值/峭度)
↓
本地预筛(RMS > 基线阈值的才送诊断,降低API调用量)
↓
调用第三方振动诊断API(RESTful,同步/异步两种模式)
↓
解析诊断结果 → 告警/维保工单调用第三方 API 的关键实现:
@Service
public class VibrationDiagnosisService {
@Value("${vibration.diagnosis.api.url}")
private String apiUrl; // https://api.vendor.com/v2/diagnosis
@Value("${vibration.diagnosis.api.key}")
private String apiKey;
private final RestTemplate restTemplate;
private final MeterRegistry meterRegistry;
/**
* 同步诊断(用于WARNING级别,需要快速出结果)
*/
public DiagnosisResult diagnoseSync(String deviceId, String tenantId,
VibrationFeature feature,
double[] rawWaveform) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
DiagnosisRequest request = DiagnosisRequest.builder()
.deviceType(getDeviceType(deviceId)) // pump_centrifugal/pump_reciprocating
.rpmSpeed(feature.getCurrentRpm()) // 转速(故障频率计算需要)
.samplingRate(feature.getSamplingRate())
.waveformData(Base64.getEncoder().encodeToString(
doubleArrayToBytes(rawWaveform)))
.features(Map.of(
"rms_velocity", feature.getRmsVelocity(),
"peak_value", feature.getPeakValue(),
"kurtosis", feature.getKurtosis(),
"crest_factor", feature.getCrestFactor()
))
.build();
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", "Bearer " + apiKey);
headers.setContentType(MediaType.APPLICATION_JSON);
ResponseEntity<DiagnosisResponse> response = restTemplate.exchange(
apiUrl, HttpMethod.POST,
new HttpEntity<>(request, headers),
DiagnosisResponse.class);
DiagnosisResponse body = response.getBody();
return DiagnosisResult.builder()
.faultType(body.getFaultType())
// BEARING_INNER_RACE / MISALIGNMENT / CAVITATION / IMBALANCE / NORMAL
.confidence(body.getConfidence()) // 0-1
.severity(body.getSeverity()) // LOW/MEDIUM/HIGH/CRITICAL
.recommendation(body.getRecommendation()) // "建议7天内更换轴承"
.spectralAnalysis(body.getSpectralData()) // 频谱分析详情
.build();
} catch (Exception e) {
meterRegistry.counter("vibration_diagnosis_error_total",
"reason", e.getClass().getSimpleName()).increment();
// 第三方API不可用时降级到本地基础诊断
return fallbackLocalDiagnosis(feature);
} finally {
sample.stop(meterRegistry.timer("vibration_diagnosis_latency",
"mode", "sync"));
}
}
/**
* 降级:本地基础诊断(只能判断严重程度,无法识别故障模式)
*/
private DiagnosisResult fallbackLocalDiagnosis(VibrationFeature feature) {
VibrationLevel level = VibrationLevel.fromRms(feature.getRmsVelocity());
return DiagnosisResult.builder()
.faultType("UNKNOWN")
.confidence(0.5)
.severity(level.toSeverity())
.recommendation("振动异常,建议安排现场检查(详细诊断服务暂不可用)")
.build();
}
}本地预筛逻辑——降低 API 调用成本:
第三方 API 按调用次数计费,如果 120 家客户的几千台设备每条振动数据都送过去诊断,成本不可接受。所以做了两层过滤:
第一层:只有 RMS 振动值超过设备基线阈值 120% 的数据才送诊断。大部分正常运行的数据在这一层被过滤掉,实际送诊断的比例约 3%-5%。
第二层:同一台设备 10 分钟内只送一次诊断请求(防抖)。设备振动异常通常是持续性的,不需要每秒都诊断。
@Component
public class DiagnosisGatekeeper {
// 设备最后一次送诊断的时间戳
private final Map<String, Long> lastDiagnosisTime = new ConcurrentHashMap<>();
private static final long DIAGNOSIS_COOLDOWN_MS = 10 * 60 * 1000; // 10分钟
public boolean shouldDiagnose(String deviceId, VibrationFeature feature) {
// 1. 基线阈值过滤
double baseline = baselineService.getRmsBaseline(deviceId);
if (feature.getRmsVelocity() < baseline * 1.2) {
return false;
}
// 2. 冷却时间过滤
Long lastTime = lastDiagnosisTime.get(deviceId);
if (lastTime != null &&
System.currentTimeMillis() - lastTime < DIAGNOSIS_COOLDOWN_MS) {
return false;
}
lastDiagnosisTime.put(deviceId, System.currentTimeMillis());
return true;
}
}告警触发逻辑:
诊断结果回来后,根据 severity 和 confidence 综合判断是否触发告警:
public void handleDiagnosisResult(String deviceId, String tenantId,
DiagnosisResult result) {
// 只有置信度>0.7且severity>=MEDIUM才触发告警
if (result.getConfidence() < 0.7 ||
result.getSeverity().ordinal() < Severity.MEDIUM.ordinal()) {
// 低置信度或低严重度,只记录不告警
diagnosisLogMapper.insert(toDiagnosisLog(deviceId, tenantId, result, "LOGGED"));
return;
}
// 创建告警记录
DeviceAlert alert = DeviceAlert.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.alertType(result.getFaultType())
.severity(result.getSeverity())
.message(result.getRecommendation())
.diagnosisDetail(JSON.toJSONString(result.getSpectralAnalysis()))
.status(AlertStatus.OPEN)
.build();
alertMapper.insert(alert);
// 推送通知
notificationService.dispatch(alert);
// 如果是HIGH/CRITICAL,自动创建维保工单
if (result.getSeverity().ordinal() >= Severity.HIGH.ordinal()) {
maintenanceOrderService.createFromAlert(alert);
}
// 埋点:记录告警端到端延迟
deviceAlertMetrics.recordAlertLatency(
result.getOriginalDataTimestamp(),
result.getSeverity().name(),
result.getFaultType());
}四、销售瞭望台与 Elasticsearch
1. 业务痛点
凯泵的销售团队管理 120+ 企业客户,历史合同数据积累了 10 年以上。痛点是:销售员想查"这个客户过去 5 年买了哪些型号的泵、合同金额趋势、哪些快到保修期了",要翻 ERP 系统的多个模块,甚至去翻纸质合同档案。新销售员接手客户时完全没有历史背景,客户觉得"你们公司换个人就要重新介绍一遍"。
销售瞭望台要解决的是:一个搜索框搞定客户 360 度视图——历史合同、设备档案、维保记录、告警历史、联系人、商机进展,全部可搜可聚合。
2. Elasticsearch 数据架构
索引设计:
// 合同索引
PUT /kicloud-contracts
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"ik_smart_pinyin": {
"type": "custom",
"tokenizer": "ik_max_word",
"filter": ["pinyin_filter", "lowercase"]
}
},
"filter": {
"pinyin_filter": {
"type": "pinyin",
"keep_full_pinyin": true,
"keep_joined_full_pinyin": true,
"keep_original": true
}
}
}
},
"mappings": {
"properties": {
"contract_id": { "type": "keyword" },
"tenant_id": { "type": "keyword" },
"customer_name": {
"type": "text",
"analyzer": "ik_smart_pinyin",
"fields": {
"keyword": { "type": "keyword" }
}
},
"product_models": { "type": "keyword" }, // 泵型号,数组
"contract_amount": { "type": "double" },
"sign_date": { "type": "date" },
"warranty_end_date": { "type": "date" },
"sales_person": { "type": "keyword" },
"contract_status": { "type": "keyword" }, // ACTIVE/COMPLETED/EXPIRED
"contract_text": { "type": "text", "analyzer": "ik_max_word" },
"tags": { "type": "keyword" }
}
}
}用了 IK 中文分词 + 拼音过滤器,支持"中石化"、"zhongshihua"、"zsh"三种方式搜索同一个客户。
500GB 数据的处理:10 年合同数据约 80 万条合同记录 + 关联的设备档案约 200 万条 + 维保记录约 150 万条。原始数据在 MySQL 里,通过 Canal 监听 binlog 增量同步到 ES。历史全量数据用 Logstash 一次性导入,大约跑了 6 小时。
3. 核心查询优化
客户 360 视图——多索引联合查询 + 聚合:
销售员搜"中石化"时,后端一次请求要返回:合同列表+金额汇总、在用设备列表、近期告警、维保历史统计。用 ES 的 _msearch(多搜索)一次请求批量查多个索引:
@Service
public class CustomerViewService {
public CustomerView getCustomerView(String tenantId, String keyword) {
// 用_msearch一次请求查4个索引
MultiSearchRequest multiSearch = new MultiSearchRequest();
// 1. 合同搜索+聚合
SearchRequest contractReq = new SearchRequest("kicloud-contracts");
contractReq.source(new SearchSourceBuilder()
.query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("customer_name", keyword))
.filter(QueryBuilders.termQuery("tenant_id", tenantId)))
.aggregation(AggregationBuilders.sum("total_amount")
.field("contract_amount"))
.aggregation(AggregationBuilders.dateHistogram("yearly_amount")
.field("sign_date")
.calendarInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.sum("amount")
.field("contract_amount")))
.size(20)
.sort("sign_date", SortOrder.DESC));
multiSearch.add(contractReq);
// 2. 设备搜索
SearchRequest deviceReq = new SearchRequest("kicloud-devices");
deviceReq.source(new SearchSourceBuilder()
.query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("customer_name", keyword))
.filter(QueryBuilders.termQuery("tenant_id", tenantId)))
.size(50));
multiSearch.add(deviceReq);
// 3. 近期告警
SearchRequest alertReq = new SearchRequest("kicloud-alerts");
alertReq.source(new SearchSourceBuilder()
.query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("customer_name", keyword))
.filter(QueryBuilders.termQuery("tenant_id", tenantId))
.filter(QueryBuilders.rangeQuery("created_at")
.gte("now-30d")))
.size(10)
.sort("created_at", SortOrder.DESC));
multiSearch.add(alertReq);
// 4. 保修到期预警
SearchRequest warrantyReq = new SearchRequest("kicloud-contracts");
warrantyReq.source(new SearchSourceBuilder()
.query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("customer_name", keyword))
.filter(QueryBuilders.termQuery("tenant_id", tenantId))
.filter(QueryBuilders.rangeQuery("warranty_end_date")
.gte("now").lte("now+90d")))
.size(10));
multiSearch.add(warrantyReq);
MultiSearchResponse responses = esClient.msearch(multiSearch,
RequestOptions.DEFAULT);
return assembleCustomerView(responses);
}
}_msearch 的好处是 4 个查询在 ES 内部并行执行,总延迟取决于最慢的那个查询,而不是 4 个串行累加。实测客户 360 视图的查询延迟 p95 在 350ms 左右。
缓存策略:
热门客户的 360 视图查询结果在 Redis 里缓存 5 分钟(TTL),用 tenant_id + customer_keyword 做 key。合同数据变更时通过 Canal 事件触发缓存失效。缓存命中率约 45%(销售团队一天内多次查同一客户很常见)。
索引生命周期管理(ILM):
500GB 数据不可能全放热节点。用 ES 的 ILM 策略做冷热分层:
PUT _ilm/policy/kicloud-contract-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": { "max_size": "30gb" }
}
},
"warm": {
"min_age": "90d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"cold": {
"min_age": "365d",
"actions": {
"allocate": { "require": { "data": "cold" } }
}
}
}
}
}近 3 个月的合同在热节点(SSD),3-12 个月在温节点,1 年以上在冷节点(HDD)。冷数据查询慢一些(1-2 秒),但 90% 的查询都命中热数据。
五、MTTR 从 48 小时缩短至 6 小时的验证
1. MTTR 数据采集
每条设备告警从触发到闭环,系统记录了完整的时间线:
CREATE TABLE device_alert (
id BIGINT PRIMARY KEY,
tenant_id VARCHAR(64),
device_id VARCHAR(64),
alert_triggered_at DATETIME NOT NULL, -- 告警触发时间
notification_sent_at DATETIME, -- 通知发出时间
engineer_accepted_at DATETIME, -- 工程师接单时间
onsite_arrived_at DATETIME, -- 到达现场时间(可选,远程解决则无)
repair_completed_at DATETIME, -- 维修完成时间
alert_closed_at DATETIME, -- 告警关闭时间
resolution_type ENUM('REMOTE','ONSITE','FALSE_ALARM'),
-- ...
);MTTR = AVG(repair_completed_at - alert_triggered_at)
2. 对比数据
上线前 48 小时的 MTTR 是从凯泵售后部门的历史工单记录里统计的(他们有一个简单的售后工单系统,记录了故障报告时间和处理完成时间),样本是过去一年的约 600 条维修工单。
上线后统计了 6 个月的数据,约 1200 条告警工单:
核心改善来自三个方面:告警自动推送替代了"客户打电话报修"的滞后环节(节省了 12-24 小时的信息传递时间);振动诊断给出了故障模式和建议,60% 的问题售后工程师可以远程指导客户处理(之前不知道什么问题就只能派人去现场);剩余 40% 需要现场的,工程师出发前已经知道故障类型,可以提前备好配件,减少了"到了现场才发现没带对零件"的无效往返。
六、高并发场景下的踩坑
坑一:告警风暴
上线第二周就遇到了。某客户工厂停电后恢复供电,几十台设备同时重启,重启瞬间的振动值都偏高(启动冲击),触发了 40+ 条 WARNING/DANGER 级别告警。售后工程师的手机在 2 分钟内收到了 40 多条企业微信通知和 20 多条短信,直接被"轰炸"了。而且这些全是误报——设备启动冲击是正常的,运行稳定后振动就恢复正常了。
解决方案分三层:
第一层——启动态免疫窗口。设备从离线变为在线后,有一个 5 分钟的"免疫期",这期间的振动数据只记录不告警:
public boolean isInImmunityWindow(String deviceId) {
String onlineTimeStr = redisTemplate.opsForHash()
.get(deviceStatusKey(deviceId), "lastOnlineAt").toString();
long onlineTime = Long.parseLong(onlineTimeStr);
return System.currentTimeMillis() - onlineTime < 5 * 60 * 1000;
}第二层——同客户告警聚合。AlertManager 的 group_by 配了 tenant_id,同一客户的告警在 30 秒内聚合成一条通知,而不是每条单独推送。
第三层——告警升级策略。初始只推企业微信,如果 15 分钟内告警未被确认(工程师点"已查看"),才升级到短信;30 分钟未确认才打电话。避免一上来就全渠道轰炸。
坑二:K8s HPA 扩容延迟导致设备数据积压
某个工作日早上 8 点,120 家客户的设备集中开机,设备数据上报 TPS 从夜间的 200 突然飙到 2500。HPA 检测到 CPU 超标后触发扩容,但新 Pod 从调度→拉镜像→启动→就绪 整个过程大约需要 60-90 秒。这 60 秒内现有 3 个 Pod 扛不住 2500 TPS,MQTT 消息开始积压,部分设备数据延迟超过 30 秒,触发了 DeviceAlertLatencyHigh 告警。
解决方案:
短期——把 device-monitor 的 minReplicas 从 3 调到 5,保证基线容量能扛住早高峰的初始冲击。
长期——做了预热扩容。用 CronHPA(K8s 的定时 HPA 扩展)在每天早上 7:50 提前扩容到 6 个 Pod,8 点高峰来的时候已经准备好了:
apiVersion: autoscaling.alibabacloud.com/v1beta1
kind: CronHorizontalPodAutoscaler
metadata:
name: device-monitor-cron-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: kicloud-device-monitor
jobs:
- name: "morning-peak-scaleup"
schedule: "50 7 * * 1-6" # 周一到周六7:50
targetSize: 6
- name: "night-scaledown"
schedule: "0 22 * * *" # 每晚22:00
targetSize: 3同时把镜像拉取策略改为 imagePullPolicy: IfNotPresent + 预拉取(DaemonSet 在所有 Worker 节点上提前拉好最新镜像),新 Pod 启动时间从 90 秒降到 25 秒。
坑三:Elasticsearch 查询雪崩
销售团队季度末做业绩汇总时,多个销售员同时在瞭望台上跑"全客户合同金额年度统计"这种重聚合查询。ES 集群 3 个数据节点的 CPU 全部打到 95%,JVM GC 频繁,连简单的客户搜索也变慢,查询超时率飙到 30%。
排查: Grafana 上 ES 集群的 thread_pool.search.rejected 指标从 0 飙到几十/秒,jvm.gc.collection.time 急剧上升。查 ES 的 _tasks API 发现有 8 个重聚合查询在并行跑,每个都要扫描 500GB 数据的年度聚合。
解决方案三管齐下:
第一——查询熔断。配置 ES 的 indices.breaker.request.limit 为 40%(默认 60%),单个查询如果预估内存超过 JVM 堆的 40% 直接拒绝,防止单个查询拖垮集群。
第二——预计算聚合表。年度/季度/月度的合同金额统计不再实时聚合,而是用 XXL-JOB 定时任务每天凌晨跑一次聚合,结果写入 kicloud-contract-stats 索引。前端查统计数据直接查这个预计算索引,毫秒级返回:
@XxlJob("contract-stats-aggregation")
public void aggregateContractStats() {
// 按客户+年度聚合合同金额
SearchRequest request = new SearchRequest("kicloud-contracts");
request.source(new SearchSourceBuilder()
.size(0)
.aggregation(AggregationBuilders.terms("by_customer")
.field("customer_name.keyword").size(10000)
.subAggregation(AggregationBuilders.dateHistogram("by_year")
.field("sign_date")
.calendarInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.sum("total")
.field("contract_amount")))));
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
// 解析聚合结果写入预计算索引
List<ContractStat> stats = parseAggregation(response);
bulkIndexStats(stats);
}第三——按租户做查询队列限制。在应用层加了一个 Semaphore,同一租户同时最多 3 个 ES 重查询在跑,超过排队等待:
@Component
public class EsQueryThrottle {
private final Map<String, Semaphore> tenantSemaphores = new ConcurrentHashMap<>();
public <T> T executeWithThrottle(String tenantId, Callable<T> query)
throws Exception {
Semaphore semaphore = tenantSemaphores.computeIfAbsent(
tenantId, k -> new Semaphore(3));
if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
throw new BizException("查询繁忙,请稍后重试");
}
try {
return query.call();
} finally {
semaphore.release();
}
}
}三个措施上线后,ES 集群在季度末高峰期 CPU 稳定在 60% 以下,查询超时率降到 1% 以内。