這篇文章的內容都是基於我們GoFrame微服務電商項目的實踐,感興趣的朋友可以點擊查看
最近發的每篇教程都講清楚了概念,也講清楚了在咱們項目中是如何實現和落地的。
1. 延遲隊列和死信隊列的基本概念
1.1 什麼是延遲隊列?
延遲隊列是一種特殊的消息隊列,它允許消息在發送後的一定時間延遲後才被消費。在電商系統中,延遲隊列常用於處理訂單超時自動取消、優惠券到期提醒、定時任務調度等場景。
1.2 什麼是死信隊列?
死信隊列(Dead Letter Queue,DLQ)是用於存儲無法被正常消費的消息的隊列。當消息滿足以下任一條件時,會被髮送到死信隊列:
- 消息被拒絕(basic.reject 或 basic.nack)並且 requeue=false
- 消息的 TTL(Time-To-Live)過期
- 隊列達到最大長度,無法再添加新消息
1.3 延遲隊列的實現方式
在RabbitMQ中,實現延遲隊列主要有兩種方式:
- TTL+ 死信隊列:設置消息的TTL,當消息過期後會被轉發到死信隊列
- 插件方式:使用 RabbitMQ Delayed Message Exchange 插件
本項目採用的是第二種方式,通過安裝和配置 RabbitMQ Delayed Message Exchange 插件來實現延遲隊列功能。
2. 為什麼需要使用延遲隊列處理訂單超時?
在電商系統中,訂單創建後通常需要用户在一定時間內完成支付,否則訂單應該被自動取消。處理這種場景有幾種常見方案:
2.1 常見方案對比
| 方案 | 優點 | 缺點 |
|---|---|---|
| 定時任務輪詢 | 實現簡單 | 1. 時間精度低2. 對數據庫壓力大3. 資源浪費 |
| Redis過期監聽 | 性能好 | 1. 需要額外的Redis集羣2. 實現複雜度高3. 存在消息丟失風險 |
| 延遲隊列 | 1. 時間精度高2. 解耦系統3. 高可靠 | 1. 需要引入消息隊列2. 額外維護成本 |
2.2 延遲隊列的優勢
- 解耦系統:訂單創建和超時處理邏輯解耦
- 高可靠:消息持久化,防止消息丟失
- 時間精確:可以精確控制消息的延遲時間
- 削峯填谷:有效處理流量峯值
- 擴展性好:可以輕鬆擴展其他延遲業務需求
3. RabbitMQ延遲隊列插件安裝
3.1 插件介紹
RabbitMQ Delayed Message Exchange 插件是一個官方維護的插件,它提供了一個延遲交換機類型 x-delayed-message,允許消息根據指定的延遲時間進行投遞。
3.2 插件安裝
從項目結構可以看到,插件已經放置在 rabbitmq/plugins 目錄下:
rabbitmq/
└── plugins/
└── rabbitmq_delayed_message_exchange-4.1.0.ez
在Docker環境中,通常需要在 docker-compose.yml 中配置啓用該插件。
4. 項目中的延遲隊列實現
4.1 核心組件設計
項目中實現延遲隊列處理訂單超時主要包含以下幾個核心組件:
- RabbitMQ客户端:封裝了與RabbitMQ交互的核心功能
- 訂單超時事件發佈:在訂單創建時發佈延遲消息
- 訂單超時事件消費:處理超時消息,執行訂單取消操作
- 訂單狀態更新:更新訂單狀態為已取消
- 庫存返還:取消訂單後返還商品庫存
4.2 RabbitMQ客户端封裝
項目在 utility/rabbitmq/rabbitmq.go 中封裝了RabbitMQ客户端,提供了連接管理、消息發佈、消費等功能。
// 關鍵方法:PublishWithDelay 發佈延遲消息
func (r *RabbitMQ) PublishWithDelay(exchange, routingKey string, message interface{}, delayMs int) error {
body, err := json.Marshal(message)
if err != nil {
return err
}
return r.channel.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delayMs, // 延遲時間,單位毫秒
},
DeliveryMode: amqp.Persistent, // 持久化消息
},
)
}
特別注意:
- 使用
Headers: amqp.Table{"x-delay": delayMs}設置延遲時間 - 設置
DeliveryMode: amqp.Persistent確保消息持久化,防止服務重啓導致消息丟失
4.3 延遲交換機聲明
// DeclareExchange 聲明交換機
func (r *RabbitMQ) DeclareExchange(name, kind string) error {
args := amqp.Table{}
// 如果是延遲交換機,需要設置特殊參數
if kind == "x-delayed-message" {
args["x-delayed-type"] = "direct" // 指定延遲交換機的底層類型
}
return r.channel.ExchangeDeclare(
name,
kind,
true, // durable
false, // autoDelete
false, // internal
false, // noWait
args, // arguments
)
}
延遲交換機需要指定 kind 為 x-delayed-message,並在 args 中設置 x-delayed-type 參數。
5. 訂單超時處理流程實現
5.1 訂單超時事件定義
// 訂單超時事件定義
type OrderTimeoutEvent struct {
OrderId int `json:"order_id"`
Type string `json:"type"`
TimeStamp string `json:"timestamp"`
}
// 事件類型常量
const (
OrderTimeout = "order_timeout"
)
5.2 發佈訂單超時事件
當用户創建訂單時,系統會發佈一個延遲消息,設置一定的延遲時間(如30分鐘):
// PublishOrderTimeoutEvent 發佈訂單超時事件
func PublishOrderTimeoutEvent(orderId int, delayMs int) {
ctx := context.Background()
// 初始化RabbitMQ連接
rb, err := NewRabbitMQ(ctx)
if err != nil {
g.Log().Errorf(ctx, "Failed to connect to RabbitMQ: %v", err)
return
}
defer rb.Close()
// 聲明延遲交換機
exchange := g.Cfg().MustGet(ctx, "rabbitmq.exchange.orderDelayExchange").String()
err = rb.DeclareExchange(exchange, "x-delayed-message")
if err != nil {
g.Log().Errorf(ctx, "Failed to declare delay exchange: %v", err)
return
}
// 創建事件
event := OrderTimeoutEvent{
OrderId: orderId,
Type: OrderTimeout,
TimeStamp: time.Now().Format(time.RFC3339),
}
// 發佈延遲事件
routingKey := g.Cfg().MustGet(ctx, "rabbitmq.routingKey.orderTimeout").String()
err = rb.PublishWithDelay(exchange, routingKey, event, delayMs)
if err != nil {
g.Log().Errorf(ctx, "Failed to publish orderTimeout event: %v", err)
} else {
g.Log().Infof(ctx, "Published orderTimeout event with %d ms delay: %+v", delayMs, event)
}
}
5.3 訂單超時消費者
訂單超時消費者負責接收和處理超時消息:
// OrderTimeoutConsumer 訂單超時未支付消費者
type OrderTimeoutConsumer struct {
*rabbitmq.BaseConsumer
}
// NewOrderTimeoutConsumer 創建訂單超時未支付消費者
func NewOrderTimeoutConsumer(ctx context.Context) *OrderTimeoutConsumer {
config := rabbitmq.ConsumerConfig{
Exchange: g.Cfg().MustGet(ctx, "rabbitmq.exchange.orderDelayExchange").String(),
ExchangeType: "x-delayed-message",
Queue: g.Cfg().MustGet(ctx, "rabbitmq.queue.orderTimeoutQueue").String(),
RoutingKey: g.Cfg().MustGet(ctx, "rabbitmq.routingKey.orderTimeout").String(),
ConsumerTag: "order_service_order_timeout",
AutoAck: false,
PrefetchCount: 1,
Durable: true,
}
return &OrderTimeoutConsumer{
BaseConsumer: rabbitmq.NewBaseConsumer("OrderTimeoutConsumer", config),
}
}
// HandleMessage 處理訂單超時未支付消息
func (c *OrderTimeoutConsumer) HandleMessage(ctx context.Context, msg amqp.Delivery) error {
var event rabbitmq.OrderTimeoutEvent
err := rabbitmq.UnmarshalEvent(msg.Body, &event)
if err != nil {
g.Log().Errorf(ctx, "解析訂單超時未支付結果事件失敗: %v", err)
return err
}
g.Log().Infof(ctx, "收到訂單超時未支付事件: %+v", event)
if event.Type != rabbitmq.OrderTimeout {
g.Log().Errorf(ctx, "不是訂單超時未支付的事件,event.Type:%s", event.Type)
return gerror.WrapCode(gcode.CodeInvalidParameter, fmt.Errorf("不是訂單超時未支付的事件,event.Type:%s", event.Type))
}
eventTime, err := time.Parse(time.RFC3339, event.TimeStamp)
if err != nil {
return fmt.Errorf("解析事件時間戳失敗: %v", err)
}
// 判斷是否過期:事件時間 + 30s < 當前時間
expireTime := g.Cfg().MustGet(ctx, "business.orderTimeout").String()
expireMs, err := strconv.Atoi(expireTime)
if err != nil {
return fmt.Errorf("訂單超時時間配置無效: %v", err)
}
expireDuration := time.Duration(expireMs) * time.Millisecond
if time.Now().Before(eventTime.Add(expireDuration)) {
g.Log().Infof(ctx, "訂單未到取消時間,跳過處理: order_id=%d, event_time=%s", event.OrderId, event.TimeStamp)
return nil
}
// 調用訂單超時未支付處理邏輯
err = order_info.HandleOrderTimeoutResult(ctx, event.OrderId)
if err != nil {
g.Log().Errorf(ctx, "處理訂單 %d 的超時未支付失敗: %v", event.OrderId, err)
return err
}
g.Log().Infof(ctx, "成功處理訂單 %d 的超時未支付事件", event.OrderId)
// 取消庫存
eventReq, err := order_info.GetOrderDetail(ctx, event.OrderId)
if err != nil {
g.Log().Errorf(ctx, "獲取訂單 %v 對應的商品信息失敗,err: %v", event.OrderId, err)
return err
}
go rabbitmq.PublishReturnStockEvent(event.OrderId, eventReq)
return nil
}
消費者的主要職責:
- 解析訂單超時事件消息
- 驗證事件類型和時間
- 調用訂單超時處理邏輯
- 觸發庫存返還操作
5.4 訂單超時處理邏輯
// HandleOrderTimeoutResult 處理訂單超時結果
func HandleOrderTimeoutResult(ctx context.Context, orderId int) error {
// 更新字段
updateData := g.Map{
"status": consts.OrderStatusCancelled,
"updated_at": gtime.Now(), // 可選:更新時間戳
}
// 更新訂單狀態
result, err := dao.OrderInfo.Ctx(ctx).Where("id=? AND status=?", orderId, consts.OrderStatusPendingPayment).Update(updateData)
if err != nil {
return gerror.WrapCode(gcode.CodeDbOperationError, err)
}
row, _ := result.RowsAffected()
if row == 0 {
g.Log().Infof(ctx, "訂單已取消,無需再取消, orderId=%d", orderId)
return nil
}
g.Log().Infof(ctx, "訂單狀態更新成功, 訂單編號:{%s}, 新狀態: %d", orderId, consts.OrderStatusPendingPayment)
return nil
}
這個函數的主要邏輯:
- 準備更新數據,設置訂單狀態為已取消
- 使用
WHERE id=? AND status=?條件進行樂觀鎖更新,確保只更新待支付狀態的訂單 - 檢查更新結果,記錄日誌
6. 完整業務流程
6.1 流程圖
┌───────────────┐ ┌────────────────────┐ ┌──────────────────────┐
│ 創建訂單 │ ──> │ 發佈延遲消息 │ ──> │ 延遲交換機存儲 │
└───────────────┘ └────────────────────┘ └──────────┬─────────┘
│ 延遲時間到
▼
┌───────────────────────┐ ┌───────────────────────┐ ┌─────────────────┐
│ 返還商品庫存 │ <─── │ 更新訂單狀態為已取消 │ <─── │ 消費超時消息 │
└───────────────────────┘ └───────────────────────┘ └─────────────────┘
6.2 流程步驟詳解
- 訂單創建:用户提交訂單,系統創建訂單記錄,狀態為"待支付"
- 發佈延遲消息:調用
PublishOrderTimeoutEvent方法,發佈一個延遲消息,延遲時間通常設置為訂單超時時間(如30分鐘) - 消息存儲:延遲消息被髮送到延遲交換機並存儲
- 消息延遲:消息在延遲交換機中等待,直到延遲時間到期
- 消息路由:延遲時間到期後,消息被路由到訂單超時隊列
- 消息消費:訂單超時消費者
OrderTimeoutConsumer從隊列中獲取消息 - 訂單狀態檢查:驗證訂單是否仍然是"待支付"狀態
- 更新訂單狀態:調用
HandleOrderTimeoutResult更新訂單狀態為"已取消" - 返還庫存:調用
PublishReturnStockEvent發佈庫存返還事件
7. 總結
7.1 核心優勢
- 高可靠性:消息持久化、指數退避重試等機制確保消息不丟失
- 精確控制:可以精確控制訂單超時時間
- 系統解耦:訂單創建和超時處理邏輯完全解耦
- 可擴展性:相同的模式可以應用於其他需要延遲處理的場景
7.2 學習要點
- 延遲隊列概念:理解延遲隊列的基本原理和應用場景
- RabbitMQ插件使用:掌握 RabbitMQ Delayed Message Exchange 插件的配置和使用
- 消息持久化:理解消息持久化的重要性和配置方式
- 消費者實現:學習如何實現高可靠的消息消費者
- 冪等性處理:理解並實現冪等性處理,避免重複操作
7.3 應用場景擴展
除了訂單超時處理,延遲隊列還可以用於以下場景:
- 預約提醒:用户預約某服務前的提醒通知
- 會員到期提醒:會員到期前的自動提醒
- 定時任務:不需要高精度的定時任務調度
- 異步任務補償:失敗任務的延遲重試
- 優惠券過期通知:優惠券即將過期的提醒
通過本實戰案例,相信大家已經掌握瞭如何使用RabbitMQ延遲隊列來處理訂單超時問題,以及相關的最佳實踐和優化方向。
如果你對這種技術問題有疑問,或者對這個微服務項目感興趣,都可以直接關注或者私信我:wangzhongyang1993。