點擊上方“程序員蝸牛g”,選擇“設為星標”

跟蝸牛哥一起,每天進步一點點

這幾種方案為 Spring Boot 事務與外部服務協同_定時任務

程序員蝸牛g

大廠程序員一枚 跟蝸牛一起 每天進步一點點

33篇原創內容


公眾號

 


在分佈式系統裏,Spring Boot事務管理邊界處理是架構設計的一大痛點。

關鍵業務涉及數據庫事務與第三方服務調用(如郵件發送、遠程接口調用)混合場景時,開發者常陷入兩難:

在 @Transactional 中直接調用,網絡問題會使整個事務回滾致訂單丟失;

移至事務外,又會出現數據不一致風險。

本文將以4層漸進式方案,深度剖析Spring Boot事務與外部服務的協同策略。

從基礎的事務內阻塞調用,逐步進階至本地消息表,共給出4個方案。

每個方案均附完整代碼,且會揭示前代方案的不足,帶你領略技術演進之路。

2.實戰案例

環境準備

// 訂單對象
@Entity
@Table(name = "x_order")
public class Order {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id ;
  private String orderNo;
  private BigDecimal amount;
  private Integer status ;
  private LocalDateTime orderTime;
}

訂單&郵件基本操作類

public interface OrderRepository extends JpaRepository<Order, Long> {
}
@Service
public class EmailService {
  public void sendEmail(Order order) {
    System.err.printf("給【%s】發送郵件成功, 本次訂單總額: %s%n", 
        UserContext.getEmail(), order.getAmount()) ;
  }
}
@Service
public class OrderService {
  private final OrderRepository orderRepository ;
  private final EmailService emailService ;
  @Transactional
  public void createOrder(Order order) {
    // 各種方案實現
  }
}

2.1 方案1:事務方法內直接調用

在事務方法中直接調用郵件發送(或其它操作),代碼簡單但隱患巨大。適用於快速原型驗證,但生產環境嚴禁使用。

@Transactional
public void createOrder(Order order) {
  // 1.保存訂單
  orderRepository.save(order) ;
  // 2.發送
  emailService.sendEmail(order) ;
}

問題分析:

  • 事務膨脹:郵件調用耗時過長會佔用數據庫連接,降低併發性能
  • 事務回滾污染:若郵件發送失敗拋異常,會導致整個事務回滾
  • 可靠性問題:網絡波動可能使郵件發送失敗,無法重試
  • 耦合性高:業務邏輯與通知邏輯緊耦合

2.2 方案2:事務鈎子回調

通過Spring事務同步器在事務提交後觸發外部調用,避免事務回滾污染。適合對實時性要求低、調用量小的場景,如內部系統通知。

private final ApplicationEventPublisher eventPublisher;
@Transactional
public void createOrder(Order order) {
  orderRepository.save(order);
  // 發佈事件註冊事務鈎子回調
  this.eventPublisher.publishEvent(new OrderCreatedEvent(order)) ;
}

事件對象&事件監聽

public class OrderCreatedEvent extends ApplicationEvent{


  public OrderCreatedEvent(Object source) {
    super(source);
  }
}


@Component
public class OrderEventListener {
  private final EmailService emailService;
  public OrderEventListener(EmailService emailService) {
    this.emailService = emailService;
  }
  // 事務提交以後執行
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void handleOrderCreatedEvent(OrderCreatedEvent event) {
    this.emailService.sendEmail((Order) event.getSource()) ;
  }
}

問題分析:

  • 同步執行瓶頸:監聽器與主線程同步執行,響應延遲
  • 無重試機制:臨時故障導致永久失敗
  • 事件丟失可能:應用重啓時未處理事件會丟失

2.3 方案3:異步+事務鈎子回調

結合 @Async 異步執行和 @Retryable 自動重試,解決同步阻塞和臨時故障問題。但仍依賴應用內存,崩潰時事件丟失,適合要求不是很嚴格的業務場景。

首先,引入依賴

<dependency>
  <groupId>org.springframework.retry</groupId>
  <artifactId>spring-retry</artifactId>
</dependency>

其次,開啓異步&重試機制

@Configuration
@EnableAsync
@EnableRetry
public class AsyncConfig implements AsyncConfigurer {


  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() ;
    executor.setThreadNamePrefix("Pack-Async-");
    executor.setCorePoolSize(5) ;
    executor.setMaxPoolSize(10) ;
    executor.setQueueCapacity(100) ;
    executor.initialize() ;
    return executor ;
  }
}

最後,修改事務提交後監聽

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Retryable(retryFor = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
  this.emailService.sendEmail((Order) event.getSource()) ;
}

修改郵件發送模擬錯誤

public void sendEmail(Order order) {
  if (new Random().nextInt(2) == 1) {
    throw new RuntimeException("State Error") ;
  }
  System.err.printf("%s - 給【%s】發送郵件成功, 本次訂單總額: %s%n", 
      Thread.currentThread().getName(), UserContext.getEmail(), order.getAmount()) ;
}

測試結果

這幾種方案為 Spring Boot 事務與外部服務協同_問題分析_02

重試第三次後成功。

問題分析:

  • 消息丟失風險:應用崩潰時內存中的事件會丟失
  • 重試侷限性:超過最大重試次數後仍失敗問題
  • 未持久化:最終失敗的操作無法人工干預

2.4 方案4:本地消息表

通過數據庫事務原子性保存任務記錄,定時任務異步處理,確保消息不丟失。支持無限重試和人工干預,實現最終一致性。

創建本地消息表對象&Repository

@Entity
@Table(name = "x_local_message")
public class LocalMessage {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;
  // JSON格式的任務數據
  @Column(length = 500)
  private String payload;
  /**1:處理中,2:失敗,3:成功*/
  @Column(columnDefinition = "int default 0")
  private Integer state ;
  private LocalDateTime createdAt = LocalDateTime.now() ;
}
public interface LocalMessageRepository extends JpaRepository<LocalMessage, Long>{


  @Query("select e from LocalMessage e where e.state = 1 and e.retryCount < 3 order by e.createdAt desc limit 10")
  List<LocalMessage> queryMessages() ;
}

修改創建訂單業務

@Transactional
public void createOrder(Order order) {
  this.orderRepository.save(order);
  LocalMessage message = new LocalMessage();
  message.setState(1);
  try {
    message.setPayload(this.objectMapper.writeValueAsString(order));
  } catch (Exception e) {}
  this.messageRepository.save(message);
}

定義定時任務

首先,開啓定時任務

@Configuration
@EnableScheduling
public class TaskConfig {
}

最後,定義定時任務

@Component
public class TaskService {
  private final ExecutorService executor = Executors.newFixedThreadPool(5);


  private final LocalMessageRepository messageRepository ;
  private final EmailService emailService ;
  private final ObjectMapper objectMapper ;
  public TaskService(LocalMessageRepository messageRepository, 
      EmailService emailService, ObjectMapper objectMapper) {
    this.messageRepository = messageRepository;
    this.emailService = emailService ;
    this.objectMapper = objectMapper ;
  }
  @Scheduled(cron = "0 */2 * * * ?")
  public void sendMailTask() {
    List<LocalMessage> messages = this.messageRepository.queryMessages() ;
    List<CompletableFuture<Void>> futures = new ArrayList<>(messages.size());
    messages.forEach(message -> {
      CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
          Order order = this.objectMapper.readValue(message.getPayload(), Order.class);
          this.emailService.sendEmail(order);
          message.setState(3);
          message.setRetryCount(message.getRetryCount() + 1);
        } catch (Exception e) {
          int retryCount = message.getRetryCount() + 1;
          if (retryCount >= 3) {
            message.setState(2);
          }
          message.setRetryCount(retryCount);
        } finally {
          messageRepository.save(message);
        }
      }, executor);
      futures.add(future);
    });
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join() ;
  }
}

問題分析:

  • 實時性:任務執行需等到下一輪才能執行

本方案優勢:

  • 可靠性:數據庫事務保證任務持久化
  • 故障恢復:定時任務自動重試失敗操作
  • 系統解耦:業務服務與郵件發送完全隔離

如果這篇文章對您有所幫助,或者有所啓發的話,求一鍵三連:點贊、轉發、在看。

關注公眾號:woniuxgg,在公眾號中回覆:筆記  就可以獲得蝸牛為你精心準備的java實戰語雀筆記,回覆面試、開發手冊、有超讚的粉絲福利