使用 SpringBoot + SSE 实现实时异步流式推送

时间:2025-03-18 15:50:06

在现代 Web 应用中,实时推送 是一个常见的需求,如消息通知、数据更新、实时日志等。传统的 Web 通信方式,如轮询(Polling)和 WebSocket,各有优缺点。SSE(Server-Sent Events) 提供了一种轻量级的解决方案,特别适用于单向数据流的场景。

本文将介绍如何在 Spring Boot 中使用 SSE(Server-Sent Events),实现实时异步流式推送


1. 什么是 SSE?

SSE(Server-Sent Events) 是 HTML5 标准的一部分,它允许服务器单向向客户端推送数据,客户端使用 EventSource 监听数据流。

1.1 SSE VS WebSocket

特性

SSE(Server-Sent Events)

WebSocket

连接方式

仅支持 服务器 → 客户端 推送

双向通信,客户端和服务器可互发消息

复杂度

简单,基于 HTTP 协议

复杂,需要自定义协议

兼容性

只支持 HTTP / HTTPS

支持 TCP 连接

断线重连

浏览器自动重连

需手动实现

适用场景

单向推送(如消息通知、股票更新)

双向交互(如在线聊天、多人协作)


2. 在 Spring Boot 中实现 SSE

2.1 创建 SSE 控制器

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;

@RestController
@RequestMapping("/sse")
public class SseController {
    private final CopyOnWriteArrayList<SseEmitter> clients = new CopyOnWriteArrayList<>();

    @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        clients.add(emitter);
        emitter.onCompletion(() -> clients.remove(emitter));
        emitter.onTimeout(() -> clients.remove(emitter));
        return emitter;
    }

    @PostMapping("/send")
    public void send(@RequestParam String message) {
        for (SseEmitter emitter : clients) {
            try {
                emitter.send(SseEmitter.event().data(message));
            } catch (IOException e) {
                clients.remove(emitter);
            }
        }
    }
}

2.2 客户端代码(HTML + JavaScript)

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE 测试</title>
</head>
<body>
    <h2>服务器推送消息:</h2>
    <div id="messages"></div>

    <script>
        const eventSource = new EventSource("http://localhost:8080/sse/subscribe");

        eventSource.onmessage = function(event) {
            const messagesDiv = document.getElementById("messages");
            messagesDiv.innerHTML += "<p>" + event.data + "</p>";
        };

        eventSource.onerror = function() {
            console.error("连接中断,尝试重新连接...");
        };
    </script>
</body>
</html>

2.3 运行 & 测试

  1. 启动 Spring Boot 应用。
  2. 访问 http://localhost:8080/sse/subscribe,浏览器会持续监听服务器推送的数据。
  3. 通过 http://localhost:8080/sse/send?message=Hello SSE,浏览器会立即收到消息

3. SSE 进阶优化

3.1 自动重连

SSE 默认支持自动重连,但可以调整 重试间隔

@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe() {
    SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
    try {
        emitter.send("retry: 5000\n\n"); // 5秒自动重连
    } catch (IOException e) {
        e.printStackTrace();
    }
    return emitter;
}

3.2 心跳检测

可以定期发送心跳包,防止连接断开:

private void sendHeartbeat(SseEmitter emitter) {
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
        try {
            emitter.send("heartbeat");
        } catch (IOException e) {
            emitter.complete();
        }
    }, 10, 10, TimeUnit.SECONDS);
}

3.3 多用户支持

可以在 CopyOnWriteArrayList存储用户 ID,实现针对性推送:

private final Map<String, SseEmitter> userEmitters = new ConcurrentHashMap<>();

@GetMapping("/subscribe/{userId}")
public SseEmitter subscribe(@PathVariable String userId) {
    SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
    userEmitters.put(userId, emitter);
    return emitter;
}

@PostMapping("/send/{userId}")
public void send(@PathVariable String userId, @RequestParam String message) {
    SseEmitter emitter = userEmitters.get(userId);
    if (emitter != null) {
        try {
            emitter.send(message);
        } catch (IOException e) {
            userEmitters.remove(userId);
        }
    }
}

4. 适用场景

  • 实时日志(Spring Boot 监控日志推送)
  • 消息通知(Web 端的实时消息提醒)
  • 数据流(股票行情、物联网数据更新)
  • 后台任务进度(文件上传进度、AI 任务处理进度)

5. 总结

SSE 特性

说明

单向推送

服务器 → 客户端(不支持客户端 → 服务器)

自动重连

浏览器原生支持,断线后自动重连

轻量级

基于 HTTP,兼容性好,适用于数据流更新

局限性

只支持文本(不适合二进制数据),不适合双向通信

???? **SSE 是一个轻量级、简单易用的方案,适用于服务器单向推送场景!**希望本文对你有所帮助!????