ist Teil einer Serie über verteilte KI über mehrere GPUs:

  • Teil 1: Das Host- und Geräteparadigma verstehen
  • Teil 2: Punkt-zu-Punkt- und Sammeloperationen (dieser Artikel)
  • Teil 3: Wie GPUs kommunizieren (kommt bald)
  • Teil 4: Gradientenakkumulation und verteilte Datenparallelität (DDP) (kommt bald)
  • Teil 5: ZeRO (kommt bald)
  • Teil 6: Tensorparallelität (kommt bald)

Einführung

Im vorherigen Beitrag haben wir das Host-Gerät-Paradigma etabliert und das Konzept der Ränge für Multi-GPU-Workloads eingeführt. Jetzt untersuchen wir die spezifischen Kommunikationsmuster, die PyTorch bietet torch.distributed Modul zur Koordinierung der Arbeit und zum Datenaustausch zwischen diesen Ebenen. Diese Operationen, bekannt als Kollektivesind die Bausteine ​​verteilter Workloads.

Obwohl PyTorch diese Vorgänge offenlegt, ruft es letztendlich ein Backend-Framework auf, das die Kommunikation tatsächlich implementiert. Für NVIDIA-GPUs ist es so NCCL (NVIDIA Collective Communications Library), während es sich bei AMD um RCCL (ROCm Communication Collectives Library) handelt.

NCCL implementiert Multi-GPU- und Multi-Node-Kommunikationsprimitive, die für NVIDIA-GPUs und Netzwerke optimiert sind. Es erkennt automatisch die aktuelle Topologie (Kommunikationskanäle wie PCIe, NVLink, InfiniBand) und wählt die effizienteste aus.

Haftungsausschluss 1: Da NVIDIA-GPUs am häufigsten vorkommen, konzentrieren wir uns auf die NCCL Backend für diesen Beitrag.

Haftungsausschluss 2: Der Kürze halber stellt der unten dargestellte Code nur die Hauptargumente jeder Methode anstelle aller verfügbaren Argumente bereit.

Haftungsausschluss 3: Der Einfachheit halber zeigen wir nicht die Speicherfreigabe von Tensoren, sondern Operationen wie scatter wird den Speicher des Quellrangs nicht automatisch freigeben (wenn Sie nicht verstehen, was ich meine, ist das in Ordnung, es wird sehr bald klar werden).

Kommunikation: Blockieren vs. Nicht-Blockieren

Um zusammenzuarbeiten, müssen GPUs Daten austauschen. Die CPU initiiert die Kommunikation, indem sie NCCL-Kernel in CUDA-Streams einreiht (wenn Sie nicht wissen, was CUDA-Streams sind, schauen Sie sich die an erster Blogbeitrag dieser Serie), aber die eigentliche Datenübertragung erfolgt direkt zwischen GPUs über die Verbindung, wobei der Hauptspeicher der CPU vollständig umgangen wird. Idealerweise sind die GPUs mit einer Hochgeschwindigkeitsverbindung wie NVLink oder InfiniBand verbunden (diese Verbindungen werden im behandelt). Dritter Beitrag dieser Serie).

Diese Kommunikation kann synchron (blockierend) oder asynchron (nicht blockierend) sein, was wir weiter unten untersuchen.

Synchrone (blockierende) Kommunikation

  • Verhalten: Wenn Sie eine synchrone Kommunikationsmethode aufrufen, wird der Hostprozess aufgerufen bleibt stehen und wartet bis der NCCL-Kernel erfolgreich in den aktuell aktiven CUDA-Stream eingereiht wurde. Sobald die Funktion in die Warteschlange gestellt wird, kehrt sie zurück. Dies ist in der Regel einfach und zuverlässig. Beachten Sie, dass der Host nicht auf den Abschluss der Übertragung wartet, sondern nur darauf, dass der Vorgang in die Warteschlange gestellt wird. Allerdings blockiert es verhindert, dass dieser spezifische Stream zum nächsten Vorgang übergeht bis der NCCL-Kernel vollständig ausgeführt ist.

Asynchrone (nicht blockierende) Kommunikation

  • Verhalten: Wenn Sie eine asynchrone Kommunikationsmethode aufrufen, wird der Aufruf zurückgegeben sofortund der Einreihungsvorgang erfolgt im Hintergrund. Es wird nicht in den aktuell aktiven Stream eingereiht, sondern in einen dedizierten internen NCCL-Stream professional Gerät. Dadurch kann Ihre CPU mit anderen Aufgaben fortfahren, eine Technik, die als bekannt ist Überschneidung von Berechnung und Kommunikation. Die asynchrone API ist komplexer, da sie bei unsachgemäßer Verwendung zu undefiniertem Verhalten führen kann .wait() (unten erklärt) und Daten während der Übertragung ändern. Die Beherrschung ist jedoch der Schlüssel zur Erzielung maximaler Leistung bei groß angelegten verteilten Schulungen.

Punkt-zu-Punkt (Eins-zu-Eins)

Diese Operationen werden nicht berücksichtigt Kollektiveaber sie sind grundlegende Kommunikationsprimitive. Sie erleichtern die direkte Datenübertragung zwischen zwei spezifischen Rängen und sind von grundlegender Bedeutung für Aufgaben, bei denen eine GPU bestimmte Informationen an eine andere senden muss.

  • Synchron (Blockierend): Der Hostprozess wartet darauf, dass der Vorgang in den CUDA-Stream eingereiht wird, bevor er fortfährt. Der Kernel wird in den aktuell aktiven Stream eingereiht.
    • torch.distributed.ship(tensor, dst): Sendet einen Tensor an einen angegebenen Zielrang.
    • torch.distributed.recv(tensor, src): Empfängt einen Tensor von einem Quellrang. Dem empfangenden Tensor muss vorab die richtige Type zugewiesen werden dtype.
  • Asynchron (nicht blockierend): Der Hostprozess initiiert den Enqueue-Vorgang und fährt sofort mit anderen Aufgaben fort. Der Kernel wird professional Gerät in einen dedizierten internen NCCL-Stream eingereiht, was eine überlappende Kommunikation mit der Berechnung ermöglicht. Diese Operationen geben a zurück request(technisch gesehen a Work Objekt), mit dem der Warteschlangenstatus verfolgt werden kann.
    • request = torch.distributed.isend(tensor, dst): Initiiert einen asynchronen Sendevorgang.
    • request = torch.distributed.irecv(tensor, src): Initiiert einen asynchronen Empfangsvorgang.
    • request.wait(): Blockiert den Host nur, bis der Vorgang erfolgreich conflict eingereiht im CUDA-Stream. Es blockiert jedoch die Ausführung späterer Kernel durch den aktuell aktiven CUDA-Stream, bis dieser bestimmte asynchrone Vorgang abgeschlossen ist.
    • request.wait(timeout): Wenn Sie ein Timeout-Argument angeben, ändert sich das Hostverhalten. Das wird es Blockieren Sie die CPU Thread, bis die NCCL-Arbeit abgeschlossen ist oder eine Zeitüberschreitung auftritt (was eine Ausnahme auslöst). Im Normalfall müssen Benutzer das Timeout nicht festlegen.
    • request.is_completed(): Retouren True ob der Vorgang erfolgreich conflict eingereiht auf einen CUDA-Stream. Es kann für Umfragen verwendet werden. Es kann nicht garantiert werden, dass die tatsächlichen Daten übertragen wurden.

Wenn PyTorch einen NCCL-Kernel startet, fügt es automatisch einen ein Abhängigkeit (dh erzwingt eine Synchronisierung) zwischen Ihrem aktuell aktiven Stream und dem NCCL-Stream. Das bedeutet, dass der NCCL-Stream erst startet, wenn alle zuvor in die Warteschlange gestellten Arbeiten am aktiven Stream abgeschlossen sind – was garantiert, dass der gesendete Tensor bereits die endgültigen Werte enthält.

Ebenso anrufen req.wait() fügt eine Abhängigkeit in die andere Richtung ein. Alle Arbeiten, die Sie anschließend in den aktuellen Stream einreihen req.wait() wird erst ausgeführt, wenn die Der NCCL-Vorgang ist abgeschlossensodass Sie die empfangenen Tensoren sicher verwenden können.

Große „Fallstricke“ in NCCL

Während ship Und recv Da sie als „synchron“ gekennzeichnet sind, kann ihr Verhalten in NCCL verwirrend sein. Ein synchroner Aufruf eines CUDA-Tensors blockiert den Host-CPU-Thread nur, bis der Datenübertragungskernel in die Warteschlange des Streams eingereiht wird. erst wenn die Datenübertragung abgeschlossen ist. Die CPU kann dann andere Aufgaben in die Warteschlange stellen.

Es gibt eine Ausnahme: die allererstes Ruf an torch.distributed.recv() in einem Prozess ist wirklich blockierend und wartet auf den Abschluss der Übertragung, wahrscheinlich aufgrund des internen NCCL Aufwärmvorgänge. Nachfolgende Aufrufe werden nur blockiert, bis der Vorgang in die Warteschlange gestellt wird.

Betrachten Sie dieses Beispiel, wo rank 1 hängt, weil die CPU versucht, auf einen Tensor zuzugreifen, den die GPU noch nicht erhalten hat:

rank = torch.distributed.get_rank()
if rank == 0:
   t = torch.tensor((1,2,3), dtype=torch.float32, system=system)
   # torch.distributed.ship(t, dst=1) # No ship operation is carried out
else: # rank == 1 (assuming solely 2 ranks)
   t = torch.empty(3, dtype=torch.float32, system=system)
   torch.distributed.recv(t, src=0) # Blocks solely till enqueued (after first run)
   print("This WILL print if NCCL is warmed-up")
   print
   print("This can NOT print")

Der CPU-Prozess bei rank 1 bleibt hängen print

Beachten Sie dies, wenn Sie diesen Code mehrmals ausführen This WILL print if NCCL is warmed-up wird in den späteren Ausführungen nicht gedruckt, da die CPU immer noch feststeckt print

Kollektive

Jede Sammeloperationsfunktion unterstützt sowohl synchrone als auch asynchrone Operationen durch async_op Argument. Der Standardwert ist False, was synchrone Vorgänge bedeutet.

One-to-All-Kollektive

Bei diesen Vorgängen sendet ein Rang Daten an alle anderen Ränge in der Gruppe.

Übertragen

  • torch.distributed.broadcast(tensor, src): Kopiert einen Tensor aus einem einzelnen Quellrang (src) zu allen anderen Rängen. Jeder Prozess führt zu einer identischen Kopie des Tensors. Der tensor Der Parameter dient zwei Zwecken: (1) wenn der Rang des Prozesses mit dem übereinstimmt srcDie tensor werden die Daten gesendet? (2) andernfalls, tensor dient der Speicherung der empfangenen Daten.
rank = torch.distributed.get_rank()
if rank == 0: # supply rank
  tensor = torch.tensor((1,2,3), dtype=torch.int64, system=system)
else: # vacation spot ranks
  tensor = torch.empty(3, dtype=torch.int64, system=system)
torch.distributed.broadcast(tensor, src=0)
Bild des Autors: Visuelle Animation übertragen

Streuen

  • torch.distributed.scatter(tensor, scatter_list, src): Verteilt Datenblöcke von einem Quellrang über alle Ränge. Der scatter_list auf dem Quellrang enthält mehrere Tensoren, und jeder Rang (einschließlich der Quelle) erhält einen Tensor aus dieser Liste in seinen tensor Variable. Die Zielränge gehen einfach vorbei None für die scatter_list.
# The scatter_list should be None for all non-source ranks.
scatter_list = None if rank != 0 else (torch.tensor((i, i+1)).to(system) for i in vary(0,4,2))
tensor = torch.empty(2, dtype=torch.int64).to(system)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} obtained: {tensor}')
Bild des Autors: Visuelle Streuanimation

All-to-One-Kollektive

Diese Vorgänge sammeln Daten von allen Rängen und konsolidieren sie auf einem einzigen Zielrang.

Reduzieren

  • torch.distributed.cut back(tensor, dst, op): Nimmt einen Tensor von jedem Rang und wendet eine Reduktionsoperation an (wie SUM, MAX, MIN) und speichert das Endergebnis auf dem Zielrang (dst) nur.
rank = torch.distributed.get_rank()
tensor = torch.tensor((rank+1, rank+2, rank+3), system=system)
torch.distributed.cut back(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)
Bild des Autors: Reduzieren Sie visuelle Animationen

Versammeln

  • torch.distributed.collect(tensor, gather_list, dst): Sammelt einen Tensor von jedem Rang in einer Liste von Tensoren auf dem Zielrang. Der gather_list muss eine Liste von Tensoren (richtige Größe und Typ) am Ziel und sein None überall sonst.
# The gather_list should be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else (torch.zeros(3, dtype=torch.int64).to(system) for _ in vary(world_size))
t = torch.tensor((0+rank, 1+rank, 2+rank), dtype=torch.int64).to(system)
torch.distributed.collect(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')

Die Variable world_size ist die Gesamtzahl der Ränge. Es kann mit erhalten werden torch.distributed.get_world_size(). Aber machen Sie sich zunächst keine Gedanken über die Implementierungsdetails. Das Wichtigste ist, die Konzepte zu verstehen.

Bild des Autors: Sammeln Sie visuelle Animationen

All-to-All-Kollektive

Bei diesen Operationen sendet und empfängt jeder Rang Daten von allen anderen Rängen.

Alles reduzieren

  • torch.distributed.all_reduce(tensor, op): Das Gleiche wie cut backaber das Ergebnis wird gespeichert jederRang statt nur ein Ziel.
# Instance for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor((rank+1, rank+2, rank+3), dtype=torch.float32, system=system)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")
Bild des Autors: Alle visuellen Animationen reduzieren

Alle versammeln sich

  • torch.distributed.all_gather(tensor_list, tensor): Das Gleiche wie collectaber die gesammelte Liste der Tensoren ist auf verfügbar jeder Rang.
# Instance for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor((rank), dtype=torch.float32, system=system)
tensor_list = (torch.empty(1, dtype=torch.float32, system=system) for _ in vary(world_size))
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {(t.merchandise() for t in tensor_list)}")
Bild des Autors: All Collect visuelle Animation

Streuung reduzieren

  • torch.distributed.reduce_scatter(output, input_list): Äquivalent zur Durchführung einer Reduzierungsoperation für eine Liste von Tensoren und der anschließenden Streuung der Ergebnisse. Jeder Rang erhält einen anderen Teil der reduzierten Leistung.
# Instance for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = (torch.tensor((rank + i), dtype=torch.float32, system=system) for i in vary(world_size))
output = torch.empty(1, dtype=torch.float32, system=system)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} obtained lowered worth: {output.merchandise()}")
Bild des Autors: Visuelle Animation „Scatter reduzieren“.

Synchronisation

Die beiden am häufigsten verwendeten Operationen sind request.wait() Und torch.cuda.synchronize(). Es ist wichtig, den Unterschied zwischen diesen beiden zu verstehen:

  • request.wait(): Dies wird für asynchrone Vorgänge verwendet. Es synchronisiert den aktuell aktiven CUDA-Stream für diesen Vorgang und stellt sicher, dass der Stream auf den Abschluss der Kommunikation wartet, bevor er fortfährt. Mit anderen Worten: Es blockiert den aktuell aktiven CUDA-Stream, bis die Datenübertragung abgeschlossen ist. Auf der Host-Seite bewirkt es nur, dass der Host wartet, bis der Kernel in die Warteschlange gestellt wird; der Gastgeber tut es nicht Warten Sie, bis die Datenübertragung abgeschlossen ist.
  • torch.cuda.synchronize(): Dies ist ein energischerer Befehl, der den Host-CPU-Thread bis pausiert alle Zuvor auf der GPU eingereihte Aufgaben sind abgeschlossen. Es stellt sicher, dass die GPU vollständig im Leerlauf ist, bevor die CPU weitermacht, kann jedoch bei unsachgemäßer Verwendung zu Leistungsengpässen führen. Wann immer Sie Benchmark-Messungen durchführen müssen, sollten Sie diese verwenden, um sicherzustellen, dass Sie genau den Second erfassen, in dem die GPUs fertig sind.

Abschluss

Herzlichen Glückwunsch, dass Sie es bis zum Ende geschafft haben! In diesem Beitrag haben Sie Folgendes erfahren:

  • Punkt-zu-Punkt-Operationen
  • Synchronisierung und Asynchronisierung in NCCL
  • Kollektive Operationen
  • Synchronisationsmethoden

Im nächsten Blogbeitrag befassen wir uns mit PCIe, NVLink und anderen Mechanismen, die die Kommunikation in einer verteilten Umgebung ermöglichen!

Referenzen

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert