4.复杂用法
4.1.流式输出
大模型并不是一次性生成最终结果,而是逐步地生成中间结果,最终结果由中间结果拼接而成。使用非流式输出方式需要等待模型生成结束后再将生成的中间结果拼接后返回,而流式输出可以实时地将中间结果返回,您可以在模型进行输出的同时进行阅读,减少等待模型回复的时间。
4.1.代码
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationParam;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.ApiException;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.Constants;
import io.reactivex.Flowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Arrays;
@RestController
@RequestMapping("/tongyi")
public class CallStream2Controller {
private static final Logger logger = LoggerFactory.getLogger(CallStream2Controller.class);
@Value("${-key}")
private String apiKey;
@RequestMapping(value = "/call/Stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public SseEmitter callMany(@RequestParam(value = "message", required = false, defaultValue = "如何做西红柿炖牛腩?") String message) throws NoApiKeyException, InputRequiredException {
try {
// 设置API密钥
Constants.apiKey = apiKey;
Generation gen = new Generation();
Message userMsg = Message.builder().role(Role.USER.getValue()).content(message).build();
Flowable<GenerationResult> serverSentEventFlux = streamCallWithMessage(gen, userMsg);
// 创建 SseEmitter
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
// 订阅 Flowable 并发送数据到 SseEmitter
serverSentEventFlux.subscribe(result -> sendSseData(emitter, result.getOutput().getChoices().get(0).getMessage().getContent()));
// 当完成时关闭 SseEmitter
emitter.onCompletion(() -> {
System.out.println("SSE 连接已完成,正在关闭.");
});
return emitter;
} catch (ApiException | NoApiKeyException | InputRequiredException e) {
logger.error("发生异常: {}", e.getMessage());
}
return null;
}
// 发送数据到 SseEmitter
private void sendSseData(SseEmitter emitter, String data) {
try {
// 发送数据到 SseEmitter
emitter.send(data.toString());
} catch (Exception e) {
// 处理错误情况
System.err.println("发送数据失败: " + e.getMessage());
emitter.completeWithError(e);
}
}
// 流式调用
public Flowable<GenerationResult> streamCallWithMessage(Generation gen, Message userMsg)
throws NoApiKeyException, ApiException, InputRequiredException {
// 根据用户消息构建生成参数
GenerationParam param = buildGenerationParam(userMsg);
// 使用构建的参数启动生成调用,并返回生成结果的Flowable对象
Flowable<GenerationResult> result = gen.streamCall(param);
// 从结果创建一个Flux流
return result;
}
// 构建 GenerationParam
private GenerationParam buildGenerationParam(Message userMsg) {
return GenerationParam.builder()
.model("qwen-turbo")
.messages(Arrays.asList(userMsg))
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
.topP(0.8)
.incrementalOutput(true)
.build();
}
}
-
创建
SseEmitter
:创建了一个SseEmitter
实例,它将用于发送 SSE 数据给客户端。 -
订阅
Flowable
:订阅Flowable
,并在每次接收到数据时调用sendSseData
方法来发送数据到SseEmitter
。 -
错误处理:设置了
SseEmitter
的onCompletion
方法来处理正常完成情况。 -
发送数据:
sendSseData
方法负责实际发送数据到SseEmitter
。如果发送过程中出现错误,它会调用completeWithError
方法来关闭SseEmitter
。
4.1.2.前台代码
Server-Sent Events (SSE) 是一种让服务器向客户端推送实时更新的技术。它允许服务器端主动向客户端发送数据,而无需客户端持续发起请求。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<a href="">返回</a> <br>
<div id="chat"></div>
<hr>
<textarea id="msg">背一下 春晓</textarea>
<button id="btn" onclick="send()">发送</button>
</body>
<script>
function send() {
let msg = document.getElementById('msg').value;
var source = new EventSource("http://localhost:8080/tongyi/call/Stream?message=" + msg);
source.onmessage = function(event) {
console.log(event)
let innerText = document.getElementById('chat').innerText;
document.getElementById('chat').innerText = innerText + event.data ; // ().;
};
source.addEventListener('error', event => {
console.log('收到错误,关闭连接.');
source.close(); // 关闭EventSource连接
});
}
</script>
</html>
4.1.3.使用Flux
Flowable
(来自 RxJava)和 Flux
(来自 Project Reactor)都可以用来整合 Server-Sent Events (SSE)。
Flux
相对的优点有:
- 与 Spring 生态系统紧密集成。
- 更现代的响应式编程模型。
- 更适合 WebFlux 和响应式 Spring Boot 应用。
因为 Spring WebFlux 提供了专门的工具来支持 Flux
和 SSE 的整合。
导入 maven 依赖
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
将 Controller 代码调整为
import com.alibaba.dashscope.aigc.generation.Generation;
import com.alibaba.dashscope.aigc.generation.GenerationParam;
import com.alibaba.dashscope.aigc.generation.GenerationResult;
import com.alibaba.dashscope.common.Message;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.ApiException;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.Constants;
import io.reactivex.Flowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.util.Arrays;
@RestController
@RequestMapping("/tongyi")
public class CallStreamController {
private static final Logger logger = LoggerFactory.getLogger(CallStreamController.class);
@Value("${-key}")
private String apiKey;
@RequestMapping(value = "/call/Stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> callMany(@RequestParam(value = "message", required = false, defaultValue = "如何做西红柿炖牛腩?") String message) throws NoApiKeyException, InputRequiredException {
try {
// 设置API密钥
Constants.apiKey = apiKey;
Generation gen = new Generation();
Message userMsg = Message.builder().role(Role.USER.getValue()).content(message).build();
Flux<ServerSentEvent<String>> serverSentEventFlux = streamCallWithMessage(gen, userMsg);
return serverSentEventFlux;
} catch (ApiException | NoApiKeyException | InputRequiredException e) {
logger.error("发生异常: {}", e.getMessage());
}
return Flux.empty();
}
/**
* 使用流式调用并 带回调的方式进行 消息生成
*
* @param gen
* @param userMsg
* @return
* @throws NoApiKeyException
* @throws ApiException
* @throws InputRequiredException
*/
public Flux<ServerSentEvent<String>> streamCallWithMessage(Generation gen, Message userMsg)
throws NoApiKeyException, ApiException, InputRequiredException {
// 根据用户消息构建生成参数
GenerationParam param = buildGenerationParam(userMsg);
// 使用构建的参数启动生成调用,并返回生成结果的Flowable对象
Flowable<GenerationResult> result = gen.streamCall(param);
// 从结果创建一个Flux流
Flux<ServerSentEvent<String>> serverSentEventFlux = Flux.from(result)
// 在每个事件之间添加延迟
// .delayElements((1000))
// 处理每个消息事件
.map(message -> {
// 提取消息内容
String output = message.getOutput().getChoices().get(0).getMessage().getContent();
// 打印输出内容
// ("output = " + output);
// 构建并返回ServerSentEvent对象
return ServerSentEvent.<String>builder()
.data(output)
.build();
})
// 添加事件("error") 用来触发结束处理的事件
.concatWith(Flux.just(ServerSentEvent.<String>builder().event("error").build()))
// 错误处理
.doOnError(e -> {
// 根据不同类型的异常进行相应处理
if (e instanceof NoApiKeyException) {
// 处理NoApiKeyException
} else if (e instanceof InputRequiredException) {
// 处理InputRequiredException
} else if (e instanceof ApiException) {
// 处理其他ApiException
} else {
// 处理其他异常
}
});
// 从结果创建一个Flux流
return serverSentEventFlux;
}
private GenerationParam buildGenerationParam(Message userMsg) {
return GenerationParam.builder()
.model("qwen-turbo")
.messages(Arrays.asList(userMsg))
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
.topP(0.8)
.incrementalOutput(true)
.build();
}
}