我們一起聊聊,Spring 中實現異步流式接口的三種方案,解決大查詢、長處理等場景下的接口超時問題。
1. SSE(Server-Sent Events) - 服務器推送事件
實現代碼:
// 1. 控制器層
@RestController
@RequestMapping("/api/stream")
public class SseController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamSse() {
SseEmitter emitter = new SseEmitter(60_000L); // 60秒超時
CompletableFuture.runAsync(() -> {
try {
for (int i = 1; i <= 10; i++) {
Thread.sleep(1000); // 模擬耗時操作
// 構建事件數據
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data(new ProgressEvent(i * 10, "處理中..."))
.id(String.valueOf(i))
.name("progress");
emitter.send(event);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
emitter.onCompletion(() ->
log.info("SSE流完成"));
emitter.onTimeout(() ->
log.warn("SSE流超時"));
emitter.onError((e) ->
log.error("SSE流錯誤", e));
return emitter;
}
@Data
@AllArgsConstructor
public static class ProgressEvent {
private int progress;
private String message;
}
}
前端調用:
// 前端使用 EventSource
const eventSource = new EventSource('/api/stream/sse');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`進度: ${data.progress}%, ${data.message}`);
};
eventSource.onerror = (error) => {
console.error('SSE連接錯誤:', error);
eventSource.close();
};
2. ResponseBodyEmitter - 響應式流
實現代碼:
@RestController
@RequestMapping("/api/stream")
public class ResponseBodyEmitterController {
@GetMapping("/emitter")
public ResponseBodyEmitter streamData() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
try {
// 發送頭信息
emitter.send("開始數據流傳輸\n\n");
// 流式發送數據
for (int i = 1; i <= 20; i++) {
Thread.sleep(500);
// 發送不同類型的數據
Map<String, Object> data = new HashMap<>();
data.put("id", i);
data.put("name", "Item-" + i);
data.put("timestamp", LocalDateTime.now());
data.put("processed", i % 2 == 0);
emitter.send(data);
emitter.send("\n");
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
} finally {
executor.shutdown();
}
});
return emitter;
}
// 大文件下載示例
@GetMapping("/download/{filename}")
public ResponseEntity<ResponseBodyEmitter> downloadLargeFile(
@PathVariable String filename) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
CompletableFuture.runAsync(() -> {
try {
// 模擬大文件讀取
Path filePath = Paths.get("/path/to/large/" + filename);
try (InputStream is = Files.newInputStream(filePath);
BufferedReader reader = new BufferedReader(
new InputStreamReader(is))) {
String line;
while ((line = reader.readLine()) != null) {
emitter.send(line + "\n");
Thread.sleep(10); // 控制發送速度
}
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + filename + "\"")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(emitter);
}
}
3. WebFlux Reactive Streaming - 響應式編程
依賴配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
實現代碼:
@RestController
@RequestMapping("/api/reactive")
public class ReactiveStreamController {
// 無限流示例
@GetMapping(value = "/infinite",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> infiniteStream() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.builder()
.id(String.valueOf(sequence))
.event("heartbeat")
.data(Map.of(
"timestamp", Instant.now(),
"sequence", sequence,
"status", "OK"
))
.build());
}
// 數據庫流式查詢示例
@GetMapping(value = "/users/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return Flux.usingWhen(
Mono.fromCallable(() ->
dataSource.getConnection()),
connection -> Flux.fromIterable(() -> {
// 使用流式ResultSet
Statement stmt = connection.createStatement(
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(100);
ResultSet rs = stmt.executeQuery(
"SELECT * FROM users");
return new ResultSetSpliterator<>(rs, resultSet -> {
User user = new User();
user.setId(resultSet.getLong("id"));
user.setName(resultSet.getString("name"));
user.setEmail(resultSet.getString("email"));
return user;
});
}),
connection -> Mono.fromRunnable(() -> {
try {
if (!connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
// 處理異常
}
})
).delayElements(Duration.ofMillis(100)); // 控制發送頻率
}
// 批量處理流
@GetMapping("/process/batch")
public Flux<ProcessResult> processBatch(@RequestParam List<Long> ids) {
return Flux.fromIterable(ids)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(this::processItem)
.ordered(Comparator.comparing(ProcessResult::getId));
}
private Mono<ProcessResult> processItem(Long id) {
return Mono.fromCallable(() -> {
// 模擬處理邏輯
Thread.sleep(500);
return new ProcessResult(id, "SUCCESS",
LocalDateTime.now());
}).subscribeOn(Schedulers.boundedElastic());
}
@Data
@AllArgsConstructor
public static class ProcessResult {
private Long id;
private String status;
private LocalDateTime processedAt;
}
}
4. 通用配置和工具類
配置類:
@Configuration
public class AsyncStreamConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 配置異步處理
configurer.setDefaultTimeout(300_000L); // 5分鐘
configurer.setTaskExecutor(asyncTaskExecutor());
}
@Bean("asyncTaskExecutor")
public Executor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Async-Stream-");
executor.initialize();
return executor;
}
}
流式響應工具類:
@Component
public class StreamResponseUtil {
public static <T> void sendStreamData(
SseEmitter emitter,
List<T> data,
Function<T, Map<String, Object>> mapper) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
for (T item : data) {
Map<String, Object> result = mapper.apply(item);
emitter.send(SseEmitter.event()
.data(result)
.id(UUID.randomUUID().toString()));
// 控制發送速度,避免客户端壓力過大
Thread.sleep(100);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
} finally {
executor.shutdown();
}
});
}
// 分頁流式查詢
public <T> void paginatedStream(
SseEmitter emitter,
Supplier<List<T>> dataSupplier,
int pageSize) {
int page = 0;
boolean hasMore = true;
while (hasMore && !emitter.isCompleted()) {
List<T> data = dataSupplier.get();
if (data.size() < pageSize) {
hasMore = false;
}
if (!data.isEmpty()) {
try {
emitter.send(SseEmitter.event()
.data(data)
.name("page-" + page));
page++;
} catch (IOException e) {
break;
}
}
if (hasMore) {
try {
Thread.sleep(1000); // 批次間隔
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
if (!emitter.isCompleted()) {
emitter.complete();
}
}
}
5. 異常處理和監控
@ControllerAdvice
public class StreamExceptionHandler {
@ExceptionHandler(AsyncRequestTimeoutException.class)
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
public ResponseEntity<Object> handleTimeout(AsyncRequestTimeoutException e) {
return ResponseEntity.status(503)
.body(Map.of(
"error", "Stream timeout",
"message", "請重試或聯繫管理員",
"timestamp", Instant.now()
));
}
@ExceptionHandler(IOException.class)
public void handleClientDisconnect(IOException e,
HttpServletResponse response) {
// 客户端斷開連接,記錄日誌但不拋出異常
log.debug("Client disconnected from stream: {}", e.getMessage());
}
}
6. 前端統一處理
class StreamClient {
constructor(options = {}) {
this.options = {
retryCount: 3,
retryDelay: 2000,
heartbeatInterval: 30000,
...options
};
this.eventSource = null;
this.retryAttempts = 0;
}
connect(url, onMessage, onError, onComplete) {
this.disconnect();
this.eventSource = new EventSource(url);
this.eventSource.onmessage = (event) => {
this.retryAttempts = 0;
try {
const data = JSON.parse(event.data);
onMessage(data, event);
} catch (e) {
console.error('Parse error:', e);
}
};
this.eventSource.onerror = (error) => {
if (this.retryAttempts < this.options.retryCount) {
this.retryAttempts++;
setTimeout(() => {
this.connect(url, onMessage, onError, onComplete);
}, this.options.retryDelay);
} else {
onError(error);
}
};
// 心跳檢測
this.heartbeatTimer = setInterval(() => {
if (this.eventSource.readyState === EventSource.CLOSED) {
this.connect(url, onMessage, onError, onComplete);
}
}, this.options.heartbeatInterval);
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
}
}
總結對比
|
方案
|
適用場景
|
優點
|
缺點
|
|
SSE |
實時進度更新、通知推送
|
簡單易用、自動重連、文本協議
|
單向通信、IE不支持
|
|
ResponseBodyEmitter |
大文件下載、流式JSON
|
靈活、支持各種數據類型
|
需要手動管理連接
|
|
WebFlux Reactive |
高併發、響應式系統
|
非阻塞、資源利用率高
|
學習曲線陡峭、需要響應式庫支持
|
最佳實踐建議
- 超時設置:根據業務需求合理設置超時時間
- 流量控制:添加延遲避免客户端壓力過大
- 錯誤恢復:實現重試機制和斷點續傳
- 監控告警:監控流式接口的連接數和異常
- 資源清理:確保連接關閉,避免資源泄漏
- 文檔説明:明確告知客户端這是流式接口