前言

 本篇博客是《MoeDistributeDispatch算子代碼閲讀》的姊妹篇。
 在EP並行場景中,每張卡部署了不同的專家。npu_moe_distribute_dispatch根據expert_ids,將hidden states發送到對應專家所在的rank上。
單個專家的運算,用代碼語言描述:

class Expert(nn.Module):
def __init__(self, dim: int, inter_dim: int):
super().__init__()
self.w1 = Linear(dim, inter_dim)
self.w2 = Linear(inter_dim, dim)
self.w3 = Linear(dim, inter_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.w2(F.silu(self.w1(x)) * self.w3(x))

 source: DeepSeek-V3 的 MoE 架構解析:細粒度專家與高效模型擴展  推理平台上,使用FusedMoE,輸出多個專家的計算結果。例如vllm-ascend中的實現邏輯:

def unquant_apply_mlp(hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
group_list: torch.Tensor,
group_list_type: int = 1,
topk_scales: Optional[torch.Tensor] = None,
need_trans: bool = True) -> torch.Tensor:
if need_trans:
w1 = w1.transpose(1, 2)
w2 = w2.transpose(1, 2)
gate_up_out = torch_npu.npu_grouped_matmul(
x=[hidden_states],
weight=[w1],
split_item=2,
group_list_type=group_list_type,
group_type=0,
group_list=group_list,
)[0]
if is_310p():
gate_up_out = torch_npu.npu_swiglu(gate_up_out.to(torch.float32)).to(
torch.float16)
else:
gate_up_out = torch_npu.npu_swiglu(gate_up_out)
if topk_scales is not None:
gate_up_out *= topk_scales
hidden_states = torch_npu.npu_grouped_matmul(
x=[gate_up_out],
weight=[w2],
split_item=2,
group_list_type=group_list_type,
group_type=0,
group_list=group_list,
)[0]
return hidden_states

 source: moe_mlp.py  參數對應關係:

算子

參數1

參數2

npu_moe_distribute_dispatch

expert_token_nums

expert_token_nums_type

npu_grouped_matmul

group_list

group_list_type

expert_token_nums_type(group_list_type)為0時候,npu_moe_distribute_dispatch輸出cusum型的token計數,來自上一篇的數據示例: --------expertTokenNumsOutGMTensor------------ 3 6 11 16 17 22 27 30 32 34 36 41 44 46 47 50  專家計算完之後,從哪裏來的輸入,對應的輸出需要返回到對應的rank上,由MoeDistributeCombine算子負責。

MoeDistributeCombine文檔資料

 torch_npu.npu_moe_distribute_combine接口説明: https://www.hiascend.com/document/detail/zh/Pytorch/700/apiref/apilist/ptaoplist_002363.html  代碼實現: https://gitcode.com/cann/ops-transformer/tree/master/mc2/moe_distribute_combine  ep_send_counts (Tensor):必選參數。表示本卡每個專家發給EP(Expert Parallelism)域每個卡的token數(token數以前綴和的形式表示),要求為1維張量。數據類型支持int32,對應torch_npu.npu_moe_distribute_dispatch的ep_recv_counts輸出。
 expand_idx (Tensor):必選參數。表示給同一專家發送的token個數,要求為1維張量。數據類型支持int32,對應torch_npu.npu_moe_distribute_dispatch的expand_idx輸出。
來自上一篇博客的數據,rank0打印的數據,sendCountsGlobal(ep_send_counts), expand_idx(expandIdxGMTensor):
--------sendCountsGlobal------------
2 3 5 6 9 11 13 16
16 17 18 22 24 27 28 30
31 32 33 34 35 36 39 41
43 44 45 46 47 47 47 50
--------expandIdxGMTensor------------
0 0 0 0 0 0 0 0 0 0 0 0
0 0 1 0 1 0 1 1 0 0 0 0
0 0 2 1 1 1 1 2 1 0 1 1
1 0 0 1 1 2 0 1 2 1 1 2
sendCountsGlobal數字的含義:
rank0上有2個token需要專家0處理。
rank1傳輸1個token,需要專家0處理: 1= 3 -2
rank0上有2個token需要專家1處理: 2 = 5 - 3
rank1傳輸1個token,需要專家1處理: 1 = 6 - 5

主要流程

template 
__aicore__ inline void MoeDistributeCombine::Process()
{
    if constexpr (IsNeedReduceScatter) {
        ReduceScatterTrans();
    }
    BuffInit();
    SetWaitTpStatusAndDisPatch();
    AlltoAllBuffInit();
    SetStatus();
    WaitDispatch();
    LocalWindowCopy();
}

SetWaitTpStatusAndDisPatch

 主要看ExpertAlltoAllDispatchCopyAdd的處理。epSendCountLocal_參考上面打印的sendCountsGlobal。

template 
__aicore__ inline void MoeDistributeCombine::ExpertAlltoAllDispatchCopyAdd()
{
    // 分核是按照卡數去分的,先循環單卡上每個專家,再循環處理當前核處理的卡號,因為網絡中一個專家的放在一起處理
    for (uint32_t expertIdx = 0U; expertIdx < curRankExpertNum; expertIdx++) {
        for (uint32_t ep = startRankId_ ; ep < endRankId_; ep++) {
            if ((ep > 0) || (expertIdx > 0)) {
                preCount = epSendCountLocal_.GetValue(expertIdx * epWorldSize_ + ep - 1);
            }
            curTokenNum = epSendCountLocal_.GetValue(expertIdx * epWorldSize_ + ep) - preCount;
            if (curTokenNum == 0) {
                continue;
            }
            startTokenIdx = preCount * axisH_;
            ExpertAlltoAllDispatchInnerCopyAdd(curTokenNum, startTokenIdx, ep, expertIdx);
        }
    }
}
template 
__aicore__ inline void MoeDistributeCombine::ExpertAlltoAllDispatchInnerCopyAdd(
    uint32_t tokenNumLoop, uint32_t srcStartTokenIdx, uint32_t ep, uint32_t expertIdx)
{
    // 獲取對應卡上 window 的首地址
    GM_ADDR rankGM = GetWinAddrByRankId(ep, EP_DOMAIN, expertIdx) + epDataOffsetOnWin_;
}

 根據ep的值,決定是本地拷貝還是RDMA。
 根據epSendCountLocal_中的值,0號專家有3個輸出。頭2個要拷貝到rank0,最後1個要拷貝到rank1。

SetStatus

不再分析

WaitDispatch

不再分析

LocalWindowCopy

 數據同步完成後,拷貝數據。

template 
__aicore__ inline void MoeDistributeCombine::LocalWindowCopy()
{
    for (uint32_t tokenIndex = beginIndex; tokenIndex < endIndex; tokenIndex++) {
        uint32_t index = tokenIndex * axisK_;
        int32_t moeExpert = 0;
        float scaleVal = 0.0;
        GM_ADDR wAddr;
        SyncFunc<:hardevent::mte3_v>(); // 與結果搬出datacopy同tensor
        Duplicate(sumFloatBufLocal, (float)0, axisH_);
        LocalTensor tmpUb;
        for (uint32_t i = 0; i < axisK_; i++) {
            moeExpert = expertIdsLocal.GetValue(index);
            scaleVal = expandScalesLocal.GetValue(index);
            wAddr = (__gm__ uint8_t *)(epWindowGM_) + expertPerSizeOnWin_ * moeExpertPerRankNum_ *
                sharedExpertRankNum_ + expertPerSizeOnWin_ * moeExpert +
                indexCountsLocal.GetValue(index) * axisHExpandXTypeSize_ + tokenOffset * sizeof(ExpandXType);
            rowTmpGlobal_.SetGlobalBuffer((__gm__ ExpandXType *)wAddr);
            tmpUb = moeSumQueue_.AllocTensor();
            if constexpr (IsQuant) {
                DataCopy(tmpUb, rowTmpGlobal_, quantCopyLen);
            } else {
                DataCopy(tmpUb, rowTmpGlobal_, processLen);
                SyncFunc<:hardevent::mte2_v>();
            }
            moeSumQueue_.EnQue(tmpUb);
            tmpUb = moeSumQueue_.DeQue();
            if constexpr (IsQuant) {
                DequantProcess(tmpUb);
            }
            Cast(rowTmpFloatLocal, tmpUb, AscendC::RoundMode::CAST_NONE, processLen);
            PipeBarrier();
            AscendC::Muls(mulBufLocal, rowTmpFloatLocal, scaleVal, processLen);
            PipeBarrier();
            AscendC::Add(sumFloatBufLocal, sumFloatBufLocal, mulBufLocal, processLen);
            index++;
            moeSumQueue_.FreeTensor(tmpUb);
        }
}

 地址偏移的計算:

expertPerSizeOnWin_ * moeExpertPerRankNum_ *
                sharedExpertRankNum_ + expertPerSizeOnWin_ * moeExpert +
                indexCountsLocal.GetValue(index) * axisHExpandXTypeSize_ + tokenOffset * sizeof(ExpandXType);

 moeExpert從expertIdsLocal獲取的值,就是本rank推理token產生的expertIds。
 rank=0上的expertIds為:

--------expertIds------------
 11 17 29 12 24 23 1 0 16 30 18 8
 2 14 11 3 30 21 12 0 10 9 6 31
 22 27 30 21 1 24 17 11 3 19 2 29
 16 13 7 27 6 29 5 22 24 19 23 2
  indexCountsLocal是從expandIdxGM拷貝的數據。
  rank=0上的expandIdxGM為,就是相同專家id的計數值。參考MoeDistributeDispatch算子中的函數CalTokenSendExpertCnt。
 --------expandIdxGMTensor------------
 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 1 0 1 0 1 1 0 0 0 0
 0 0 2 1 1 1 1 2 1 0 1 1
 1 0 0 1 1 2 0 1 2 1 1 2