前言
本篇博客是《MoeDistributeDispatch算子代碼閲讀》的姊妹篇。
在EP並行場景中,每張卡部署了不同的專家。npu_moe_distribute_dispatch根據expert_ids,將hidden states發送到對應專家所在的rank上。
單個專家的運算,用代碼語言描述:
class Expert(nn.Module):
def __init__(self, dim: int, inter_dim: int):
super().__init__()
self.w1 = Linear(dim, inter_dim)
self.w2 = Linear(inter_dim, dim)
self.w3 = Linear(dim, inter_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.w2(F.silu(self.w1(x)) * self.w3(x))
source: DeepSeek-V3 的 MoE 架構解析:細粒度專家與高效模型擴展 推理平台上,使用FusedMoE,輸出多個專家的計算結果。例如vllm-ascend中的實現邏輯:
def unquant_apply_mlp(hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
group_list: torch.Tensor,
group_list_type: int = 1,
topk_scales: Optional[torch.Tensor] = None,
need_trans: bool = True) -> torch.Tensor:
if need_trans:
w1 = w1.transpose(1, 2)
w2 = w2.transpose(1, 2)
gate_up_out = torch_npu.npu_grouped_matmul(
x=[hidden_states],
weight=[w1],
split_item=2,
group_list_type=group_list_type,
group_type=0,
group_list=group_list,
)[0]
if is_310p():
gate_up_out = torch_npu.npu_swiglu(gate_up_out.to(torch.float32)).to(
torch.float16)
else:
gate_up_out = torch_npu.npu_swiglu(gate_up_out)
if topk_scales is not None:
gate_up_out *= topk_scales
hidden_states = torch_npu.npu_grouped_matmul(
x=[gate_up_out],
weight=[w2],
split_item=2,
group_list_type=group_list_type,
group_type=0,
group_list=group_list,
)[0]
return hidden_states
source: moe_mlp.py 參數對應關係:
|
算子
|
參數1
|
參數2
|
|
npu_moe_distribute_dispatch
|
expert_token_nums
|
expert_token_nums_type
|
|
npu_grouped_matmul
|
group_list
|
group_list_type
|
expert_token_nums_type(group_list_type)為0時候,npu_moe_distribute_dispatch輸出cusum型的token計數,來自上一篇的數據示例: --------expertTokenNumsOutGMTensor------------ 3 6 11 16 17 22 27 30 32 34 36 41 44 46 47 50 專家計算完之後,從哪裏來的輸入,對應的輸出需要返回到對應的rank上,由MoeDistributeCombine算子負責。
torch_npu.npu_moe_distribute_combine接口説明: https://www.hiascend.com/document/detail/zh/Pytorch/700/apiref/apilist/ptaoplist_002363.html 代碼實現: https://gitcode.com/cann/ops-transformer/tree/master/mc2/moe_distribute_combine ep_send_counts (Tensor):必選參數。表示本卡每個專家發給EP(Expert Parallelism)域每個卡的token數(token數以前綴和的形式表示),要求為1維張量。數據類型支持int32,對應torch_npu.npu_moe_distribute_dispatch的ep_recv_counts輸出。
expand_idx (Tensor):必選參數。表示給同一專家發送的token個數,要求為1維張量。數據類型支持int32,對應torch_npu.npu_moe_distribute_dispatch的expand_idx輸出。
來自上一篇博客的數據,rank0打印的數據,sendCountsGlobal(ep_send_counts), expand_idx(expandIdxGMTensor):
--------sendCountsGlobal------------
2 3 5 6 9 11 13 16
16 17 18 22 24 27 28 30
31 32 33 34 35 36 39 41
43 44 45 46 47 47 47 50
--------expandIdxGMTensor------------
0 0 0 0 0 0 0 0 0 0 0 0
0 0 1 0 1 0 1 1 0 0 0 0
0 0 2 1 1 1 1 2 1 0 1 1
1 0 0 1 1 2 0 1 2 1 1 2
sendCountsGlobal數字的含義:
rank0上有2個token需要專家0處理。
rank1傳輸1個token,需要專家0處理: 1= 3 -2
rank0上有2個token需要專家1處理: 2 = 5 - 3
rank1傳輸1個token,需要專家1處理: 1 = 6 - 5
主要流程
template
__aicore__ inline void MoeDistributeCombine::Process()
{
if constexpr (IsNeedReduceScatter) {
ReduceScatterTrans();
}
BuffInit();
SetWaitTpStatusAndDisPatch();
AlltoAllBuffInit();
SetStatus();
WaitDispatch();
LocalWindowCopy();
}
SetWaitTpStatusAndDisPatch
主要看ExpertAlltoAllDispatchCopyAdd的處理。epSendCountLocal_參考上面打印的sendCountsGlobal。
template
__aicore__ inline void MoeDistributeCombine::ExpertAlltoAllDispatchCopyAdd()
{
// 分核是按照卡數去分的,先循環單卡上每個專家,再循環處理當前核處理的卡號,因為網絡中一個專家的放在一起處理
for (uint32_t expertIdx = 0U; expertIdx < curRankExpertNum; expertIdx++) {
for (uint32_t ep = startRankId_ ; ep < endRankId_; ep++) {
if ((ep > 0) || (expertIdx > 0)) {
preCount = epSendCountLocal_.GetValue(expertIdx * epWorldSize_ + ep - 1);
}
curTokenNum = epSendCountLocal_.GetValue(expertIdx * epWorldSize_ + ep) - preCount;
if (curTokenNum == 0) {
continue;
}
startTokenIdx = preCount * axisH_;
ExpertAlltoAllDispatchInnerCopyAdd(curTokenNum, startTokenIdx, ep, expertIdx);
}
}
}
template
__aicore__ inline void MoeDistributeCombine::ExpertAlltoAllDispatchInnerCopyAdd(
uint32_t tokenNumLoop, uint32_t srcStartTokenIdx, uint32_t ep, uint32_t expertIdx)
{
// 獲取對應卡上 window 的首地址
GM_ADDR rankGM = GetWinAddrByRankId(ep, EP_DOMAIN, expertIdx) + epDataOffsetOnWin_;
}
根據ep的值,決定是本地拷貝還是RDMA。
根據epSendCountLocal_中的值,0號專家有3個輸出。頭2個要拷貝到rank0,最後1個要拷貝到rank1。
SetStatus
不再分析
WaitDispatch
不再分析
LocalWindowCopy
數據同步完成後,拷貝數據。
template
__aicore__ inline void MoeDistributeCombine::LocalWindowCopy()
{
for (uint32_t tokenIndex = beginIndex; tokenIndex < endIndex; tokenIndex++) {
uint32_t index = tokenIndex * axisK_;
int32_t moeExpert = 0;
float scaleVal = 0.0;
GM_ADDR wAddr;
SyncFunc<:hardevent::mte3_v>(); // 與結果搬出datacopy同tensor
Duplicate(sumFloatBufLocal, (float)0, axisH_);
LocalTensor tmpUb;
for (uint32_t i = 0; i < axisK_; i++) {
moeExpert = expertIdsLocal.GetValue(index);
scaleVal = expandScalesLocal.GetValue(index);
wAddr = (__gm__ uint8_t *)(epWindowGM_) + expertPerSizeOnWin_ * moeExpertPerRankNum_ *
sharedExpertRankNum_ + expertPerSizeOnWin_ * moeExpert +
indexCountsLocal.GetValue(index) * axisHExpandXTypeSize_ + tokenOffset * sizeof(ExpandXType);
rowTmpGlobal_.SetGlobalBuffer((__gm__ ExpandXType *)wAddr);
tmpUb = moeSumQueue_.AllocTensor();
if constexpr (IsQuant) {
DataCopy(tmpUb, rowTmpGlobal_, quantCopyLen);
} else {
DataCopy(tmpUb, rowTmpGlobal_, processLen);
SyncFunc<:hardevent::mte2_v>();
}
moeSumQueue_.EnQue(tmpUb);
tmpUb = moeSumQueue_.DeQue();
if constexpr (IsQuant) {
DequantProcess(tmpUb);
}
Cast(rowTmpFloatLocal, tmpUb, AscendC::RoundMode::CAST_NONE, processLen);
PipeBarrier();
AscendC::Muls(mulBufLocal, rowTmpFloatLocal, scaleVal, processLen);
PipeBarrier();
AscendC::Add(sumFloatBufLocal, sumFloatBufLocal, mulBufLocal, processLen);
index++;
moeSumQueue_.FreeTensor(tmpUb);
}
}
地址偏移的計算:
expertPerSizeOnWin_ * moeExpertPerRankNum_ *
sharedExpertRankNum_ + expertPerSizeOnWin_ * moeExpert +
indexCountsLocal.GetValue(index) * axisHExpandXTypeSize_ + tokenOffset * sizeof(ExpandXType);
moeExpert從expertIdsLocal獲取的值,就是本rank推理token產生的expertIds。
rank=0上的expertIds為:
--------expertIds------------
11 17 29 12 24 23 1 0 16 30 18 8
2 14 11 3 30 21 12 0 10 9 6 31
22 27 30 21 1 24 17 11 3 19 2 29
16 13 7 27 6 29 5 22 24 19 23 2
indexCountsLocal是從expandIdxGM拷貝的數據。
rank=0上的expandIdxGM為,就是相同專家id的計數值。參考MoeDistributeDispatch算子中的函數CalTokenSendExpertCnt。
--------expandIdxGMTensor------------
0 0 0 0 0 0 0 0 0 0 0 0
0 0 1 0 1 0 1 1 0 0 0 0
0 0 2 1 1 1 1 2 1 0 1 1
1 0 0 1 1 2 0 1 2 1 1 2