RAG垂直知识库+AI智能客服项目

_

Q1:从0到1的搭建AIGC 智能客服系统

业务场景

趣玩搭平台日活1000+、日订单500+,用户咨询主要集中在三类问题:活动规则(时间地点人数变更)、退票退款流程、会员积分权益。之前靠2个客服人工回复,高峰期(周五晚到周日)响应经常超过10分钟,用户投诉率比较高。老板要求把重复性问题交给AI,人工只兜底复杂纠纷,所以我来主导这个智能客服模块的落地。

技术选型

Spring AI + bge-m3(本地) + Qdrant(向量库) + DeepSeek API + Qwen2-7B + Redis

为什么用Spring AI?

直接调用大模型API,全靠prompt,token消耗大容易幻觉。LangChain4j学习成本偏高,额外写胶水代码。Spring AI,它天然和Spring生态无缝集成,通过spring-ai-starter可以直接注入ChatClientEmbeddingClient,和我们现有的Nacos配置中心、Gateway网关无缝衔接。更关键的是它对向量数据库的抽象做得比较好,VectorStore接口直接支持Qdrant,RAG的QuestionAnswerAdvisor开箱即用,我们不用自己拼检索-生成的管道。对于一个20人的初创团队来说,选一个和现有技术栈摩擦最小的方案,落地速度是第一优先级。

Embedding模型选型

选bge-m3而不是调用OpenAI的text-embedding-ada是两个原因:一是数据安全,活动规则、俱乐部财务相关的知识库内容不希望出境;二是成本,我们日均咨询量在几百到上千条,如果每条都调外部Embedding API,长期成本不可控。bge-m3支持中文效果好,而且支持dense+sparse混合检索,我们用一台带T4 GPU的服务器本地部署,通过FastAPI包了一层REST接口,Spring AI的EmbeddingClient自定义实现对接这个接口。单条Embedding延迟在20ms左右,完全够用。

向量库选择

原业务在mysql上,Qdrant可docker一键部署,有过滤需求,Milvus太重部署复杂

知识库构建与Qdrant索引

知识库来源主要三块:产品文档里的活动规则、退款政策、会员权益说明;历史客服QA对(从融云聊天记录里清洗出来的高频问答约500条);以及主理人常见操作指南。

文档预处理用的是分块策略:按语义段落切分,每个chunk控制在300-500 token,chunk之间保留50 token的重叠(sliding window),避免语义被截断。每个chunk带上元数据标签,比如category: refundcategory: membershipclub_id: xxx,这个元数据后面做检索过滤非常关键。

切好的chunk通过bge-m3生成向量,写入Qdrant。Qdrant里建了一个collection叫cs_knowledge,用的HNSW索引,距离度量选cosine。同时对元数据字段建了payload index,支持后续按类别做过滤检索。

检索策略

检索分两步。第一步是意图识别:用户问题先过一个轻量的Qwen2-7B(本地部署),做意图分类,判断是退款类、活动规则类、会员权益类还是闲聊。这一步用的是few-shot prompt,不走RAG,纯分类任务,响应很快,大概200ms。

第二步是向量检索+元数据过滤:根据意图分类结果,在Qdrant里做带filter的相似度搜索。比如意图是"退款",就加category: refund的过滤条件,TopK取3-5条。这样做的好处是缩小检索范围,减少噪声文档干扰,也降低了后续大模型的token消耗。

检索完还会做一个相关性阈值过滤,cosine similarity低于0.72的chunk直接丢弃。如果过滤后一条都不剩,就走兜底逻辑转人工。

Prompt模板设计

[System] 你是趣玩搭平台的官方客服助手。请严格基于以下参考资料回答用户问题。
如果参考资料中没有相关信息,请直接说"这个问题我需要转接人工客服为您处理",不要编造答案。
​
[参考资料]
{retrieved_chunks}
​
[对话历史]
{recent_3_turns}
​
[用户问题]
{user_query}

几个关键设计点:system prompt里强调"严格基于参考资料"和"不要编造",这是控制幻觉的第一道防线。对话历史只保留最近3轮,多了反而引入噪声。生成模型用的是DeepSeek API(性价比高,中文能力强),temperature设成0.3,尽量减少创造性发挥。

Redis缓存命中逻辑

这是我们做性能优化时加的一层。思路是:很多用户问的问题高度相似,比如"怎么退票"、"我要退款"、"退票流程是什么"本质上是同一个问题。所以我们做了语义缓存

用户query先通过bge-m3算出向量,拿这个向量去Redis里做近似匹配(我们用RedisSearch的向量检索能力)。如果命中了一个cosine similarity > 0.95的缓存项,直接返回缓存的答案,不走Qdrant检索和大模型生成,响应时间从2-3秒降到200ms以内。

缓存的key是query向量,value是最终回答+过期时间。过期策略是TTL 24小时,加上知识库更新时主动清缓存。缓存命中率稳定在35%-40%左右,对降低大模型调用成本帮助很大。

Q2:上线后遇到的核心问题及解决

问题1:幻觉——退款金额计算错误

这是上线第一周最严重的问题。用户问"我买了158元的票,活动开始前2小时退票能退多少",模型有时会自己算出一个金额来,但实际退款比例是业务规则决定的(24小时外全退,24小时内扣20%,开始后不退),模型并不总是准确引用规则。

排查发现原因有两个:一是知识库里退款规则的chunk粒度太粗,把所有退款场景写在一个大段落里,检索回来的chunk信息密度太大,模型容易只关注部分内容。二是Prompt里没有明确禁止模型做数学计算。

解决方案:把退款规则拆成更细粒度的chunk,每种退款场景独立一个chunk(比如"24小时外退票规则"、"24小时内退票规则"各一条);Prompt里加了一句"涉及金额计算的问题请引导用户联系人工客服或在App内查看退款预估金额";同时对涉及金额的回答加了一个后置校验——用正则检测回答里是否包含具体金额数字,如果有就触发人工复核流程。

问题2:多租户数据隔离

趣玩搭是多俱乐部模式,不同俱乐部的活动规则可能不同(比如A俱乐部支持活动前12小时免费退,B俱乐部是24小时)。上线初期知识库是全局共享的,导致A俱乐部用户问退票政策,可能检索到B俱乐部的规则。

解决方案:在chunk的元数据里增加club_id字段。用户发起咨询时,从会话上下文里拿到其所属俱乐部ID(通过订单关联),检索时在Qdrant的filter条件里加上club_id匹配。通用规则(平台级别的)标记为club_id: global,检索时用OR条件同时匹配。这个改动之后,回答准确率从最初的82%提升到了93%左右。

问题3:首次响应延迟偏高

完整链路跑一遍——意图识别(Qwen2-7B)+ Embedding + Qdrant检索 + DeepSeek生成——端到端要3-4秒,用户体感不好。

优化手段分几步:第一是前面提到的Redis语义缓存,命中时直接200ms返回;第二是意图识别从Qwen2-7B改成了用规则引擎+关键词匹配做第一层粗筛(覆盖80%的高频意图),只有匹配不到的才走模型分类;第三是DeepSeek API调用开启了streaming模式,前端逐字显示,用户感知到的"首字时间"从3秒降到了800ms左右。综合下来,整体平均响应时间从3.5秒降到1.4秒。

Q3:指标验证方法

关于"响应时效提升60%、人工客服降低50%、准确率95%"这几个数据的验证:

响应时效:在Gateway层埋点,记录每次客服会话的首次响应时间(从用户发送消息到收到回复的时间差)。上线前人工平均响应时间是5-8分钟(从融云IM记录里统计),上线后AI自动回复的平均响应是1.4秒。这个60%是保守算法——把AI处理+转人工的整体平均响应时间和纯人工时期做对比。数据看板用的是Grafana,数据源是Prometheus采集的接口响应时间指标。

人工客服工作量降低50%:统计维度是每日转人工的会话数。上线前日均400+会话全部人工处理,上线后AI直接解决的占55%-60%,转人工的降到180左右。这个数据通过XXL-JOB每天跑一个统计任务,写入运营报表,产品那边每周Review。

准确率95%:这个是通过人工抽检来验证的。每天随机抽取50条AI回复的会话,由客服同事按"完全正确/部分正确/错误"三档评分。上线优化稳定后,完全正确率在93%-95%之间,部分正确约4%,错误在1%-2%。没有做严格的A/B实验(团队规模和用户量不支持),但有一个简单的灰度方案:上线初期只对30%的用户开启AI客服,其余走人工,对比两组的用户满意度评分(会话结束后的1-5星评价),AI组平均4.2星,人工组4.0星,差异不大后才全量放开。

Q4:知识库更新完整链路

1. 整体架构:事件驱动 + 异步管道

我们没有用定时轮询,而是走的事件驱动模式。核心原因是退款规则这种东西一旦改了,必须尽快生效,靠定时任务有窗口期,用户在窗口期内拿到旧答案会引发客诉。

完整链路是这样的:

文档上传入口:运营在管理后台(yudao-cloud的admin模块)编辑知识库文档,支持两种方式——在线富文本编辑(适合退款规则这种结构化内容)和上传Markdown/Word文件(适合批量导入)。保存时写入MySQL的knowledge_doc表,字段包括doc_idclub_idcategorycontentversion(乐观锁版本号)、updated_at

事件发布:文档保存成功后,Service层通过Spring ApplicationEvent发布一个KnowledgeDocUpdateEvent,携带doc_idclub_id。这里没有直接用RocketMQ的原因是知识库更新频率不高(日均也就几次到十几次),ApplicationEvent走的是进程内事件,链路更短,出问题好排查。但事件监听器内部做的第一件事就是把消息投递到RocketMQ的knowledge-update-topic,目的是解耦和保证可靠性——如果下游处理失败,消息还在MQ里可以重试。

异步消费与处理管道:RocketMQ的Consumer收到消息后,执行以下步骤:

Step1: 从MySQL读取最新文档内容(根据doc_id查,带version校验)
Step2: 文档分块(chunking)
Step3: 调用bge-m3生成Embedding向量
Step4: Qdrant upsert(带元数据)
Step5: Redis缓存失效(按club_id + category维度清除)
Step6: 写更新日志到knowledge_update_log表

2. 分块策略的具体实现

分块逻辑封装在一个DocumentChunker组件里,不是用Spring AI自带的TokenTextSplitter,因为它对中文的分割效果不理想(会在汉字中间截断)。我们自己实现了一个基于段落和语义边界的Splitter:

核心逻辑是先按双换行符\n\n切段落,然后对超过500 token的段落再按句号、分号等中文标点二次切分。每个chunk控制在300-500 token,相邻chunk之间保留50 token的overlap。切完后每个chunk生成一个确定性的chunk_id,算法是MD5(doc_id + chunk_sequence_number),这个ID后面在Qdrant里作为point ID使用,保证同一份文档重新分块后,相同位置的chunk会覆盖旧的而不是新增。

每个chunk附带的元数据结构大致是:

{
  "doc_id": "doc_20240601_refund_rule",
  "chunk_seq": 2,
  "club_id": "club_1024",
  "category": "refund",
  "doc_version": 7,
  "updated_at": 1717200000
}

这里doc_version非常关键,后面讲缓存一致性会用到。

3. Embedding生成与Qdrant Upsert

Embedding调用的是我们本地bge-m3服务的REST接口,Spring AI里自定义了一个EmbeddingClient实现:

@Component
public class BgeM3EmbeddingClient implements EmbeddingClient {
    
    @Value("${embedding.bge-m3.url}")
    private String bgeServiceUrl;  // http://gpu-server:8080/embedding
    
    private final RestTemplate restTemplate;
​
    @Override
    public List<Embedding> embed(List<String> texts) {
        // 批量调用bge-m3,一次最多传32条
        BgeRequest request = new BgeRequest(texts);
        BgeResponse response = restTemplate.postForObject(
            bgeServiceUrl, request, BgeResponse.class);
        // 转换为Spring AI的Embedding对象
        return response.getVectors().stream()
            .map(vec -> new Embedding(vec, texts.indexOf(vec)))
            .collect(Collectors.toList());
    }
}

Qdrant的upsert用的是官方Java客户端qdrant-client,封装在自定义的QdrantVectorStore里:

public void upsert(List<ChunkWithEmbedding> chunks) {
    List<PointStruct> points = chunks.stream().map(chunk -> {
        // chunk_id作为point ID(转成UUID格式)
        String pointId = toUUID(chunk.getChunkId());
        return PointStruct.newBuilder()
            .setId(PointId.newBuilder().setUuid(pointId))
            .addAllVectors(chunk.getVector())  // 1024维 bge-m3 dense向量
            .putAllPayload(Map.of(
                "doc_id", value(chunk.getDocId()),
                "club_id", value(chunk.getClubId()),
                "category", value(chunk.getCategory()),
                "doc_version", value(chunk.getDocVersion()),
                "text", value(chunk.getText()),
                "updated_at", value(chunk.getUpdatedAt())
            ))
            .build();
    }).collect(Collectors.toList());
​
    // 使用upsert,相同point ID会覆盖
    qdrantClient.upsertAsync("cs_knowledge", points).get(10, TimeUnit.SECONDS);
}

关键点:用upsert而不是insert,保证同一个chunk重新处理后是覆盖而不是重复写入。point ID由doc_id + chunk_seq确定性生成,所以文档更新后重新分块,只要段落位置没变,就是原地更新。

旧chunk清理:如果文档更新后段落变少了(比如从10个chunk变成8个),那原来的chunk 9和10就成了孤儿数据。所以upsert之前会先按doc_id做一次条件删除:

qdrantClient.deleteAsync("cs_knowledge", 
    Filter.newBuilder().addMust(
        FieldCondition.newBuilder()
            .setKey("doc_id")
            .setMatch(Match.newBuilder().setKeyword(docId))
    ).build()
).get();
// 删完后再upsert新的chunks

先删后写,保证不留脏数据。这两步不是原子的,但实际影响很小——中间有几百毫秒的空窗期,即使这期间有用户查询,也只是检索不到这篇文档的内容,会走兜底转人工,不会返回错误答案。

4. 如何保证5分钟内同步

实际上整个链路跑下来远不到5分钟。实测数据:文档保存到MQ投递约200ms,Consumer消费到读取文档约500ms,分块约100ms,bge-m3 Embedding(假设10个chunk批量调用)约300ms,Qdrant删除+upsert约500ms,Redis清缓存约50ms。端到端通常在2-3秒内完成。

"5分钟"的保证更多是容错设计:如果Consumer处理失败(比如bge-m3服务临时不可用),RocketMQ会重试,重试策略是1s、5s、30s、1min、5min,5分钟内最多重试5次。如果5次都失败,进入死信队列,同时触发AlertManager告警通知运维。

Q5:Redis语义缓存的具体实现

1. 整体方案:RedisSearch向量索引

我们没有用Redis原生的String/Hash去存向量然后自己算余弦相似度(那性能太差),而是用了Redis Stack里的RedisSearch模块,它原生支持向量索引和KNN搜索。

Redis里建了一个索引,结构如下:

FT.CREATE idx:semantic_cache ON HASH PREFIX 1 sc: 
SCHEMA
  query_vector VECTOR HNSW 6 
    TYPE FLOAT32 
    DIM 1024 
    DISTANCE_METRIC COSINE
  answer TEXT
  club_id TAG
  category TAG
  doc_version NUMERIC
  created_at NUMERIC

2. 缓存写入逻辑

当一次完整的RAG请求完成后(经过Qdrant检索 + DeepSeek生成),在返回答案给用户的同时,异步写入缓存:

public void cacheSemanticResult(String query, float[] queryVector,
                                 String answer, String clubId, 
                                 String category, long docVersion) {
    String key = "sc:" + generateCacheKey(query, clubId);
    
    Map<String, Object> fields = new HashMap<>();
    fields.put("query_vector", toByteArray(queryVector));
    fields.put("answer", answer);
    fields.put("club_id", clubId);
    fields.put("category", category);
    fields.put("doc_version", docVersion);
    fields.put("created_at", System.currentTimeMillis());
    
    redisTemplate.opsForHash().putAll(key, fields);
    // TTL 24小时
    redisTemplate.expire(key, Duration.ofHours(24));
}

generateCacheKey的实现是SHA256(query_text + club_id),目的不是做精确匹配,而是给每条缓存一个唯一key。实际的"相似查询命中"走的是向量KNN搜索,不是key匹配。

3. 缓存查询逻辑

用户新query进来后,先算Embedding向量,然后在Redis里做KNN搜索:

public Optional<String> searchCache(float[] queryVector, String clubId) {
    // RedisSearch KNN查询,带club_id过滤
    String queryStr = String.format(
        "(@club_id:{%s})=>[KNN 1 @query_vector $vec AS score]",
        clubId
    );
    
    Query query = new Query(queryStr)
        .addParam("vec", toByteArray(queryVector))
        .returnFields("answer", "score", "doc_version")
        .limit(0, 1)
        .dialect(2);
    
    SearchResult result = jedisClient.ftSearch("idx:semantic_cache", query);
    
    if (result.getTotalResults() > 0) {
        Document doc = result.getDocuments().get(0);
        double score = Double.parseDouble(doc.getString("score"));
        
        // cosine distance,越小越相似,转成similarity = 1 - distance
        double similarity = 1.0 - score;
        
        if (similarity >= 0.95) {
            long cachedVersion = Long.parseLong(doc.getString("doc_version"));
            // 版本校验:缓存的doc_version必须>=当前知识库版本
            if (cachedVersion >= getCurrentDocVersion(clubId)) {
                return Optional.of(doc.getString("answer"));
            }
            // 版本过期,主动删除这条缓存
            redisTemplate.delete(doc.getId());
        }
    }
    return Optional.empty();
}

关键设计点有三个:

第一是阈值0.95,这个比较严格,是故意的。宁可缓存少命中一点,也不能返回"差不多但其实不对"的答案。比如"怎么退票"和"退票后多久到账"similarity大概在0.88左右,不应该命中同一个缓存。

第二是club_id过滤,KNN搜索加了club_id的TAG过滤,保证A俱乐部的用户不会命中B俱乐部的缓存答案。

第三是doc_version校验,这是保证缓存一致性的核心机制。每条缓存记录带了生成时对应的doc_version,查询时会比对当前知识库的最新version。如果缓存的version落后了,说明知识库已经更新过了,这条缓存不可信,主动删除并走正常RAG流程。

4. 知识库更新时的主动缓存失效

前面讲的doc_version校验是被动失效(查的时候才发现过期),但我们还有一层主动失效。在知识库更新链路的Step5里,Consumer处理完Qdrant upsert之后,会按club_id + category维度批量清除Redis缓存:

public void invalidateCache(String clubId, String category) {
    // 用RedisSearch按club_id + category条件查出所有相关缓存key
    String queryStr = String.format("@club_id:{%s} @category:{%s}", 
                                     clubId, category);
    Query query = new Query(queryStr).returnFields("__key").limit(0, 1000);
    SearchResult result = jedisClient.ftSearch("idx:semantic_cache", query);
    
    // 批量删除
    List<String> keys = result.getDocuments().stream()
        .map(Document::getId)
        .collect(Collectors.toList());
    
    if (!keys.isEmpty()) {
        redisTemplate.delete(keys);
        log.info("Invalidated {} cache entries for club={}, category={}", 
                 keys.size(), clubId, category);
    }
}

所以缓存一致性是三层保障:主动失效(更新时批量清)+ 被动版本校验(查时比对version)+ TTL兜底(24小时自动过期)。三层叠加后,实际出现用户拿到旧答案的概率非常低。

Q6:实际踩过的坑

坑1:Qdrant先删后写的空窗期引发短暂"知识消失"

前面提到文档更新时先按doc_id删除旧chunk,再upsert新chunk。有一次运营同时更新了3篇核心退款文档,触发了3个并发Consumer,Qdrant短时间内执行了大量delete+upsert操作。恰好这几百毫秒内有用户咨询退款问题,Qdrant检索回来的结果为空(相关chunk刚被删还没写回来),AI直接回了"请联系人工客服",但实际这是有明确答案的常见问题。

排查过程:用户反馈后我们查了knowledge_update_log,发现该时间点确实有3条文档同时更新。又查了Qdrant的操作日志(通过Prometheus抓的qdrant_grpc_request_duration指标),确认delete和upsert之间有300-500ms的间隔。

解决方案:改成了先写后删策略——先upsert新chunk(因为point ID相同会覆盖),然后根据新的chunk ID集合和旧的chunk ID集合做diff,只删除差集(即被移除的chunk)。这样在更新过程中,Qdrant里始终有可用的chunk,最坏情况是短暂返回旧版本内容,但不会出现"什么都查不到"的情况。

// 改进后的逻辑
public void updateDocument(String docId, List<ChunkWithEmbedding> newChunks) {
    Set<String> newChunkIds = newChunks.stream()
        .map(c -> toUUID(c.getChunkId()))
        .collect(Collectors.toSet());
    
    // 1. 先查出旧的chunk IDs
    Set<String> oldChunkIds = getExistingChunkIds(docId);
    
    // 2. 先upsert新的(覆盖已有的 + 新增的)
    upsert(newChunks);
    
    // 3. 删除差集(旧有但新版本里不存在的)
    Set<String> toDelete = new HashSet<>(oldChunkIds);
    toDelete.removeAll(newChunkIds);
    if (!toDelete.isEmpty()) {
        deleteByPointIds(toDelete);
    }
}

坑2:Redis缓存与Qdrant不一致导致答案矛盾

有一次出现了一个很诡异的现象:同一个用户在5分钟内问了两次"活动前多久可以退票",第一次回答"24小时前可全额退",第二次回答"12小时前可全额退"。原因是运营在两次提问之间更新了退款规则(从24小时改成12小时),主动缓存失效确实执行了,但执行顺序出了问题——Consumer先清了Redis缓存,然后才开始Qdrant的upsert。在Redis已清、Qdrant还没更新完的窗口期,第二次查询没命中缓存,走了RAG流程,但Qdrant里还是旧数据,生成了基于旧规则的回答,同时这个回答又被写入了Redis缓存(带的是新的doc_version,因为MySQL里version已经更新了,但Qdrant里的内容还是旧的)。

排查过程:对比了两次回答的时间戳和knowledge_update_log里各步骤的执行时间,发现Redis失效在Qdrant upsert之前约1.2秒完成。

解决方案:调整了Consumer内部的执行顺序,必须先完成Qdrant upsert,再清Redis缓存。逻辑上的核心原则是——在新数据完全就绪之前,宁可让用户命中旧缓存拿到旧答案(至少是一致的旧答案),也不能出现"缓存没了但新数据也没就位"的中间状态。

// 修正后的Consumer处理顺序
@RocketMQMessageListener(topic = "knowledge-update-topic")
public class KnowledgeUpdateConsumer {
    
    public void onMessage(KnowledgeUpdateMessage msg) {
        // Step1: 读文档
        KnowledgeDoc doc = docService.getById(msg.getDocId());
        // Step2: 分块
        List<Chunk> chunks = chunker.split(doc);
        // Step3: Embedding
        List<ChunkWithEmbedding> embeddedChunks = embeddingClient.embed(chunks);
        // Step4: Qdrant upsert(先完成数据就位)
        qdrantVectorStore.updateDocument(doc.getDocId(), embeddedChunks);
        // Step5: 最后才清Redis缓存
        cacheService.invalidateCache(doc.getClubId(), doc.getCategory());
        // Step6: 记日志
        updateLogService.log(doc.getDocId(), "SUCCESS", System.currentTimeMillis());
    }
}

坑3:bge-m3服务重启导致向量漂移

有一次GPU服务器做了系统升级重启,bge-m3的FastAPI服务用了不同版本的PyTorch重新加载了模型。结果发现重启后生成的Embedding和之前的有微小差异(同一段文本前后两次Embedding的cosine similarity约0.97-0.98,不是完全一致)。这导致Redis语义缓存的命中率突然从35%掉到了15%左右——因为新query的向量和缓存里旧向量的相似度刚好被0.95的阈值卡掉了。

排查过程:先看Grafana上的缓存命中率监控(我们有一个专门的semantic_cache_hit_rate指标),发现是GPU服务器重启那个时间点开始骤降。然后写了个测试脚本,用同一批query在重启前后分别生成Embedding,对比发现确实有漂移。

解决方案:短期措施是写了一个脚本,用当前bge-m3服务重新计算所有缓存条目的向量,刷新Redis。长期措施有两个——一是固定了bge-m3服务的Docker镜像版本和所有依赖版本(pip freeze锁定),避免环境变化导致结果不一致;二是在服务健康检查里加了一个"向量一致性探针":启动时用5条固定的benchmark文本生成Embedding,和预存的baseline向量做比对,如果cosine similarity低于0.999就告警并阻止服务注册到Nacos。

Q7:运维监控

监控体系用的是项目整体的Prometheus + Grafana + AlertManager,针对知识库更新链路加了几个专项指标:

knowledge_update_duration_seconds:从文档保存到全链路完成的端到端耗时,Histogram类型,p50/p95/p99分位。正常情况p95在3秒以内。

knowledge_update_success_total / knowledge_update_failure_total:更新成功/失败计数,failure连续超过3次触发PagerDuty告警。

semantic_cache_hit_rate:每5分钟统计一次缓存命中率,低于20%告警(正常在35%-40%)。

qdrant_search_latency_ms:Qdrant检索延迟,p99超过500ms告警。

embedding_service_latency_ms:bge-m3服务响应时间,p99超过200ms告警。

Grafana上有一个专门的"AI客服"Dashboard,分三个区域:实时对话量和AI/人工分流比、RAG链路各环节延迟分布、知识库更新状态和缓存命中率趋势。运维和我每天早上看一眼,异常指标会通过AlertManager推到企业微信群。

Q8:LLM 调用成本监控与优化

1. 混合路由架构:DeepSeek API + 本地 Qwen2-7B 分层调度

成本优化的核心思路是能不调大模型就不调,必须调的时候选最便宜够用的。所以我们把请求分成了三层:

第一层:Redis语义缓存直接命中,命中率35%-40%,零LLM成本,响应200ms以内。这一层前面已经详细讲过了。

第二层:本地Qwen2-7B处理简单意图。意图识别本身就跑在Qwen2-7B上,但我们发现有一类问题其实不需要走RAG——比如"你好"、"在吗"、"谢谢"这种闲聊,以及"我的订单号是xxx,帮我查一下"这种需要调业务接口而不是查知识库的问题。对于闲聊,Qwen2-7B直接用固定的few-shot prompt生成回复,不走Qdrant也不走DeepSeek;对于订单查询类,Qwen2-7B做意图+实体抽取(提取订单号),然后调内部的订单查询API,把结果用模板拼成回答。这一层大概覆盖了20%-25%的请求,也是零外部API成本。

第三层:DeepSeek API处理需要知识库RAG的复杂问题,大概占总请求的35%-40%。只有这一层会产生外部API费用。

所以实际调用DeepSeek API的请求比例大约是35%-40%,而不是100%。这个分层路由的决策点就在意图识别环节,Qwen2-7B分类完之后根据意图类型走不同的处理分支。

2. Token消耗与成本的具体监控

我们在调用DeepSeek API的Gateway封装层里做了精细埋点,每次调用记录以下指标:

@Component
public class DeepSeekCostTracker {

    private final MeterRegistry meterRegistry;

    public void trackApiCall(String clubId, String intentCategory,
                             int promptTokens, int completionTokens,
                             long latencyMs, boolean success) {
        // 1. token消耗计数器,按category打标
        meterRegistry.counter("deepseek_prompt_tokens_total",
            "category", intentCategory,
            "club_id", clubId
        ).increment(promptTokens);
        
        meterRegistry.counter("deepseek_completion_tokens_total",
            "category", intentCategory
        ).increment(completionTokens);

        // 2. 单次调用成本(DeepSeek V3的定价:
        //    input ¥1/百万token, output ¥2/百万token,缓存命中再打折)
        double cost = promptTokens * 1.0 / 1_000_000 
                    + completionTokens * 2.0 / 1_000_000;
        meterRegistry.summary("deepseek_call_cost_yuan",
            "category", intentCategory
        ).record(cost);

        // 3. 延迟直方图
        meterRegistry.timer("deepseek_api_latency",
            "category", intentCategory,
            "success", String.valueOf(success)
        ).record(Duration.ofMillis(latencyMs));
    }
}

具体数据:单次RAG会话的平均token消耗大概是prompt 800-1200 token(包含system prompt约300 token + 检索到的3-5个chunk约500-700 token + 对话历史约100-200 token),completion 150-300 token。按DeepSeek V3的定价算,单次会话成本大约在¥0.001-0.002之间,非常便宜。

月度总费用:日均约200-250次DeepSeek API调用(总咨询量500-600次,缓存命中35%约去掉200次,Qwen2-7B本地处理再去掉120次左右),每次均价¥0.0015,日均API费用约¥0.3-0.4。月度DeepSeek API费用大约¥10-12。本地Qwen2-7B和bge-m3跑在一台T4 GPU服务器上,这台服务器的月租大概¥1500-2000(这是公司内部的GPU服务器,不是专门为客服买的,AI客服只是其中一个服务),分摊到AI客服上的算力成本大约¥500-800/月。所以AI客服的总运行成本大约¥600-800/月,而之前2个人工客服的人力成本一个月大概¥12000-15000(按无锡的薪资水平),即使现在还保留1个人工客服兜底,整体成本降幅也很明显。

缓存命中率对成本的具体贡献:35%-40%的缓存命中意味着每天少调约200次DeepSeek API,按单次¥0.0015算,每天省¥0.3。这个绝对值不大,是因为DeepSeek本身就很便宜。但缓存的更大价值在于降低响应延迟(200ms vs 1.5-2s)和减轻下游各环节的并发压力,这个在高峰期尤其重要。

3. 成本告警机制

Grafana上有一个"AI客服成本"面板,核心指标:

deepseek_daily_cost_yuan:每日累计API费用,XXL-JOB每晚23:59跑统计任务写入。超过¥1触发预警(正常日均¥0.3-0.4,突然翻倍说明缓存可能失效或有异常流量)。

deepseek_call_count_hourly:每小时API调用次数,超过50次/小时告警(正常峰值约30次/小时)。

token_per_session_avg:平均每会话token消耗,超过2000告警(说明可能有异常长上下文或检索chunk过多)。

有一次这个告警真的触发了——某天下午API调用量突然飙到平时的3倍,排查发现是运营批量更新了15篇知识库文档,触发了大量Redis缓存失效,导致短时间内大量请求穿透缓存直接打到DeepSeek API。这之后我们加了一个优化:批量更新时设置一个"更新中"的标记,在这个窗口期内对缓存未命中的请求做一层短暂的排队等待(最多等3秒),等Qdrant数据就位后再处理,避免缓存穿透风暴。

Q9:高峰期稳定性保障

1. 瓶颈分析与压测

我们在上线前用JMeter做过压测,模拟周五晚高峰场景——200并发用户,每用户每分钟发送2条消息,持续30分钟。压测结果发现各环节的瓶颈排序是:

DeepSeek API最先到瓶颈。DeepSeek的API有并发限制(当时我们用的套餐是50 QPS),200并发如果缓存全不命中,理论上需要200 QPS的API调用能力,远超限额。实际上缓存+本地Qwen2-7B过滤后打到DeepSeek的并发大概40-60 QPS,刚好在临界线上,偶尔会触发429 Too Many Requests。

bge-m3是第二个瓶颈。单台T4 GPU跑bge-m3,批量推理的吞吐约100条/秒(batch size=32时),但每个请求都要先过一次Embedding,高峰期并发Embedding请求可能达到80-100 QPS,GPU利用率飙到95%以上,延迟从正常的20ms涨到150-200ms。

Qdrant和Redis相对扛得住。Qdrant单节点在我们的数据量级下(总chunk数约5000-8000条),即使200并发检索,p99延迟也只有50-80ms。Redis更不用说,KNN搜索在这个数据量下毫秒级。

2. 限流方案

在Gateway层用Sentinel做了多级限流:

全局限流:AI客服接口整体QPS上限200,超过排队等待(最多等5秒),队列满了返回"当前咨询人数较多,请稍候"的提示。

DeepSeek API调用限流:封装了一个DeepSeekRateLimiter,用Guava的RateLimiter限制到40 QPS(留10 QPS的buffer给API限额)。超出的请求不是直接拒绝,而是进入一个有界队列(容量100),异步等待令牌。队列满了才走降级。

@Component
public class DeepSeekClientWrapper {

    private final RateLimiter rateLimiter = RateLimiter.create(40.0);
    private final ExecutorService asyncPool = 
        new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());  // 队列满了调用者线程执行

    public CompletableFuture<ChatResponse> chatAsync(ChatRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            // 最多等2秒获取令牌
            if (!rateLimiter.tryAcquire(2, TimeUnit.SECONDS)) {
                throw new RateLimitExceededException("DeepSeek rate limit");
            }
            return deepSeekClient.chat(request);
        }, asyncPool);
    }
}

3. 熔断与降级

用Sentinel的熔断规则,针对DeepSeek API配置了慢调用比例熔断:如果10秒内DeepSeek API调用中超时(>5秒)的比例超过50%,触发熔断,熔断时长30秒。

熔断后的降级策略分两级

第一级降级——切换到本地Qwen2-7B做生成。Qwen2-7B虽然效果比DeepSeek差一些,但对于退款规则、活动查询这种有明确RAG上下文的问题,准确率也能到85%左右。降级时的Prompt会更严格:

你是趣玩搭客服助手。请仅基于以下参考资料回答,不确定时直接说"建议联系人工客服"。
{retrieved_chunks}
用户问题:{query}

temperature直接设成0.1,最大程度减少Qwen2-7B的"自由发挥"。

第二级降级——如果Qwen2-7B本地服务也不可用(GPU服务器挂了),则直接返回预设的兜底话术"非常抱歉,当前系统繁忙,正在为您转接人工客服",同时自动创建融云IM的人工客服会话。

@SentinelResource(value = "deepseek-chat", 
    fallback = "localModelFallback",
    blockHandler = "rateLimitHandler")
public String generateAnswer(String query, List<String> chunks, String clubId) {
    // 正常走DeepSeek API
    return deepSeekClient.chat(buildPrompt(query, chunks));
}

public String localModelFallback(String query, List<String> chunks, 
                                  String clubId, Throwable ex) {
    log.warn("DeepSeek fallback triggered: {}", ex.getMessage());
    try {
        // 降级到本地Qwen2-7B
        return qwen2Client.chat(buildStrictPrompt(query, chunks));
    } catch (Exception e) {
        // 二级降级:直接转人工
        transferToHuman(clubId, query);
        return "非常抱歉,正在为您转接人工客服,请稍候...";
    }
}

4. 实际线上故障案例

案例:DeepSeek API大面积超时导致的连锁反应

上线后第三周的一个周六晚上(高峰期),DeepSeek API突然出现大面积超时,p99延迟从正常的2秒飙到15秒以上,持续了约20分钟。

故障过程:前5分钟Sentinel的慢调用比例熔断被触发,自动切到了Qwen2-7B。但问题是大量在途的DeepSeek请求还在线程池里等待超时,线程池队列迅速被占满,CallerRunsPolicy导致Tomcat的工作线程也被阻塞,整个AI客服服务的其他接口(包括查询会话列表、发送消息等)也开始变慢。

排查过程:AlertManager在2分钟内推了告警到企业微信:deepseek_api_latency p99 > 5s。上Grafana看,DeepSeek调用的超时率从0%飙到70%。同时发现Tomcat活跃线程数从正常的30涨到了200(最大值),等待队列开始堆积。

解决措施

短期:手动开启了一个"全量降级"开关(Nacos配置中心里的一个配置项ai.deepseek.force-fallback=true),所有请求直接走Qwen2-7B,不再尝试DeepSeek API。等DeepSeek API恢复后再关闭。

长期改进了三个地方。第一,给DeepSeek API调用加了严格的超时控制——连接超时2秒、读超时5秒,超时立即断开释放线程,不让在途请求拖累线程池。第二,把DeepSeek API调用的线程池和业务接口的线程池做了隔离(Bulkhead模式),DeepSeek的线程池最大20个线程,即使全部阻塞也不影响其他接口。第三,在Nacos里加了三个动态配置开关,运维可以实时调整而不用重启服务:

ai:
  deepseek:
    force-fallback: false          # 强制降级开关
    timeout-ms: 5000               # API超时时间
    rate-limit-qps: 40             # 限流QPS

5. bge-m3的峰值优化

针对Embedding服务在高峰期GPU打满的问题,做了两个优化:

一是请求合并(batch coalescing):不是每个请求来了立刻调一次bge-m3,而是用一个微批次窗口(50ms),把窗口期内到达的多个请求的文本合并成一个batch一起送到GPU推理。这样GPU的利用效率更高,50ms的额外延迟用户几乎感知不到。

二是Embedding结果缓存:用户query的Embedding在Redis里也缓存了一份(key是emb:SHA256(query_text),value是向量bytes,TTL 1小时)。同一个用户在短时间内重复问类似的问题,或者不同用户问同样的问题,不用重复算Embedding。这一层缓存命中率约15%-20%,有效分担了GPU压力。


Q10:7×24 上下文连续对话与人机无缝转接

1. 会话上下文存储

对话上下文存在Redis里,数据结构是这样的:

Key:   chat:session:{session_id}
Type:  List(按时间顺序的消息列表)
TTL:   2小时(无活动自动过期)

每条消息的结构:

{
  "msg_id": "msg_20240615_001",
  "role": "user|ai|human_agent",
  "content": "怎么退票?",
  "timestamp": 1718438400000,
  "metadata": {
    "intent": "refund",
    "club_id": "club_1024",
    "confidence": 0.92,
    "source": "rag|cache|qwen2_local|human"
  }
}

AI生成回答时,从Redis里取最近3轮对话(user+ai各算一轮,所以实际取最近6条消息),拼入Prompt的对话历史部分。为什么只取3轮?测试过5轮和3轮的效果差异不大,但5轮会多消耗约300-400 token,对DeepSeek API的成本和延迟都有可感知的影响。而且超过3轮的上下文经常引入无关话题,反而干扰当前问题的回答准确性。

会话的生命周期管理:用户打开客服窗口时创建session(生成session_id,写入Redis),每次交互刷新TTL。2小时无活动自动过期。用户主动关闭窗口时通过前端WebSocket断连事件触发session关闭。

2. 人机转接的完整流程

转人工的触发条件有四种:

一是AI主动转接——Qdrant检索结果为空或相似度全低于阈值,AI回复"这个问题需要转接人工";或者连续2轮AI回复后用户表示"不对"、"没解决"等负面反馈(通过关键词+情感分析检测)。

二是用户主动转接——用户点击前端的"转人工"按钮或输入"转人工"关键词。

三是敏感场景自动转接——涉及金额争议、投诉、法律相关关键词时,意图识别阶段就直接路由到人工。

四是置信度过低——Qwen2-7B意图识别的confidence低于0.6时,不冒险走AI,直接转人工。

转接时的上下文传递

public void transferToHumanAgent(String sessionId, String clubId, 
                                   String triggerReason) {
    // 1. 从Redis取完整对话历史
    List<ChatMessage> history = getSessionHistory(sessionId);
    
    // 2. 用Qwen2-7B生成一段摘要(给人工客服看的简报)
    String summary = qwen2Client.chat(
        "请用一句话总结以下客服对话的核心问题:\n" + formatHistory(history)
    );
    
    // 3. 构建转接数据包
    TransferContext context = TransferContext.builder()
        .sessionId(sessionId)
        .clubId(clubId)
        .userId(getCurrentUserId())
        .summary(summary)              // AI生成的问题摘要
        .fullHistory(history)           // 完整对话记录
        .triggerReason(triggerReason)    // 转接原因
        .userOrder(getRecentOrder())    // 用户最近的订单信息
        .timestamp(System.currentTimeMillis())
        .build();
    
    // 4. 通过融云IM创建人工客服会话
    rongCloudService.createServiceSession(context);
    
    // 5. 更新session状态
    updateSessionMode(sessionId, "HUMAN_AGENT");
    
    // 6. 给用户发消息
    sendToUser(sessionId, "正在为您转接人工客服,已将之前的对话记录同步给客服人员...");
}

人工客服那边在融云IM工作台上看到的是:一个新会话弹入,顶部有一段AI生成的问题摘要(比如"用户询问6月15日飞盘活动的退票规则,AI已告知24小时前可全退,用户对退款到账时间不满意"),下方可以展开看完整的AI对话历史。这样人工客服不需要用户重新描述一遍问题。

3. 人工处理完后接回AI

支持,但有条件。人工客服在融云工作台上点击"结束服务"时,会话状态从HUMAN_AGENT改回AI。同时把人工客服的对话内容也追加到Redis的session history里,这样如果用户后续继续提问,AI可以看到人工客服之前的回答,保证上下文连续。

但有一个限制:如果人工客服处理的是投诉或纠纷类问题,转回AI后会打上一个sensitive标记,后续AI的回答会更保守(Prompt里加一句"此用户之前有投诉记录,请谨慎回答,如有任何不确定请建议联系人工")。

4. 转人工率与满意度统计

转人工率:在每次会话结束时记录resolution_type字段——AI_RESOLVEDTRANSFERRED_TO_HUMANUSER_LEFT(用户直接关闭未解决)。XXL-JOB每天凌晨跑统计任务,计算各类型占比写入cs_daily_stats表。上线稳定后的数据:AI直接解决55%-60%,转人工30%-35%,用户主动离开10%左右。

用户满意度:每次会话结束(无论AI还是人工)都会弹一个1-5星评分和可选的文字反馈。评分数据存入cs_feedback表,关联session_id。统计方式也是每日跑批:

-- 每日满意度统计
SELECT 
  resolution_type,
  AVG(rating) as avg_rating,
  COUNT(*) as total_sessions,
  SUM(CASE WHEN rating >= 4 THEN 1 ELSE 0 END) / COUNT(*) as satisfaction_rate
FROM cs_feedback 
WHERE create_date = CURDATE() - INTERVAL 1 DAY
GROUP BY resolution_type;

实际数据:AI直接解决的会话平均评分4.1-4.3星,满意率(4星及以上)约82%-85%;转人工后的会话平均评分3.8-4.0星(因为转人工的往往是更复杂或用户已经不耐烦的case)。整体加权平均约4.0-4.1星。

这些数据也在Grafana上有可视化,产品经理每周的Review会上会拉出来看趋势。如果某一天AI解决的满意度突然下降(比如从4.2掉到3.5),通常意味着知识库某个规则有误或者模型回答出了问题,会触发人工排查最近的AI对话记录。

5. 一个转接相关的线上故障

有一次用户反馈"转人工后客服看不到之前的聊天记录"。排查发现是Redis session的TTL问题——用户和AI聊了几句后中间去做别的事,过了2小时回来点了转人工,这时Redis里的session已经过期了,getSessionHistory返回空,人工客服看到的是一个没有上下文的新会话。

解决方案:session过期的对话历史不能只依赖Redis。我们加了一层持久化兜底——每条消息除了写Redis,还会异步写入MySQL的cs_chat_log表(这个表主要用于数据分析和审计)。转人工时的取历史逻辑改成了:先查Redis,如果Redis里没有(过期了),再从MySQL按session_id查。同时把Redis的TTL从2小时延长到了4小时,减少过期概率。

public List<ChatMessage> getSessionHistory(String sessionId) {
    // 优先从Redis取
    List<ChatMessage> history = redisSessionStore.getHistory(sessionId);
    if (history != null && !history.isEmpty()) {
        return history;
    }
    // Redis过期,从MySQL兜底
    log.info("Session {} expired in Redis, fallback to MySQL", sessionId);
    return chatLogMapper.selectBySessionId(sessionId);
}

大模型应用开发学习:智能客服 & RAG 知识库为案例 2026-03-11

评论区