色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

圖解大模型訓練之:Megatron源碼解讀2,模型并行

jf_pmFSk4VX ? 來源:GiantPandaCV ? 2023-06-07 15:08 ? 次閱讀

源碼解讀系列將和大家一起來讀Megatron的pretrain部分代碼。

在源碼解讀第一篇中,我們講解了如何做「分布式環境初始化」,即按照DP/TP/PP對進程進行分組,并為每個進程指定GPU。在這一章中,我們將一起讀「模型并行部分」:如何切分模型,并搬入分布式環境定義好的DP/TP/PP組中。

78c337c8-04d9-11ee-90ce-dac502259ad0.png

「本文將提供:」

  • 詳細的圖解。畫圖說明代碼的設計架構,講清代碼想做一件什么事。
  • 詳細的代碼注釋。在圖解的基礎上,提取核心代碼部分,并附上注釋。

「如何利用本文提高源碼閱讀效率:」

  • 先看一~三部分。了解模型并行的設計思想、整體框架及入口函數。
  • 打開Megatron源碼,找到入口函數,開始閱讀。
  • 閱讀中的每一塊細節,可參考四~八部分。

「閱讀本文前置知識:」

  • 圖解大模型訓練之:張量模型并行,Megatron-LM
  • 圖解大模型訓練之:Megatron源碼解讀1,分布式環境初始化

「本文目錄:」

一、模型概述
二、模型切割在做一件什么事

  • 2.1 模型切割設計思想
  • 2.2 隨機種子

三、模型并行框架

  • 3.1 模型并行入口函數
  • 3.2 定義并搬運模型
  • 3.3 分布式模型:CodeGeeX

四、MegatronModule
五、Emebdding
六、VocabParallelEmebdding
七、ParallelSelfAttention:分布式block的一般套路

  • 7.1 列切割:ColumnParallelLinear
  • 7.2 行切割:RowParallelLinear
  • 7.3 ParallelSelfAttention

八、CrossEntropy

  • 8.1 計算logit
  • 8.2 計算交叉熵

九、筋疲力盡的總結

十、參考(本文相關源碼與論文)

一、模型概述

前文說過,用Megatron做分布式訓練的開源大模型有很多,我們選用的是THUDM開源的CodeGeeX(代碼生成式大模型,類比于openAI Codex)。選用它的原因是“完全開源”與“清晰的模型架構和預訓練配置圖”,能幫助我們高效閱讀源碼。我們再來回顧下這兩張圖。

「模型架構」

78d7ea56-04d9-11ee-90ce-dac502259ad0.png

「預訓練配置」

78e27bb0-04d9-11ee-90ce-dac502259ad0.png

由圖可知,CodeGeeX在預訓練中采用的是8頭TP(同一個node內的8張卡做TP,8張卡組成一個完整的模型),192頭DP(192個node間做DP),一共1536塊GPU進行訓練。

「【閱讀提醒】:如果你對GPT模型比較熟悉,則不需要花時間細看CodeGeeX架構圖也能無障礙閱讀本文。架構圖只是在涉及模型細節時,可以對照著看?!?/strong>

二、模型切割在做一件什么事

2.1 模型切割設計思想

回顧一下,在初始化分布式環境中,我們根據DP/TP/PP組設置并劃分了進程,確定了模型的切割方法,如下圖:78ed14b2-04d9-11ee-90ce-dac502259ad0.png(注意:這并不是CodeGeeX的劃分框架,而是一個更廣義的例子,細節可閱讀上篇講解)

接下來,我們就可以根據這個框架來切割模型了。pytorch默認將模型(nn.Module)定義在CPU上,因此,我們在CPU上定義并初始化模型,然后將其搬運到當前進程所對應的GPU上,整個過程如下圖:78fafa14-04d9-11ee-90ce-dac502259ad0.png首先,我們是面向進程編程的,也就是整份腳本處理的是發生在1個進程上的事情。這樣做的好處是,我們只需要維護1份腳本,然后將其發去不同機器的各張卡上執行,就能實現全局的并行。

但是,1個進程處理的是模型的不同部分,比如GPT模型,它的pre層涉及到Embedding計算,post層涉及到softmax和loss的計算,這樣每個進程上處理的模型是不一樣的,這時怎么辦呢?別忘了,我們能夠取到進程id(全局或DP/TP/PP組內的),這樣我們就能通過進程id,寫if...else...來解決模型差異化問題了。

明確了這個思想,現在我們可以開始寫代碼了,我們有兩種方式對模型進行切割:

  • 「方案一:」先定義出完整的模型,并對模型參數做初始化,然后根據進程id取出相應子模型,搬運到GPU上
  • 「方案二:」直接根據進程id,設計好當前子模型,做參數初始化,搬運到GPU上

這兩者的核心差別,在于“隨機種子”的設定。

2.2 隨機種子

在分布式訓練中,「隨機種子是非常重要的,它關系到模型是否能夠復現」。例如我們采取activation checkpoint的技術來節省顯存時,在backward過程中我們需要重算forward得到activation,這時候就需要我們完整復現之前forward的過程,各類參數的初始化結果也要和之前完全一致。

我們來看幾個例子:

例1: Word Embedding

790d0876-04d9-11ee-90ce-dac502259ad0.pngWE1和WE2間需要采用不同的隨機種子。因為若采用相同的隨機種子,則WE1和WE2的結果完全一樣,這不等價于先隨機初始化WE,再將它進行切割。

例2: dropout

79224f60-04d9-11ee-90ce-dac502259ad0.png左側方框中的2個dropout,在初始化時需要用不同的隨機種子。因為這樣才等價于對完整的dropout做初始化,然后再切割。右側方框中的dropout,需要用相同的隨機種子(雖然右邊只畫了1個dropout,但其實是2個dropout,每塊GPU上各一個,因為此時兩塊GPU上的輸出已經AllReduce,是完全一致的。做完AllReduce后,兩塊GPU繼續獨立計算,因此實際上有兩個dropout)。

關于隨機種子設定的一般結論

從例子中,我們可以得出一個結論:「一般在TP/PP組內,設定不同的隨機種子。而在DP組內,設定相同的隨機種子?!?/strong> 這只是一個一般結論,我們可以根據實際情況去調整。

最后,回到模型切割上,方案1(先做整體初始化再切割)在代碼里被稱為“CPU上的初始化”(_initialize_affine_weight_cpu),方案2(直接對局部初始化)被稱為“在GPU上的初始化”(_initialize_affine_weight_gpu)。我們會在切割部分的代碼里經??匆娝鼈?。

三、模型并行框架

現在,我們可以來看具體的代碼了

3.1 模型并行入口函數

模型并行部分的代碼入口依然在megatron/training.pypretrain函數下,代碼如下:

defpretrain(
train_valid_test_dataset_provider,
model_provider,
forward_step_func,
valid_forward_step_func=None,
extra_args_provider=None,
args_defaults={},
):
#1.初始化分布式環境(源碼解讀1內容)
initialize_megatron(
extra_args_provider=extra_args_provider,args_defaults=args_defaults
)
...
# 2、模型并行:定義模型架構,并切割模型(本文重點)
model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider)
...

#3、構造train/val/test數據集(下一篇將講述)
...(
train_data_iterator,
valid_data_iterator,
test_data_iterator,
)=build_train_valid_test_data_iterators(train_valid_test_dataset_provider)

...
#4、訓練(下下一篇將講述)
iteration=train(
forward_step_func,
valid_forward_step_func,
model,
optimizer,
lr_scheduler,
train_data_iterator,
valid_data_iterator,
)

...

由代碼可知,setup_model_and_optimizer是整個模型并行的入口函數,如下圖,它主要由”「定義模型架構并切割模型」“,“「設置optimizer」”和“「設置學習率」”三部分組成。我們關注的重點在第一部分上(get_model)。

792fc5be-04d9-11ee-90ce-dac502259ad0.png

3.2 定義并搬運模型

get_model的內容可簡化成下圖:

7944a132-04d9-11ee-90ce-dac502259ad0.png

get_model函數主要做了兩件事:

  • 在CPU上定義模型。pytorch默認在CPU上定義模型(nn.Module)。model_provider是一個函數,調用它即可返回CPU版的模型,也就是一個CodeGeeX類,這個將是下文要介紹的重點。

  • 把模型從CPU搬運至GPU上。這里有兩種方法可供選擇:

    • 「顯式搬運?!?/strong>即手動將模型搬運到當前進程所對應的GPU上
    • 「權重精度設定。」由ZeRO的思想可知,在模型訓練中,把權重精度從fp32降至fp16,是一種節省顯存的好辦法。如果確定使用這種優化辦法,將模型搬運到GPU上后,我們需要修改精度。
    • 「初始化DP組」。這里指的是定義DP組間forward、backward和梯度計算與通訊等方法。在Megatron中,TP和PP組的這些方法是人為定義的(在定義CPU模型時已設置好,我們將在下文講CodeGeeX細節時看到),而DP組則是可以用現成的(torch的DistributedDataParallel)。在具體使用時,我們可以:(1)直接調用DistributedDataParallel?;颍?)在DistributedDataParallel這個類的基礎上做一些改進,例如增加對碎片化內存的管理,對計算梯度時的精度控制等。
    • 「方案一:借助deepspeed進行管理」。在源碼解讀1中我們提過,秉持著萬物皆可wrap的原則,按照deepspeed官網教程,只需要在Megatron的某些文件中插入相應代碼,就可以讓deepspeed來管理模型的分布式、DP組間的顯存優化等,這里同理。
    • 「方案二:手動搬運管理?!?/strong>這里需要我們以下事情:

get_model函數的核心代碼如下(一切盡在注釋中):

defget_model(model_provider_func):
"""Buildthemodel."""
args=get_args()

#1、定義并構建CPU版模型
if(#1.1、當分布式進行框架采用virtualpipeline(是NVDIA后續提出的對Megatron的優化方法,可先忽略不看)
mpu.get_pipeline_model_parallel_world_size()>1
andargs.virtual_pipeline_model_parallel_sizeisnotNone
):
model=[]
foriinrange(args.virtual_pipeline_model_parallel_size):
mpu.set_virtual_pipeline_model_parallel_rank(i)
#Setpre_processandpost_processonlyaftervirtualrankisset.
pre_process=mpu.is_pipeline_first_stage()
post_process=mpu.is_pipeline_last_stage()
this_model=model_provider_func(
pre_process=pre_process,post_process=post_process
)
model.append(this_model)
else:#1.2其余情況
#判斷當前進程是否是PP組的第一個進程(例如第一部分圖例中PP組的g0)
pre_process=mpu.is_pipeline_first_stage()
#判斷當前進程是否是PP組的最后一個進程(例如第一部分圖例中PP組的g12)
post_process=mpu.is_pipeline_last_stage()
#構建CPU版CodeGeeX模型
model=model_provider_func(pre_process=pre_process,post_process=post_process)

...

#2、將模型從CPU搬運到GPU上
#2.1如果采用Megatron-DeepSpeed的方式,則直接返回模型,后面的搬運,數據并行等工作將由deepspeed來完成
# ref:https://www.deepspeed.ai/tutorials/megatron/
ifargs.deepspeed:
returnmodel

#將當前進程所維護的模型,從CPU搬運到GPU上(GPU即為在初始化時為當前進程分配的那塊GPU)
print(f">movingmodeltoGPU...",flush=True)
formodel_moduleinmodel:
model_module.cuda(torch.cuda.current_device())
print(f">movingtoGPUdone",flush=True)

#fp16轉換(pytorch默認模型參數精度為fp32,依需決定計算過程中是否要轉成fp16,節省顯存)
ifargs.fp16orargs.bf16:
print(f">convertingmodeltofp16...",flush=True)
model=[Float16Module(model_module,args)formodel_moduleinmodel]
print(f">convertingtofp16done",flush=True)

#采用pytorch定義的DistributedDataParallel管理數據并行
ifargs.DDP_impl=="torch":
i=torch.cuda.current_device()
model=[
torchDDP(
model_module,
device_ids=[i],
output_device=i,
process_group=mpu.get_data_parallel_group(),#數據并行的組
)
formodel_moduleinmodel
]
returnmodel

#采用自定義的DistributedDataParallel管理數據并行
#即在pytorch的DistributedDataParallel的基礎上,自己再定義內存管理、梯度精度等計算方式,更有效利用顯存
ifargs.DDP_impl=="local":#自定義的數據并行類在megatron/model/distributed.py下
print(f">creatingDDPmodel...",flush=True)
model=[
LocalDDP(
model_module,
args.accumulate_allreduce_grads_in_fp32,
args.use_contiguous_buffers_in_ddp,
)
formodel_moduleinmodel
]
print(f">creatingDDPmodeldone",flush=True)
returnmodel

raiseNotImplementedError(
"UnknownDDPimplementationspecified:{}.""Exiting.".format(args.DDP_impl)
)

特別說明的是,前文提過模型的首尾兩層和中間層的架構可能不一樣,因此我們通過pre_process 和post_process來做區分。(當然你也能選擇用進程序id,只是首尾兩層經常被Q到,所以這里單獨明確了下)。對CodeGeeX來說,由它預訓練配置可知,它的PP并行度為1,也就是1塊GPU上涵蓋了模型的第一層至最后一層,所以pre_process和post_process實際上沒有用到。感興趣的朋友可以閱讀NVIDIA Megatron源碼下關于bert、gpt2的預訓練代碼,具體了解pre_process和post_process在定義模型時起的作用。

3.3 分布式模型:CodeGeeX

現在,我們來看最核心的分布式模型:CodeGeeX類。

前文說過,1個腳本處理的是1個進程上發生的事情,而1個進程對應的是模型的一部分。單進程的架構如下:

795e1270-04d9-11ee-90ce-dac502259ad0.png

圖中每個方框都表示源碼里定義的一個nn.Module 類(除了最上的方框外)具體定義為:

  • CodeGeeX: 定義一塊GPU上的模型。它由TransformerLanguageModel 和_VocabParallelCrossEntropy這兩個核心類組成。
  • TransformerLanguageModel:定義每塊GPU上輸入層embedding和中間block層的結構
  • Embedding: 定義每塊GPU上輸入層embedding結構及相關計算,輸出結果已AllReduce(TP組間)
  • ParallelTransformer:定義每塊GPU上所有中間blocks的結構及相關計算,輸出結果已AllReduce(TP組間)
  • ParallelTransformerLayer: 定義每塊GPU上單個block的結構及相關計算,輸出結果已AllReduce(TP組間)
  • ParallelSelfAttention: 定義每塊GPU上單個block中,attention的結構及相關計算,輸出結果已AllReduce(TP組間)
  • ParallelMLP: 定義每塊GPU上單個block中,mlp層的結構及相關計算,輸出結果已AllReduce(TP組間)。
  • _VocabParallelCrossEntropy: torch.autograd.Function,定義每塊GPU上,輸出層embedding、softmax和loss等結構及相關計算。

「為什么需要對輸出做AllReduce?」回顧Megtron理論部分的講解,在縱向切割模型時,Megatron是在輸入X完整的情況下,設計模型切割的方式的。因此,對于模型的每一層輸出,我們都要在TP組間做AllReduce,來保證下一層拿到的輸入也是完整的。類名字中的"Parallel",也是指在TP組中做并行,如下圖所示:

7973d074-04d9-11ee-90ce-dac502259ad0.png

到這一步,我們終于把模型切割部分的整體流程講完了。「雖然我們是以CodeGeeX為例,但這個流程圖可以看作是通用的?!?/strong>不同模型間只有模型具體結構、DP/TP/PP組設置這些方面的差別,整個并行框架是通用的。下面,我們來探究圖中所繪的各個類的細節。

四、MegatronModule

上面所繪制的幾類,并不是直接繼承自nn.Module ,而是皆繼承于自定義的class MegatronModule(torch.nn.Module)。我們說過,gpt類模型,輸入和輸出層共用一個word embedding。因此,這個類的主要作用,就是令PP組的第一個進程和最后一個進程滿足這個條件(不過我不懂為什么要把這個限制放在一個大母類中去做,設計上感覺有點奇怪)。MegatronModule類的整體架構如下:

797c7fda-04d9-11ee-90ce-dac502259ad0.png

特別說明,「initialize_word_embedding 并不是某一具體的初始化WE方法,它只是起到如圖所說的強制作用?!?/strong>

MegatronModule的代碼如下(一切盡在注釋中):

classMegatronModule(torch.nn.Module):
"""MegatronspecificextensionsoftorchModulewithsupport
forpipelining."""

def__init__(self,share_word_embeddings=True):
super(MegatronModule,self).__init__()
#input和output是否要共享一套WE
self.share_word_embeddings=share_word_embeddings

defstate_dict_for_save_checkpoint(
self,destination=None,prefix="",keep_vars=False
):
"""Usethisfunctiontooverridethestatedictfor
savingcheckpoints."""
#模型訓練中,及時將參數保存到指定位置(設置checkpoint),
#這樣在訓練出問題時,可以從checkpoint點重新load參數,繼續訓練
returnself.state_dict(destination,prefix,keep_vars)

defword_embeddings_weight(self):
"""獲取word_embedding"""
ifmpu.is_pipeline_first_stage(ignore_virtual=True):
returnself.language_model.embedding.word_embeddings.weight
ifmpu.is_pipeline_last_stage(ignore_virtual=True):
ifnotself.share_word_embeddings:
raiseException(#強制要求共享一套embedding
"word_embeddings_weight()calledforlast"
"stage,butshare_word_embeddingsisfalse"
)
returnself.word_embeddings.weight#參見initialize_word_embeddings中WE的定義
raiseException(#如果當前進程是PP組的中間進程,則其上未維護WE,因此當然獲取不到
"word_embeddings_weight()shouldbe""calledforfirstandlaststageonly"
)

definitialize_word_embeddings(self,init_method_normal):
"""強制PP組最后一個進程初始化WE時,直接使用PP組第一個進程的WE"""
args=get_args()
ifnotself.share_word_embeddings:#強制shareembeddingg
raiseException(
"initialize_word_embeddings()wascalledbut"
"share_word_embeddingsisfalse"
)

#PP組并行度為1時,第一層和最后一層都在一塊GPU上,天然共享WE,無需做強制
ifargs.pipeline_model_parallel_size==1:
return

#---------------------------------------------------
#如果流水線并行的度不為1時,依次做三件事:
#【初始化時】:
#1、在PP組最后一個進程上初始化一個WE,令其取值全為0
#2、在PP組第一個進程與最后一個進程間做一次AllReduce,保證兩者的WE完全一致
#【訓練時】:
#3、每次想在PP組第一個/最后一個進程上使用WE時,要做一次通信,保證兩者用的WE完全一致

ifmpu.is_pipeline_last_stage():#若當前進程是PP組最后一個進程
assertnotmpu.is_pipeline_first_stage()
self._word_embeddings_for_head_key="word_embeddings_for_head"
#初始化一個WE(已按vocab_size維度切割,可參見Megatron原理篇對WE的講解)
#VocabParallelEmbedding將在下文詳細講解
self.word_embeddings=mpu.VocabParallelEmbedding(
args.padded_vocab_size,#vocab_size
args.hidden_size,#embed_dim
init_method=init_method_normal(args.init_method_std),#初始化方法(在model/utils.py下)
)
#用0填充WE(等待下面做AllReduce后取得第一個進程上的WE)
self.word_embeddings.weight.data.fill_(0)
self.word_embeddings.weight.shared=True

iftorch.distributed.is_initialized():
ifmpu.is_pipeline_first_stage()ormpu.is_pipeline_last_stage():#若當前進程是PP組第一個或最后一個進程
#在兩進程間做AllReduce,保證它們使用的WE完全一致
# mpu.get_embedding_group:在源碼解讀1中講過,是除DP/TP/PP之外設置的又一進程組,
#主要就是用來做關于WE的通訊
torch.distributed.all_reduce(
self.word_embeddings_weight().data,group=mpu.get_embedding_group()
)
else:
print(
"WARNING!Distributedprocessesaren'tinitialized,so"
"wordembeddingsinthelastlayerarenotinitialized."
"Ifyouarejustmanipulatingamodelthisisfine,but"
"thisneedstobehandledmanually.Ifyouaretraining"
"somethingisdefinitelywrong."
)

五、Embedding

Emebdding類定義了word/position/segment embedding,并定義輸入X過embedding層的計算方法。關鍵屬性和方法如下圖:

798951ec-04d9-11ee-90ce-dac502259ad0.png
  • self.word_embeddings:來自自定義的VocabParallelEmbedding (下面會詳述) 。「含“Parallel”則意味著參數在TP組間做了切割」。因此self.word_embeddings 是切割好的WE。每個進程上維護根據自己進程序號所取下的那塊WE(例如下圖中的WE1,WE2,圖片來自Megatron原理篇):
790d0876-04d9-11ee-90ce-dac502259ad0.png
  • self.position_embeddingsself.tokentype_embeddings 這兩者都和輸入X相關,而輸入X是不做切割的,因此這兩者也無需切割。

  • state_dict_for_save_checkpointload_state_dict。在源碼注解里,這兩個函數分別給出了"easy load" 和"customize load"的注釋,這個注釋不是很貼切。實際上,前者用于在模型訓練過程中及時讀取當前參數,及時保存(做checkpoint);后者則一般用于模型的重載,例如訓到一半掛掉了,我們就重新初始化一個新模型,重載上個checkpoint保存下的權重。

Embedding層代碼如下(一切盡在注釋中):

classEmbedding(MegatronModule):
"""Languagemodelembeddings.

Arguments:
hidden_size:hiddensize
vocab_size:vocabularysize
max_sequence_length:maximumsizeofsequence.This
isusedforpositionalembedding
embedding_dropout_prob:dropoutprobabilityforembeddings
init_method:weightinitializationmethod
num_tokentypes:sizeofthetoken-typeembeddings.0value
willignorethisembedding
"""

def__init__(
self,
hidden_size,#每個token的向量維度
vocab_size,#詞表大小
max_sequence_length,#最長序列長度
embedding_dropout_prob,#dropoutprobabilityforembeddings
init_method,#初始化權重的方法
num_tokentypes=0,#類似于Bert中的segmenttype
):
super(Embedding,self).__init__()

args=get_args()

self.hidden_size=hidden_size
self.init_method=init_method
self.num_tokentypes=num_tokentypes
self.max_sequence_length=max_sequence_length

#WEsize:(vocab_size//TP_N,hidden_size)
#TP_N表示TP組模型并行度
self.word_embeddings=mpu.VocabParallelEmbedding(
vocab_size,self.hidden_size,init_method=self.init_method)
self._word_embeddings_key='word_embeddings'

self.vocab_size=vocab_size

#PEsize:(max_seq_len,hidden_size)
self.position_embeddings=torch.nn.Embedding(
max_sequence_length,self.hidden_size)
self.position_embeddings=self.position_embeddings.half()
self._position_embeddings_key='position_embeddings'
#Initializethepositionembeddings.
self.init_method(self.position_embeddings.weight)

#TE_size:(num_tokentypes,hidden_size)
#TE類似于Bert中的segmentembedding
self._tokentype_embeddings_key='tokentype_embeddings'
ifself.num_tokentypes>0:
self.tokentype_embeddings=torch.nn.Embedding(self.num_tokentypes,
self.hidden_size)
#Initializethetoken-typeembeddings.
self.init_method(self.tokentype_embeddings.weight)
else:
self.tokentype_embeddings=None

#Embeddingsdropout
self.embedding_dropout=torch.nn.Dropout(embedding_dropout_prob)

defadd_tokentype_embeddings(self,num_tokentypes):
"""如果在pretrain階段未定義TE,而在fine-tune階段TE,則可通過此函數添加
"""
ifself.tokentype_embeddingsisnotNone:
raiseException('tokentypeembeddingsisalreadyinitialized')
iftorch.distributed.get_rank()==0:
print('addingembeddingfor{}tokentypes'.format(num_tokentypes),
flush=True)
self.num_tokentypes=num_tokentypes
self.tokentype_embeddings=torch.nn.Embedding(num_tokentypes,
self.hidden_size)
#Initializethetoken-typeembeddings.
self.init_method(self.tokentype_embeddings.weight)

defforward(self,input_ids,position_ids,tokentype_ids=None):
"""定義輸入X過embedding層的計算方法
"""

#words_embeddingssize=(b,seq_len,hidden_size)
#再次注意:self.word_embeddings做forward時,最終的輸出結果時AllReduce的(見上圖)
words_embeddings=self.word_embeddings(input_ids)
#position_embeddingssize=(b,seq_len,hidden_size)
position_embeddings=self.position_embeddings(position_ids)
#embedding=WE+PE
#embeddingsize=(b,seq_len,hidden_size)
embeddings=words_embeddings+position_embeddings
#依需要決定是否增加TE
iftokentype_idsisnotNone:
assertself.tokentype_embeddingsisnotNone
embeddings=embeddings+self.tokentype_embeddings(tokentype_ids)
else:
assertself.tokentype_embeddingsisNone

#Dropout.
embeddings=self.embedding_dropout(embeddings)

returnembeddings

defstate_dict_for_save_checkpoint(
self,destination=None,prefix='',keep_vars=False,
):
"""Foreasyload.
在模型訓練過程中及時讀取當前參數,方便及時保存(做checkpoint)
篇幅限制,這里不展示細節
"""
...

defload_state_dict(self,state_dict,strict=True):
"""Customizedload.
用于模型的重載。例如訓到一半掛掉了,我們就重新初始化一個新模型,
重載上個checkpoint保存下的權重。
篇幅限制,這里不展示細節
"""
...

六、VocabParallelEmbedding

該類用于定義分布式的word embedding,整體架構如下,同樣只列舉了核心屬性和方法:

79ad1a8c-04d9-11ee-90ce-dac502259ad0.png

具體代碼如下,可以特別關注「初始化和forward」部分,同時建議大家閱讀理論篇中關于這一過程的詳細講解(一切盡在注釋中)

classVocabParallelEmbedding(torch.nn.Module):
"""Embeddingparallelizedinthevocabularydimension.

Thisismainlyadaptedfromtorch.nn.Embeddingandallthedefault
valuesarekept.
Arguments:
num_embeddings:vocabularysize.
embedding_dim:sizeofhiddenstate.
init_method:methodtoinitializeweights.
"""

def__init__(self,num_embeddings,embedding_dim,init_method=init.xavier_normal_):
super(VocabParallelEmbedding,self).__init__()
#Keeptheinputdimensions.
self.num_embeddings=num_embeddings#vocab_size
self.embedding_dim=embedding_dim#hidden_state.
#Setthedetaulsforcompatibility.
self.padding_idx=None
self.max_norm=None
self.norm_type=2.0
self.scale_grad_by_freq=False
self.sparse=False
self._weight=None
#當前進程所在TP組進程總數
self.tensor_model_parallel_size=get_tensor_model_parallel_world_size()
#根據當前進程在TP組中的序號,確定其所需維護的WE部分,沿著vocab維度對WE進行切割
#例如,進程id=0, 維護詞表序號[0,5)范圍內的數據;進程id=1,維護[5,10)
(
self.vocab_start_index,
self.vocab_end_index,
)=VocabUtility.vocab_range_from_global_vocab_size(
self.num_embeddings,
get_tensor_model_parallel_rank(),
self.tensor_model_parallel_size,
)
#計算當前進程維護的詞表大小
self.num_embeddings_per_partition=(
self.vocab_end_index-self.vocab_start_index
)

#對WE做初始化
args=get_args()#讀取預訓練參數配置
ifargs.use_cpu_initialization:#CPU上做初始化
self.weight=Parameter(#在CPU上先生成一個完整的WE
torch.empty(
self.num_embeddings_per_partition,
self.embedding_dim,
dtype=args.params_dtype,
#dtype=torch.float32,
)
)
#對CPU上的WE做切割(隨機種子在初始化分布式中已設定好,不用變)
_initialize_affine_weight_cpu(
self.weight,
self.num_embeddings,
self.embedding_dim,
self.num_embeddings_per_partition,
0,
init_method,#初始化權重的方法,例如xavier之類
)
else:#在GPU上做初始化
self.weight=Parameter(#生成一個切割好的WE
torch.empty(
self.num_embeddings_per_partition,
self.embedding_dim,
device=torch.cuda.current_device(),
dtype=args.params_dtype,
#dtype=torch.float32,
)
)
#在GPU上做初始化,注意TP組內不同進程采用不同的隨機種子
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=0,stride=1
)

defforward(self,input_):
"""定義輸入X過WE的計算方法,輸出結果已經過AllReduce"""
ifself.tensor_model_parallel_size>1:#如果使用TP
#如果在當前進程維護的WE上,找不到對應的單詞,那么對應位置就賦0
#例如當前的數據的tokenid是:[2,7,1,5],當前維護的詞表是[0,1,2](start_index=0, end_index = 3),
#則mask之后的數據為[2,0,1,0]
#Buildthemask.
input_mask=(input_=self.vocab_end_index
)
#Masktheinput.
masked_input=input_.clone()-self.vocab_start_index
masked_input[input_mask]=0
else:
masked_input=input_

#輸入X,過當前進程維護的部分WE的結果
output_parallel=F.embedding(
masked_input,#tensorcontainingindicesintotheembeddingmatrix
self.weight,#切割好的wordembedding的權重
self.padding_idx,
self.max_norm,
self.norm_type,
self.scale_grad_by_freq,
self.sparse,
)
#當前詞表不維護的部分,都設為0
ifself.tensor_model_parallel_size>1:
output_parallel[input_mask,:]=0.0#

#將TP組各GPU上的結果做AllReduce
output=reduce_from_tensor_model_parallel_region(output_parallel)
returnoutput

def_initialize_affine_weight_cpu(...):
"""CPU版權重初始化。這個不難,大家可以自己閱讀"""
...

def_initialize_affine_weight_gpu(...):
"""GPU版權重初始化。特別關注設置隨機種子部分"""
...
#借助deepspeed或自定義的get_cuda_rng_tracker方法,對隨機種子進行操作
#get_cuda_rng_tracker細節,大家可自行閱讀源碼
ifds_checkpointing.is_configured():
globalget_cuda_rng_tracker
get_cuda_rng_tracker=ds_checkpointing.get_cuda_rng_tracker

withget_cuda_rng_tracker().fork():
init_method(weight)

七、ParallelSelfAttention:分布式block的一般套路

【閱讀提示】:閱讀本節時可:

  • 對照第一部分CodeGeeX框架圖
  • 對照Megatron理論篇對矩陣切分的講解

首先來看切割Attention的示意圖,由圖可知,「對QKV矩陣,采用“列切割”,對線性矩陣B,采用“行切割”」。這樣設計的好處是,在經過QKV的計算后,各進程在不用通訊的前提下,繼續做線性計算,直到最后一步才AllReduce,起到降低通訊成本的作用:

79224f60-04d9-11ee-90ce-dac502259ad0.png

我們先單獨來看“列切割”與“行切割”的實現代碼。Megatron將它們定義成了兩個nn.Module類。

7.1 列切割:ColumnParallelLinear

列切割示意圖如下:

79c65f06-04d9-11ee-90ce-dac502259ad0.png
  • f和g是兩個共軛算子,可理解為兩個torch.autograd.Function類。在這個類下,我們可以「根據需要重寫forward和backward方法」。

  • f: 「forward中,直接copy輸入;backward中,對梯度做AllReduce」。在代碼里定義為class _CopyToModelParallelRegion(torch.autograd.Function)

  • g: 「forward中,all-gather輸出;backward中,對梯度做split」(每張卡經過all-gather已有完整的Y了,因此以Y為起點計算梯度后,沿著列做split就可得到Y1和Y2的梯度)。在代碼里定義為class _GatherFromModelParallelRegion(torch.autograd.Function)

classColumnParallelLinear(torch.nn.Module):
"""Linearlayerwithcolumnparallelism.

ThelinearlayerisdefinedasY=XA+b.Aisparallelizedalong
itsseconddimensionasA=[A_1,...,A_p].

Arguments:
input_size:firstdimensionofmatrixA.
output_size:seconddimensionofmatrixA.
bias:Iftrue,addbias
gather_output:Iftrue,callall-getheronoutputandmakeYavaiable
toallGPUs,otherwise,everyGPUwillhaveitsoutput
whichisY_i=XA_i
init_method:methodtoinitializeweights.Notethatbiasisalwaysset
tozero.
stride:Forthestridedlinearlayers.
keep_master_weight_for_test:Thiswasaddedfortestingandshouldbe
settoFalse.Itreturnsthemasterweights
usedforinitialization.
skip_bias_add:Thiswasaddedtoenableperformanceoptimationswherebias
canbefusedwithotherelementwiseoperations.weskip
addingbiasbutinsteadreturnit.
"""
#該類定義了切割后的權重W,例如對上圖來說,W1和W2都可分別視為該類的一個實例

def__init__(
self,
input_size,#W的第一個維度
output_size,#W的第二個維度
bias=True,#是否需要引入bias
gather_output=True,#決定是否要將Y1和Y2做all-gather
init_method=init.xavier_normal_,
stride=1,
keep_master_weight_for_test=False,
skip_bias_add=False,
params_dtype=None,
skip_init=False,
device=None,
):
super(ColumnParallelLinear,self).__init__()

#Keepinputparameters
self.input_size=input_size
self.output_size=output_size
self.gather_output=gather_output
#Dividetheweightmatrixalongthelastdimension.
#當前進程所在TP組的總進程數
world_size=get_tensor_model_parallel_world_size()
#每塊GPU上維護的hidden_size的大小,等于原hidden_zize//TP組總進程數
self.output_size_per_partition=divide(output_size,world_size)
self.skip_bias_add=skip_bias_add
self.params_dtype=params_dtype
self.device=device
#Parameters.
#Note:torch.nn.functional.linearperformsXA^T+bandasaresult
#Initializeweight.
args=get_args()#取得命令行所有的參數
ifnotskip_init:
ifargs.use_cpu_initialization:#CPU上初始化
self.weight=Parameter(
torch.empty(
self.output_size_per_partition,
self.input_size,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
self.master_weight=_initialize_affine_weight_cpu(#
self.weight,
self.output_size,
self.input_size,
self.output_size_per_partition,
0,
init_method,
stride=stride,
return_master_weight=keep_master_weight_for_test,
)
else:#GPU上初始化
self.weight=Parameter(
torch.empty(
self.output_size_per_partition,
self.input_size,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=0,stride=stride
)
else:
self.register_parameter("weight",None)

#對bias做處理,道理同weight
ifbiasandnotskip_init:
ifargs.use_cpu_initialization:#CPU上初始化
self.bias=Parameter(
torch.empty(self.output_size_per_partition,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype)
)
else:
self.bias=Parameter(#GPU上初始化
torch.empty(
self.output_size_per_partition,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)

set_tensor_model_parallel_attributes(self.bias,True,0,stride)
#Alwaysinitializebiastozero.
withtorch.no_grad():
self.bias.zero_()
else:
self.register_parameter("bias",None)

defforward(self,input_):
#定義列切割中的f算子
#調用copy_to_tensor_model_parallel_region則新建一個_CopyToModelParallelRegion實例(見下)
input_parallel=copy_to_tensor_model_parallel_region(input_)

bias=self.biasifnotself.skip_bias_addelseNone#定義bias
output_parallel=F.linear(input_parallel,self.weight,bias)#X*切割好的權重
#決定是否要對每個進程上的輸出結果做All-Reduce
ifself.gather_output:
#定義列切割中的g算子
#調用gather_from_tensor_model_parallel_region則新建一個_GatherFromModelParallelRegion實例(見下)
output=gather_from_tensor_model_parallel_region(output_parallel)#把各GPU上的輸出按照列gather起來后,作為最終輸出
else:
output=output_parallel#否則最終輸出還是自己算的那塊GPU
output_bias=self.biasifself.skip_bias_addelseNone
returnoutput,output_bias

#列切割中的f與g
class_CopyToModelParallelRegion(torch.autograd.Function):
"""Passtheinputtothemodelparallelregion."""
#列切割下的f算子
# forward:copy輸入
# backward:對梯度做AllReduce

@staticmethod
defsymbolic(graph,input_):
returninput_

@staticmethod
defforward(ctx,input_):
returninput_

@staticmethod
defbackward(ctx,grad_output):
return_reduce(grad_output)

class_GatherFromModelParallelRegion(torch.autograd.Function):
"""Gathertheinputfrommodelparallelregionandconcatinate."""
#列切割中的g算子
# forward:All-Gather輸出
# backward:對梯度,沿著列方向做split

@staticmethod
defsymbolic(graph,input_):
return_gather(input_)

@staticmethod
defforward(ctx,input_):
return_gather(input_)

@staticmethod
defbackward(ctx,grad_output):
return_split(grad_output)

7.2 行切割:RowParallelLinear

79e77786-04d9-11ee-90ce-dac502259ad0.png
  • f: forward中,按列split輸入;backward中,all-gather梯度
  • g: forward中,AllReduce輸出;backward中,直接輸出梯度,無需做任何通訊(因為經過g的foward,每塊GPU上已擁有了Yi和Y,則根據圖中g的backward公式可知,每塊GPU可獨立計算梯度)

代碼如下:

classRowParallelLinear(torch.nn.Module):
"""Linearlayerwithrowparallelism.

ThelinearlayerisdefinedasY=XA+b.Aisparallelizedalong
itsfirstdimensionandXalongitsseconddimensionas:
--
|A_1|
|.|
A=|.|X=[X_1,...,X_p]
|.|
|A_p|
--
Arguments:
input_size:firstdimensionofmatrixA.
output_size:seconddimensionofmatrixA.
bias:Iftrue,addbias.Notethatbiasisnotparallelized.
input_is_parallel:Iftrue,weassumethattheinputisalready
splitacrosstheGPUsandwedonotsplit
again.
init_method:methodtoinitializeweights.Notethatbiasisalwaysset
tozero.
stride:Forthestridedlinearlayers.
keep_master_weight_for_test:Thiswasaddedfortestingandshouldbe
settoFalse.Itreturnsthemasterweights
usedforinitialization.
skip_bias_add:Thiswasaddedtoenableperformanceoptimationswherebias
canbefusedwithotherelementwiseoperations.weskip
addingbiasbutinsteadreturnit.
"""

def__init__(
self,
input_size,
output_size,
bias=True,
input_is_parallel=False,
init_method=init.xavier_normal_,
stride=1,
keep_master_weight_for_test=False,
skip_bias_add=False,
params_dtype=None,
skip_init=False,
device=None,
):
super(RowParallelLinear,self).__init__()

#Keepinputparameters
self.input_size=input_size
self.output_size=output_size
self.input_is_parallel=input_is_parallel
#Dividetheweightmatrixalongthelastdimension.
world_size=get_tensor_model_parallel_world_size()
self.input_size_per_partition=divide(input_size,world_size)
self.skip_bias_add=skip_bias_add
self.params_dtype=params_dtype
self.device=device

#Parameters.
#Note:torch.nn.functional.linearperformsXA^T+bandasaresult
#weallocatethetranspose.
#Initializeweight.
args=get_args()
ifnotskip_init:
ifargs.use_cpu_initialization:
self.weight=Parameter(
torch.empty(
self.output_size,
self.input_size_per_partition,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
self.master_weight=_initialize_affine_weight_cpu(
self.weight,
self.output_size,
self.input_size,
self.input_size_per_partition,
1,
init_method,
stride=stride,
return_master_weight=keep_master_weight_for_test,
)
else:
self.weight=Parameter(
torch.empty(
self.output_size,
self.input_size_per_partition,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=1,stride=stride
)
else:
self.register_parameter("weight",None)

ifbiasandnotskip_init:
ifargs.use_cpu_initialization:
self.bias=Parameter(
torch.empty(self.output_size,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype)
)
else:
self.bias=Parameter(
torch.empty(
self.output_size,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
#Alwaysinitializebiastozero.
withtorch.no_grad():
self.bias.zero_()
else:
self.register_parameter("bias",None)

defforward(self,input_):
#Setupbackpropall-reduce.
ifself.input_is_parallel:
input_parallel=input_
else:
input_parallel=scatter_to_tensor_model_parallel_region(input_)
#Matrixmultiply.
output_parallel=F.linear(input_parallel,self.weight)
#All-reduceacrossallthepartitions.
output_=reduce_from_tensor_model_parallel_region(output_parallel)
ifnotself.skip_bias_add:
output=output_+self.biasifself.biasisnotNoneelseoutput_
output_bias=None
else:
output=output_
output_bias=self.bias
returnoutput,output_bias

#行切割中的f和g算子
class_ScatterToModelParallelRegion(torch.autograd.Function):
"""Splittheinputandkeeponlythecorrespondingchucktotherank."""
#行切割中的f算子
# forward:沿列split輸入
# backward:all-gather梯度
@staticmethod
defsymbolic(graph,input_):
return_split(input_)

@staticmethod
defforward(ctx,input_):
return_split(input_)

@staticmethod
defbackward(ctx,grad_output):
return_gather(grad_output)

class_ReduceFromModelParallelRegion(torch.autograd.Function):
"""All-reducetheinputfromthemodelparallelregion."""
#行切割中的g算子
# forward:AllReduce輸出
# backward:正常計算梯度,GPU間無需做任何通訊
@staticmethod
defsymbolic(graph,input_):
return_reduce(input_)

@staticmethod
defforward(ctx,input_):
return_reduce(input_)

@staticmethod
defbackward(ctx,grad_output):
returngrad_output

7.3 ParallelSelfAttention

該類的構造如下圖:

79fefadc-04d9-11ee-90ce-dac502259ad0.png

這張圖中透露的核心含義是,「每個進程上維護的都是按列切割完的QKV矩陣」,進程間獨立計算,QKV矩陣的輸出結果一般不做AllReduce。同時,「每個進程上維護的是按行切割完的dense(線型層)矩陣」,Attention輸出過線性層后的結果,做AllReduce。另外,在設置attention_dropout時,同樣調用了get_cuda_rng_tracker 方法,令TP組內的進程擁有不同的隨機種子。「最后,你可能想問,dense后的dropout去哪里了」?代碼里把它定義到了ParallelTransformerLayer 下(等于attention + mlp)。

相信有了上面的說明,看這塊代碼就不難了。篇幅限制,這里不展示代碼了。大家可以對照著CodeGeeX架構圖,來看這里multi-head attention的計算方式。

ParallelMLP,ParallelTransformerLayer和ParallelTransformer都采用的是一樣的套路,也略過不言。

八、CrossEntropy

現在,終于可以來看模型的最后一層:交叉熵的平行計算。核心類為_VocabParallelCrossEntropy

我們在原理篇中講過交叉熵的并行計算,其優化核心是將通訊量從b*s*v降至b*s。但是Megatron代碼中定義的交叉熵計算方式,稍微復雜一些,也和我們一般理解的交叉熵有些許差異。所以我們先用圖解,來看下代碼定義的交叉熵計算流程:

【注】:

  • 對X和Y_i來說,(b, s, h)維度下應該畫成一個立方體,為了表達簡練,這里將b拍平了。
  • 對其余維度中含b的矩陣,b正常表示,即row=b
7a173aa2-04d9-11ee-90ce-dac502259ad0.png

8.1 計算logit

首先,在使用_VocabParallelCrossEntropy 計算交叉熵前,我們需要計算logit。這時我們調用parallel_lm_logits 函數,將模型最后一層的輸出X(復習一下,這個X已經在TP組內AllReduce了),乘上當前進程上維護的輸入層WE的轉置(復習一下,輸入層和輸出層共用一套embedding),得到當前進程的logit Y_i,「同時我們選擇不對輸出logit做AllReduce」。

你可能會有一個疑惑:「在Transformer中,輸出層會額外訓練一個線性矩陣,來計算logit;為什么在gpt中,可以用輸入層WE的轉置來代替這個線性矩陣?」

這個問題的答案,對理解Megatron交叉熵計算也至關重要。我們可「將X*WE^T結果理解成“X與WE間的相似度”」,例如對Y1來說,它的第一行中的每個logit,表示第一個token與詞表里每個詞的相似度。

注意到每個進程上只維護部分WE。例如,假設詞表共有5個單詞,WE1維護前5個單詞,WE2維護后5個單詞。因此再嚴格來說:「對Y1,它的第一行中的每個logit,表示第一個token與詞表中前5個詞的相似度;對Y2,它的第一行中的每個logit,表示第一個token與詞表中后5個詞的相似度。我們要記住這個含義。」

8.2 計算交叉熵

知道了logit的含義,我們來看交叉熵計算。

首先做了一系列求max的計算,得到基于全局的max(logit),再將orig_logit - max(logit),得到處理后的結果。這步理解起來不難,主要目的是為了防止計算溢出。

「接下來,就是基于logit算loss了?!?/strong>

  • 每個進程上都有一份(b, s)維度的真值,它表示每個token的真值是哪個詞(詞用id表示)。我們基于這份真值,在Y_i上找出真值位置的logit。例如:seq_length = 3,即我們需要對3個token去做預測,假設前兩個token的真值在第1個進程所維護的WE1中,最后一個token的真值在第2個進程所維護的WE2中。那么我們去Y1的前兩行里,取出真值位置的logit,這個logit表示“token與真值的相似度”,去Y2的最后一行里做同樣操作。

  • 這樣,我們就能得到L1和L2,和真值位置不對應的地方,統一填充0。隨后對L1和L2做AllReduce,得到L。「L中的每行表示“token與真值間的相似度"」

  • 現在,我們回來對Y1和Y2的每一行求sum(e^logit),得到e1和e2。將e1和e2做AllReduce,得到e。「e中的每行表示“token和詞表中所有詞相似度的總和”」

  • 我們希望「(token和詞表中所有詞相似度的總和-token與真值間的相似度) /token和詞表中所有詞相似度的總和」這個值最小,這個差值就是最終的loss。

8.3 代碼

理清了這點,現在可以來看代碼了(一切盡在注釋中),建議對這塊還有疑問的朋友,可以寫個test腳本把中間結果打印出來,方便理解:

class_VocabParallelCrossEntropy(torch.autograd.Function):
"""
分布式計算Loss
"""
@staticmethod
defforward(ctx,vocab_parallel_logits,target):
#1.logit-globalmax(logit)操作,主要目的是防溢出
logits_max=torch.max(vocab_parallel_logits,dim=-1)[0]#(b,s,1)
torch.distributed.all_reduce(#(b,s,1)
logits_max,
op=torch.distributed.ReduceOp.MAX,#找全局最大值
group=get_tensor_model_parallel_group(),
)
#Subtractthemaximumvalue.
vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1))#原始GPU上維護的logits減去每行最大值(防止溢出)

#2、根據當前進程id,取出當前進程所維護詞表序號等信息
#函數,能夠獲取當前進程所維護詞表的start_index和end_index
get_vocab_range=VocabUtility.vocab_range_from_per_partition_vocab_size
#這塊GPU上logits最后一維的大小,等于所維護的詞表的大?。╲/N)
partition_vocab_size=vocab_parallel_logits.size()[-1]
#取得當前進程所在TP組中的序號
rank=get_tensor_model_parallel_rank()
#取得當前進程所在TP組的總進程數
world_size=get_tensor_model_parallel_world_size()
#取得當前進程所維護的詞表的start_index和end_index
vocab_start_index,vocab_end_index=get_vocab_range(
partition_vocab_size,rank,world_size
)

#3.基于真值,取出每個token在真值位置上的logit(即和真值的相似度)
#Createamaskofvalidvocabids(1meansitneedstobemasked)
target_mask=(target=vocab_end_index)#target=(b,s)
masked_target=target.clone()-vocab_start_index
masked_target[target_mask]=0

#Getpredicted-logits=logits[target].
#ForSimplicity,weconvertlogitstoa2-Dtensorwithsize
#[*,partition-vocab-size]andtargettoa1-Dtensorofsize[*].
logits_2d=vocab_parallel_logits.view(-1,partition_vocab_size)#(b*s,v/N)
masked_target_1d=masked_target.view(-1)#(b*s)
arange_1d=torch.arange(#[b*s]
start=0,end=logits_2d.size()[0],device=logits_2d.device
)
#logits_2d[arange_1d,masked_target_1d]:
# tensor的切片操作。arange_1d:取出所有的行。masked_target_1d:取出logit
predicted_logits_1d=logits_2d[arange_1d,masked_target_1d]#(b*s)
predicted_logits_1d=predicted_logits_1d.clone().contiguous()
predicted_logits=predicted_logits_1d.view_as(target)#(b,s)
predicted_logits[target_mask]=0.0
#AllreduceisneededtogetthechunksfromotherGPUs.
torch.distributed.all_reduce(#allreduce之后得到的logit矩陣為(b,s),每一個位置表示對應真值位置的預測logit
predicted_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_tensor_model_parallel_group(),
)

#SumofexponentialoflogitsalongvocabdimensionacrossallGPUs.
exp_logits=vocab_parallel_logits#(b,s,v/N)
torch.exp(vocab_parallel_logits,out=exp_logits)
sum_exp_logits=exp_logits.sum(dim=-1)#(b,s)
torch.distributed.all_reduce(
sum_exp_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_tensor_model_parallel_group(),
)

#4.計算Loss=log(sum(exp(logits)))-predicted-logit.
loss=torch.log(sum_exp_logits)-predicted_logits#(b,s)

#Storesoftmax,target-maskandmasked-targetforbackwardpass.
exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))
ctx.save_for_backward(exp_logits,target_mask,masked_target_1d)

returnloss

@staticmethod
defbackward(ctx,grad_output):

#Retreivetensorsfromtheforwardpath.
softmax,target_mask,masked_target_1d=ctx.saved_tensors

#Alltheinputshavesoftmaxastheirgradient.
grad_input=softmax
#Forsimplicity,workwiththe2Dgradient.
partition_vocab_size=softmax.size()[-1]
grad_2d=grad_input.view(-1,partition_vocab_size)

#Addthegradientfrommatchingclasses.
arange_1d=torch.arange(start=0,end=grad_2d.size()[0],device=grad_2d.device)
grad_2d[arange_1d,masked_target_1d]-=1.0-target_mask.view(-1).float()

#Finallyelementwisemultiplicationwiththeoutputgradients.
grad_input.mul_(grad_output.unsqueeze(dim=-1))

returngrad_input,None

九、總結

啊這總結怎么寫呢,嘔心瀝血終于寫完了。希望能給到大家幫助!


聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • gpu
    gpu
    +關注

    關注

    28

    文章

    4729

    瀏覽量

    128890
  • 源碼
    +關注

    關注

    8

    文章

    639

    瀏覽量

    29185
  • 大模型
    +關注

    關注

    2

    文章

    2423

    瀏覽量

    2643

原文標題:圖解大模型訓練之:Megatron源碼解讀2,模型并行

文章出處:【微信號:GiantPandaCV,微信公眾號:GiantPandaCV】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    基于Transformer做大模型訓練基本的并行范式

    并行(TP)。 它的基本思想就是把模型的參數縱向切開,放到不同的GPU上進行獨立計算,然后再做聚合。 在寫這篇文章的過程中,我發現要理解Megatron的大框架不難,但是涉及到細節,特別是混合
    的頭像 發表于 05-31 14:38 ?2672次閱讀
    基于Transformer做大<b class='flag-5'>模型</b>預<b class='flag-5'>訓練</b>基本的<b class='flag-5'>并行</b>范式

    【大語言模型:原理與工程實踐】大語言模型的預訓練

    大語言模型的核心特點在于其龐大的參數量,這賦予了模型強大的學習容量,使其無需依賴微調即可適應各種下游任務,而更傾向于培養通用的處理能力。然而,隨著學習容量的增加,對預訓練數據的需求也相應
    發表于 05-07 17:10

    用PVC管自制遙控火車模型的教程圖解

    槽的上面,檢查是否吻合并調整。 自制遙控火車模型的教程圖解 擰緊螺絲,滴加502固定 自制遙控火車模型的教程圖解 自制遙控火車模型的教
    發表于 12-29 15:03

    Pytorch模型訓練實用PDF教程【中文】

    本教程以實際應用、工程開發為目的,著重介紹模型訓練過程中遇到的實際問題和方法。在機器學習模型開發中,主要涉及三大部分,分別是數據、模型和損失函數及優化器。本文也按順序的依次介紹數據、
    發表于 12-21 09:18

    基于EAIDK的人臉算法應用-源碼解讀(2)

    上一期介紹了基于EAIDK的人臉算法應用,本期從應用角度,解讀一下該案例源碼。本期案例源碼解讀,主要從源碼目錄結構、配置文件、
    的頭像 發表于 12-10 21:14 ?959次閱讀

    超大Transformer語言模型的分布式訓練框架

    NVIDIA Megatron 是一個基于 PyTorch 的框架,用于訓練基于 Transformer 架構的巨型語言模型。本系列文章將詳細介紹Megatron的設計和實踐,探索這一
    的頭像 發表于 10-11 16:46 ?2682次閱讀
    超大Transformer語言<b class='flag-5'>模型</b>的分布式<b class='flag-5'>訓練</b>框架

    探究超大Transformer語言模型的分布式訓練框架

    模型的預訓練計算。 上篇主要介紹了大模型訓練的發展趨勢、NVIDIA Megatron模型
    的頭像 發表于 10-20 09:25 ?2436次閱讀

    KT利用NVIDIA AI平臺訓練大型語言模型

    韓國先進的移動運營商構建包含數百億個參數的大型語言模型,并使用 NVIDIA DGX SuperPOD 平臺和 NeMo Megatron 框架訓練模型。
    的頭像 發表于 09-27 09:24 ?1188次閱讀

    圖解模型系列Megatron源碼解讀1,分布式環境初始化

    使用Megatron訓練gpt類大模型的項目有很多。在這個系列里,我選擇了由THUDM開發的CodeGeeX項目,它是gpt在代碼生成方向上的應用,對標于openAI的CodeX。github地址在此。
    的頭像 發表于 06-06 15:22 ?5922次閱讀
    <b class='flag-5'>圖解</b>大<b class='flag-5'>模型</b>系列<b class='flag-5'>之</b>:<b class='flag-5'>Megatron</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>1,分布式環境初始化

    圖解模型訓練:數據并行上篇(DP, DDP與ZeRO)

    數據并行的核心思想是:在各個GPU上都拷貝一份完整模型,各自吃一份數據,算一份梯度,最后對梯度進行累加來更新整體模型。理念不復雜,但到了大模型場景,巨大的存儲和GPU間的通訊量,就是系
    發表于 06-16 09:54 ?3483次閱讀
    <b class='flag-5'>圖解</b>大<b class='flag-5'>模型</b><b class='flag-5'>訓練</b><b class='flag-5'>之</b>:數據<b class='flag-5'>并行</b>上篇(DP, DDP與ZeRO)

    DeepSpeed結合Megatron-LM訓練GPT2模型筆記

    本文基于DeepSpeedExamples倉庫中給出的Megatron相關例子探索一下訓練GPT2模型的流程。主要包含3個部分,第一個部分是基于原始的
    的頭像 發表于 06-19 14:45 ?3474次閱讀
    DeepSpeed結合<b class='flag-5'>Megatron</b>-LM<b class='flag-5'>訓練</b>GPT<b class='flag-5'>2</b><b class='flag-5'>模型</b>筆記

    模型分布式訓練并行技術(一)-概述

    數據并行是最常見的并行形式,因為它很簡單。在數據并行訓練中,數據集被分割成幾個碎片,每個碎片被分配到一個設備上。這相當于沿批次(Batch)維度對
    的頭像 發表于 08-24 15:17 ?1455次閱讀
    大<b class='flag-5'>模型</b>分布式<b class='flag-5'>訓練</b><b class='flag-5'>并行</b>技術(一)-概述

    基于PyTorch的模型并行分布式訓練Megatron解析

    NVIDIA Megatron 是一個基于 PyTorch 的分布式訓練框架,用來訓練超大Transformer語言模型,其通過綜合應用了數據并行
    的頭像 發表于 10-23 11:01 ?2966次閱讀
    基于PyTorch的<b class='flag-5'>模型</b><b class='flag-5'>并行</b>分布式<b class='flag-5'>訓練</b><b class='flag-5'>Megatron</b>解析

    基于NVIDIA Megatron Core的MOE LLM實現和訓練優化

    本文將分享阿里云人工智能平臺 PAI 團隊與 NVIDIA Megatron-Core 團隊在 MoE (Mixture of Experts) 大語言模型(LLM)實現與訓練優化上的創新工作。
    的頭像 發表于 03-22 09:50 ?759次閱讀
    基于NVIDIA <b class='flag-5'>Megatron</b> Core的MOE LLM實現和<b class='flag-5'>訓練</b>優化

    解讀PyTorch模型訓練過程

    PyTorch作為一個開源的機器學習庫,以其動態計算圖、易于使用的API和強大的靈活性,在深度學習領域得到了廣泛的應用。本文將深入解讀PyTorch模型訓練的全過程,包括數據準備、模型
    的頭像 發表于 07-03 16:07 ?1046次閱讀
    主站蜘蛛池模板: 国产成人在线播放视频| 9988电影网| 在线看片亚洲| 国产人人为我我为人人澡| 青柠电影在线看| 97国产精品久久精品国产| 久久最新地址获取| 夜色资源站国产www在线视频| 国产在线高清视频无码不卡| 我要搞av| 国产午夜电影院| 亚洲第一色网站| 国产在线高清亚洲精品一区 | xxxx69美国| 免费成人高清在线视频| 4399亚洲AV无码V无码网站| 美女张开大腿| 91黄色大片| 妻子的秘密HD观看| 草比比过程图| 四虎永久在线精品免费A| 国产成人啪精视频精东传媒网站 | G国产精品无马| 全是肉的高h短篇列车| 糙汉顶弄抽插HHHH| 四虎永久在线精品免费A| 国产精品自产拍在线观看中文 | 国产学生无码中文视频一区| 亚洲国产成人精品无码区APP | 久久久久青草大香线综合精品| 在线播放毛片| 你的欲梦裸身在线播放| 啊…嗯啊好深男男小黄文| 色AV色婷婷96人妻久久久| 国产女人毛片| 伊人久久大线蕉香港三级| 麻豆AV福利AV久久AV| YY8090福利午夜理论片| 校花被扒衣吸乳羞羞漫画| 久久久久夜| 出租屋交换人妻 全文|