博客 / 詳情

返回

延遲隊列處理訂單超時(RabbitMQ死信隊列實戰)

這篇文章的內容都是基於我們GoFrame微服務電商項目的實踐,感興趣的朋友可以點擊查看

最近發的每篇教程都講清楚了概念,也講清楚了在咱們項目中是如何實現和落地的。

1. 延遲隊列和死信隊列的基本概念

1.1 什麼是延遲隊列?

延遲隊列是一種特殊的消息隊列,它允許消息在發送後的一定時間延遲後才被消費。在電商系統中,延遲隊列常用於處理訂單超時自動取消、優惠券到期提醒、定時任務調度等場景。

1.2 什麼是死信隊列?

死信隊列(Dead Letter Queue,DLQ)是用於存儲無法被正常消費的消息的隊列。當消息滿足以下任一條件時,會被髮送到死信隊列:

  1. 消息被拒絕(basic.reject 或 basic.nack)並且 requeue=false
  2. 消息的 TTL(Time-To-Live)過期
  3. 隊列達到最大長度,無法再添加新消息

1.3 延遲隊列的實現方式

在RabbitMQ中,實現延遲隊列主要有兩種方式:

  1. TTL+ 死信隊列:設置消息的TTL,當消息過期後會被轉發到死信隊列
  2. 插件方式:使用 RabbitMQ Delayed Message Exchange 插件

本項目採用的是第二種方式,通過安裝和配置 RabbitMQ Delayed Message Exchange 插件來實現延遲隊列功能。

2. 為什麼需要使用延遲隊列處理訂單超時?

在電商系統中,訂單創建後通常需要用户在一定時間內完成支付,否則訂單應該被自動取消。處理這種場景有幾種常見方案:

2.1 常見方案對比

方案 優點 缺點
定時任務輪詢 實現簡單 1. 時間精度低2. 對數據庫壓力大3. 資源浪費
Redis過期監聽 性能好 1. 需要額外的Redis集羣2. 實現複雜度高3. 存在消息丟失風險
延遲隊列 1. 時間精度高2. 解耦系統3. 高可靠 1. 需要引入消息隊列2. 額外維護成本

2.2 延遲隊列的優勢

  1. 解耦系統:訂單創建和超時處理邏輯解耦
  2. 高可靠:消息持久化,防止消息丟失
  3. 時間精確:可以精確控制消息的延遲時間
  4. 削峯填谷:有效處理流量峯值
  5. 擴展性好:可以輕鬆擴展其他延遲業務需求

3. RabbitMQ延遲隊列插件安裝

3.1 插件介紹

RabbitMQ Delayed Message Exchange 插件是一個官方維護的插件,它提供了一個延遲交換機類型 x-delayed-message,允許消息根據指定的延遲時間進行投遞。

3.2 插件安裝

從項目結構可以看到,插件已經放置在 rabbitmq/plugins 目錄下:

rabbitmq/
└── plugins/
    └── rabbitmq_delayed_message_exchange-4.1.0.ez

在Docker環境中,通常需要在 docker-compose.yml 中配置啓用該插件。

4. 項目中的延遲隊列實現

4.1 核心組件設計

項目中實現延遲隊列處理訂單超時主要包含以下幾個核心組件:

  1. RabbitMQ客户端:封裝了與RabbitMQ交互的核心功能
  2. 訂單超時事件發佈:在訂單創建時發佈延遲消息
  3. 訂單超時事件消費:處理超時消息,執行訂單取消操作
  4. 訂單狀態更新:更新訂單狀態為已取消
  5. 庫存返還:取消訂單後返還商品庫存

4.2 RabbitMQ客户端封裝

項目在 utility/rabbitmq/rabbitmq.go 中封裝了RabbitMQ客户端,提供了連接管理、消息發佈、消費等功能。

// 關鍵方法:PublishWithDelay 發佈延遲消息
func (r *RabbitMQ) PublishWithDelay(exchange, routingKey string, message interface{}, delayMs int) error {
    body, err := json.Marshal(message)
    if err != nil {
        return err
    }

    return r.channel.Publish(
        exchange,
        routingKey,
        false,
        false,
        amqp.Publishing{
            ContentType: "application/json",
            Body:        body,
            Headers: amqp.Table{
                "x-delay": delayMs, // 延遲時間,單位毫秒
            },
            DeliveryMode: amqp.Persistent, // 持久化消息
        },
    )
}

特別注意:

  • 使用 Headers: amqp.Table{"x-delay": delayMs} 設置延遲時間
  • 設置 DeliveryMode: amqp.Persistent 確保消息持久化,防止服務重啓導致消息丟失

4.3 延遲交換機聲明

// DeclareExchange 聲明交換機
func (r *RabbitMQ) DeclareExchange(name, kind string) error {
    args := amqp.Table{}

    // 如果是延遲交換機,需要設置特殊參數
    if kind == "x-delayed-message" {
        args["x-delayed-type"] = "direct" // 指定延遲交換機的底層類型
    }

    return r.channel.ExchangeDeclare(
        name,
        kind,
        true,  // durable
        false, // autoDelete
        false, // internal
        false, // noWait
        args,  // arguments
    )
}

延遲交換機需要指定 kindx-delayed-message,並在 args 中設置 x-delayed-type 參數。

5. 訂單超時處理流程實現

5.1 訂單超時事件定義

// 訂單超時事件定義
type OrderTimeoutEvent struct {
    OrderId   int    `json:"order_id"`
    Type      string `json:"type"`
    TimeStamp string `json:"timestamp"`
}

// 事件類型常量
const (
    OrderTimeout = "order_timeout"
)

5.2 發佈訂單超時事件

當用户創建訂單時,系統會發佈一個延遲消息,設置一定的延遲時間(如30分鐘):

// PublishOrderTimeoutEvent 發佈訂單超時事件
func PublishOrderTimeoutEvent(orderId int, delayMs int) {
    ctx := context.Background()

    // 初始化RabbitMQ連接
    rb, err := NewRabbitMQ(ctx)
    if err != nil {
        g.Log().Errorf(ctx, "Failed to connect to RabbitMQ: %v", err)
        return
    }
    defer rb.Close()

    // 聲明延遲交換機
    exchange := g.Cfg().MustGet(ctx, "rabbitmq.exchange.orderDelayExchange").String()
    err = rb.DeclareExchange(exchange, "x-delayed-message")
    if err != nil {
        g.Log().Errorf(ctx, "Failed to declare delay exchange: %v", err)
        return
    }

    // 創建事件
    event := OrderTimeoutEvent{
        OrderId:   orderId,
        Type:      OrderTimeout,
        TimeStamp: time.Now().Format(time.RFC3339),
    }

    // 發佈延遲事件
    routingKey := g.Cfg().MustGet(ctx, "rabbitmq.routingKey.orderTimeout").String()
    err = rb.PublishWithDelay(exchange, routingKey, event, delayMs)
    if err != nil {
        g.Log().Errorf(ctx, "Failed to publish orderTimeout event: %v", err)
    } else {
        g.Log().Infof(ctx, "Published orderTimeout event with %d ms delay: %+v", delayMs, event)
    }
}

5.3 訂單超時消費者

訂單超時消費者負責接收和處理超時消息:

// OrderTimeoutConsumer 訂單超時未支付消費者
type OrderTimeoutConsumer struct {
    *rabbitmq.BaseConsumer
}

// NewOrderTimeoutConsumer 創建訂單超時未支付消費者
func NewOrderTimeoutConsumer(ctx context.Context) *OrderTimeoutConsumer {
    config := rabbitmq.ConsumerConfig{
        Exchange:      g.Cfg().MustGet(ctx, "rabbitmq.exchange.orderDelayExchange").String(),
        ExchangeType:  "x-delayed-message",
        Queue:         g.Cfg().MustGet(ctx, "rabbitmq.queue.orderTimeoutQueue").String(),
        RoutingKey:    g.Cfg().MustGet(ctx, "rabbitmq.routingKey.orderTimeout").String(),
        ConsumerTag:   "order_service_order_timeout",
        AutoAck:       false,
        PrefetchCount: 1,
        Durable:       true,
    }

    return &OrderTimeoutConsumer{
        BaseConsumer: rabbitmq.NewBaseConsumer("OrderTimeoutConsumer", config),
    }
}

// HandleMessage 處理訂單超時未支付消息
func (c *OrderTimeoutConsumer) HandleMessage(ctx context.Context, msg amqp.Delivery) error {
    var event rabbitmq.OrderTimeoutEvent
    err := rabbitmq.UnmarshalEvent(msg.Body, &event)
    if err != nil {
        g.Log().Errorf(ctx, "解析訂單超時未支付結果事件失敗: %v", err)
        return err
    }
    g.Log().Infof(ctx, "收到訂單超時未支付事件: %+v", event)
    if event.Type != rabbitmq.OrderTimeout {
        g.Log().Errorf(ctx, "不是訂單超時未支付的事件,event.Type:%s", event.Type)
        return gerror.WrapCode(gcode.CodeInvalidParameter, fmt.Errorf("不是訂單超時未支付的事件,event.Type:%s", event.Type))
    }
    eventTime, err := time.Parse(time.RFC3339, event.TimeStamp)
    if err != nil {
        return fmt.Errorf("解析事件時間戳失敗: %v", err)
    }

    // 判斷是否過期:事件時間 + 30s < 當前時間
    expireTime := g.Cfg().MustGet(ctx, "business.orderTimeout").String()
    expireMs, err := strconv.Atoi(expireTime)
    if err != nil {
        return fmt.Errorf("訂單超時時間配置無效: %v", err)
    }
    expireDuration := time.Duration(expireMs) * time.Millisecond
    if time.Now().Before(eventTime.Add(expireDuration)) {
        g.Log().Infof(ctx, "訂單未到取消時間,跳過處理: order_id=%d, event_time=%s", event.OrderId, event.TimeStamp)
        return nil
    }

    // 調用訂單超時未支付處理邏輯
    err = order_info.HandleOrderTimeoutResult(ctx, event.OrderId)
    if err != nil {
        g.Log().Errorf(ctx, "處理訂單 %d 的超時未支付失敗: %v", event.OrderId, err)
        return err
    }
    g.Log().Infof(ctx, "成功處理訂單 %d 的超時未支付事件", event.OrderId)

    // 取消庫存
    eventReq, err := order_info.GetOrderDetail(ctx, event.OrderId)
    if err != nil {
        g.Log().Errorf(ctx, "獲取訂單 %v 對應的商品信息失敗,err: %v", event.OrderId, err)
        return err
    }
    go rabbitmq.PublishReturnStockEvent(event.OrderId, eventReq)

    return nil
}

消費者的主要職責:

  1. 解析訂單超時事件消息
  2. 驗證事件類型和時間
  3. 調用訂單超時處理邏輯
  4. 觸發庫存返還操作

5.4 訂單超時處理邏輯

// HandleOrderTimeoutResult 處理訂單超時結果
func HandleOrderTimeoutResult(ctx context.Context, orderId int) error {
    // 更新字段
    updateData := g.Map{
        "status":     consts.OrderStatusCancelled,
        "updated_at": gtime.Now(), // 可選:更新時間戳
    }
    // 更新訂單狀態
    result, err := dao.OrderInfo.Ctx(ctx).Where("id=? AND status=?", orderId, consts.OrderStatusPendingPayment).Update(updateData)
    if err != nil {
        return gerror.WrapCode(gcode.CodeDbOperationError, err)
    }

    row, _ := result.RowsAffected()
    if row == 0 {
        g.Log().Infof(ctx, "訂單已取消,無需再取消, orderId=%d", orderId)
        return nil
    }

    g.Log().Infof(ctx, "訂單狀態更新成功, 訂單編號:{%s}, 新狀態: %d", orderId, consts.OrderStatusPendingPayment)
    return nil
}

這個函數的主要邏輯:

  1. 準備更新數據,設置訂單狀態為已取消
  2. 使用 WHERE id=? AND status=? 條件進行樂觀鎖更新,確保只更新待支付狀態的訂單
  3. 檢查更新結果,記錄日誌

6. 完整業務流程

6.1 流程圖

┌───────────────┐      ┌────────────────────┐      ┌──────────────────────┐
│  創建訂單     │ ──>  │  發佈延遲消息      │ ──>  │  延遲交換機存儲      │
└───────────────┘      └────────────────────┘      └──────────┬─────────┘
                                                             │ 延遲時間到
                                                             ▼
┌───────────────────────┐      ┌───────────────────────┐      ┌─────────────────┐
│  返還商品庫存         │ <─── │  更新訂單狀態為已取消 │ <─── │  消費超時消息   │
└───────────────────────┘      └───────────────────────┘      └─────────────────┘

6.2 流程步驟詳解

  1. 訂單創建:用户提交訂單,系統創建訂單記錄,狀態為"待支付"
  2. 發佈延遲消息:調用 PublishOrderTimeoutEvent 方法,發佈一個延遲消息,延遲時間通常設置為訂單超時時間(如30分鐘)
  3. 消息存儲:延遲消息被髮送到延遲交換機並存儲
  4. 消息延遲:消息在延遲交換機中等待,直到延遲時間到期
  5. 消息路由:延遲時間到期後,消息被路由到訂單超時隊列
  6. 消息消費:訂單超時消費者 OrderTimeoutConsumer 從隊列中獲取消息
  7. 訂單狀態檢查:驗證訂單是否仍然是"待支付"狀態
  8. 更新訂單狀態:調用 HandleOrderTimeoutResult 更新訂單狀態為"已取消"
  9. 返還庫存:調用 PublishReturnStockEvent 發佈庫存返還事件

7. 總結

7.1 核心優勢

  1. 高可靠性:消息持久化、指數退避重試等機制確保消息不丟失
  2. 精確控制:可以精確控制訂單超時時間
  3. 系統解耦:訂單創建和超時處理邏輯完全解耦
  4. 可擴展性:相同的模式可以應用於其他需要延遲處理的場景

7.2 學習要點

  1. 延遲隊列概念:理解延遲隊列的基本原理和應用場景
  2. RabbitMQ插件使用:掌握 RabbitMQ Delayed Message Exchange 插件的配置和使用
  3. 消息持久化:理解消息持久化的重要性和配置方式
  4. 消費者實現:學習如何實現高可靠的消息消費者
  5. 冪等性處理:理解並實現冪等性處理,避免重複操作

7.3 應用場景擴展

除了訂單超時處理,延遲隊列還可以用於以下場景:

  1. 預約提醒:用户預約某服務前的提醒通知
  2. 會員到期提醒:會員到期前的自動提醒
  3. 定時任務:不需要高精度的定時任務調度
  4. 異步任務補償:失敗任務的延遲重試
  5. 優惠券過期通知:優惠券即將過期的提醒

通過本實戰案例,相信大家已經掌握瞭如何使用RabbitMQ延遲隊列來處理訂單超時問題,以及相關的最佳實踐和優化方向。

如果你對這種技術問題有疑問,或者對這個微服務項目感興趣,都可以直接關注或者私信我:wangzhongyang1993。
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.