Dies ist der erste Teil einer zweiteiligen Reihe über verteiltes Rechnen mit Ray. Dieser Teil zeigt, wie Sie Ray auf Ihrem lokalen PC verwenden, und Teil 2 zeigt, wie Sie Ray auf Multi-Server-Cluster in der Cloud skalieren.
Sie haben sich einen neuen 16-Core-Laptop computer oder -Desktop zugelegt und möchten dessen Leistungsfähigkeit mit einigen umfangreichen Berechnungen testen.
Sie sind ein Python-Programmierer, aber noch kein Experte, additionally öffnen Sie Ihr Lieblings-LLM und fragen ihn etwa so.
„Ich möchte die Anzahl der Primzahlen innerhalb eines bestimmten Eingabebereichs zählen. Bitte geben Sie mir dafür etwas Python-Code.“
Nach einigen Sekunden erhalten Sie vom LLM Code. Sie können es durch ein kurzes Hin und Her ein wenig optimieren, und am Ende erhalten Sie so etwas:
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Begin "chunky"; we are able to sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
outcomes = ()
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
outcomes.append(count_primes(s, e))
whole = sum(outcomes)
print(f"whole={whole}, time={time.time() - t0:.2f}s")
Sie führen das Programm aus und es funktioniert perfekt. Das einzige Drawback besteht darin, dass die Ausführung ziemlich lange dauert, vielleicht dreißig bis sechzig Sekunden, abhängig von der Größe Ihres Eingabebereichs. Das ist wahrscheinlich inakzeptabel.
Was tust du jetzt? Sie haben mehrere Möglichkeiten, wobei die drei häufigsten wahrscheinlich sind:
– Parallelisieren Sie den Code mithilfe von Threads oder Multi-Processing
– Schreiben Sie den Code in einer „schnellen“ Sprache wie C oder Rust neu
– Probieren Sie eine Bibliothek wie Cython, Numba oder NumPy aus
Dies sind alles praktikable Optionen, aber jede hat Nachteile. Die Optionen 1 und 3 erhöhen die Komplexität Ihres Codes erheblich, und die mittlere Choice erfordert möglicherweise das Erlernen einer neuen Programmiersprache.
Was wäre, wenn ich dir sagen würde, dass es auch anders geht? Eine Lösung, bei der die erforderlichen Änderungen an Ihrem vorhandenen Code auf ein absolutes Minimal beschränkt werden. Eines, bei dem Ihre Laufzeit automatisch auf alle verfügbaren Kerne verteilt wird.
Genau das ist es, was der Dritte tut Ray Die Bibliothek verspricht es.
Was ist Ray?
Die Ray Python-Bibliothek ist eine Open-Supply verteiltes Laptop-Framework entworfen, um es zu schaffen leicht zu skalieren Python-Programme von einem Laptop computer zu einem Cluster mit minimalen Codeänderungen.
Ray erleichtert die Skalierung und Verteilung rechenintensiver Anwendungs-Workloads – von Deep Studying bis hin zur Datenverarbeitung – über Cluster von Distant-Computern und sorgt gleichzeitig für praktische Verbesserungen der Anwendungslaufzeit auf Ihrem Laptop computer, Desktop oder sogar einem Distant-Cloud-basierten Computing-Cluster.
Ray bietet einen umfangreichen Satz an Bibliotheken und Integrationen, die auf einem flexiblen verteilten Ausführungsframework basieren und verteiltes Computing einfach und für alle zugänglich machen.
Kurz gesagt: Mit Ray können Sie Ihren Python-Code mit minimalem Aufwand parallelisieren und verteilen, unabhängig davon, ob er lokal auf einem Laptop computer oder in einem riesigen cloudbasierten Cluster ausgeführt wird.
Mit Ray
Im Relaxation dieses Artikels werde ich Sie durch die Grundlagen der Verwendung von Ray zur Beschleunigung von CPU-intensivem Python-Code führen und einige Beispielcode-Snippets einrichten, um Ihnen zu zeigen, wie einfach es ist, die Leistungsfähigkeit von Ray in Ihre eigenen Workloads zu integrieren.
Um Ray optimum nutzen zu können, müssen Sie als Datenwissenschaftler oder Ingenieur für maschinelles Lernen zunächst einige Schlüsselkonzepte verstehen. Ray besteht aus mehreren Komponenten.
Ray-Daten ist eine skalierbare Bibliothek, die für die Datenverarbeitung in ML- und KI-Aufgaben entwickelt wurde. Es bietet versatile, leistungsstarke APIs für KI-Aufgaben, einschließlich Batch-Inferenz, Datenvorverarbeitung und Datenaufnahme für ML-Coaching.
Ray Prepare ist eine versatile, skalierbare Bibliothek, die für verteiltes maschinelles Lernen und die Feinabstimmung entwickelt wurde.
Ray Tune wird für das Hyperparameter-Tuning verwendet.
Ray Serve ist eine skalierbare Bibliothek zum Bereitstellen von Modellen zur Erleichterung von On-line-Inferenz-APIs.
Ray RLlib wird für skalierbares Verstärkungslernen verwendet
Wie Sie sehen, konzentriert sich Ray stark auf große Sprachmodelle und KI-Anwendungen, aber es gibt noch eine letzte wichtige Komponente, die ich noch nicht erwähnt habe und die ich in diesem Artikel verwenden werde.
Ray Core ist für die Skalierung CPU-intensiver Allzweck-Python-Anwendungen konzipiert. Es wurde entwickelt, um Ihre Python-Arbeitslast auf alle verfügbaren Kerne auf dem System zu verteilen, auf dem Sie es ausführen.
In diesem Artikel geht es ausschließlich um Ray Core.
Zwei wesentliche Konzepte, die es in Ray Core zu verstehen gilt, sind: Aufgaben Und Schauspieler.
Aufgaben sind staatenlos Employee oder Dienste, die mit Ray implementiert werden, indem reguläre Python-Funktionen dekoriert werden.
Schauspieler (bzw zustandsbehaftet Staff) werden beispielsweise verwendet, wenn Sie den Standing abhängiger Variablen in Ihrem verteilten Cluster verfolgen und verwalten müssen. Akteure werden durch die Dekoration von regulärem Python implementiert Klassen.
Sowohl Akteure als auch Aufgaben werden mit demselben definiert @ray.Fernbedienung Dekorateur. Einmal definiert, werden diese Aufgaben mit dem Particular ausgeführt .Fernbedienung() Methode von Ray. Als nächstes schauen wir uns ein Beispiel dafür an.
Einrichten einer Entwicklungsumgebung
Bevor wir mit dem Codieren beginnen, sollten wir eine Entwicklungsumgebung einrichten, um unsere Projekte isoliert zu halten, damit sie sich nicht gegenseitig beeinträchtigen. Ich werde dafür Conda verwenden, Sie können aber auch das von Ihnen bevorzugte Software verwenden. Ich werde meinen Code mit einem Jupyter-Pocket book in einer WSL2-Ubuntu-Shell unter Home windows ausführen.
$ conda create -n ray-test python=3.13 -y
$ conda activate ray-test
(ray-test) $ conda set up ray(default)
Codebeispiel – Primzahlen zählen
Schauen wir uns noch einmal das Beispiel an, das ich am Anfang gegeben habe: Zählen der Anzahl der Primzahlen im Intervall 10.000.000 bis 20.000.000.
Wir führen unseren ursprünglichen Python-Code aus und messen, wie lange es dauert.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Begin "chunky"; we are able to sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
outcomes = ()
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
outcomes.append(count_primes(s, e))
whole = sum(outcomes)
print(f"whole={whole}, time={time.time() - t0:.2f}s")
Und die Ausgabe?
CPUs~32, chunks=64
whole=606028, time=31.17s
Können wir das nun mit Ray verbessern? Ja, indem Sie diesem einfachen 4-Schritte-Prozess folgen.
Schritt 1 – Ray initialisieren. Fügen Sie diese beiden Zeilen am Anfang Ihres Codes hinzu.
import ray
ray.init()
Schritt 2 – Erstellen Sie unsere Distant-Funktion. Das ist einfach. Dekorieren Sie einfach die Funktion, die wir optimieren möchten, mit dem @ray.remote-Dekorator. Die zu dekorierende Funktion ist diejenige, die die meiste Arbeit leistet. In unserem Beispiel ist das die Funktion count_primes.
@ray.distant(num_cpus=1)
def count_primes(begin: int, finish: int) -> int:
...
...
Schritt 3 – Starten Sie die parallelen Aufgaben. Rufen Sie Ihre Distant-Funktion mit auf .Fernbedienung Ray-Anweisung.
refs.append(count_primes.distant(s, e))
Schritt 4 – Warten Sie, bis alle unsere Aufgaben abgeschlossen sind. Jede Aufgabe in Ray gibt eine zurück ObjektRef wenn es aufgerufen wurde. Das ist ein Versprechen von Ray. Das bedeutet, dass Ray die Aufgabe aus der Ferne gestartet hat und Ray ihren Wert irgendwann in der Zukunft zurückgeben wird. Wir überwachen alle ObjectRefs, die von der Ausführung von Aufgaben zurückgegeben werden ray.get() Funktion. Dies blockiert, bis alle Aufgaben abgeschlossen sind.
outcomes = ray.get(duties)
Lassen Sie uns das alles zusammenfassen. Wie Sie sehen werden, sind die Änderungen an unserem ursprünglichen Code minimal – es wurden lediglich vier Codezeilen hinzugefügt und eine print-Anweisung zur Anzeige der Anzahl der Knoten und Kerne, auf denen wir ausgeführt werden.
import math
import time
# -----------------------------------------
# Change No. 1
# -----------------------------------------
import ray
ray.init(auto)
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
# -----------------------------------------
# Change No. 2
# -----------------------------------------
@ray.distant(num_cpus=1) # pure-Python loop → 1 CPU per job
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 60_000_000
total_cpus = int(ray.cluster_resources().get("CPU", 1))
# Begin "chunky"; we are able to sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
refs = ()
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
# -----------------------------------------
# Change No. 3
# -----------------------------------------
refs.append(count_primes.distant(s, e))
# -----------------------------------------
# Change No. 4
# -----------------------------------------
whole = sum(ray.get(refs))
print(f"whole={whole}, time={time.time() - t0:.2f}s")
Hat sich das alles gelohnt? Lassen Sie uns den neuen Code ausführen und sehen, was wir bekommen.
2025-11-01 13:36:30,650 INFO employee.py:2004 -- Began a neighborhood Ray occasion. View the dashboard at 127.0.0.1:8265
/house/tom/.native/lib/python3.10/site-packages/ray/_private/employee.py:2052: FutureWarning: Tip: In future variations of Ray, Ray will now not override accelerator seen units env var if num_gpus=0 or num_gpus=None (default). To allow this habits and switch off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
nodes=1, CPUs~32, chunks=64
whole=606028, time=3.04s
Nun, das Ergebnis spricht für sich. Der Ray Python-Code ist 10x schneller als der reguläre Python-Code. Nicht zu schäbig.
Woher kommt dieser Geschwindigkeitszuwachs? Nun, Ray kann Ihre Arbeitslast auf alle Kerne Ihres Methods verteilen. Ein Kern ist wie eine Mini-CPU. Als wir unseren ursprünglichen Python-Code ausführten, verwendete er nur einen Kern. Das ist in Ordnung, aber wenn Ihre CPU mehr als einen Kern hat, was bei den meisten modernen PCs der Fall ist, dann lassen Sie sozusagen Geld auf dem Tisch.
In meinem Fall verfügt die CPU über 24 Kerne, daher ist es nicht verwunderlich, dass mein Ray-Code viel schneller conflict als der Nicht-Ray-Code.
Überwachen von Ray-Jobs
Ein weiterer erwähnenswerter Punkt ist, dass Ray die Überwachung von Jobausführungen über ein Dashboard sehr einfach macht. Beachten Sie, dass wir in der Ausgabe, die wir beim Ausführen unseres Ray-Beispielcodes erhalten haben, Folgendes gesehen haben:
... -- Began a neighborhood Ray occasion. View the dashboard at 127.0.0.1:8265
Es wird ein lokaler URL-Hyperlink angezeigt, da ich diesen auf meinem Desktop ausführe. Wenn Sie dies auf einem Cluster ausführen würden, würde die URL auf einen Speicherort auf dem Cluster-Hauptknoten verweisen.
Wenn Sie auf den angegebenen URL-Hyperlink klicken, sollten Sie etwas Ähnliches sehen:

Von diesem Hauptbildschirm aus können Sie über die Menülinks oben auf der Seite einen Drilldown durchführen, um viele Aspekte Ihrer Ray-Programme zu überwachen.
Verwendung von Ray-Schauspielern
Ich habe bereits erwähnt, dass Schauspieler ein integraler Bestandteil der Ray-Kernverarbeitung sind. Akteure werden verwendet, um Daten zwischen Ray-Aufgaben zu koordinieren und auszutauschen. Angenommen, Sie möchten ein globales Restrict für ALLE laufenden Aufgaben festlegen, das eingehalten werden muss. Nehmen wir an, Sie verfügen über einen Pool von Employee-Aufgaben, möchten aber sicherstellen, dass nur maximal fünf dieser Aufgaben gleichzeitig ausgeführt werden können. Hier ist ein Code, von dem Sie denken könnten, dass er funktionieren würde.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Begin "chunky"; we are able to sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
outcomes = ()
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
outcomes.append(count_primes(s, e))
whole = sum(outcomes)
print(f"whole={whole}, time={time.time() - t0:.2f}s")
Wir haben eine globale Variable verwendet, um die Anzahl der ausgeführten Aufgaben zu begrenzen, und der Code ist syntaktisch korrekt und läuft fehlerfrei. Leider erhalten Sie nicht das erwartete Ergebnis. Das liegt daran, dass jede Ray-Aufgabe in ihrem eigenen Prozessraum ausgeführt wird und über eine eigene Kopie der globalen Variablen verfügt. Die globale Variable wird NICHT von Funktionen gemeinsam genutzt. Wenn wir additionally den obigen Code ausführen, sehen wir eine Ausgabe wie diese:
Whole calls: 200
Supposed GLOBAL_QPS: 5.0
Anticipated time if really global-limited: ~40.00s
Precise time with 'international var' (damaged): 3.80s
Noticed cluster QPS: ~52.6 (ought to have been ~5.0)
Um dies zu beheben, verwenden wir einen Schauspieler. Denken Sie daran, dass ein Schauspieler nur eine mit Ray dekorierte Python-Klasse ist. Hier ist der Code mit Schauspielern.
import time, ray
ray.init(ignore_reinit_error=True, log_to_driver=False)
# That is our actor
@ray.distant
class GlobalPacer:
"""Serialize calls so cluster-wide price <= qps."""
def __init__(self, qps: float):
self.interval = 1.0 / qps
self.next_time = time.time()
def purchase(self):
# Wait contained in the actor till we are able to proceed
now = time.time()
if now < self.next_time:
time.sleep(self.next_time - now)
# Reserve the following slot; guard in opposition to drift
self.next_time = max(self.next_time + self.interval, time.time())
return True
@ray.distant
def call_api_with_limit(n_calls: int, pacer):
performed = 0
for _ in vary(n_calls):
# Look forward to international permission
ray.get(pacer.purchase.distant())
# fake API name (no further sleep right here)
performed += 1
return performed
if __name__ == "__main__":
NUM_WORKERS = 10
CALLS_EACH = 20
GLOBAL_QPS = 5.0 # cluster-wide cap
total_calls = NUM_WORKERS * CALLS_EACH
expected_min_time = total_calls / GLOBAL_QPS
pacer = GlobalPacer.distant(GLOBAL_QPS)
t0 = time.time()
ray.get((call_api_with_limit.distant(CALLS_EACH, pacer) for _ in vary(NUM_WORKERS)))
dt = time.time() - t0
print(f"Whole calls: {total_calls}")
print(f"World QPS cap: {GLOBAL_QPS}")
print(f"Anticipated time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s")
print(f"Precise time with actor: {dt:.2f}s")
print(f"Noticed cluster QPS: ~{total_calls/dt:.1f}")
Unser Limiter-Code ist in einer Klasse (GlobalPacer) gekapselt und mit ray.distant versehen, was bedeutet, dass er für alle laufenden Aufgaben gilt. Wir können den Unterschied, den dies für die Ausgabe macht, erkennen, indem wir den aktualisierten Code ausführen.
Whole calls: 200
World QPS cap: 5.0
Anticipated time (if capped at 5.0 QPS): ~40.00s
Precise time with actor: 39.86s
Noticed cluster QPS: ~5.0
Zusammenfassung
Dieser Artikel wurde vorgestellt Rayein Open-Supply-Python-Framework, das es einfach macht rechenintensive Programme skalieren von einem einzelnen Kern zu mehreren Kernen oder sogar einem Cluster mit minimalen Codeänderungen.
Ich habe kurz die Schlüsselkomponenten von Ray erwähnt – Ray Information, Ray Prepare, Ray Tune, Ray Serve und Ray Core – und betont, dass Ray Core excellent für die allgemeine CPU-Skalierung ist.
Ich habe einige der wesentlichen Konzepte in Ray Core erklärt, wie zum Beispiel die Einführung von Aufgaben (zustandslose Parallelfunktionen), Akteuren (zustandsbehaftete Arbeiter für gemeinsamen Zustand und Koordination) und ObjectRefs (ein zukünftiges Versprechen des Rückgabewerts einer Aufgabe).
Um die Vorteile der Verwendung von Ray zu verdeutlichen, begann ich mit einem einfachen CPU-intensiven Beispiel – dem Zählen von Primzahlen über einen Bereich – und zeigte, wie langsam die Ausführung auf einem einzelnen Kern mit einer naiven Python-Implementierung sein kann.
Anstatt den Code in einer anderen Sprache neu zu schreiben oder komplexe Multiprocessing-Bibliotheken zu verwenden, können Sie dies mit Ray tun die Arbeitsbelastung parallelisieren in nur vier einfachen Schritten und nur ein paar zusätzlichen Codezeilen:
- ray.init(), um Ray zu starten
- Dekorieren Sie Ihre Funktionen mit @ray.distant, um sie in parallele Aufgaben umzuwandeln
- .distant() zum gleichzeitigen Starten von Aufgaben und
- ray.get() zum Sammeln von Aufgabenergebnissen.
Dieser Ansatz verkürzte die Laufzeit des Primzahlen-Beispiels von ~30 Sekunden auf ~3 Sekunden auf einem 24-Core-Laptop.
Ich erwähnte auch, wie einfach es ist, laufende Jobs in Ray mithilfe des integrierten Dashboards zu überwachen, und zeigte, wie man darauf zugreift.
Abschließend habe ich ein Beispiel für die Verwendung eines Ray Actors bereitgestellt zeigen, warum Globale Variablen sind nicht geeignet zur aufgabenübergreifenden Koordination, da Jeder Employee hat seinen eigenen Speicherplatz.
Im zweiten Teil dieser Serie werden wir sehen, wie wir die Dinge auf eine andere Ebene bringen können, indem wir es Ray-Jobs ermöglichen, noch mehr CPU-Leistung zu verbrauchen, während wir über Amazon Net Providers auf große Server mit mehreren Knoten in der Cloud skalieren.
