ist Teil einer Serie über verteilte KI über mehrere GPUs:
- Teil 1: Das Host- und Geräteparadigma verstehen
- Teil 2: Punkt-zu-Punkt- und Sammeloperationen (dieser Artikel)
- Teil 3: Wie GPUs kommunizieren (kommt bald)
- Teil 4: Gradientenakkumulation und verteilte Datenparallelität (DDP) (kommt bald)
- Teil 5: ZeRO (kommt bald)
- Teil 6: Tensorparallelität (kommt bald)
Einführung
Im vorherigen Beitrag haben wir das Host-Gerät-Paradigma etabliert und das Konzept der Ränge für Multi-GPU-Workloads eingeführt. Jetzt untersuchen wir die spezifischen Kommunikationsmuster, die PyTorch bietet torch.distributed Modul zur Koordinierung der Arbeit und zum Datenaustausch zwischen diesen Ebenen. Diese Operationen, bekannt als Kollektivesind die Bausteine verteilter Workloads.
Obwohl PyTorch diese Vorgänge offenlegt, ruft es letztendlich ein Backend-Framework auf, das die Kommunikation tatsächlich implementiert. Für NVIDIA-GPUs ist es so NCCL (NVIDIA Collective Communications Library), während es sich bei AMD um RCCL (ROCm Communication Collectives Library) handelt.
NCCL implementiert Multi-GPU- und Multi-Node-Kommunikationsprimitive, die für NVIDIA-GPUs und Netzwerke optimiert sind. Es erkennt automatisch die aktuelle Topologie (Kommunikationskanäle wie PCIe, NVLink, InfiniBand) und wählt die effizienteste aus.
Haftungsausschluss 1: Da NVIDIA-GPUs am häufigsten vorkommen, konzentrieren wir uns auf die
NCCLBackend für diesen Beitrag.
Haftungsausschluss 2: Der Kürze halber stellt der unten dargestellte Code nur die Hauptargumente jeder Methode anstelle aller verfügbaren Argumente bereit.
Haftungsausschluss 3: Der Einfachheit halber zeigen wir nicht die Speicherfreigabe von Tensoren, sondern Operationen wie
scatterwird den Speicher des Quellrangs nicht automatisch freigeben (wenn Sie nicht verstehen, was ich meine, ist das in Ordnung, es wird sehr bald klar werden).
Kommunikation: Blockieren vs. Nicht-Blockieren
Um zusammenzuarbeiten, müssen GPUs Daten austauschen. Die CPU initiiert die Kommunikation, indem sie NCCL-Kernel in CUDA-Streams einreiht (wenn Sie nicht wissen, was CUDA-Streams sind, schauen Sie sich die an erster Blogbeitrag dieser Serie), aber die eigentliche Datenübertragung erfolgt direkt zwischen GPUs über die Verbindung, wobei der Hauptspeicher der CPU vollständig umgangen wird. Idealerweise sind die GPUs mit einer Hochgeschwindigkeitsverbindung wie NVLink oder InfiniBand verbunden (diese Verbindungen werden im behandelt). Dritter Beitrag dieser Serie).
Diese Kommunikation kann synchron (blockierend) oder asynchron (nicht blockierend) sein, was wir weiter unten untersuchen.
Synchrone (blockierende) Kommunikation
- Verhalten: Wenn Sie eine synchrone Kommunikationsmethode aufrufen, wird der Hostprozess aufgerufen bleibt stehen und wartet bis der NCCL-Kernel erfolgreich in den aktuell aktiven CUDA-Stream eingereiht wurde. Sobald die Funktion in die Warteschlange gestellt wird, kehrt sie zurück. Dies ist in der Regel einfach und zuverlässig. Beachten Sie, dass der Host nicht auf den Abschluss der Übertragung wartet, sondern nur darauf, dass der Vorgang in die Warteschlange gestellt wird. Allerdings blockiert es verhindert, dass dieser spezifische Stream zum nächsten Vorgang übergeht bis der NCCL-Kernel vollständig ausgeführt ist.
Asynchrone (nicht blockierende) Kommunikation
- Verhalten: Wenn Sie eine asynchrone Kommunikationsmethode aufrufen, wird der Aufruf zurückgegeben sofortund der Einreihungsvorgang erfolgt im Hintergrund. Es wird nicht in den aktuell aktiven Stream eingereiht, sondern in einen dedizierten internen NCCL-Stream professional Gerät. Dadurch kann Ihre CPU mit anderen Aufgaben fortfahren, eine Technik, die als bekannt ist Überschneidung von Berechnung und Kommunikation. Die asynchrone API ist komplexer, da sie bei unsachgemäßer Verwendung zu undefiniertem Verhalten führen kann
.wait()(unten erklärt) und Daten während der Übertragung ändern. Die Beherrschung ist jedoch der Schlüssel zur Erzielung maximaler Leistung bei groß angelegten verteilten Schulungen.
Punkt-zu-Punkt (Eins-zu-Eins)
Diese Operationen werden nicht berücksichtigt Kollektiveaber sie sind grundlegende Kommunikationsprimitive. Sie erleichtern die direkte Datenübertragung zwischen zwei spezifischen Rängen und sind von grundlegender Bedeutung für Aufgaben, bei denen eine GPU bestimmte Informationen an eine andere senden muss.
- Synchron (Blockierend): Der Hostprozess wartet darauf, dass der Vorgang in den CUDA-Stream eingereiht wird, bevor er fortfährt. Der Kernel wird in den aktuell aktiven Stream eingereiht.
torch.distributed.ship(tensor, dst): Sendet einen Tensor an einen angegebenen Zielrang.torch.distributed.recv(tensor, src): Empfängt einen Tensor von einem Quellrang. Dem empfangenden Tensor muss vorab die richtige Type zugewiesen werdendtype.
- Asynchron (nicht blockierend): Der Hostprozess initiiert den Enqueue-Vorgang und fährt sofort mit anderen Aufgaben fort. Der Kernel wird professional Gerät in einen dedizierten internen NCCL-Stream eingereiht, was eine überlappende Kommunikation mit der Berechnung ermöglicht. Diese Operationen geben a zurück
request(technisch gesehen aWorkObjekt), mit dem der Warteschlangenstatus verfolgt werden kann.request = torch.distributed.isend(tensor, dst): Initiiert einen asynchronen Sendevorgang.request = torch.distributed.irecv(tensor, src): Initiiert einen asynchronen Empfangsvorgang.request.wait(): Blockiert den Host nur, bis der Vorgang erfolgreich conflict eingereiht im CUDA-Stream. Es blockiert jedoch die Ausführung späterer Kernel durch den aktuell aktiven CUDA-Stream, bis dieser bestimmte asynchrone Vorgang abgeschlossen ist.request.wait(timeout): Wenn Sie ein Timeout-Argument angeben, ändert sich das Hostverhalten. Das wird es Blockieren Sie die CPU Thread, bis die NCCL-Arbeit abgeschlossen ist oder eine Zeitüberschreitung auftritt (was eine Ausnahme auslöst). Im Normalfall müssen Benutzer das Timeout nicht festlegen.request.is_completed(): RetourenTrueob der Vorgang erfolgreich conflict eingereiht auf einen CUDA-Stream. Es kann für Umfragen verwendet werden. Es kann nicht garantiert werden, dass die tatsächlichen Daten übertragen wurden.
Wenn PyTorch einen NCCL-Kernel startet, fügt es automatisch einen ein Abhängigkeit (dh erzwingt eine Synchronisierung) zwischen Ihrem aktuell aktiven Stream und dem NCCL-Stream. Das bedeutet, dass der NCCL-Stream erst startet, wenn alle zuvor in die Warteschlange gestellten Arbeiten am aktiven Stream abgeschlossen sind – was garantiert, dass der gesendete Tensor bereits die endgültigen Werte enthält.
Ebenso anrufen req.wait() fügt eine Abhängigkeit in die andere Richtung ein. Alle Arbeiten, die Sie anschließend in den aktuellen Stream einreihen req.wait() wird erst ausgeführt, wenn die Der NCCL-Vorgang ist abgeschlossensodass Sie die empfangenen Tensoren sicher verwenden können.
Große „Fallstricke“ in NCCL
Während ship Und recv Da sie als „synchron“ gekennzeichnet sind, kann ihr Verhalten in NCCL verwirrend sein. Ein synchroner Aufruf eines CUDA-Tensors blockiert den Host-CPU-Thread nur, bis der Datenübertragungskernel in die Warteschlange des Streams eingereiht wird. erst wenn die Datenübertragung abgeschlossen ist. Die CPU kann dann andere Aufgaben in die Warteschlange stellen.
Es gibt eine Ausnahme: die allererstes Ruf an torch.distributed.recv() in einem Prozess ist wirklich blockierend und wartet auf den Abschluss der Übertragung, wahrscheinlich aufgrund des internen NCCL Aufwärmvorgänge. Nachfolgende Aufrufe werden nur blockiert, bis der Vorgang in die Warteschlange gestellt wird.
Betrachten Sie dieses Beispiel, wo rank 1 hängt, weil die CPU versucht, auf einen Tensor zuzugreifen, den die GPU noch nicht erhalten hat:
rank = torch.distributed.get_rank()
if rank == 0:
t = torch.tensor((1,2,3), dtype=torch.float32, system=system)
# torch.distributed.ship(t, dst=1) # No ship operation is carried out
else: # rank == 1 (assuming solely 2 ranks)
t = torch.empty(3, dtype=torch.float32, system=system)
torch.distributed.recv(t, src=0) # Blocks solely till enqueued (after first run)
print("This WILL print if NCCL is warmed-up")
print
print("This can NOT print")
Der CPU-Prozess bei rank 1 bleibt hängen print
Beachten Sie dies, wenn Sie diesen Code mehrmals ausführen
This WILL print if NCCL is warmed-upwird in den späteren Ausführungen nicht gedruckt, da die CPU immer noch feststeckt
Kollektive
Jede Sammeloperationsfunktion unterstützt sowohl synchrone als auch asynchrone Operationen durch async_op Argument. Der Standardwert ist False, was synchrone Vorgänge bedeutet.
One-to-All-Kollektive
Bei diesen Vorgängen sendet ein Rang Daten an alle anderen Ränge in der Gruppe.
Übertragen
torch.distributed.broadcast(tensor, src): Kopiert einen Tensor aus einem einzelnen Quellrang (src) zu allen anderen Rängen. Jeder Prozess führt zu einer identischen Kopie des Tensors. DertensorDer Parameter dient zwei Zwecken: (1) wenn der Rang des Prozesses mit dem übereinstimmtsrcDietensorwerden die Daten gesendet? (2) andernfalls,tensordient der Speicherung der empfangenen Daten.
rank = torch.distributed.get_rank()
if rank == 0: # supply rank
tensor = torch.tensor((1,2,3), dtype=torch.int64, system=system)
else: # vacation spot ranks
tensor = torch.empty(3, dtype=torch.int64, system=system)
torch.distributed.broadcast(tensor, src=0)

Streuen
torch.distributed.scatter(tensor, scatter_list, src): Verteilt Datenblöcke von einem Quellrang über alle Ränge. Derscatter_listauf dem Quellrang enthält mehrere Tensoren, und jeder Rang (einschließlich der Quelle) erhält einen Tensor aus dieser Liste in seinentensorVariable. Die Zielränge gehen einfach vorbeiNonefür diescatter_list.
# The scatter_list should be None for all non-source ranks.
scatter_list = None if rank != 0 else (torch.tensor((i, i+1)).to(system) for i in vary(0,4,2))
tensor = torch.empty(2, dtype=torch.int64).to(system)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} obtained: {tensor}')

All-to-One-Kollektive
Diese Vorgänge sammeln Daten von allen Rängen und konsolidieren sie auf einem einzigen Zielrang.
Reduzieren
torch.distributed.cut back(tensor, dst, op): Nimmt einen Tensor von jedem Rang und wendet eine Reduktionsoperation an (wieSUM,MAX,MIN) und speichert das Endergebnis auf dem Zielrang (dst) nur.
rank = torch.distributed.get_rank()
tensor = torch.tensor((rank+1, rank+2, rank+3), system=system)
torch.distributed.cut back(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)

Versammeln
torch.distributed.collect(tensor, gather_list, dst): Sammelt einen Tensor von jedem Rang in einer Liste von Tensoren auf dem Zielrang. Dergather_listmuss eine Liste von Tensoren (richtige Größe und Typ) am Ziel und seinNoneüberall sonst.
# The gather_list should be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else (torch.zeros(3, dtype=torch.int64).to(system) for _ in vary(world_size))
t = torch.tensor((0+rank, 1+rank, 2+rank), dtype=torch.int64).to(system)
torch.distributed.collect(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')
Die Variable world_size ist die Gesamtzahl der Ränge. Es kann mit erhalten werden torch.distributed.get_world_size(). Aber machen Sie sich zunächst keine Gedanken über die Implementierungsdetails. Das Wichtigste ist, die Konzepte zu verstehen.

All-to-All-Kollektive
Bei diesen Operationen sendet und empfängt jeder Rang Daten von allen anderen Rängen.
Alles reduzieren
torch.distributed.all_reduce(tensor, op): Das Gleiche wiecut backaber das Ergebnis wird gespeichert jederRang statt nur ein Ziel.
# Instance for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor((rank+1, rank+2, rank+3), dtype=torch.float32, system=system)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")

Alle versammeln sich
torch.distributed.all_gather(tensor_list, tensor): Das Gleiche wiecollectaber die gesammelte Liste der Tensoren ist auf verfügbar jeder Rang.
# Instance for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor((rank), dtype=torch.float32, system=system)
tensor_list = (torch.empty(1, dtype=torch.float32, system=system) for _ in vary(world_size))
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {(t.merchandise() for t in tensor_list)}")

Streuung reduzieren
torch.distributed.reduce_scatter(output, input_list): Äquivalent zur Durchführung einer Reduzierungsoperation für eine Liste von Tensoren und der anschließenden Streuung der Ergebnisse. Jeder Rang erhält einen anderen Teil der reduzierten Leistung.
# Instance for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = (torch.tensor((rank + i), dtype=torch.float32, system=system) for i in vary(world_size))
output = torch.empty(1, dtype=torch.float32, system=system)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} obtained lowered worth: {output.merchandise()}")

Synchronisation
Die beiden am häufigsten verwendeten Operationen sind request.wait() Und torch.cuda.synchronize(). Es ist wichtig, den Unterschied zwischen diesen beiden zu verstehen:
request.wait(): Dies wird für asynchrone Vorgänge verwendet. Es synchronisiert den aktuell aktiven CUDA-Stream für diesen Vorgang und stellt sicher, dass der Stream auf den Abschluss der Kommunikation wartet, bevor er fortfährt. Mit anderen Worten: Es blockiert den aktuell aktiven CUDA-Stream, bis die Datenübertragung abgeschlossen ist. Auf der Host-Seite bewirkt es nur, dass der Host wartet, bis der Kernel in die Warteschlange gestellt wird; der Gastgeber tut es nicht Warten Sie, bis die Datenübertragung abgeschlossen ist.torch.cuda.synchronize(): Dies ist ein energischerer Befehl, der den Host-CPU-Thread bis pausiert alle Zuvor auf der GPU eingereihte Aufgaben sind abgeschlossen. Es stellt sicher, dass die GPU vollständig im Leerlauf ist, bevor die CPU weitermacht, kann jedoch bei unsachgemäßer Verwendung zu Leistungsengpässen führen. Wann immer Sie Benchmark-Messungen durchführen müssen, sollten Sie diese verwenden, um sicherzustellen, dass Sie genau den Second erfassen, in dem die GPUs fertig sind.
Abschluss
Herzlichen Glückwunsch, dass Sie es bis zum Ende geschafft haben! In diesem Beitrag haben Sie Folgendes erfahren:
- Punkt-zu-Punkt-Operationen
- Synchronisierung und Asynchronisierung in NCCL
- Kollektive Operationen
- Synchronisationsmethoden
Im nächsten Blogbeitrag befassen wir uns mit PCIe, NVLink und anderen Mechanismen, die die Kommunikation in einer verteilten Umgebung ermöglichen!
