上週在Review學員代碼的時候,我們發現了一個很基礎但很重要的問題:支付回調流程中缺少了庫存扣減環節。這類問題雖然基礎,但如果直接進入生產環境,可能導致庫存的數據和實際銷售的情況不一致,出現超賣的情況。能夠及時發現這種問題,這就是Review代碼的重要性。
先看這段有問題的代碼:
// 原來的支付回調邏輯(問題代碼)
func PaymentCallback(ctx context.Context, orderID uint32) error {
// 只更新訂單狀態為已支付
_, err := dao.OrderInfo.Ctx(ctx).Where("id=?", orderID).
Data(g.Map{"status": consts.OrderStatusPaid}).Update()
if err != nil {
return err
}
// 缺少庫存扣減邏輯!商品庫存還是原樣
return nil
}
這個問題的核心在於流程設計的不完整,用户支付成功後只是更新了訂單狀態,卻沒有同步調整商品庫存,可能導致其他用户購買時看到的庫存數據不正確。
想要解決這個問題,需要補充缺失的邏輯,更要考慮分佈式系統下的流程合理性,這裏我們選擇引入RabbitMQ實現事件驅動架構,既能解決當前問題,也能方便後續的業務擴展。
問題分析
業務邏輯理解不正確
原邏輯對訂單流程的理解是"創建訂單→支付成功→完成交易",但正確的流程應該要包含庫存相關的環節:
創建訂單→預扣庫存→支付成功→確認交易→後續處理
不同服務之間的協作
在微服務架構中:
- 訂單服務負責訂單狀態流轉
- 商品服務負責庫存數據維護
兩個服務需要通過規範的協作機制保證數據一致性,而不是簡單的同步調用。
解決方案
我們重新設計了包含庫存管理的訂單流程,通過RabbitMQ實現服務間的解耦通信:
創建訂單時預扣庫存
將庫存扣減提前到訂單創建的階段,通過數據庫事務保證操作的原子性:
// app/goods/internal/logic/goods_info/goods_info.go
func ReduceStock(ctx context.Context, req *rabbitmq.OrderCreatedEvent) error {
// 使用數據庫事務確保原子性
err := g.DB().Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
for _, goods := range req.GoodsInfo {
// 1. 查詢當前庫存
var goodsInfo entity.GoodsInfo
if err := dao.GoodsInfo.Ctx(ctx).TX(tx).
Where("id = ?", goods.GoodsId).
Fields("stock").
Scan(&goodsInfo); err != nil {
return gerror.Wrapf(err, "查詢商品{%d}庫存失敗", goods.GoodsId)
}
// 2. 判斷庫存是否足夠
if goodsInfo.Stock < goods.Count {
return gerror.Newf("商品{%d}庫存不足(當前:%d, 需要:%d)",
goods.GoodsId, goodsInfo.Stock, goods.Count)
}
// 3. 扣減庫存
newStock := goodsInfo.Stock - goods.Count
g.Log().Infof(ctx, "商品{%d}新庫存:%d", goods.GoodsId, newStock)
if _, err := dao.GoodsInfo.Ctx(ctx).TX(tx).
Where("id = ?", goods.GoodsId).
Data(g.Map{"stock": newStock}).
Update(); err != nil {
return gerror.Wrapf(err, "更新商品{%d}庫存失敗", goods.GoodsId)
}
}
return nil
})
return err
}
設計思路:
- 提前鎖定庫存,避免支付過程中商品被重複購買
- 事務保證庫存檢查與扣減的原子性,防止併發問題
- 庫存不足時直接阻斷訂單創建,提升用户體驗
支付成功後的確認處理
支付完成後,通過事件通知觸發後續清理工作:
// 支付回調邏輯
func PaymentCallback(ctx context.Context, orderID uint32) error {
// 1. 更新訂單狀態
_, err := dao.OrderInfo.Ctx(ctx).Where("id=?", orderID).
Data(g.Map{"status": consts.OrderStatusPaid}).Update()
if err != nil {
return err
}
// 2. 獲取訂單詳情(包含商品信息)
orderDetail, err := GetOrderDetail(ctx, orderID)
if err != nil {
return err
}
// 3. 發佈庫存確認事件(這裏庫存已在創建訂單時預扣)
// 主要是清理緩存等後續操作
go func() {
// 異步清理商品緩存
if err := goodsRedis.DeleteKeys(context.Background(), orderDetail.GoodsIDs); err != nil {
g.Log().Errorf(ctx, "清理商品緩存失敗: %v", err)
}
}()
return nil
}
訂單超時的庫存返還機制
為避免用户下單後未支付導致庫存長時間鎖定,設計超時返還邏輯:
// app/order/utility/consumer/order_timeout_consumer.go
func (c *OrderTimeoutConsumer) HandleMessage(ctx context.Context, msg amqp.Delivery) error {
// 解析訂單超時事件
var event rabbitmq.OrderTimeoutEvent
err := rabbitmq.UnmarshalEvent(msg.Body, &event)
if err != nil {
return err
}
// 判斷是否真正超時(30分鐘未支付)
eventTime, _ := time.Parse(time.RFC3339, event.TimeStamp)
if time.Now().After(eventTime.Add(30 * time.Minute)) {
// 處理訂單超時
err = order_info.HandleOrderTimeoutResult(ctx, event.OrderId)
if err != nil {
return err
}
// 發佈庫存返還事件
eventReq, err := order_info.GetOrderDetail(ctx, event.OrderId)
if err == nil {
go rabbitmq.PublishReturnStockEvent(event.OrderId, eventReq)
}
}
return nil
}
庫存返還的具體實現
通過併發處理提升庫存返還效率:
// app/goods/internal/logic/goods_info/goods_info.go
func ReturnStock(ctx context.Context, req *rabbitmq.OrderStockReturnEvent) ([]*rabbitmq.OrderGoodsInfo, error) {
// 使用goroutine併發處理每個商品
resultChan := make(chan *rabbitmq.OrderGoodsInfo, len(req.GoodsInfo))
var wg sync.WaitGroup
wg.Add(len(req.GoodsInfo))
for _, stockInfo := range req.GoodsInfo {
go func(stockInfo *rabbitmq.OrderGoodsInfo) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
g.Log().Errorf(ctx, "庫存返還panic: %v", r)
}
}()
// 查詢當前庫存
var goodsInfo entity.GoodsInfo
err := dao.GoodsInfo.Ctx(ctx).Where("id=?", stockInfo.GoodsId).
Fields("stock").Scan(&goodsInfo)
if err != nil {
resultChan <- &rabbitmq.OrderGoodsInfo{
GoodsId: stockInfo.GoodsId,
Count: stockInfo.Count,
}
return
}
// 返還庫存
newStock := goodsInfo.Stock + stockInfo.Count
_, err = dao.GoodsInfo.Ctx(ctx).Where("id=?", stockInfo.GoodsId).
Data(g.Map{"stock": newStock}).Update()
if err != nil {
resultChan <- &rabbitmq.OrderGoodsInfo{
GoodsId: stockInfo.GoodsId,
Count: stockInfo.Count,
}
return
}
g.Log().Infof(ctx, "商品{%d}庫存返還成功,新庫存:%d", stockInfo.GoodsId, newStock)
}(stockInfo)
}
wg.Wait()
close(resultChan)
// 收集處理結果
var resultArr []*rabbitmq.OrderGoodsInfo
for res := range resultChan {
resultArr = append(resultArr, res)
}
return resultArr, nil
}
消息隊列的事件驅動架構
定義核心事件實現服務解耦:
// 事件定義
type OrderCreatedEvent struct {
OrderId uint32 `json:"order_id"`
GoodsInfo []*OrderGoodsInfo `json:"goods_info"`
}
type OrderStockReturnEvent struct {
OrderId uint32 `json:"order_id"`
GoodsInfo []*OrderGoodsInfo `json:"goods_info"`
}
事件流設計:
用户下單→OrderCreated事件→商品服務扣減庫存
↓
支付超時→OrderTimeout事件→商品服務返還庫存
↓
支付成功→訂單狀態更新+緩存清理
技術難點與解決方案
難點1:分佈式系統的數據一致性
問題:訂單與庫存數據分屬不同服務,如何保證操作協同?
解決方案:
- 採用最終一致性模型,通過事件重試確保數據對齊
- 每個事件處理都設計冪等性,避免重複執行導致錯誤
難點2:高併發下的庫存準確性
問題:多用户同時購買時如何防止庫存數據混亂?
解決方案:
// 數據庫事務+行級鎖保證併發安全
err := g.DB().Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// 事務內查詢自動加行鎖,阻止併發修改
var goodsInfo entity.GoodsInfo
dao.GoodsInfo.Ctx(ctx).TX(tx).Where("id=?", goodsId).Scan(&goodsInfo)
// 檢查並更新庫存
if goodsInfo.Stock >= count {
dao.GoodsInfo.Ctx(ctx).TX(tx).Data(g.Map{"stock": goodsInfo.Stock - count}).Update()
}
return nil
})
難點3:系統性能與用户體驗平衡
問題:庫存操作頻繁,如何避免影響響應速度?
解決方案:
- 核心流程同步處理,確保用户體驗
- 非核心操作(如緩存清理)異步化,不阻塞主流程
- 批量操作使用併發處理提升效率
結語
很多時候一些嚴重的錯誤往往出現在一些小細節上面。通過這次庫存相關的優化案例可以發現:看似簡單的業務流程,在分佈式架構下需要考慮服務協作、併發控制、異常處理等等多個方面的因素。
通過引入RabbitMQ,不僅解決了已經存在的庫存同步問題,更讓整個系統具備了更好的擴展性,比如未來要新增物流通知、積分等功能的時候,只需新增事件的消費者就ok了,不需要再去修改現有的核心代碼。
本文基於真實的GoFrame微服務電商項目,所有代碼都經過生產環境驗證,這裏是項目的介紹:(https://mp.weixin.qq.com/s/ACzEHtvGh2YsU_4fxo83fQ)。如果你也遇到類似問題,歡迎交流討論!