KiCloud 凯泵智联——云平台监控体系、振动分析与销售瞭望台

_

一、项目背景与我的角色

1. 业务背景

KiCloud 是凯泵集团的工业设备智联云平台,核心客户是使用凯泵泵类设备(离心泵、往复泵、螺杆泵等)的企业。这些泵广泛用于石油化工、水处理、矿业等场景,设备分散在全国各地的客户工厂里。凯泵需要一个云平台来远程监测这些设备的运行状态,提供预防性维护服务,同时支撑销售团队管理 120+ 企业客户的合同和商机。

我加入时平台处于从 0 到 1 的阶段,之前凯泵的设备监测完全靠客户自己巡检+电话报修,从设备出问题到凯泵售后工程师到达现场平均需要 48 小时——信息传递慢(客户发现异常→打电话→售后接单→安排工程师→出差到场),而且很多时候客户发现异常已经是设备停机了,错过了最佳维修窗口。

我负责整个云平台的后端架构搭建,包括 K8s 部署体系、监控告警全链路、振动分析模块集成、以及销售瞭望台的 Elasticsearch 查询架构。

2. 与 KiPlant 的关系

KiCloud 复用了 KiPlant 的部分微服务底座(租户上下文透传、Nacos 配置体系、RocketMQ 消息规范),但业务场景完全不同——KiPlant 是面向工厂内部的生产管理,KiCloud 是面向设备制造商的远程运维+销售管理。两者的技术架构也有差异:KiPlant 部署在客户私有环境或阿里云 ECS 上,KiCloud 从一开始就设计为 K8s 原生的 SaaS 平台。


二、Docker + Kubernetes + 监控体系的搭建

1. 为什么选 K8s 而不是传统 ECS 部署

KiPlant 早期用的是阿里云 ECS + Docker Compose,每个租户环境手动维护,扩缩容靠运维手工操作,痛点明显——新客户上线要手动配环境,高峰期扛不住要手动加机器,半夜设备数据突增没人处理。KiCloud 面向 120+ 客户,设备数量预计会持续增长,必须有自动化的弹性能力。

选阿里云 ACK(Managed Kubernetes)而不是自建 K8s 集群,原因是团队没有专职运维,ACK 的 Master 节点由阿里云托管,我们只需要管 Worker 节点和应用层。

2. K8s 集群架构

# 集群规模
Master: 3节点(阿里云托管,免运维)
Worker: 
  - 业务节点池: 6台 ECS(4C8G),运行微服务
  - 监控节点池: 2台 ECS(4C8G),运行Prometheus/Grafana/AlertManager
  - ES节点池: 3台 ECS(8C32G + 500G SSD),运行Elasticsearch
​
# Namespace划分
- kicloud-prod      # 生产环境业务服务
- kicloud-staging   # 预发布环境
- monitoring        # 监控组件
- elastic-system    # Elasticsearch集群
- ingress-nginx     # 入口控制器

3. 微服务 K8s 部署配置

以核心的设备监测服务为例:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kicloud-device-monitor
  namespace: kicloud-prod
spec:
  replicas: 3
  strategy:
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0    # 滚动更新零停服
  selector:
    matchLabels:
      app: device-monitor
  template:
    metadata:
      labels:
        app: device-monitor
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8080"
        prometheus.io/path: "/actuator/prometheus"
    spec:
      containers:
      - name: device-monitor
        image: registry.cn-shanghai.aliyuncs.com/kicloud/device-monitor:${TAG}
        ports:
        - containerPort: 8080
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "2000m"
            memory: "2Gi"
        readinessProbe:
          httpGet:
            path: /actuator/health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /actuator/health/liveness
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 30
        env:
        - name: SPRING_PROFILES_ACTIVE
          value: "prod"
        - name: NACOS_SERVER_ADDR
          valueFrom:
            configMapKeyRef:
              name: kicloud-config
              key: nacos.addr
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: device-monitor-hpa
  namespace: kicloud-prod
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: kicloud-device-monitor
  minReplicas: 3
  maxReplicas: 8
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: device_data_processing_rate
      target:
        type: AverageValue
        averageValue: "1000"   # 每个Pod处理1000条/秒时触发扩容

HPA 配了两个扩容维度:CPU 利用率超 70% 或自定义指标(单 Pod 设备数据处理速率超 1000 条/秒)。自定义指标通过 Prometheus Adapter 暴露给 K8s metrics API。

4. Prometheus + AlertManager + Grafana 监控体系

用 kube-prometheus-stack(Helm Chart)一键部署,在此基础上做了大量自定义配置。

Prometheus 自定义采集配置:

# prometheus-additional-scrape.yaml
- job_name: 'kicloud-services'
  kubernetes_sd_configs:
  - role: pod
    namespaces:
      names: ['kicloud-prod']
  relabel_configs:
  - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
    action: keep
    regex: true
  - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
    target_label: __metrics_path__
  - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_port, __meta_kubernetes_pod_ip]
    action: replace
    target_label: __address__
    regex: (.+);(.+)
    replacement: $2:$1
​
- job_name: 'emqx-mqtt'
  static_configs:
  - targets: ['emqx-svc.kicloud-prod:18083']
  metrics_path: '/api/v5/prometheus/stats'

核心业务指标——设备异常响应 <30 秒的实现:

"设备异常响应 <30 秒"指的是从设备上报异常数据到平台触发告警通知的端到端延迟。这个指标拆解为几段:

设备→MQTT Broker: ~200ms(网络传输)
MQTT→device-monitor服务: ~100ms(EMQX消息转发)
device-monitor处理+异常检测: ~50-200ms(振动分析/阈值判断)
异常→告警通知发出: ~500ms(创建告警记录+推送企业微信/短信)

在 device-monitor 服务里埋了端到端延迟指标:

@Component
public class DeviceAlertMetrics {
​
    private final MeterRegistry registry;
    private final Timer alertE2eLatency;
    private final Counter alertTriggeredTotal;
​
    public DeviceAlertMetrics(MeterRegistry registry) {
        this.registry = registry;
        this.alertE2eLatency = Timer.builder("device_alert_e2e_latency_seconds")
            .description("End-to-end latency from device data arrival to alert dispatch")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(registry);
        this.alertTriggeredTotal = Counter.builder("device_alert_triggered_total")
            .tag("level", "")
            .tag("type", "")
            .register(registry);
    }
​
    public void recordAlertLatency(long deviceTimestampMs, String level, String type) {
        long latencyMs = System.currentTimeMillis() - deviceTimestampMs;
        alertE2eLatency.record(Duration.ofMillis(latencyMs));
        
        registry.counter("device_alert_triggered_total",
            "level", level, "type", type).increment();
        
        // 超过30秒的记录告警
        if (latencyMs > 30_000) {
            log.warn("Alert e2e latency exceeded 30s: {}ms, level={}, type={}", 
                latencyMs, level, type);
        }
    }
}

AlertManager 规则配置:

groups:
- name: kicloud-device-alerts
  rules:
  # 告警响应延迟超标
  - alert: DeviceAlertLatencyHigh
    expr: histogram_quantile(0.95, 
      rate(device_alert_e2e_latency_seconds_bucket[5m])) > 30
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "设备告警端到端延迟p95超过30秒"
      
  # 设备离线检测
  - alert: DeviceOffline
    expr: time() - device_last_heartbeat_timestamp > 300
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "设备 {{ $labels.device_id }} 超过5分钟无心跳"
​
  # MQTT消息积压
  - alert: MqttMessageBacklog
    expr: emqx_messages_qos1_received_total 
        - emqx_messages_qos1_delivered_total > 5000
    for: 1m
    labels:
      severity: warning
​
  # 设备数据写入TPS骤降(可能是采集通道故障)
  - alert: DeviceDataWriteDropped
    expr: rate(device_data_write_total[5m]) 
        < rate(device_data_write_total[1h] offset 1h) * 0.5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "设备数据写入速率较一小时前下降50%以上"

AlertManager 通知路由——分级分渠道:

route:
  receiver: 'default-wechat'
  group_by: ['alertname', 'tenant_id']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  routes:
  # 设备DANGER级别 → 短信+企业微信+电话(值班工程师)
  - match:
      severity: critical
      alert_source: device
    receiver: 'critical-oncall'
    repeat_interval: 30m
    
  # 设备WARNING级别 → 企业微信
  - match:
      severity: warning
      alert_source: device
    receiver: 'warning-wechat'
    
  # 平台基础设施告警 → 运维企业微信群
  - match:
      severity: critical
      alert_source: infrastructure
    receiver: 'ops-wechat'
​
receivers:
- name: 'critical-oncall'
  webhook_configs:
  - url: 'http://kicloud-notification-svc:8080/alert/oncall'
    # 内部通知服务:短信+企业微信+电话轮询值班表
- name: 'warning-wechat'
  webhook_configs:
  - url: 'http://kicloud-notification-svc:8080/alert/wechat'
- name: 'ops-wechat'
  wechat_configs:
  - corp_id: 'xxx'
    to_party: 'ops-team'
    agent_id: 'xxx'
    api_secret: 'xxx'

Grafana Dashboard 布局:

做了 4 个核心 Dashboard:

  • 设备监测总览:120+ 客户设备在线率、数据上报 TPS 实时曲线、按客户维度的设备健康度热力图

  • 告警运营:告警触发数(按级别/类型/客户)、告警响应延迟分布、告警→工单闭环率

  • 平台基础设施:K8s 节点资源使用率、Pod 状态、MQTT Broker 连接数和消息吞吐、Elasticsearch 集群健康

  • SaaS 可用性:各服务接口成功率、p99 延迟、错误率趋势


三、第三方振动分析算法集成

1. 为什么用第三方而不是自研

KiPlant 里我们自己实现了基础的振动 RMS 和 FFT 分析,但 KiCloud 的客户是泵类设备专业领域,振动诊断需要故障模式识别能力——不仅要知道"振动值偏高",还要判断是轴承内圈故障、不对中、还是气蚀。这需要大量的泵类设备故障样本数据来训练模型,凯泵选择了和一家专门做旋转机械振动诊断的第三方公司合作(国内做这个的有容知日新、安维尔等),他们提供 RESTful API 形式的诊断服务。

2. 集成架构与调用流程

设备振动传感器 → MQTT → device-monitor服务
    ↓
提取原始波形 + 基础特征(RMS/峰值/峭度)
    ↓
本地预筛(RMS > 基线阈值的才送诊断,降低API调用量)
    ↓
调用第三方振动诊断API(RESTful,同步/异步两种模式)
    ↓
解析诊断结果 → 告警/维保工单

调用第三方 API 的关键实现:

@Service
public class VibrationDiagnosisService {

    @Value("${vibration.diagnosis.api.url}")
    private String apiUrl;  // https://api.vendor.com/v2/diagnosis
    
    @Value("${vibration.diagnosis.api.key}")
    private String apiKey;
    
    private final RestTemplate restTemplate;
    private final MeterRegistry meterRegistry;

    /**
     * 同步诊断(用于WARNING级别,需要快速出结果)
     */
    public DiagnosisResult diagnoseSync(String deviceId, String tenantId,
                                         VibrationFeature feature,
                                         double[] rawWaveform) {
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            DiagnosisRequest request = DiagnosisRequest.builder()
                .deviceType(getDeviceType(deviceId))    // pump_centrifugal/pump_reciprocating
                .rpmSpeed(feature.getCurrentRpm())       // 转速(故障频率计算需要)
                .samplingRate(feature.getSamplingRate())
                .waveformData(Base64.getEncoder().encodeToString(
                    doubleArrayToBytes(rawWaveform)))
                .features(Map.of(
                    "rms_velocity", feature.getRmsVelocity(),
                    "peak_value", feature.getPeakValue(),
                    "kurtosis", feature.getKurtosis(),
                    "crest_factor", feature.getCrestFactor()
                ))
                .build();

            HttpHeaders headers = new HttpHeaders();
            headers.set("Authorization", "Bearer " + apiKey);
            headers.setContentType(MediaType.APPLICATION_JSON);

            ResponseEntity<DiagnosisResponse> response = restTemplate.exchange(
                apiUrl, HttpMethod.POST,
                new HttpEntity<>(request, headers),
                DiagnosisResponse.class);

            DiagnosisResponse body = response.getBody();
            
            return DiagnosisResult.builder()
                .faultType(body.getFaultType())
                    // BEARING_INNER_RACE / MISALIGNMENT / CAVITATION / IMBALANCE / NORMAL
                .confidence(body.getConfidence())         // 0-1
                .severity(body.getSeverity())             // LOW/MEDIUM/HIGH/CRITICAL
                .recommendation(body.getRecommendation()) // "建议7天内更换轴承"
                .spectralAnalysis(body.getSpectralData()) // 频谱分析详情
                .build();
                
        } catch (Exception e) {
            meterRegistry.counter("vibration_diagnosis_error_total",
                "reason", e.getClass().getSimpleName()).increment();
            // 第三方API不可用时降级到本地基础诊断
            return fallbackLocalDiagnosis(feature);
            
        } finally {
            sample.stop(meterRegistry.timer("vibration_diagnosis_latency",
                "mode", "sync"));
        }
    }

    /**
     * 降级:本地基础诊断(只能判断严重程度,无法识别故障模式)
     */
    private DiagnosisResult fallbackLocalDiagnosis(VibrationFeature feature) {
        VibrationLevel level = VibrationLevel.fromRms(feature.getRmsVelocity());
        return DiagnosisResult.builder()
            .faultType("UNKNOWN")
            .confidence(0.5)
            .severity(level.toSeverity())
            .recommendation("振动异常,建议安排现场检查(详细诊断服务暂不可用)")
            .build();
    }
}

本地预筛逻辑——降低 API 调用成本:

第三方 API 按调用次数计费,如果 120 家客户的几千台设备每条振动数据都送过去诊断,成本不可接受。所以做了两层过滤:

第一层:只有 RMS 振动值超过设备基线阈值 120% 的数据才送诊断。大部分正常运行的数据在这一层被过滤掉,实际送诊断的比例约 3%-5%。

第二层:同一台设备 10 分钟内只送一次诊断请求(防抖)。设备振动异常通常是持续性的,不需要每秒都诊断。

@Component
public class DiagnosisGatekeeper {

    // 设备最后一次送诊断的时间戳
    private final Map<String, Long> lastDiagnosisTime = new ConcurrentHashMap<>();
    private static final long DIAGNOSIS_COOLDOWN_MS = 10 * 60 * 1000; // 10分钟

    public boolean shouldDiagnose(String deviceId, VibrationFeature feature) {
        // 1. 基线阈值过滤
        double baseline = baselineService.getRmsBaseline(deviceId);
        if (feature.getRmsVelocity() < baseline * 1.2) {
            return false;
        }
        
        // 2. 冷却时间过滤
        Long lastTime = lastDiagnosisTime.get(deviceId);
        if (lastTime != null && 
            System.currentTimeMillis() - lastTime < DIAGNOSIS_COOLDOWN_MS) {
            return false;
        }
        
        lastDiagnosisTime.put(deviceId, System.currentTimeMillis());
        return true;
    }
}

告警触发逻辑:

诊断结果回来后,根据 severity 和 confidence 综合判断是否触发告警:

public void handleDiagnosisResult(String deviceId, String tenantId,
                                    DiagnosisResult result) {
    // 只有置信度>0.7且severity>=MEDIUM才触发告警
    if (result.getConfidence() < 0.7 || 
        result.getSeverity().ordinal() < Severity.MEDIUM.ordinal()) {
        // 低置信度或低严重度,只记录不告警
        diagnosisLogMapper.insert(toDiagnosisLog(deviceId, tenantId, result, "LOGGED"));
        return;
    }
    
    // 创建告警记录
    DeviceAlert alert = DeviceAlert.builder()
        .tenantId(tenantId)
        .deviceId(deviceId)
        .alertType(result.getFaultType())
        .severity(result.getSeverity())
        .message(result.getRecommendation())
        .diagnosisDetail(JSON.toJSONString(result.getSpectralAnalysis()))
        .status(AlertStatus.OPEN)
        .build();
    alertMapper.insert(alert);
    
    // 推送通知
    notificationService.dispatch(alert);
    
    // 如果是HIGH/CRITICAL,自动创建维保工单
    if (result.getSeverity().ordinal() >= Severity.HIGH.ordinal()) {
        maintenanceOrderService.createFromAlert(alert);
    }
    
    // 埋点:记录告警端到端延迟
    deviceAlertMetrics.recordAlertLatency(
        result.getOriginalDataTimestamp(), 
        result.getSeverity().name(), 
        result.getFaultType());
}

四、销售瞭望台与 Elasticsearch

1. 业务痛点

凯泵的销售团队管理 120+ 企业客户,历史合同数据积累了 10 年以上。痛点是:销售员想查"这个客户过去 5 年买了哪些型号的泵、合同金额趋势、哪些快到保修期了",要翻 ERP 系统的多个模块,甚至去翻纸质合同档案。新销售员接手客户时完全没有历史背景,客户觉得"你们公司换个人就要重新介绍一遍"。

销售瞭望台要解决的是:一个搜索框搞定客户 360 度视图——历史合同、设备档案、维保记录、告警历史、联系人、商机进展,全部可搜可聚合。

2. Elasticsearch 数据架构

索引设计:

// 合同索引
PUT /kicloud-contracts
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "analysis": {
      "analyzer": {
        "ik_smart_pinyin": {
          "type": "custom",
          "tokenizer": "ik_max_word",
          "filter": ["pinyin_filter", "lowercase"]
        }
      },
      "filter": {
        "pinyin_filter": {
          "type": "pinyin",
          "keep_full_pinyin": true,
          "keep_joined_full_pinyin": true,
          "keep_original": true
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "contract_id": { "type": "keyword" },
      "tenant_id": { "type": "keyword" },
      "customer_name": { 
        "type": "text", 
        "analyzer": "ik_smart_pinyin",
        "fields": {
          "keyword": { "type": "keyword" }
        }
      },
      "product_models": { "type": "keyword" },  // 泵型号,数组
      "contract_amount": { "type": "double" },
      "sign_date": { "type": "date" },
      "warranty_end_date": { "type": "date" },
      "sales_person": { "type": "keyword" },
      "contract_status": { "type": "keyword" },  // ACTIVE/COMPLETED/EXPIRED
      "contract_text": { "type": "text", "analyzer": "ik_max_word" },
      "tags": { "type": "keyword" }
    }
  }
}

用了 IK 中文分词 + 拼音过滤器,支持"中石化"、"zhongshihua"、"zsh"三种方式搜索同一个客户。

500GB 数据的处理:10 年合同数据约 80 万条合同记录 + 关联的设备档案约 200 万条 + 维保记录约 150 万条。原始数据在 MySQL 里,通过 Canal 监听 binlog 增量同步到 ES。历史全量数据用 Logstash 一次性导入,大约跑了 6 小时。

3. 核心查询优化

客户 360 视图——多索引联合查询 + 聚合:

销售员搜"中石化"时,后端一次请求要返回:合同列表+金额汇总、在用设备列表、近期告警、维保历史统计。用 ES 的 _msearch(多搜索)一次请求批量查多个索引:

@Service
public class CustomerViewService {

    public CustomerView getCustomerView(String tenantId, String keyword) {
        // 用_msearch一次请求查4个索引
        MultiSearchRequest multiSearch = new MultiSearchRequest();
        
        // 1. 合同搜索+聚合
        SearchRequest contractReq = new SearchRequest("kicloud-contracts");
        contractReq.source(new SearchSourceBuilder()
            .query(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("customer_name", keyword))
                .filter(QueryBuilders.termQuery("tenant_id", tenantId)))
            .aggregation(AggregationBuilders.sum("total_amount")
                .field("contract_amount"))
            .aggregation(AggregationBuilders.dateHistogram("yearly_amount")
                .field("sign_date")
                .calendarInterval(DateHistogramInterval.YEAR)
                .subAggregation(AggregationBuilders.sum("amount")
                    .field("contract_amount")))
            .size(20)
            .sort("sign_date", SortOrder.DESC));
        multiSearch.add(contractReq);
        
        // 2. 设备搜索
        SearchRequest deviceReq = new SearchRequest("kicloud-devices");
        deviceReq.source(new SearchSourceBuilder()
            .query(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("customer_name", keyword))
                .filter(QueryBuilders.termQuery("tenant_id", tenantId)))
            .size(50));
        multiSearch.add(deviceReq);
        
        // 3. 近期告警
        SearchRequest alertReq = new SearchRequest("kicloud-alerts");
        alertReq.source(new SearchSourceBuilder()
            .query(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("customer_name", keyword))
                .filter(QueryBuilders.termQuery("tenant_id", tenantId))
                .filter(QueryBuilders.rangeQuery("created_at")
                    .gte("now-30d")))
            .size(10)
            .sort("created_at", SortOrder.DESC));
        multiSearch.add(alertReq);
        
        // 4. 保修到期预警
        SearchRequest warrantyReq = new SearchRequest("kicloud-contracts");
        warrantyReq.source(new SearchSourceBuilder()
            .query(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("customer_name", keyword))
                .filter(QueryBuilders.termQuery("tenant_id", tenantId))
                .filter(QueryBuilders.rangeQuery("warranty_end_date")
                    .gte("now").lte("now+90d")))
            .size(10));
        multiSearch.add(warrantyReq);
        
        MultiSearchResponse responses = esClient.msearch(multiSearch, 
            RequestOptions.DEFAULT);
        
        return assembleCustomerView(responses);
    }
}

_msearch 的好处是 4 个查询在 ES 内部并行执行,总延迟取决于最慢的那个查询,而不是 4 个串行累加。实测客户 360 视图的查询延迟 p95 在 350ms 左右。

缓存策略:

热门客户的 360 视图查询结果在 Redis 里缓存 5 分钟(TTL),用 tenant_id + customer_keyword 做 key。合同数据变更时通过 Canal 事件触发缓存失效。缓存命中率约 45%(销售团队一天内多次查同一客户很常见)。

索引生命周期管理(ILM):

500GB 数据不可能全放热节点。用 ES 的 ILM 策略做冷热分层:

PUT _ilm/policy/kicloud-contract-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": { 
          "rollover": { "max_size": "30gb" }
        }
      },
      "warm": {
        "min_age": "90d",
        "actions": {
          "shrink": { "number_of_shards": 1 },
          "forcemerge": { "max_num_segments": 1 }
        }
      },
      "cold": {
        "min_age": "365d",
        "actions": {
          "allocate": { "require": { "data": "cold" } }
        }
      }
    }
  }
}

近 3 个月的合同在热节点(SSD),3-12 个月在温节点,1 年以上在冷节点(HDD)。冷数据查询慢一些(1-2 秒),但 90% 的查询都命中热数据。


五、MTTR 从 48 小时缩短至 6 小时的验证

1. MTTR 数据采集

每条设备告警从触发到闭环,系统记录了完整的时间线:

CREATE TABLE device_alert (
    id BIGINT PRIMARY KEY,
    tenant_id VARCHAR(64),
    device_id VARCHAR(64),
    alert_triggered_at DATETIME NOT NULL,     -- 告警触发时间
    notification_sent_at DATETIME,             -- 通知发出时间
    engineer_accepted_at DATETIME,             -- 工程师接单时间
    onsite_arrived_at DATETIME,                -- 到达现场时间(可选,远程解决则无)
    repair_completed_at DATETIME,              -- 维修完成时间
    alert_closed_at DATETIME,                  -- 告警关闭时间
    resolution_type ENUM('REMOTE','ONSITE','FALSE_ALARM'),
    -- ...
);

MTTR = AVG(repair_completed_at - alert_triggered_at)

2. 对比数据

上线前 48 小时的 MTTR 是从凯泵售后部门的历史工单记录里统计的(他们有一个简单的售后工单系统,记录了故障报告时间和处理完成时间),样本是过去一年的约 600 条维修工单。

上线后统计了 6 个月的数据,约 1200 条告警工单:

阶段

样本量

平均 MTTR

中位数 MTTR

上线前(历史数据)

600 条

48 小时

42 小时

上线后全量

1200 条

5.8 小时

4.2 小时

其中远程解决

720 条(60%)

2.1 小时

1.5 小时

其中现场解决

480 条(40%)

11.4 小时

9 小时

核心改善来自三个方面:告警自动推送替代了"客户打电话报修"的滞后环节(节省了 12-24 小时的信息传递时间);振动诊断给出了故障模式和建议,60% 的问题售后工程师可以远程指导客户处理(之前不知道什么问题就只能派人去现场);剩余 40% 需要现场的,工程师出发前已经知道故障类型,可以提前备好配件,减少了"到了现场才发现没带对零件"的无效往返。


六、高并发场景下的踩坑

坑一:告警风暴

上线第二周就遇到了。某客户工厂停电后恢复供电,几十台设备同时重启,重启瞬间的振动值都偏高(启动冲击),触发了 40+ 条 WARNING/DANGER 级别告警。售后工程师的手机在 2 分钟内收到了 40 多条企业微信通知和 20 多条短信,直接被"轰炸"了。而且这些全是误报——设备启动冲击是正常的,运行稳定后振动就恢复正常了。

解决方案分三层:

第一层——启动态免疫窗口。设备从离线变为在线后,有一个 5 分钟的"免疫期",这期间的振动数据只记录不告警:

public boolean isInImmunityWindow(String deviceId) {
    String onlineTimeStr = redisTemplate.opsForHash()
        .get(deviceStatusKey(deviceId), "lastOnlineAt").toString();
    long onlineTime = Long.parseLong(onlineTimeStr);
    return System.currentTimeMillis() - onlineTime < 5 * 60 * 1000;
}

第二层——同客户告警聚合。AlertManager 的 group_by 配了 tenant_id,同一客户的告警在 30 秒内聚合成一条通知,而不是每条单独推送。

第三层——告警升级策略。初始只推企业微信,如果 15 分钟内告警未被确认(工程师点"已查看"),才升级到短信;30 分钟未确认才打电话。避免一上来就全渠道轰炸。

坑二:K8s HPA 扩容延迟导致设备数据积压

某个工作日早上 8 点,120 家客户的设备集中开机,设备数据上报 TPS 从夜间的 200 突然飙到 2500。HPA 检测到 CPU 超标后触发扩容,但新 Pod 从调度→拉镜像→启动→就绪 整个过程大约需要 60-90 秒。这 60 秒内现有 3 个 Pod 扛不住 2500 TPS,MQTT 消息开始积压,部分设备数据延迟超过 30 秒,触发了 DeviceAlertLatencyHigh 告警。

解决方案:

短期——把 device-monitor 的 minReplicas 从 3 调到 5,保证基线容量能扛住早高峰的初始冲击。

长期——做了预热扩容。用 CronHPA(K8s 的定时 HPA 扩展)在每天早上 7:50 提前扩容到 6 个 Pod,8 点高峰来的时候已经准备好了:

apiVersion: autoscaling.alibabacloud.com/v1beta1
kind: CronHorizontalPodAutoscaler
metadata:
  name: device-monitor-cron-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: kicloud-device-monitor
  jobs:
  - name: "morning-peak-scaleup"
    schedule: "50 7 * * 1-6"    # 周一到周六7:50
    targetSize: 6
  - name: "night-scaledown"
    schedule: "0 22 * * *"      # 每晚22:00
    targetSize: 3

同时把镜像拉取策略改为 imagePullPolicy: IfNotPresent + 预拉取(DaemonSet 在所有 Worker 节点上提前拉好最新镜像),新 Pod 启动时间从 90 秒降到 25 秒。

坑三:Elasticsearch 查询雪崩

销售团队季度末做业绩汇总时,多个销售员同时在瞭望台上跑"全客户合同金额年度统计"这种重聚合查询。ES 集群 3 个数据节点的 CPU 全部打到 95%,JVM GC 频繁,连简单的客户搜索也变慢,查询超时率飙到 30%。

排查: Grafana 上 ES 集群的 thread_pool.search.rejected 指标从 0 飙到几十/秒,jvm.gc.collection.time 急剧上升。查 ES 的 _tasks API 发现有 8 个重聚合查询在并行跑,每个都要扫描 500GB 数据的年度聚合。

解决方案三管齐下:

第一——查询熔断。配置 ES 的 indices.breaker.request.limit 为 40%(默认 60%),单个查询如果预估内存超过 JVM 堆的 40% 直接拒绝,防止单个查询拖垮集群。

第二——预计算聚合表。年度/季度/月度的合同金额统计不再实时聚合,而是用 XXL-JOB 定时任务每天凌晨跑一次聚合,结果写入 kicloud-contract-stats 索引。前端查统计数据直接查这个预计算索引,毫秒级返回:

@XxlJob("contract-stats-aggregation")
public void aggregateContractStats() {
    // 按客户+年度聚合合同金额
    SearchRequest request = new SearchRequest("kicloud-contracts");
    request.source(new SearchSourceBuilder()
        .size(0)
        .aggregation(AggregationBuilders.terms("by_customer")
            .field("customer_name.keyword").size(10000)
            .subAggregation(AggregationBuilders.dateHistogram("by_year")
                .field("sign_date")
                .calendarInterval(DateHistogramInterval.YEAR)
                .subAggregation(AggregationBuilders.sum("total")
                    .field("contract_amount")))));
    
    SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
    // 解析聚合结果写入预计算索引
    List<ContractStat> stats = parseAggregation(response);
    bulkIndexStats(stats);
}

第三——按租户做查询队列限制。在应用层加了一个 Semaphore,同一租户同时最多 3 个 ES 重查询在跑,超过排队等待:

@Component
public class EsQueryThrottle {
    private final Map<String, Semaphore> tenantSemaphores = new ConcurrentHashMap<>();
    
    public <T> T executeWithThrottle(String tenantId, Callable<T> query) 
            throws Exception {
        Semaphore semaphore = tenantSemaphores.computeIfAbsent(
            tenantId, k -> new Semaphore(3));
        
        if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
            throw new BizException("查询繁忙,请稍后重试");
        }
        try {
            return query.call();
        } finally {
            semaphore.release();
        }
    }
}

三个措施上线后,ES 集群在季度末高峰期 CPU 稳定在 60% 以下,查询超时率降到 1% 以内。

一台故障设备拖垮整个平台:SaaS 系统中"线程池雪崩"的排查与根治 2025-01-11
KiCloud SaaS 可用性保障——线程池隔离、限流降级与 MTTR 闭环机制 2025-01-15

评论区