知識庫 / Spring / Spring Boot RSS 訂閱

使用 PostgreSQL 作為消息代理

Spring Boot
HongKong
4
11:34 AM · Dec 06 ,2025

1. 引言

在本教程中,我們將學習如何使用 PostgreSQL 的 LISTEN/NOTIFY 命令來實現一個簡單的消息代理機制。

2. PostgreSQL 的 LISTEN/NOTIFY 機制簡介

簡單來説,這些命令允許連接的客户端通過常規 PostgreSQL 連接交換消息。客户端使用 NOTIFY 命令 向一個 頻道 發送通知,並可包含可選的字符串負載。

一個 頻道 可以是任何有效的 SQL 標識符,它就像傳統消息傳遞系統中的主題一樣。這意味着負載將被髮送到該特定 頻道 的所有活動監聽器。當沒有負載時,監聽器只會接收一個空通知。

要開始接收通知,客户端使用 LISTEN 命令,該命令的唯一參數是頻道名稱。此命令會立即返回,從而允許客户端繼續使用同一連接執行其他任務。

通知機制具有一些重要的特性:

  • 頻道 名稱在數據庫中是唯一的
  • 客户端無需特殊權限即可使用 LISTEN/NOTIFY
  • 當使用 NOTIFY 命令在事務中時,客户端僅在事務成功完成時才會接收到通知

此外,如果使用相同的負載向同一 頻道 發送多個 NOTIFY 命令(在同一事務中),客户端將接收到一個單個通知。

3. 使用 PostgreSQL 作為消息代理的理由

考慮到 PostgreSQL 的通知特性,我們可以思考在何時使用它代替像 RabbitMQ 這樣完整的消息代理。 就像往常一樣,這裏有一些權衡。 通常情況下,選擇後者意味着:

  • 更高的複雜性——消息代理是一個需要監控、升級等其他組件
  • 處理分佈式事務帶來的故障模式

通知機制不會受到這些問題的困擾:

  • 功能已經就緒,假設我們使用 PostgreSQL 作為主要的數據庫
  • 沒有分佈式事務

當然,這裏有一些限制:

  • 這是一個專有機制,需要永遠擁抱 PostgreSQL(或者至少直到進行重大重構)
  • 沒有直接支持持久訂閲者。 在客户端開始監聽消息之前發送的消息將被丟失

即使存在這些限制,這個機制也有一些潛在的應用:

  • 在“模塊化單體”應用程序中,用於通知的公交系統
  • 分佈式緩存失效
  • 輕量級消息代理,使用簡單的數據庫表作為隊列
  • 事件溯源架構

4. 在 Spring Boot 應用中使用 LISTEN/NOTIFY

現在我們對 LISTEN/NOTIFY 機制有了基本的瞭解,接下來我們將使用它構建一個簡單的 Spring Boot 測試應用程序。 我們將創建一個簡單的 API,允許我們提交買單/賣單訂單。 負載數據包含交易工具的符號、價格和我們願意買入或賣出的數量。 我們還將添加一個 API,允許我們根據其標識符查詢訂單。

目前一切正常。 但這裏有一個關鍵點: 我們希望在將訂單插入數據庫後立即從緩存中提供查詢結果。 儘管我們可以使用緩存寫-通過,但在分佈式環境中,我們需要擴展服務時,我們也需要分佈式緩存。

這就是通知機制發揮作用的地方: 我們將向每次插入時發送 NOTIFY,客户端將使用 LISTEN 來將訂單預加載到各自的本地緩存中

4.1. 項目依賴

我們的示例應用程序需要與 PostgreSQL 驅動程序一起,WebMVC SpringBoot 應用程序的標準依賴項。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.7.12</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
    <version>2.7.12</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.6.0</version>
</dependency>

最新版本的 spring-boot-starter-webspring-boot-starter-data-jdbcpostgresql 已在 Maven Central 上提供。

4.2. 通知服務

由於通知機制是特定於 PostgreSQL 的,我們將其通用的行為封裝在一個單一的類中:NotifierService通過這樣做,我們可以避免這些細節泄露到應用程序的其他部分。 這也有助於簡化單元測試,因為我們可以用一個模擬版本替換此服務以實現不同的場景。

NotifierService 有兩個主要職責。首先,它提供了一個外觀(facade),用於發送與訂單相關的通知:

public class NotifierService {
    private static final String ORDERS_CHANNEL = "orders";
    private final JdbcTemplate tpl;
   
    @Transactional
    public void notifyOrderCreated(Order order) {
        tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
    }
   // ... other methods omitted
}

其次,它提供了一個用於創建 Runnable 實例的工廠方法,應用程序使用該工廠接收通知。該工廠接收一個 Consumer,該 Consumer 消費 PGNotification 對象,並提供方法來檢索與通知相關的頻道和負載:

public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {        
    return () -> {
        tpl.execute((Connection c) -> {
            c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);                
            PGConnection pgconn = c.unwrap(PGConnection.class);                 
            while(!Thread.currentThread().isInterrupted()) {
                PGNotification[] nts = pgconn.getNotifications(10000);
                if ( nts == null || nts.length == 0 ) {
                    continue;
                }                    
                for( PGNotification nt : nts) {
                    consumer.accept(nt);
                }
            }                
            return 0;
        });                
    };
}

這裏我們選擇直接使用原始的 PGNotification 類以簡化操作。 在實際應用中,通常會處理多個領域實體,我們可以使用泛型或類似的技巧來擴展該類,以避免代碼重複。

以下是創建的 Runnable 類的一些注意事項:

  • 數據庫相關的邏輯使用提供的 JdbcTemplateexecute() 方法。 這確保了正確的連接處理/清理,並簡化了錯誤處理
  • 回調會在當前線程被中斷或運行時錯誤導致返回時執行,直到完成

請注意使用了 PGConnection 而不是標準的 JDBC Connection。 我們需要這樣才能直接訪問 getNotifications() 方法,該方法返回一個或多個排隊的通知。

getNotifications() 方法有兩個變體。 如果不帶參數調用,它會輪詢並返回任何未決的通知。 如果沒有未決的通知,它將返回 null。 第二個變體接受一個整數,該整數對應於等待通知的時間(以毫秒為單位),直到返回 null。 此外,如果我們將 0(零)作為超時值傳遞,getNotifications() 將會阻塞,直到收到新的通知。

在應用程序初始化期間,我們使用一個 CommandLineRunner bean 在 @Configuration class 中,該 bean 會啓動一個新的 Thread 實際上用於開始接收通知

@Configuration
public class ListenerConfiguration {
    
    @Bean
    CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
        return (args) -> {
            Runnable listener = notifier.createNotificationHandler(handler);            
            Thread t = new Thread(listener, "order-listener");
            t.start();
        };
    }
}

4.3. 連接處理

雖然從技術上講,使用相同的連接處理通知和常規查詢是可行的,但這種方式並不方便。需要將對 getNotification() 的調用分散到控制流程中,這會導致代碼難以閲讀和維護。

相反,標準做法是運行一個或多個專用線程來處理通知。每個線程都有自己的連接,該連接會一直保持打開狀態。 這可能會在連接由池(如 Hikari 或 DBCP)創建時產生問題

為了避免這些問題,我們的示例創建了一個專用 DriverDataSource,然後我們使用它來創建 JdbcTemplate,該 JdbcTemplate 用於 NotifierService

@Configuration
public class NotifierConfiguration {

    @Bean
    NotifierService notifier(DataSourceProperties props) {
        
        DriverDataSource ds = new DriverDataSource(
          props.determineUrl(), 
          props.determineDriverClassName(),
          new Properties(), 
          props.determineUsername(),
          props.determinePassword());
        
        JdbcTemplate tpl = new JdbcTemplate(ds);
        return new NotifierService(tpl);
    }
}

請注意,我們使用了與創建主 Spring 管理 DataSource相同的連接屬性。但是,我們沒有將此專用 DataSource暴露為 Bean,這會禁用 Spring Boot 的自動配置功能。

4.4. 通知處理程序

NotificationHandler 類是緩存邏輯的最後一部分,它實現了 <em Consumer&lt;Notification&gt;</em > 接口。該類的作用是處理單個通知並使用配置的 <em Cache</em >> 填充<em Order> 實例:

@Component
public class NotificationHandler implements Consumer<PGNotification> {
    private final OrdersService orders;

    @Override
    public void accept(PGNotification t) {
        Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
        // ... log messages omitted
    }
}

該實現使用 getName()getParameter() 方法從通知中檢索渠道名稱和訂單標識符。在這裏,我們可以假設通知將始終是預期的通知。這並非出於懶惰,而是源於 NotifierService 構建的 Runnable 對象,該對象將用於調用該處理程序。

實際邏輯非常簡單:我們使用 OrderRepository 從數據庫中檢索 Order 對象並將其添加到緩存中。

@Service
public class OrdersService {
    private final OrdersRepository repo;
    // ... other private fields omitted
   
    @Transactional(readOnly = true)
    public Optional<Order> findById(Long id) {
        Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
        if (!o.isEmpty()) {
            log.info("findById: cache hit, id={}",id);
            return o;
        }        
        log.info("findById: cache miss, id={}",id);
        o = repo.findById(id);
        if ( o.isEmpty()) {
            return o;
        }        
        ordersCache.put(id, o.get());
        return o;
    }
}

5. 測試

要查看通知機制的運行情況,最好的方法是啓動兩個或多個測試應用程序實例,每個實例配置為監聽不同的端口。 我們還需要一個運行的 PostgreSQL 實例,這兩個實例將連接到它。 請參閲 <em>application.properties</em> 文件並使用您的 PostgreSQL 實例連接詳細信息進行修改。

接下來,為了啓動我們的測試環境,我們將打開兩個 shell 並使用 Maven 運行應用程序。 項目的 <em>pom.xml</em> 文件包含一個額外的 profile,<em>instance1</em>,它將啓動應用程序在一個不同的端口上:

# On first shell:
$ mvn spring-boot:run
... many messages (omitted)
[  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
[  restartedMain] c.b.messaging.postgresql.Application     : Started Application in 2.615 seconds (JVM running for 2.944)
[  restartedMain] c.b.m.p.config.ListenerConfiguration     : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService          : notificationHandler: sending LISTEN command...

## On second shell
... many messages (omitted)
[  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
[  restartedMain] c.b.messaging.postgresql.Application     : Started Application in 1.984 seconds (JVM running for 2.274)
[  restartedMain] c.b.m.p.config.ListenerConfiguration     : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService          : notificationHandler: sending LISTEN command...

經過一段時間後,我們應該在每個應用程序上看到一條日誌消息,告知應用程序已準備好接收請求。現在,讓我們使用 curl 在另一個 shell 上創建我們的第一個 Order

$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}

該應用程序實例在 8080 端口運行並將打印一些消息。 我們還將看到 8081 實例日誌顯示它已收到通知

[ order-listener] c.b.m.p.service.NotificationHandler    : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler    : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)

這是證明該機制按預期工作的事實。
最後,我們可以再次使用curl來查詢在instance1上創建的訂單:

curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}

正如預期的那樣,我們獲取了 訂單 的詳細信息。 此外,應用程序日誌也顯示該信息來自緩存:

[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService   : findById: cache hit, id=30

6. 結論

在本文中,我們介紹了 PostgreSQL 的 NOTIFY/LISTEN 機制,以及如何利用它來實現一個輕量級的消息代理,無需額外的組件。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.