我們一起聊聊,Spring 中實現異步流式接口的三種方案,解決大查詢、長處理等場景下的接口超時問題。

1. SSE(Server-Sent Events) - 服務器推送事件

實現代碼:

// 1. 控制器層
@RestController
@RequestMapping("/api/stream")
public class SseController {
    
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamSse() {
        SseEmitter emitter = new SseEmitter(60_000L); // 60秒超時
        
        CompletableFuture.runAsync(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    Thread.sleep(1000); // 模擬耗時操作
                    
                    // 構建事件數據
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                            .data(new ProgressEvent(i * 10, "處理中..."))
                            .id(String.valueOf(i))
                            .name("progress");
                    
                    emitter.send(event);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        
        emitter.onCompletion(() -> 
            log.info("SSE流完成"));
        emitter.onTimeout(() -> 
            log.warn("SSE流超時"));
        emitter.onError((e) -> 
            log.error("SSE流錯誤", e));
        
        return emitter;
    }
    
    @Data
    @AllArgsConstructor
    public static class ProgressEvent {
        private int progress;
        private String message;
    }
}

前端調用:

// 前端使用 EventSource
const eventSource = new EventSource('/api/stream/sse');

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log(`進度: ${data.progress}%, ${data.message}`);
};

eventSource.onerror = (error) => {
    console.error('SSE連接錯誤:', error);
    eventSource.close();
};

2. ResponseBodyEmitter - 響應式流

實現代碼:

@RestController
@RequestMapping("/api/stream")
public class ResponseBodyEmitterController {
    
    @GetMapping("/emitter")
    public ResponseBodyEmitter streamData() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            try {
                // 發送頭信息
                emitter.send("開始數據流傳輸\n\n");
                
                // 流式發送數據
                for (int i = 1; i <= 20; i++) {
                    Thread.sleep(500);
                    
                    // 發送不同類型的數據
                    Map<String, Object> data = new HashMap<>();
                    data.put("id", i);
                    data.put("name", "Item-" + i);
                    data.put("timestamp", LocalDateTime.now());
                    data.put("processed", i % 2 == 0);
                    
                    emitter.send(data);
                    emitter.send("\n");
                }
                
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            } finally {
                executor.shutdown();
            }
        });
        
        return emitter;
    }
    
    // 大文件下載示例
    @GetMapping("/download/{filename}")
    public ResponseEntity<ResponseBodyEmitter> downloadLargeFile(
            @PathVariable String filename) {
        
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        
        CompletableFuture.runAsync(() -> {
            try {
                // 模擬大文件讀取
                Path filePath = Paths.get("/path/to/large/" + filename);
                try (InputStream is = Files.newInputStream(filePath);
                     BufferedReader reader = new BufferedReader(
                         new InputStreamReader(is))) {
                    
                    String line;
                    while ((line = reader.readLine()) != null) {
                        emitter.send(line + "\n");
                        Thread.sleep(10); // 控制發送速度
                    }
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        
        return ResponseEntity.ok()
                .header(HttpHeaders.CONTENT_DISPOSITION, 
                        "attachment; filename=\"" + filename + "\"")
                .contentType(MediaType.APPLICATION_OCTET_STREAM)
                .body(emitter);
    }
}

3. WebFlux Reactive Streaming - 響應式編程

依賴配置:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

實現代碼:

@RestController
@RequestMapping("/api/reactive")
public class ReactiveStreamController {
    
    // 無限流示例
    @GetMapping(value = "/infinite", 
                produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Object>> infiniteStream() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> ServerSentEvent.builder()
                        .id(String.valueOf(sequence))
                        .event("heartbeat")
                        .data(Map.of(
                            "timestamp", Instant.now(),
                            "sequence", sequence,
                            "status", "OK"
                        ))
                        .build());
    }
    
    // 數據庫流式查詢示例
    @GetMapping(value = "/users/stream", 
                produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return Flux.usingWhen(
                Mono.fromCallable(() -> 
                    dataSource.getConnection()),
                connection -> Flux.fromIterable(() -> {
                    // 使用流式ResultSet
                    Statement stmt = connection.createStatement(
                        ResultSet.TYPE_FORWARD_ONLY,
                        ResultSet.CONCUR_READ_ONLY);
                    stmt.setFetchSize(100);
                    ResultSet rs = stmt.executeQuery(
                        "SELECT * FROM users");
                    
                    return new ResultSetSpliterator<>(rs, resultSet -> {
                        User user = new User();
                        user.setId(resultSet.getLong("id"));
                        user.setName(resultSet.getString("name"));
                        user.setEmail(resultSet.getString("email"));
                        return user;
                    });
                }),
                connection -> Mono.fromRunnable(() -> {
                    try {
                        if (!connection.isClosed()) {
                            connection.close();
                        }
                    } catch (SQLException e) {
                        // 處理異常
                    }
                })
        ).delayElements(Duration.ofMillis(100)); // 控制發送頻率
    }
    
    // 批量處理流
    @GetMapping("/process/batch")
    public Flux<ProcessResult> processBatch(@RequestParam List<Long> ids) {
        return Flux.fromIterable(ids)
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(this::processItem)
                .ordered(Comparator.comparing(ProcessResult::getId));
    }
    
    private Mono<ProcessResult> processItem(Long id) {
        return Mono.fromCallable(() -> {
            // 模擬處理邏輯
            Thread.sleep(500);
            return new ProcessResult(id, "SUCCESS", 
                    LocalDateTime.now());
        }).subscribeOn(Schedulers.boundedElastic());
    }
    
    @Data
    @AllArgsConstructor
    public static class ProcessResult {
        private Long id;
        private String status;
        private LocalDateTime processedAt;
    }
}

4. 通用配置和工具類

配置類:

@Configuration
public class AsyncStreamConfig implements WebMvcConfigurer {
    
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        // 配置異步處理
        configurer.setDefaultTimeout(300_000L); // 5分鐘
        configurer.setTaskExecutor(asyncTaskExecutor());
    }
    
    @Bean("asyncTaskExecutor")
    public Executor asyncTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("Async-Stream-");
        executor.initialize();
        return executor;
    }
}

流式響應工具類:

@Component
public class StreamResponseUtil {
    
    public static <T> void sendStreamData(
            SseEmitter emitter, 
            List<T> data,
            Function<T, Map<String, Object>> mapper) {
        
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            try {
                for (T item : data) {
                    Map<String, Object> result = mapper.apply(item);
                    emitter.send(SseEmitter.event()
                            .data(result)
                            .id(UUID.randomUUID().toString()));
                    
                    // 控制發送速度,避免客户端壓力過大
                    Thread.sleep(100);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            } finally {
                executor.shutdown();
            }
        });
    }
    
    // 分頁流式查詢
    public <T> void paginatedStream(
            SseEmitter emitter,
            Supplier<List<T>> dataSupplier,
            int pageSize) {
        
        int page = 0;
        boolean hasMore = true;
        
        while (hasMore && !emitter.isCompleted()) {
            List<T> data = dataSupplier.get();
            
            if (data.size() < pageSize) {
                hasMore = false;
            }
            
            if (!data.isEmpty()) {
                try {
                    emitter.send(SseEmitter.event()
                            .data(data)
                            .name("page-" + page));
                    page++;
                } catch (IOException e) {
                    break;
                }
            }
            
            if (hasMore) {
                try {
                    Thread.sleep(1000); // 批次間隔
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        if (!emitter.isCompleted()) {
            emitter.complete();
        }
    }
}

5. 異常處理和監控

@ControllerAdvice
public class StreamExceptionHandler {
    
    @ExceptionHandler(AsyncRequestTimeoutException.class)
    @ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
    public ResponseEntity<Object> handleTimeout(AsyncRequestTimeoutException e) {
        return ResponseEntity.status(503)
                .body(Map.of(
                    "error", "Stream timeout",
                    "message", "請重試或聯繫管理員",
                    "timestamp", Instant.now()
                ));
    }
    
    @ExceptionHandler(IOException.class)
    public void handleClientDisconnect(IOException e, 
                                     HttpServletResponse response) {
        // 客户端斷開連接,記錄日誌但不拋出異常
        log.debug("Client disconnected from stream: {}", e.getMessage());
    }
}

6. 前端統一處理

class StreamClient {
    constructor(options = {}) {
        this.options = {
            retryCount: 3,
            retryDelay: 2000,
            heartbeatInterval: 30000,
            ...options
        };
        this.eventSource = null;
        this.retryAttempts = 0;
    }
    
    connect(url, onMessage, onError, onComplete) {
        this.disconnect();
        
        this.eventSource = new EventSource(url);
        
        this.eventSource.onmessage = (event) => {
            this.retryAttempts = 0;
            try {
                const data = JSON.parse(event.data);
                onMessage(data, event);
            } catch (e) {
                console.error('Parse error:', e);
            }
        };
        
        this.eventSource.onerror = (error) => {
            if (this.retryAttempts < this.options.retryCount) {
                this.retryAttempts++;
                setTimeout(() => {
                    this.connect(url, onMessage, onError, onComplete);
                }, this.options.retryDelay);
            } else {
                onError(error);
            }
        };
        
        // 心跳檢測
        this.heartbeatTimer = setInterval(() => {
            if (this.eventSource.readyState === EventSource.CLOSED) {
                this.connect(url, onMessage, onError, onComplete);
            }
        }, this.options.heartbeatInterval);
    }
    
    disconnect() {
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
        }
    }
}

總結對比

方案

適用場景

優點

缺點

SSE

實時進度更新、通知推送

簡單易用、自動重連、文本協議

單向通信、IE不支持

ResponseBodyEmitter

大文件下載、流式JSON

靈活、支持各種數據類型

需要手動管理連接

WebFlux Reactive

高併發、響應式系統

非阻塞、資源利用率高

學習曲線陡峭、需要響應式庫支持

最佳實踐建議

  1. 超時設置:根據業務需求合理設置超時時間
  2. 流量控制:添加延遲避免客户端壓力過大
  3. 錯誤恢復:實現重試機制和斷點續傳
  4. 監控告警:監控流式接口的連接數和異常
  5. 資源清理:確保連接關閉,避免資源泄漏
  6. 文檔説明:明確告知客户端這是流式接口