點擊上方“程序員蝸牛g”,選擇“設為星標”
跟蝸牛哥一起,每天進步一點點
程序員蝸牛g
大廠程序員一枚 跟蝸牛一起 每天進步一點點
33篇原創內容
公眾號
在分佈式系統裏,Spring Boot事務管理邊界處理是架構設計的一大痛點。
關鍵業務涉及數據庫事務與第三方服務調用(如郵件發送、遠程接口調用)混合場景時,開發者常陷入兩難:
在 @Transactional 中直接調用,網絡問題會使整個事務回滾致訂單丟失;
移至事務外,又會出現數據不一致風險。
本文將以4層漸進式方案,深度剖析Spring Boot事務與外部服務的協同策略。
從基礎的事務內阻塞調用,逐步進階至本地消息表,共給出4個方案。
每個方案均附完整代碼,且會揭示前代方案的不足,帶你領略技術演進之路。
2.實戰案例
環境準備
// 訂單對象
@Entity
@Table(name = "x_order")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id ;
private String orderNo;
private BigDecimal amount;
private Integer status ;
private LocalDateTime orderTime;
}
訂單&郵件基本操作類
public interface OrderRepository extends JpaRepository<Order, Long> {
}
@Service
public class EmailService {
public void sendEmail(Order order) {
System.err.printf("給【%s】發送郵件成功, 本次訂單總額: %s%n",
UserContext.getEmail(), order.getAmount()) ;
}
}
@Service
public class OrderService {
private final OrderRepository orderRepository ;
private final EmailService emailService ;
@Transactional
public void createOrder(Order order) {
// 各種方案實現
}
}
2.1 方案1:事務方法內直接調用
在事務方法中直接調用郵件發送(或其它操作),代碼簡單但隱患巨大。適用於快速原型驗證,但生產環境嚴禁使用。
@Transactional
public void createOrder(Order order) {
// 1.保存訂單
orderRepository.save(order) ;
// 2.發送
emailService.sendEmail(order) ;
}
問題分析:
- 事務膨脹:郵件調用耗時過長會佔用數據庫連接,降低併發性能
- 事務回滾污染:若郵件發送失敗拋異常,會導致整個事務回滾
- 可靠性問題:網絡波動可能使郵件發送失敗,無法重試
- 耦合性高:業務邏輯與通知邏輯緊耦合
2.2 方案2:事務鈎子回調
通過Spring事務同步器在事務提交後觸發外部調用,避免事務回滾污染。適合對實時性要求低、調用量小的場景,如內部系統通知。
private final ApplicationEventPublisher eventPublisher;
@Transactional
public void createOrder(Order order) {
orderRepository.save(order);
// 發佈事件註冊事務鈎子回調
this.eventPublisher.publishEvent(new OrderCreatedEvent(order)) ;
}
事件對象&事件監聽
public class OrderCreatedEvent extends ApplicationEvent{
public OrderCreatedEvent(Object source) {
super(source);
}
}
@Component
public class OrderEventListener {
private final EmailService emailService;
public OrderEventListener(EmailService emailService) {
this.emailService = emailService;
}
// 事務提交以後執行
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
this.emailService.sendEmail((Order) event.getSource()) ;
}
}
問題分析:
- 同步執行瓶頸:監聽器與主線程同步執行,響應延遲
- 無重試機制:臨時故障導致永久失敗
- 事件丟失可能:應用重啓時未處理事件會丟失
2.3 方案3:異步+事務鈎子回調
結合 @Async 異步執行和 @Retryable 自動重試,解決同步阻塞和臨時故障問題。但仍依賴應用內存,崩潰時事件丟失,適合要求不是很嚴格的業務場景。
首先,引入依賴
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
其次,開啓異步&重試機制
@Configuration
@EnableAsync
@EnableRetry
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() ;
executor.setThreadNamePrefix("Pack-Async-");
executor.setCorePoolSize(5) ;
executor.setMaxPoolSize(10) ;
executor.setQueueCapacity(100) ;
executor.initialize() ;
return executor ;
}
}
最後,修改事務提交後監聽
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Retryable(retryFor = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
this.emailService.sendEmail((Order) event.getSource()) ;
}
修改郵件發送模擬錯誤
public void sendEmail(Order order) {
if (new Random().nextInt(2) == 1) {
throw new RuntimeException("State Error") ;
}
System.err.printf("%s - 給【%s】發送郵件成功, 本次訂單總額: %s%n",
Thread.currentThread().getName(), UserContext.getEmail(), order.getAmount()) ;
}
測試結果
重試第三次後成功。
問題分析:
- 消息丟失風險:應用崩潰時內存中的事件會丟失
- 重試侷限性:超過最大重試次數後仍失敗問題
- 未持久化:最終失敗的操作無法人工干預
2.4 方案4:本地消息表
通過數據庫事務原子性保存任務記錄,定時任務異步處理,確保消息不丟失。支持無限重試和人工干預,實現最終一致性。
創建本地消息表對象&Repository
@Entity
@Table(name = "x_local_message")
public class LocalMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
// JSON格式的任務數據
@Column(length = 500)
private String payload;
/**1:處理中,2:失敗,3:成功*/
@Column(columnDefinition = "int default 0")
private Integer state ;
private LocalDateTime createdAt = LocalDateTime.now() ;
}
public interface LocalMessageRepository extends JpaRepository<LocalMessage, Long>{
@Query("select e from LocalMessage e where e.state = 1 and e.retryCount < 3 order by e.createdAt desc limit 10")
List<LocalMessage> queryMessages() ;
}
修改創建訂單業務
@Transactional
public void createOrder(Order order) {
this.orderRepository.save(order);
LocalMessage message = new LocalMessage();
message.setState(1);
try {
message.setPayload(this.objectMapper.writeValueAsString(order));
} catch (Exception e) {}
this.messageRepository.save(message);
}
定義定時任務
首先,開啓定時任務
@Configuration
@EnableScheduling
public class TaskConfig {
}
最後,定義定時任務
@Component
public class TaskService {
private final ExecutorService executor = Executors.newFixedThreadPool(5);
private final LocalMessageRepository messageRepository ;
private final EmailService emailService ;
private final ObjectMapper objectMapper ;
public TaskService(LocalMessageRepository messageRepository,
EmailService emailService, ObjectMapper objectMapper) {
this.messageRepository = messageRepository;
this.emailService = emailService ;
this.objectMapper = objectMapper ;
}
@Scheduled(cron = "0 */2 * * * ?")
public void sendMailTask() {
List<LocalMessage> messages = this.messageRepository.queryMessages() ;
List<CompletableFuture<Void>> futures = new ArrayList<>(messages.size());
messages.forEach(message -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Order order = this.objectMapper.readValue(message.getPayload(), Order.class);
this.emailService.sendEmail(order);
message.setState(3);
message.setRetryCount(message.getRetryCount() + 1);
} catch (Exception e) {
int retryCount = message.getRetryCount() + 1;
if (retryCount >= 3) {
message.setState(2);
}
message.setRetryCount(retryCount);
} finally {
messageRepository.save(message);
}
}, executor);
futures.add(future);
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join() ;
}
}
問題分析:
- 實時性:任務執行需等到下一輪才能執行
本方案優勢:
- 可靠性:數據庫事務保證任務持久化
- 故障恢復:定時任務自動重試失敗操作
- 系統解耦:業務服務與郵件發送完全隔離
如果這篇文章對您有所幫助,或者有所啓發的話,求一鍵三連:點贊、轉發、在看。
關注公眾號:woniuxgg,在公眾號中回覆:筆記 就可以獲得蝸牛為你精心準備的java實戰語雀筆記,回覆面試、開發手冊、有超讚的粉絲福利