《通义千问AI落地—上》:后端接口

时间:2025-04-02 18:31:10
/** * 历史对话记录; sessionId---> 历史记录 */ private static final ConcurrentHashMap<String, List<Message>> history = new ConcurrentHashMap<>(); @Override public void chat(ChatMessageRequest msg, Principal principal) throws NoApiKeyException, InputRequiredException { String sessionId = msg.getSessionId(); //用户发送的消息入库 CompletableFuture.runAsync(() -> { saveMsg(msg.getContent(), sessionId, Role.USER, getLocalDate()); }); Message message = Message.builder().role(Role.USER.getValue()).content(msg.getContent()).build(); // 创建QwenParam对象,设置参数 GenerationParam param = GenerationParam.builder() .model(module) // 模型版本 qwen-max .messages(getHistory(sessionId)) // 消息内容,如果需要启用多伦连续对话的话,就把用户历史消息以及GPT回复的消息一起放进去 .resultFormat(GenerationParam.ResultFormat.MESSAGE) .topP(0.8) .enableSearch(true) .apiKey(apiKey) // 你的apiKey,需要到阿里云百炼官网申请 .incrementalOutput(true) .build(); // 调用生成接口,获取Flowable对象 Flux<GenerationResult> result = Flux.from(gen.streamCall(param)); StringBuffer builder = new StringBuffer(); DateTime finalLocalTime = getLocalDate(); Flux.from(result) // 控制发送频率 .delayElements(Duration.ofMillis(200)).doOnNext(res -> { String output = res.getOutput().getChoices().get(0).getMessage().getContent(); if (output == null || "".equals(output)) { return; } // 将生成的消息通过websocket发送给前端,websocket内容将在下篇文章介绍 sendMsg(output, sessionId, principal); builder.append(output); }).doFinally(signalType -> { //消息发送结束,告诉前端 sendMsg("!$$---END---$$!", sessionId, principal); //消息入库 CompletableFuture.runAsync(() -> { saveMsg(builder.toString(), sessionId, Role.ASSISTANT, finalLocalTime); buildHistory(sessionId, Message.builder().role(Role.ASSISTANT.getValue()).content(builder.toString())); }); }).onErrorResume(str -> { if (str instanceof ApiException) { ApiException exception = (ApiException) str; log.error("接口调用出现错误:{}", exception.getMessage()); } sendMsg("GPT接口调用出现错误,该功能暂时无法使用,敬请期待.", sessionId, principal); return Mono.empty(); }).subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池中执行 .subscribe(); } /** * 每日凌晨自动清理历史对话缓存,防止缓存过大 */ @Scheduled(cron = "0 59 23 * * ?") private void autoCleanHistory() { history.clear(); } /** * 构建历史消息 */ private void buildHistory(String sessionId, MessageBuilder<?, ?> message) { List<Message> historyMessages = history.computeIfAbsent(sessionId, k -> { List<ChatMessageVO> list = sessionService.getById(sessionId).getMessages(); List<Message> getMsgList = new ArrayList<>(); if (list.isEmpty()) return getMsgList; MessageBuilder<?, ?> msg = Message.builder(); //只取后面60条,历史消息太多,一是过快消耗token,二是压力太大 list.subList(Math.max(0, list.size() - 60), list.size()).forEach(item -> { if (!"".equals(item.getContent())) { msg.content(item.getContent()).role(item.getRole()).build(); getMsgList.add(msg.build()); } }); return getMsgList; }); // 添加消息到列表 historyMessages.add(message.build()); history.remove(sessionId); history.put(sessionId, historyMessages); } private List<Message> getHistory(String sessionId) { List<Message> list = history.get(sessionId); if (list == null || list.isEmpty()) { return new ArrayList<>(); } list.removeIf(item -> ("".equals(item.getContent()))); List<Message> hist = list.subList(Math.max(0, list.size() - 80), list.size()); history.remove(sessionId); history.put(sessionId, hist); return hist; }