知識庫 / Spring RSS 訂閱

Spring Modulith 中的事件外部化

Spring
HongKong
3
11:19 AM · Dec 06 ,2025

1. 概述

本文將討論在 <em >@Transactional</em> 塊中發佈消息的需求,以及相關的性能挑戰,例如數據庫連接時間過長。為了解決這個問題,我們將利用 Spring Modulith 的功能,監聽 Spring 應用事件並自動將其發佈到 Kafka 主題。

2. 事務性操作和消息代理

為了本文的代碼示例,我們假設我們正在編寫負責將 文章 保存到 Baeldung 上的功能:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        validateArticle(article);
        article = addArticleTags(article);
        // ... other business logic
        
        articleRepository.save(article);
    }
}

此外,我們需要通知系統中的其他部分關於這篇新 Article 的信息。 藉助這些信息,其他模塊或服務將相應地做出反應,生成報告或向網站的讀者發送新聞通訊。

最簡單的方法是注入一個知道如何發佈此事件的依賴項。 在我們的示例中,我們使用 KafkaOperations 將消息發送到“baeldung.articles.published” 主題,並將 Articleslug() 用作鍵。

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        messageProducer.send(
          "baeldung.articles.published",
          article.slug(),
          new ArticlePublishedEvent(article.slug(), article.title())
        ).join();
    }
}

然而,這種方法存在一些問題。從設計角度來看,我們已經將領域服務與消息生產者耦合。 此外,領域服務直接依賴於低層組件,違反了Clean Architecture的基本原則之一。

此外,這種方法還會產生性能影響,因為所有操作都在一個 @Transactional 方法中進行。因此,用於保存 Article 的數據庫連接將一直保持打開狀態,直到消息成功發佈。

最終,這個解決方案還會在持久化數據和發佈消息之間創建一種容易出錯的關係:

  • 如果生產者未能成功發佈消息,事務將被回滾;
  • 事務最終可能在消息已成功發佈後仍然被回滾;

3. 使用 Spring Events 實現依賴反轉

我們可以利用 Spring Events 來改進解決方案的設計。 我們的目標是避免直接將消息發佈到 Kafka,而是應該發佈內部應用程序事件:移除 KafkaOperations 依賴項,併發布內部應用程序事件。

@Service
public class Baeldung {
    private final ApplicationEventPublisher applicationEvents;
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        applicationEvents.publishEvent(
          new ArticlePublishedEvent(article.slug(), article.title()));
    }
}

此外,我們還將擁有一個專門的 Kafka 生產者,作為我們基礎設施的一部分。該組件將監聽 ArticlePublishedEvent,並將發佈任務委託給底層的 KafkaOperations Bean:

@Component
class ArticlePublishedKafkaProducer {
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor 

    @EventListener
    public void publish(ArticlePublishedEvent article) {
        Assert.notNull(article.slug(), "Article Slug must not be null!");
        messageProducer.send("baeldung.articles.published", article.splug(), event);
    }
}

通過這種抽象,基礎設施組件現在依賴於領域服務產生的事件。換句話説,我們成功地降低了耦合並反轉了源代碼的依賴關係。 此外,如果其他模塊對 文章 的創建感興趣,它們現在可以無縫地監聽這些應用程序事件並相應地做出反應。

另一方面,publish() 方法將在與我們的業務邏輯相同的事務中調用。間接來説,這兩個操作在失敗的可能原因上仍然相關,因為任何一方的失敗都可能導致另一方的失敗或回滾。

4. 原子操作與非原子操作

現在,讓我們探討一下性能考量。首先,我們需要確定在與消息代理通信失敗時是否應該回滾操作,這取決於具體情境。

如果不需要這種原子性,則必須釋放數據庫連接並異步發佈事件。為了模擬這種情況,我們可以嘗試創建一個不帶 slug 的文章,從而導致 ArticlePublishedKafkaProducer::publish 失敗:

@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
    var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");

    baeldung.createArticle(article);

    assertThat(repository.findAll())
      .hasSize(1).first()
      .extracting(Article::title, Article::author)
      .containsExactly("Introduction to Spring Boot", "John Doe");
}

如果現在運行測試,將會失敗。這是因為 <em >ArticlePublishedKafkaProducer </em > 拋出異常,導致領域服務回滾事務。但是,我們可以通過將事件監聽器改為使用 <em >@TransactionalEventListener </em ><em >@Async</em >> 註解,使其變為異步操作。</strong

@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
    Assert.notNull(event.slug(), "Article Slug must not be null!");
    messageProducer.send("baeldung.articles.published", event);
}

如果現在重新運行測試,我們會發現異常已記錄,事件未發佈,並且實體已保存到數據庫中。 此外,數據庫連接也提前釋放,從而允許其他線程使用它。

5. 使用 Spring Modulith 進行事件外部化

我們通過兩步方法成功解決了原始代碼示例中的設計和性能問題:

  • 使用 Spring 應用程序事件進行依賴倒置
  • 利用 @TransactionalEventListener@Async 進行異步發佈

Spring Modulith 允許我們進一步簡化代碼,並提供內置支持用於該模式。讓我們先添加 spring-modulith-events-api 到我們的 pom.xml

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-api</artifactId>
    <version>1.1.3</version>
</dependency>

本模塊可配置為監聽應用程序事件並自動將其外部化到各種消息系統。我們將遵循我們原始示例並重點關注 Kafka。對於此集成,我們需要添加 spring-modulith-events-kafka 依賴項:

<dependency> 
    <groupId>org.springframework.modulith</groupId> 
    <artifactId>spring-modulith-events-kafka</artifactId> 
    <version>1.1.3</version>
    <scope>runtime</scope> 
</dependency>

現在,我們需要更新 ArticlePublishedEvent 併為其添加 @Externalized 註解。該註解需要指定目標路由的名稱和鍵,換句話説,就是 Kafka 主題和消息鍵。對於鍵,我們將使用一個 SpEL 表達式,它將調用 Articleslug() 方法:

@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}

6. 事件發佈註冊表

正如之前討論的,持久化數據與發佈消息之間存在着錯誤率較高的關係——未能發佈消息會導致事務回滾。另一方面,即使消息成功發佈,事務仍然可能在稍後回滾。

Spring Modulith 的事件發佈註冊表通過實現“事務性出箱”模式來解決這個問題,從而確保系統範圍內的最終一致性。 當發生事務性操作時,而不是立即將消息發送到外部系統,事件將被存儲在同一業務事務中的事件發佈日誌中。

6.1. 事件發佈日誌

首先,我們需要引入與我們的持久化技術相對應的 <em >spring-modulith-starter </em > 依賴項。 您可以在 官方文檔 中找到受支持的啓動器的完整列表。 鑑於我們使用 Spring Data JPA 和 PostgreSQL 數據庫,我們將添加 <a href="https://mvnrepository.com/artifact/org.springframework.modulith/spring-modulith-starter-jpa"><em >spring-modulith-starter-jpa </em></a > 依賴項:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jpa</artifactId>
    <version>1.1.2</version>
</dependency>

此外,我們將啓用 Spring Modulith 創建“event_publication”表。此表包含有關外部化應用程序事件的相關數據。讓我們將以下屬性添加到我們的 <em >application.yml</em >> 中:

spring.modulith:
  events.jdbc-schema-initialization.enabled: true

我們的設置使用 Testcontainer 啓動一個帶有 PostgreSQL 數據庫的 Docker 容器。因此,我們可以利用 Testcontainers Desktop 應用程序來“凍結容器關閉”和“打開一個終端”,該終端與容器本身關聯。然後,我們可以使用以下命令檢查數據庫:

  • “psql -U test_user -d test_db” – 用於打開 PostgreSQL 交互式終端
  • “\d” – 用於列出數據庫對象

正如我們所見,“even_publication” 表已成功創建。讓我們執行一個查詢以查看測試中持久的事件:

在第一行,我們可以看到由我們第一個測試創建的事件,該事件覆蓋了快樂流程。但是,在第二個測試中,我們故意創建了無效事件,通過省略“slug”,以模擬事件發佈期間的失敗。由於此 Article 已保存到數據庫但未成功發佈,因此它出現在 events_publication 表中,缺少 completion_date

6.2. 重新提交事件

我們可以通過啓用 Spring Modulith 在應用程序重啓時自動重新提交事件,利用 republish-outstanding-events-on-restart 屬性:

spring.modulith:
  republish-outstanding-events-on-restart: true

此外,我們還可以使用 IncompleteEventPublications Bean 以編程方式重新提交超過指定時間的失敗事件

@Component
class EventPublications {
    private final IncompleteEventPublications incompleteEvents;
    private final CompletedEventPublications completeEvents;

    // constructor

    void resubmitUnpublishedEvents() {
        incompleteEvents.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(60));
    }
}

同樣,我們也可以使用 CompletedEventPublications 豆來輕鬆地查詢或清除 event_publications 表:

void clearPublishedEvents() {
    completeEvents.deletePublicationsOlderThan(Duration.ofSeconds(60));
}

7. 事件外部化配置

儘管 <em @Externalized</em> 註解的值對於簡潔的 SpEL 表達式很有用,但在某些情況下,我們可能希望避免使用它:

  • 當表達式變得過於複雜時
  • 當我們希望將主題信息與應用程序事件分離時
  • 如果我們希望為應用程序事件和外部化事件使用不同的模型時

對於這些用例,我們可以使用 <em EventExternalizationConfiguration’</em> 的構建器配置必要的路由和事件映射。 然後,我們只需將此配置作為 Spring Bean 暴露即可:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .build();
}

外部化配置 允許我們以聲明式的方式定義應用程序事件的 路由映射。 此外,它還允許我們處理各種類型的應用程序事件。 例如,如果我們需要處理額外的事件,如“WeeklySummaryPublishedEvent”,我們可以通過添加更多特定類型的 路由映射 來輕鬆實現:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .route(
        WeeklySummaryPublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.handle())
      )
      .mapping(
        WeeklySummaryPublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.handle(), it.heading())
      )
      .build();
}

如我們所觀察的,映射路由需要兩方面內容:類型本身以及用於解析 Kafka 主題和負載的函數。在我們的示例中,兩個應用程序事件都將被映射到相同的類型,併發送到相同的主題。

此外,由於我們現在在配置中聲明瞭路由,因此可以從事件本身中刪除這些信息。因此,事件將僅包含 @Externalized 註解,且沒有值:

@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}

@Externalized
public record WeeklySummaryPublishedEvent(String handle, String heading) {
}

8. 結論

在本文中,我們探討了需要在事務性塊中發佈消息的場景。我們發現這種模式可能對性能產生重大影響,因為它會阻塞數據庫連接更長時間。

隨後,我們利用 Spring Modulith 的特性來監聽 Spring 應用事件並自動將其發佈到 Kafka 主題。 這種方法允許我們異步地外部化事件,從而更早地釋放數據庫連接。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.