1. 引言
在本教程中,我們將學習如何使用 PostgreSQL 的 LISTEN/NOTIFY 命令來實現一個簡單的消息代理機制。
2. PostgreSQL 的 LISTEN/NOTIFY 機制簡介
簡單來説,這些命令允許連接的客户端通過常規 PostgreSQL 連接交換消息。客户端使用 NOTIFY 命令 向一個 頻道 發送通知,並可包含可選的字符串負載。
一個 頻道 可以是任何有效的 SQL 標識符,它就像傳統消息傳遞系統中的主題一樣。這意味着負載將被髮送到該特定 頻道 的所有活動監聽器。當沒有負載時,監聽器只會接收一個空通知。
要開始接收通知,客户端使用 LISTEN 命令,該命令的唯一參數是頻道名稱。此命令會立即返回,從而允許客户端繼續使用同一連接執行其他任務。
通知機制具有一些重要的特性:
- 頻道 名稱在數據庫中是唯一的
- 客户端無需特殊權限即可使用 LISTEN/NOTIFY
- 當使用 NOTIFY 命令在事務中時,客户端僅在事務成功完成時才會接收到通知
此外,如果使用相同的負載向同一 頻道 發送多個 NOTIFY 命令(在同一事務中),客户端將接收到一個單個通知。
3. 使用 PostgreSQL 作為消息代理的理由
考慮到 PostgreSQL 的通知特性,我們可以思考在何時使用它代替像 RabbitMQ 這樣完整的消息代理。 就像往常一樣,這裏有一些權衡。 通常情況下,選擇後者意味着:
- 更高的複雜性——消息代理是一個需要監控、升級等其他組件
- 處理分佈式事務帶來的故障模式
通知機制不會受到這些問題的困擾:
- 功能已經就緒,假設我們使用 PostgreSQL 作為主要的數據庫
- 沒有分佈式事務
當然,這裏有一些限制:
- 這是一個專有機制,需要永遠擁抱 PostgreSQL(或者至少直到進行重大重構)
- 沒有直接支持持久訂閲者。 在客户端開始監聽消息之前發送的消息將被丟失
即使存在這些限制,這個機制也有一些潛在的應用:
- 在“模塊化單體”應用程序中,用於通知的公交系統
- 分佈式緩存失效
- 輕量級消息代理,使用簡單的數據庫表作為隊列
- 事件溯源架構
4. 在 Spring Boot 應用中使用 LISTEN/NOTIFY
現在我們對 LISTEN/NOTIFY 機制有了基本的瞭解,接下來我們將使用它構建一個簡單的 Spring Boot 測試應用程序。 我們將創建一個簡單的 API,允許我們提交買單/賣單訂單。 負載數據包含交易工具的符號、價格和我們願意買入或賣出的數量。 我們還將添加一個 API,允許我們根據其標識符查詢訂單。
目前一切正常。 但這裏有一個關鍵點: 我們希望在將訂單插入數據庫後立即從緩存中提供查詢結果。 儘管我們可以使用緩存寫-通過,但在分佈式環境中,我們需要擴展服務時,我們也需要分佈式緩存。
這就是通知機制發揮作用的地方: 我們將向每次插入時發送 NOTIFY,客户端將使用 LISTEN 來將訂單預加載到各自的本地緩存中。
4.1. 項目依賴
我們的示例應用程序需要與 PostgreSQL 驅動程序一起,WebMVC SpringBoot 應用程序的標準依賴項。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
最新版本的 spring-boot-starter-web、spring-boot-starter-data-jdbc 和 postgresql 已在 Maven Central 上提供。
4.2. 通知服務
由於通知機制是特定於 PostgreSQL 的,我們將其通用的行為封裝在一個單一的類中:NotifierService。通過這樣做,我們可以避免這些細節泄露到應用程序的其他部分。 這也有助於簡化單元測試,因為我們可以用一個模擬版本替換此服務以實現不同的場景。
NotifierService 有兩個主要職責。首先,它提供了一個外觀(facade),用於發送與訂單相關的通知:
public class NotifierService {
private static final String ORDERS_CHANNEL = "orders";
private final JdbcTemplate tpl;
@Transactional
public void notifyOrderCreated(Order order) {
tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
}
// ... other methods omitted
}其次,它提供了一個用於創建 Runnable 實例的工廠方法,應用程序使用該工廠接收通知。該工廠接收一個 Consumer,該 Consumer 消費 PGNotification 對象,並提供方法來檢索與通知相關的頻道和負載:
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return () -> {
tpl.execute((Connection c) -> {
c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if ( nts == null || nts.length == 0 ) {
continue;
}
for( PGNotification nt : nts) {
consumer.accept(nt);
}
}
return 0;
});
};
}
這裏我們選擇直接使用原始的 PGNotification 類以簡化操作。 在實際應用中,通常會處理多個領域實體,我們可以使用泛型或類似的技巧來擴展該類,以避免代碼重複。
以下是創建的 Runnable 類的一些注意事項:
- 數據庫相關的邏輯使用提供的 JdbcTemplate 的 execute() 方法。 這確保了正確的連接處理/清理,並簡化了錯誤處理
- 回調會在當前線程被中斷或運行時錯誤導致返回時執行,直到完成
請注意使用了 PGConnection 而不是標準的 JDBC Connection。 我們需要這樣才能直接訪問 getNotifications() 方法,該方法返回一個或多個排隊的通知。
getNotifications() 方法有兩個變體。 如果不帶參數調用,它會輪詢並返回任何未決的通知。 如果沒有未決的通知,它將返回 null。 第二個變體接受一個整數,該整數對應於等待通知的時間(以毫秒為單位),直到返回 null。 此外,如果我們將 0(零)作為超時值傳遞,getNotifications() 將會阻塞,直到收到新的通知。
在應用程序初始化期間,我們使用一個 CommandLineRunner bean 在 @Configuration class 中,該 bean 會啓動一個新的 Thread 實際上用於開始接收通知。
@Configuration
public class ListenerConfiguration {
@Bean
CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
return (args) -> {
Runnable listener = notifier.createNotificationHandler(handler);
Thread t = new Thread(listener, "order-listener");
t.start();
};
}
}
4.3. 連接處理
雖然從技術上講,使用相同的連接處理通知和常規查詢是可行的,但這種方式並不方便。需要將對 getNotification() 的調用分散到控制流程中,這會導致代碼難以閲讀和維護。
相反,標準做法是運行一個或多個專用線程來處理通知。每個線程都有自己的連接,該連接會一直保持打開狀態。 這可能會在連接由池(如 Hikari 或 DBCP)創建時產生問題。
為了避免這些問題,我們的示例創建了一個專用 DriverDataSource,然後我們使用它來創建 JdbcTemplate,該 JdbcTemplate 用於 NotifierService:
@Configuration
public class NotifierConfiguration {
@Bean
NotifierService notifier(DataSourceProperties props) {
DriverDataSource ds = new DriverDataSource(
props.determineUrl(),
props.determineDriverClassName(),
new Properties(),
props.determineUsername(),
props.determinePassword());
JdbcTemplate tpl = new JdbcTemplate(ds);
return new NotifierService(tpl);
}
}
請注意,我們使用了與創建主 Spring 管理 DataSource相同的連接屬性。但是,我們沒有將此專用 DataSource暴露為 Bean,這會禁用 Spring Boot 的自動配置功能。
4.4. 通知處理程序
NotificationHandler 類是緩存邏輯的最後一部分,它實現了 <em Consumer<Notification></em > 接口。該類的作用是處理單個通知並使用配置的 <em Cache</em >> 填充<em Order> 實例:
@Component
public class NotificationHandler implements Consumer<PGNotification> {
private final OrdersService orders;
@Override
public void accept(PGNotification t) {
Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
// ... log messages omitted
}
}
該實現使用 getName() 和 getParameter() 方法從通知中檢索渠道名稱和訂單標識符。在這裏,我們可以假設通知將始終是預期的通知。這並非出於懶惰,而是源於 NotifierService 構建的 Runnable 對象,該對象將用於調用該處理程序。
實際邏輯非常簡單:我們使用 OrderRepository 從數據庫中檢索 Order 對象並將其添加到緩存中。
@Service
public class OrdersService {
private final OrdersRepository repo;
// ... other private fields omitted
@Transactional(readOnly = true)
public Optional<Order> findById(Long id) {
Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
if (!o.isEmpty()) {
log.info("findById: cache hit, id={}",id);
return o;
}
log.info("findById: cache miss, id={}",id);
o = repo.findById(id);
if ( o.isEmpty()) {
return o;
}
ordersCache.put(id, o.get());
return o;
}
}
5. 測試
要查看通知機制的運行情況,最好的方法是啓動兩個或多個測試應用程序實例,每個實例配置為監聽不同的端口。 我們還需要一個運行的 PostgreSQL 實例,這兩個實例將連接到它。 請參閲 <em>application.properties</em> 文件並使用您的 PostgreSQL 實例連接詳細信息進行修改。
接下來,為了啓動我們的測試環境,我們將打開兩個 shell 並使用 Maven 運行應用程序。 項目的 <em>pom.xml</em> 文件包含一個額外的 profile,<em>instance1</em>,它將啓動應用程序在一個不同的端口上:
# On first shell:
$ mvn spring-boot:run
... many messages (omitted)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 2.615 seconds (JVM running for 2.944)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
## On second shell
... many messages (omitted)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 1.984 seconds (JVM running for 2.274)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
經過一段時間後,我們應該在每個應用程序上看到一條日誌消息,告知應用程序已準備好接收請求。現在,讓我們使用 curl 在另一個 shell 上創建我們的第一個 Order:
$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}
該應用程序實例在 8080 端口運行並將打印一些消息。 我們還將看到 8081 實例日誌顯示它已收到通知:
[ order-listener] c.b.m.p.service.NotificationHandler : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)這是證明該機制按預期工作的事實。
最後,我們可以再次使用curl來查詢在instance1上創建的訂單:
curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}正如預期的那樣,我們獲取了 訂單 的詳細信息。 此外,應用程序日誌也顯示該信息來自緩存:
[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService : findById: cache hit, id=306. 結論
在本文中,我們介紹了 PostgreSQL 的 NOTIFY/LISTEN 機制,以及如何利用它來實現一個輕量級的消息代理,無需額外的組件。