動態

詳情 返回 返回

冪等設計論之動機及其實現 - 動態 詳情

每個清晨應該都從思考開始,從思考開始會讓我的腦袋更清晰一些。這是一個系列,先是冪等然後是限流。

從一個下單場景出發

讓我們想象這麼一個場景,現在我有一個表單,也就是我們要創建某條數據,這個表單可以是創建一條訂單, 像下面這樣:

img

一般來説我們下單結束之後可以選擇讓頁面跳轉到訂單詳情頁面上,但是呢,不巧的是由於客户的網絡比較慢,第一個請求發出去的時候,大致上需要等待兩秒的時間。這個時候用户顯然有點不耐心了,於是他重複點擊了立即下單這個按鈕,然後發出去了兩個下單請求,於是就有兩條相同的數據。如果是在秒殺的場景下面,這就佔據了兩個庫存,顯然這是需要避免的情況。

於是我們祭出來第一個補丁,用户點擊立即下單之後,立即下單這個按鈕進入loading這個狀態,也就是暫時禁用這個按鈕, 像下面這樣:

img

這看起來問題解決了一部分,但是不幸的是還是出現了相同的數據,佔用了一段時間的庫存,給我們造成了一點困擾。那這是為什麼呢? 首先在網頁上,我們知道JavaScript是單線程的,但同樣實現了併發, 也就是説代碼的邏輯進入到了一個隊列裏面被交錯執行,這意味着你的loading可能會稍微延遲一點生效。這就為用户重複點擊產生了第一種條件。

那在移動端上呢? 其實面臨的是一個問題,也是一樣的,繪製UI負責頁面渲染的其實也是一個線程,這意味着也是交錯處理UI事件,也會面臨手快的客户。UI線程也會卡頓,想想這個場景也是蠻自然的,移動端的場景更加極端一點,有時候手機在發熱的時候會手機會降頻,你就會觀察到UI線程的卡頓。

第二個問題來自於不可測的網絡,移動端設備通常設置了超時時間,也就是説限制響應在指定的時間回覆,超出這個時間就不選擇等待,無限制的等待下去這個頁面處於不可用,用户體驗也不良好。然後在彈出網絡異常重試之後,用户接着點擊這個立即下單這個按鈕。其實服務端已經在處理數據了只是返回的數據在網絡中被運輸的比較慢,那麼這個時候用户接着點擊了提交的按鈕,接着相同的數據第二次到達服務端,但他其實只想下一單,這次下單成功了,於是到詳情頁面驚訝的發現自己下了兩個單。

如果在網絡持續不好的情況下,用户可能頻繁點擊這個下單按鈕,這就導致大量庫存被佔用,顯然這也是我們需要避免的一個點。那麼怎麼辦呢? 我們注意到用户當前一直停留在這個頁面,那麼我們能否在用户進入這個下單頁面的時候,前端就向後端請求一個下單令牌,提交的時候攜帶這個令牌進行提交: img

也就是説我們在訂單創建的時候先讓前端請求令牌,創建訂單的時候攜帶這個令牌創建訂單,如果這個令牌是未使用狀態,則原子的更新token的狀態,注意這裏可能是併發的更新token的狀態,更新失敗的可以給出提示當前訂單正在創建請稍後,也或者我們可以對賦予token多個屬性,一個是訂單已創建,一個是訂單是創建中,如果處於創建中,則給出當前訂單正在創建中,如果是完成就返回創建成功讓前端跳轉到對應的訂單詳情頁面。代碼如下:

import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class OrderToken {

    public enum TokenState {
        UNUSED,      // 還未使用
        CREATING,    // 正在創建訂單
        COMPLETED    // 訂單已創建
    }

    private static final String FIELD_STATE = "state";

    private static final String FIELD_ORDERID = "orderId";

    private final RMap<String, String> dataMap;

    private final long expirationSeconds;

    private final RedissonClient redissonClient = SpringContextUtils.getBean(RedissonClient.class);


    /**
     * 初始化一個token
     * @param token
     * @param expireSeconds
     */
    public OrderToken(String token, long expireSeconds) {
        this.dataMap = redissonClient.getMap("order:token:" + token);
        this.expirationSeconds = expireSeconds;
    }

    /**
     * 創建訂單的token,設置失效時間
     * @param expireSeconds
     * @return
     */
    public static String createOrderToken(long expireSeconds) {
        String tokenVal = UUID.randomUUID().toString();
        new OrderToken(tokenVal, expireSeconds)
                .initializeAsUnused();
        return tokenVal;
    }

    public void initializeAsUnused() {
        dataMap.put(FIELD_STATE, TokenState.UNUSED.name());
        dataMap.expire(expirationSeconds, TimeUnit.SECONDS);
    }

    //這裏刷新訂單的過期時間是為了防止在業務處理過程中,令牌因為超時而意外失效。
    //比如我們假定token的失效時間是三分鐘, 然後用户在當前頁面停留了兩份五十五秒
    //到我們這裏開始使用的時候,發現token不存在了。其實這裏也沒什麼,完全可以讓用户刷新頁面。
    //但這裏其實有個問題在於如果用UUID,沒法驗證這個格式是否是偽造的,
    //就會識別不清楚這個token是否是第三方偽造,因此我們頒發token的時候不能隨便生成
    //我們應當的token應當是時間+用户ID+其他有意義的業務字符串來頒發
    //獲取token的時候應當首先檢驗是否是合法的,如果是合法的,我們可以讓這個token復活。
    //來避免token在傳輸過程中過期的這種現象
    // 所以這裏只是簡單演示,不能做生產用途
    public boolean tryClaimForCreation() {
        boolean ok = dataMap.replace(FIELD_STATE,
                TokenState.UNUSED.name(),
                TokenState.CREATING.name());
        if (ok) dataMap.expire(expirationSeconds, TimeUnit.SECONDS);
        return ok;
    }

    // 標記訂單的狀態
    // 注意可能有同學會考慮這裏的過期問題,
    // 但是我們需要為我們的訂單創建服務,設置一個最長的時間
    public void markAsCompleted(String orderId) {
        dataMap.put(FIELD_STATE, TokenState.COMPLETED.name());
        dataMap.put(FIELD_ORDERID, orderId);
    }

    /**
     * 獲取當前token的狀態
     * @return
     */
    public TokenState getState() {
        String raw = dataMap.get(FIELD_STATE);
        if (raw == null) return null;
        try {
            return TokenState.valueOf(raw);
        } catch (IllegalArgumentException e) {
            System.err.println("無效狀態字符串: " + raw);
            return null;
        }
    }
}

如果創建數據的接口給第三方調用

有時候你的創建訂單的接口可能給第三方調用,或者第三方通過接口來調用你的系統創建數據,一般來説我們會提供兩個接口,一個是查詢接口,一個是創建數據的接口。這個時候token的方案就可能不那麼好用,考慮下面這種場景,調用創建數據的接口超時而拋出異常,我們假定遇到了這樣一種狀況,發起調用的某個時刻網絡處於擁擠的狀態,客户端出現了超時異常。

但這個時候我們已經在處理這個創建數據的請求了,但是還沒創建完畢。對於客户端來説,這個時候需要查詢這條數據是否創建完成,但是由於我們的接口還在處理當中,第三方的一個操作是重新生成token來創建數據,這也造成了重複的數據。對於這種模式,我們同樣要求第三方系統給一個唯一鍵,在創建數據的時候同樣先查詢對應的數據是否創建完畢來避免由於網絡抖動帶來的重複生成數據。

注意這個業務唯一鍵應當要求第三方持久化,用於後面的對照數據。

訂單創建之後

現在我們的訂單已經成功創建成功了,一個場景是給用户發消息或者是發積分,或者是通知下游。我們通常藉助消息隊列來完成這個動作,而一般的消息隊列也有重試,比如RocketMQ、RabbitMQ、Kafka都有重試。在訂單重複發出兩條消息的情況下,下游會做重複的操作,比如兩條短信,雖然短信費很便宜,但是量大了一樣是不必要的開銷。

顯然我們也需要避免這一點,那我們該怎麼辦呢? 我們可以從消息中選取唯一鍵,當作分佈式鎖的key,第二次重試的時候直接返回處理成功, 代碼如下面所示:

@Component
public class RocketMQTemplateListenerDemo implements RocketMQListener<RocketMQTemplateListenerDemo.MQDomainMsg> {
    @Autowired
    private RedissonClient redissonClient;

    private static final String ORDER_MSG_LOCK_PREFIX = "ORDER_MSG_LOCK_PREFIX";
        
    @Override
    public void onMessage(MQDomainMsg mqDomainMsg) {
        String orderCode = mqDomainMsg.orderCode();
        RLock lock = redissonClient.getLock(ORDER_MSG_LOCK_PREFIX + orderCode);
        if (lock.tryLock()) {
            try {
                sendMsgService();
            }finally {
                lock.unlock();   
            }
        }else{
            System.out.println("重複消費");
        }
    }
    private void sendMsgService() {}
    public   record  MQDomainMsg(Long orderId, String orderCode){};
}

上面的代碼解決的場景是當我們的業務邏輯處理時間超過了MQ的確認消費時間,被MQ認為超時,因此MQ會發起重試,我們為了避免這種狀況,於是從消息裏面選取了唯一鍵做為分佈式鎖的key,鎖定這段業務邏輯。 看起來解決了一種場景的問題,但是很快我們還是發現了重複消費,這是為什麼呢? 原因在於我們在寫代碼的時候,先天認為網絡是可靠的。

現在讓我們考慮下面這樣一個場景,假設我們正常消費,確認消費成功的消息也發出去了,但是網絡具備不確定性,在某段時間網絡擁塞,導致這個回傳消息超過了MQ的消費超時時間,於是MQ接着發起重試。於是我們只能引入消息表,在處理業務邏輯之前,先查消息ID是否存在,如果存在就認為是重複消費。我們同樣也可以為消費記錄引入一個消費狀態的字段,來標識是否消費成功。原因在於代碼邏輯出現問題是不可知的,有時候因為網絡的問題,MQ的重試會重試成功,有時候我們的MQ還處理了其他邏輯無法重試成功。這個都根據真實的需要進行設計,根據情況做出分析,值得注意的是,你的系統不能只考慮成功。

然後我們的邏輯就變成了,先嚐試用分佈式鎖加鎖,如果加鎖成功在消費記錄表裏面,根據消息記錄ID查詢該記錄是否存在,如果存在的情況,就直接返回。為了實現這段邏輯我們需要引入一個消費記錄表:

CREATE TABLE `consumption_record` (
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
  `message_id` VARCHAR(255) NOT NULL COMMENT '消息的唯一標識符,核心冪等鍵',
  `status` VARCHAR(20) NOT NULL COMMENT '消息處理狀態: PROCESSING, SUCCESS, FAILURE',
  `retry_count` INT NOT NULL DEFAULT 0 COMMENT '當前重試次數', 
  `error_details` TEXT NULL COMMENT '當狀態為FAILURE時,記錄錯誤詳情或堆棧',
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '記錄創建時間',
  `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '記錄最後更新時間',
   PRIMARY KEY (`id`),
  -- 核心約束:為 message_id 創建唯一索引,這是防止重複處理的最終防線
  UNIQUE KEY `uk_message_id` (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息消費記錄表';

請注意重試次數不是必要的,請根據自己的業務需求判斷失敗之後是否要重試,這是個冗餘的字段,我們的核心訴求在於藉助消息中的唯一鍵防止重複消費。

然後我們的代碼邏輯變型為:

@Component
public class RocketMQTemplateListenerDemo implements RocketMQListener<RocketMQTemplateListenerDemo.MQDomainMsg> {

    @Autowired
    private RedissonClient redissonClient;

    private static final String ORDER_MSG_LOCK_PREFIX = "ORDER_MSG_LOCK_PREFIX";

    @Autowired
    private MQRecordService mqRecordService;
    
    @Override
    public void onMessage(MQDomainMsg mqDomainMsg) {
        String orderCode = mqDomainMsg.orderCode();
        RLock lock = redissonClient.getLock(ORDER_MSG_LOCK_PREFIX + orderCode);
        if (lock.tryLock()) {
            try {
                // 這裏只是演示代碼邏輯
                // MQConsumptionRecord 就是建表的字段
                MQRecordService.MQConsumptionRecord mqConsumptionRecord = 
                        new MQRecordService.MQConsumptionRecord();
                // 根據唯一鍵查詢
                 mqConsumptionRecord = mqRecordService.findByMessageId(mqConsumptionRecord);
                 if (Objects.isNull(mqConsumptionRecord)){
                     // 創建消息成功
                     mqConsumptionRecord = mqRecordService.createMQMsgRecord(mqConsumptionRecord);
                     sendMsgService();
                 }else{
                     // 重複消費
                 }
                sendMsgService();
            }finally {
                lock.unlock();
            }
        }else{
            System.out.println("重複消費");
        }
    }
    private void sendMsgService() {}
    public   record  MQDomainMsg(Long orderId, String orderCode){};
}

同樣的我們在對某個數據進行進行修改的時候,也需要避免客户的重複點擊,比如關閉訂單請求點擊重複,這個時候也是一樣的思路,用分佈式鎖保證同時只有一個線程在更新訂單狀態的相關操作。現在分佈式鎖用於防止的是我們的業務邏輯超過MQ的消費超時時間的重試,這是由於我們代碼的問題,導致我們的確認消費成功消息在指定時間裏面沒有回傳給MQ。我們的消費記錄防止的是我們的確認消費由於網絡問題導致確認消費消息沒在指定時間給到消息隊列的場景。

現在創建訂單相關的問題解決了一部分,我們來看付款。

付款之後

現在用户發起了付款,但是我們注意付款的流程是我們將錢付給了我們在第三方開設的賬户,其實拉起的頁面和付款界面已經不在我們程序裏面了,然後用户付款成功之後。支付寶和微信會回調通知我們,我們收到這個通知之後修改訂單的狀態:

img

注意支付服務和訂單服務之間的通信方式可以是通過消息隊列,也可以是通過HTTP/RPC方式來進行通知, 用消息隊列的好處是消息隊列封裝了重試,會有更高的可靠性。當然我們也可以用HTTP/RPC做到這一點,但是設計微服務的時候,這個支付服務可以儘可能的通用,不跟下游綁定在一起,如果是消息隊列的話,支付服務接收到回調之後發出去消息即可。如果是HTTP/RPC, 如果其他業務線要用到微服務可能就要改動支付微服務。這都取決於你的系統設計要求,沒有放之四海而皆準的方案。

但是為了保證通知到位,微信和支付寶同樣會進行重試, 在參考文檔裏面可以看到這一點:

若商户應答回調接收失敗,或超時(5s)未應答時,微信支付會按照(0s/15s/15s/30s/180s/1800s/1800s/1800s/1800s/3600s)的頻次重複發送回調通知,直至微信支付接收到商户應答成功,或達到最大發送次數(10次)

在這種情況下我們可以靠分佈式鎖嘛,比如選中支付流水ID,作為分佈式鎖的key,想想這顯然是不夠的,原因在於我們的回調訂單服務的時候其實已經回調成功了,分佈式鎖解鎖了,只是應答的報文傳輸的比較慢。然後微信這個時候再次進行回調通知,我們這個時候再改訂單的狀態,其實是有嚴重的副作用的,我們應當避免這一點,以淘寶為例,淘寶的訂單狀態有待付款、待發貨、待收貨、退款或者售後。

我們可以認為在付款之後,支付平台回調我們的服務,讓訂單進入待發貨狀態,但是這個時候由於客户想退款了,於是進入了退款狀態,然後第二次重試通知被支付中心接收,接着訂單被進入到退款狀態。這就造成了強烈的副作用,又或者用户訂單退款完成,支付通知接着重試,讓用户訂單進入到待發貨狀態。這就意味着損失。

因此我們這裏為了避免產生強烈的副作用,在回調下游的時候,先引入分佈式鎖,避免兩個回調同時處理一筆流水,然後在處理的時候可以用查數據庫,如下代碼所示:

   private static final String LOCK_PREFIX = "lock:payment:callback:";

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumptionRecordService consumptionRecordService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/wechat")
    public String handleWechatCallback(@RequestBody WechatPayCallbackData callbackData) {
        String transactionId = callbackData.getTransactionId();
        RLock lock = redissonClient.getLock(LOCK_PREFIX + transactionId);
        if (lock.tryLock()) {      
            try {
                // 核心邏輯1: 雙重檢查冪等性
                if (consumptionRecordService.isProcessed(transactionId)) {
                    log.info("回調已處理 (鎖內雙重檢查), transactionId: {}", transactionId);
                    return buildWechatResponse("SUCCESS", "OK");
                }
                // 核心邏輯2: 構造併發布領域事件
                PaymentSucceededEvent event = new PaymentSucceededEvent(
                    transactionId,
                    callbackData.getOutTradeNo(),
                    callbackData.getTotalFee(),
                    callbackData.getPayTime()
                );
                rabbitTemplate.convertAndSend("ex.payment", "rk.payment.succeeded", event);
                log.info("已成功發佈支付成功事件到MQ, transactionId: {}", transactionId);

                // 核心邏輯3: 標記為已處理
                consumptionRecordService.markAsProcessed(transactionId, "PUBLISHED");        
                // 正常處理完成,返回成功
                return buildWechatResponse("SUCCESS", "OK");

            } catch (Exception e) {
                // 如果在處理過程中發生任何異常,記錄日誌並通知微信重試
                log.error("在持有鎖期間,處理支付回調時發生異常, transactionId: {}", transactionId, e);
                // 這裏根據業務邏輯來做,發郵件還是發提醒
                return buildWechatResponse("SUCCESS", "OK");
            } finally {                
                // 關鍵步驟: 無論成功還是異常,都必須釋放鎖
                lock.unlock();
            }
        } else {                    
             return buildWechatResponse("SUCCESS", "OK");
        }
    }

作為訂單微服務,我們同樣也要防止上游的重複通知,因為上游可能為了可靠性可能重複發送消息,所以我們在更新訂單狀態的時候也可以加上狀態限制:

update order_et  set  order_status = '待發貨' order_code = '' and order_status = '待付款';

這裏引申出冪等的另一種設計,當涉及變更訂單的狀態類似操作的時候,我們加入限制,狀態的流轉只能由某些狀態流轉到某些狀態。

上面是一個非常粗糙的狀態流轉圖,描述了狀態流轉方向,只能由指定的狀態進入到若干狀態,我們可以在更新狀態的時候加上狀態限制。

那如果沒有業務狀態字段該怎麼辦

有時候我們的數據沒有業務狀態該怎麼辦,我只是對這個數據進行了修改,注意,我們分析這個問題,問題的核心要義在於比較並交換這個思想。於是我們可以引入一個版本號的概念,也就是version, 在發起更新操作的時候,可以請求前端傳遞這個版本號像下面這樣:

update s  set  xx = '' , version = version + 1 where id = '11' and version = #{version}

這個SQL當且僅當數據庫的version字段和傳遞的字段相等的時候才會發揮作用,這同樣也能保證重複處理相同的字段。如果是以API的方式暴露給第三方調用,那麼我們可以先根據id查出來版本號,傳遞給後面的更新操作,這樣也能保證處理業務的時候,只有一條處理成功。

總結一下

為了避免第三方重複操作產生副作用,我們採取的手段被稱為冪等性設計。我們注意到避免第三方重複操作的核心在於識別出來是重複操作,那麼如何判斷是重複呢? 核心就要找到一個業務鍵,我們在業務系統裏面暫時緩存,如果再次出現我們就能感知到,來避免重複操作。對於新增操作來説,可以在進入對應頁面的時候,讓前端請求生成一個token,注意到這個token為了防止第三方偽造,我們可以採取加密算法來生成,這樣我們同樣也獲得了唯一鍵。

除此之外,如果我們這個系統暴露給外部,也就是通過API的方式給第三方調用,這個token的方式就有些不通用,原因在於如果調用超時,第三方第三方會重新請求獲取token的接口,但是由於我們還在處理這條創建數據的請求,第三方調用查詢不到這條數據,因此第三方大概率會進行重試。處理這種對接第三方的創建數據請求,可靠的做法是強制要求第三方給出唯一鍵,系統會根據唯一鍵來判斷這條數據是否處理過。

對於更新操作,我們選中的更新單位的ID就是唯一鍵,同樣也可以採取分佈式鎖來避免用户快速點擊,這是處理短期內重複請求的一個手段。但如果用户懸停在某個頁面,以訂單為例,假定支付服務處於某種原因重複發送了支付回調通知,但是訂單已進入了退款完成狀態,這個時候如果不加上訂單狀態限制,那麼同樣會造成問題。也就是涉及狀態流轉的時候,要名學狀態流轉的方向,舉例,只能由待付款進入待發貨, 代碼如下所示:

update et_order set order_status = '待發貨' where order_code = '' and order_status = '';

而不是下面這樣:

update et_order set order_status = '待發貨' where order_code = '';

所以到這裏我們已經明白了在哪些場景需要冪等,如果作為被調用方或者下游不確定上游或者調用方是否會發起重複操作,且重複操作會帶來額外的副作用。我們就需要引入額外的代碼設計來做冪等。

在上面的場景中我們可以觀察到,對於創建操作冪等的核心要義在於尋找唯一鍵和將唯一鍵進行持久化,唯一鍵的存在可以讓我們在超過處理時間,第三方發起重試的時候,用做分佈式鎖的key,來避免這類重試。而持久化的動機則在於避免我們的確認消費消息在網絡堵塞的情況下,超過調用方限制的超時時間,調用方發起重試。

對於更新操作,我們同樣要選取唯一鍵來避免業務處理時間超時,用户重複點擊產生的重複操作,那麼對於有生命週期的數據,大多數數據都是有狀態,我們在做狀態流轉的時候就要加上狀態限制。但是如果你沒有顯示的狀態業務字段,那我們不妨將狀態變型為比較操作,也就是版本號,在更新的時候判斷是否等於目標的版本號。注意這個版本號可以是自己查,也可以是前端傳遞。注意體會思想,在不同的場景選取對應的工具去實現。

到現在為止我們沒有引入任何數學冪等的概念,因為我認為這個引入數學的冪等性無助於我們對這個詞的理解,雖然冪等性這個術語是從數學領域借過來的一個詞,但這個詞在計算機領域有不同的意義。我選取的都是實際場景,講實際場景會發生什麼,然後給出對應的方案,我認為在真實的具體中,能夠體會抽象出來的概念。

參考資料

[1] https://pay.weixin.qq.com/doc/v3/merchant/4012587960#2.3%E3%80%81%E5%BE%AE%E4%BF%A1%E6%94%AF%E4%BB%98%E5%9B%9E%E8%B0%83%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E8%AF%B4%E6%98%8E

Add a new 評論

Some HTML is okay.