Stories

Detail Return Return

【Spring開發】SpringCloud服務端高級框架第5篇:2.死信交換機【附代碼文檔】 - Stories Detail

🏆🏆🏆教程全知識點簡介:微服務保護、服務異步通信、消息中間件部署、分佈式事務、搜索引擎、緩存、數據同步以及相關組件的安裝配置等技術要點。在微服務保護方面,介紹了 Sentinel 的基礎知識,包括雪崩問題、超時處理、艙壁模式、斷路器機制,以及不同服務保護技術的對比;講解了流量控制(簇點鏈路、流控模式、熱點參數限流)、隔離與降級(FeignClient 整合 Sentinel、線程隔離)、授權規則(自定義異常結果)及規則持久化(規則管理模式與 pull 模式),並演示了基於 Nacos 的規則持久化改造。服務異步通信部分探討了消息可靠性(生產者消息確認、Return 回調、ConfirmCallback)、死信交換機、TTL 隊列等高級應用。RabbitMQ 部署指南涵蓋了單機部署、DelayExchange 插件安裝、集羣部署、鏡像模式等內容。分佈式事務部分介紹了 CAP 定理、BASE 理論、常見解決方案,Seata 的基礎與部署(TC 服務部署、Nacos 配置、數據庫表創建)、多種事務模式(XA 模式及優缺點、四種模式對比)和高可用架構。分佈式搜索引擎章節講解了 Elasticsearch 的原理(ELK 技術棧、倒排索引)、索引庫與文檔操作、RestAPI 與 RestClient 的使用、排序與高亮、酒店搜索案例(分頁、競價排名、ad標記、算分函數)、自動補全、數據同步(同步調用、監聽 binlog)、集羣搭建與腦裂問題、分片存儲測試,以及單點 ES、Kibana、IK 分詞器安裝。緩存部分介紹了 Redis 持久化(RDB 與 AOF 對比)、單機安裝 Redis、Redis 集羣、多級緩存(JVM 進程緩存、Caffeine)、請求參數處理、Tomcat 查詢、HTTP 工具與 CJSON 工具類、Redis 緩存查詢。數據同步與網關部分包括 Canal 安裝(開啓 MySQL 主從、設置權限)、OpenResty 安裝(開發庫、目錄結構、環境變量配置)及運行流程。

<!-- start:bj1 -->

📚📚👉👉👉git倉庫code.zip 直接get:   https://gitlab.com/yiqing112/backend/-/blob/main/Spring/Sprin...    🍅🍅

<!-- end:bj1 -->

✨ 本教程項目亮點

🧠 知識體系完整:覆蓋從基礎原理、核心方法到高階應用的全流程內容
💻 全技術鏈覆蓋:完整前後端技術棧,涵蓋開發必備技能
🚀 從零到實戰:適合 0 基礎入門到提升,循序漸進掌握核心能力
📚 豐富文檔與代碼示例:涵蓋多種場景,可運行、可複用
🛠 工作與學習雙參考:不僅適合系統化學習,更可作為日常開發中的查閲手冊
🧩 模塊化知識結構:按知識點分章節,便於快速定位和複習
📈 長期可用的技術積累:不止一次學習,而是能伴隨工作與項目長期參考

🎯🎯🎯全教程總章節


🚀🚀🚀本篇主要內容

2.死信交換機

2.1.初識死信交換機

2.1.1.什麼是死信交換機

什麼是死信?

當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):

  • 消費者使用basic.reject或 basic.nack聲明消費失敗,並且消息的requeue參數設置為false
  • 消息是一個過期消息,超時無人消費
  • 要投遞的隊列消息滿了,無法投遞

如果這個包含死信的隊列配置了dead-letter-exchange屬性,指定了一個交換機,那麼隊列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,檢查DLX)。

如圖,一個消息被消費者拒絕了,變成了死信:

因為simple.queue綁定了死信交換機 dl.direct,因此死信會投遞給這個交換機:

如果這個死信交換機也綁定了一個隊列,則消息最終會進入這個存放死信的隊列:

另外,隊列將死信投遞給死信交換機時,必須知道兩個信息:

  • 死信交換機名稱
  • 死信交換機與死信隊列綁定的RoutingKey

這樣才能確保投遞的消息能到達死信交換機,並且正確的路由到死信隊列。

JDK 8 API 文檔

2.1.2.利用死信交換機接收死信(拓展)

在失敗重試策略中,默認的RejectAndDontRequeueRecoverer會在本地重試次數耗盡後,發送reject給RabbitMQ,消息變成死信,被丟棄。

可以給simple.queue添加一個死信交換機,給死信交換機綁定一個隊列。這樣消息變成死信後也不會丟棄,而是最終投遞到死信交換機,路由到與死信交換機綁定的隊列。

在consumer服務中,定義一組死信交換機、死信隊列:

Asciidoctor 文檔

// 聲明普通的 simple.queue隊列,並且為其指定死信交換機:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定隊列名稱,並持久化
        .deadLetterExchange("dl.direct") // 指定死信交換機
        .build();
}
// 聲明死信交換機 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲死信的隊列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 將死信隊列 與 死信交換機綁定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

2.1.3.總結

什麼樣的消息會成為死信?

  • 消息被消費者reject或者返回nack
  • 消息超時未消費
  • 隊列滿了

死信交換機的使用場景是什麼?

  • 如果隊列綁定了死信交換機,死信會投遞到死信交換機;
  • 可以利用死信交換機收集所有消費者處理失敗的消息(死信),交由人工處理,進一步提高消息隊列的可靠性。

2.2.TTL

一個隊列中的消息如果超時未消費,則會變為死信,超時分為兩種情況:

  • 消息所在的隊列設置了超時時間
  • 消息本身設置了超時時間

2.2.1.接收超時死信的死信交換機

在consumer服務的SpringRabbitListener中,定義一個新的消費者,並且聲明 死信交換機、死信隊列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}

2.2.2.聲明一個隊列,並且指定TTL

要給隊列設置超時時間,需要在聲明隊列時配置x-message-ttl屬性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定隊列名稱,並持久化
        .ttl(10000) // 設置隊列的超時時間,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交換機
        .build();
}

注意,這個隊列設定了死信交換機為dl.ttl.direct

聲明交換機,將ttl與交換機綁定:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

發送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {
    // 創建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封裝到CorrelationData中
    CorrelationDa

# 3.惰性隊列

## 3.1.消息堆積問題

當生產者發送消息的速度超過了消費者處理消息的速度,就會導致隊列中的消息堆積,直到隊列存儲消息達到上限。之後發送的消息就會成為死信,可能會被丟棄,這就是消息堆積問題。



![](/img/bVdmmkG)





解決消息堆積有兩種思路:

- 增加更多消費者,提高消費速度。也就是 之前説的work queue模式
- 擴大隊列容積,提高堆積上限



要提升隊列容積,把消息保存在內存中顯然是不行的。



## 3.2.惰性隊列

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊列。惰性隊列的特徵如下:

- 接收到消息後直接存入磁盤而非內存
- 消費者要消費消息時才會從磁盤中讀取並加載到內存
- 支持數百萬條的消息存儲



### 3.2.1.基於命令行設置lazy-queue

而要設置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可。可以通過命令行將一個運行中的隊列修改為惰性隊列:

OkHttp 文檔

MyBatis-Spring 集成

Gradle 用户手冊

Lettuce Redis 客户端

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues


命令解讀:

- `rabbitmqctl` :RabbitMQ的命令行工具
- `set_policy` :添加一個策略
- `Lazy` :策略名稱,可以自定義
- `"^lazy-queue$"` :用正則表達式匹配隊列的名字
- `'{"queue-mode":"lazy"}'` :設置隊列模式為lazy模式
- `--apply-to queues  `:策略的作用對象,是所有的隊列



### 3.2.2.基於@Bean聲明lazy-queue

![](/img/bVdmmkH)

### 3.2.3.基於@RabbitListener聲明LazyQueue

![](/img/bVdmmkI)





### 3.3.總結

消息堆積問題的解決方案?

- 隊列上綁定多個消費者,提高消費速度
- 使用惰性隊列,可以再mq中保存更多消息

惰性隊列的優點有哪些?

- 基於磁盤存儲,消息上限高
- 沒有間歇性的page-out,性能比較穩定

惰性隊列的缺點有哪些?

- 基於磁盤存儲,消息時效性會降低
- 性能受限於磁盤的IO





# 4.MQ集羣



## 4.1.集羣分類

RabbitMQ的是基於Erlang語言編寫,而Erlang又是一個面向併發的語言,天然支持集羣模式。RabbitMQ的集羣有兩種模式:

•**普通集羣**:是一種分佈式集羣,將隊列分散到集羣的各個節點,從而提高整個集羣的併發能力。

•**鏡像集羣**:是一種主從集羣,普通集羣的基礎上,添加了主從備份功能,提高集羣的數據可用性。



鏡像集羣雖然支持主從,但主從同步並不是強一致的,某些情況下可能有數據丟失的風險。因此在RabbitMQ的3.8版本以後,推出了新的功能:**仲裁隊列**來代替鏡像集羣,底層採用Raft協議確保主從的數據一致性。



## 4.2.普通集羣



### 4.2.1.集羣結構和特徵

普通集羣,或者叫標準集羣(classic cluster),具備下列特徵:

- 會在集羣的各個節點間共享部分數據,包括:交換機、隊列元信息。不包含隊列中的消息。
- 當訪問集羣某節點時,如果隊列不在該節點,會從數據所在節點傳遞到當前節點並返回
- 隊列所在節點宕機,隊列中的消息就會丟失

結構如圖:

![](/img/bVdmmkJ)



### 4.2.2.部署

參考課前資料:《RabbitMQ部署指南.md》





## 4.3.鏡像集羣



### 4.3.1.集羣結構和特徵

鏡像集羣:本質是主從模式,具備下面的特徵:

- 交換機、隊列、隊列中的消息會在各個mq的鏡像節點之間同步備份。
- 創建隊列的節點被稱為該隊列的**主節點,**備份到的其它節點叫做該隊列的**鏡像**節點。
- 一個隊列的主節點可能是另一個隊列的鏡像節點
- 所有操作都是主節點完成,然後同步給鏡像節點
- 主宕機後,鏡像節點會替代成新的主

結構如圖:

![](/img/bVdmmkK)





### 4.3.2.部署

參考課前資料:《RabbitMQ部署指南.md》



## 4.4.仲裁隊列



### 4.4.1.集羣特徵

仲裁隊列:仲裁隊列是3.8版本以後才有的新功能,用來替代鏡像隊列,具備下列特徵:

- 與鏡像隊列一樣,都是主從模式,支持主從數據同步
- 使用非常簡單,沒有複雜的配置
- 主從同步基於Raft協議,強一致



### 4.4.2.部署

參考課前資料:《RabbitMQ部署指南.md》





### 4.4.3.Java代碼創建仲裁隊列

AssertJ 文檔

@Bean
public Queue quorumQueue() {

return QueueBuilder
    .durable("quorum.queue") // 持久化
    .quorum() // 仲裁隊列
    .build();

}




### 4.4.4.SpringAMQP連接MQ集羣

注意,這裏用address來代替host、port方式

MyBatis 官方文檔

Spring Cloud 文檔

spring:
rabbitmq:

addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: itcast
password: 123321
virtual-host: /

🚀✨ (未完待續)項目系列下一章

📚下一篇 將進入更精彩的環節!
🔔 記得收藏 & 關注,第一時間獲取更新!
🍅 一起見證整個系列逐步成型的全過程。

Add a new Comments

Some HTML is okay.