Apache AirFlow ist eines der beliebtesten Orchestrierungswerkzeuge im Datenfeld und führt Workflows für Unternehmen weltweit an. Jeder, der bereits mit Luftstrom in einer Produktionsumgebung gearbeitet hat, insbesondere in einem komplexen, weiß, dass er gelegentlich einige Probleme und seltsame Fehler aufweisen kann.

Unter den vielen Aspekten, die Sie in einer Luftstromumgebung verwalten müssen, fliegt häufig eine kritische Metrik unter das Radar: Dag analysieren. Die Überwachung und Optimierung der Parsezeit ist unerlässlich, um Leistungs Engpässe zu vermeiden und die korrekte Funktion Ihrer Orchestrierungen zu gewährleisten, wie wir in diesem Artikel untersuchen werden.

Das heißt, dieses Tutorial zielt darauf ab, vorzustellen airflow-parse-benchein Open-Supply-Device, das ich entwickelt habe, um Dateningenieuren dabei zu helfen, ihre Luftstromumgebungen zu überwachen und zu optimieren, um Erkenntnisse zur Reduzierung der Codekomplexität zu erhalten und die Zeit zu analysieren.

In Bezug auf den Luftstrom ist Dag Parse Time oft eine übersehene Metrik. Parsing tritt jedes Mal auf, wenn der Luftstrom Ihre Python -Dateien verarbeitet, um die DAGs dynamisch zu erstellen.

Standardmäßig werden alle Ihre DAGs alle 30 Sekunden analysiert – eine Frequenz, die von der Konfigurationsvariablen gesteuert wird min_file_process_interval. Dies bedeutet, dass alle 30 Sekunden den gesamten Python -Code, der in Ihrem vorhanden ist dags Der Ordner wird gelesen, importiert und verarbeitet, um DAG -Objekte zu generieren, die die zu geplanten Aufgaben enthalten. Erfolgreich verarbeitete Dateien werden dann zu dem hinzugefügt Dag -Tasche.

Zwei wichtige Luftstromkomponenten verarbeiten diesen Vorgang:

Zusammen beide Komponenten (allgemein als die bezeichnet als die DAG -Prozessor) werden vom Luftstrom ausgeführt Schedulerund sicherstellen, dass Ihre DAG -Objekte aktualisiert werden, bevor sie ausgelöst werden. Aus Skalierungs- und Sicherheitsgründen ist es jedoch auch möglich, Ihren DAG -Prozessor als separate Komponente in Ihrem Cluster auszuführen.

Wenn Ihre Umgebung nur ein paar Dutzend DAGs hat, ist es unwahrscheinlich, dass der Analyseprozess zu Problemen führt. Es ist jedoch üblich, Produktionsumgebungen mit Hunderten oder sogar Tausenden von DAGs zu finden. In diesem Fall kann dies zu hoch sind, wenn Ihre Parse -Zeit zu hoch ist, dies kann zu:

  • Verzögerung der DAG -Planung.
  • Erhöhen Sie die Nutzung der Ressourcen.
  • Umwelt Herzschlag Probleme.
  • Scheduler -Fehler.
  • Übermäßige CPU und Speicherverbrauch, Verschwendung von Ressourcen.

Stellen Sie sich nun vor, eine Umgebung mit Hunderten von DAGs zu haben, die unnötig komplexe Parsinglogik enthalten. Kleine Ineffizienzen können sich schnell zu erheblichen Problemen verwandeln und die Stabilität und Leistung Ihres gesamten Luftstromaufbaus beeinflussen.

Beim Schreiben von Luftstrom -DAGs gibt es einige wichtige Finest Practices, die Sie beachten können, um einen optimierten Code zu erstellen. Obwohl Sie viele Tutorials zur Verbesserung Ihrer DAGs finden können, werde ich einige der wichtigsten Prinzipien zusammenfassen, die Ihre DAG -Leistung erheblich verbessern können.

Begrenzen Sie den Code der obersten Ebene

Eine der häufigsten Ursachen für hohe DAG-Parsingzeiten ist ineffizienter oder komplexer Code auf höchstem Niveau. Prime-Stage-Code in einer Airflow-DAG-Datei wird jedes Mal ausgeführt, wenn der Scheduler die Datei analysiert. Wenn dieser Code ressourcenintensive Vorgänge wie Datenbankabfragen, API-Aufrufe oder dynamische Aufgabengenerierung enthält, kann er die Parsingleistung erheblich beeinträchtigen.

Der folgende Code zeigt ein Beispiel für a Nicht optimierte DAG:

In diesem Fall wird jedes Mal, wenn die Datei vom Scheduler analysiert wird, der Prime-Stage-Code ausgeführt, wodurch eine API-Anforderung erstellt und der Datenrahmen verarbeitet wird, was die Analysezeit erheblich beeinflussen kann.

Ein weiterer wichtiger Faktor, der zu einer langsamen Parsen beiträgt, sind Prime-Stage-Importe. Jede auf der obere Ebene importierte Bibliothek wird während der Parsen in den Speicher geladen, was zeitaufwändig sein kann. Um dies zu vermeiden, können Sie Importe in Funktionen oder Aufgabendefinitionen verschieben.

Der folgende Code zeigt eine bessere Model derselben DAG:

Vermeiden Sie Xcoms und Variablen im Code auf oberster Ebene

Es ist besonders interessant, XCOMS und Variablen in Ihrem Prime-Stage-Code zu vermeiden. Wie angegeben von Google -Dokumentation:

Wenn Sie variable.get () im Code auf der oberen Ebene verwenden, führt der AirFlow jedes Mal, wenn die .py -Datei analysiert wird, eine Variable.get () aus, die eine Sitzung für die DB öffnet. Dies kann die Analysezeiten dramatisch verlangsamen.

Um dies anzugehen, sollten Sie a verwenden JSON -Wörterbuch Um mehrere Variablen in einer einzelnen Datenbankabfrage abzurufen, anstatt mehrere zu machen Variable.get() Anrufe. Alternativ verwenden Sie Jinja -Vorlagenwie auf diese Weise abgerufene Variablen nur während der Aufgabenausführung verarbeitet werden, nicht während der DAG -Parsen.

Entfernen Sie unnötige DAGs

Obwohl es offensichtlich erscheint, ist es immer wichtig, sich daran zu erinnern, unnötige DAGs und Dateien aus Ihrer Umgebung regelmäßig aufzuräumen:

  • Entfernen Sie unbenutzte DAGs: Überprüfen Sie Ihre dags Ordner und löschen Sie alle Dateien, die nicht mehr benötigt werden.
  • Verwenden .airflowignore: Geben Sie den Dateien an, den der Luftstrom absichtlich ignorieren sollte, und überspringen Sie die Parsen.
  • Die Bewertung hielt bei Dags an: Pause DAGs werden immer noch vom Scheduler analysiert und Ressourcen konsumieren. Wenn sie nicht mehr benötigt werden, sollten Sie sie entfernen oder archivieren.

Ändern Sie die Luftstromkonfigurationen

Zuletzt können Sie einige Luftstromkonfigurationen ändern, um die Verwendung von Scheduler -Ressourcen zu verringern:

  • min_file_process_interval: Diese Einstellung steuert, wie oft (in Sekunden) der Luftstrom Ihre DAG -Dateien analysiert. Durch Erhöhen von 30 Sekunden können Sie die Final des Schedulers auf Kosten langsamerer DAG -Updates verringern.
  • dag_dir_list_interval: Dies bestimmt, wie oft (in Sekunden) der Luftstrom die scannt dags Verzeichnis für neue DAGs. Wenn Sie selten neue DAGs einsetzen, sollten Sie dieses Intervall erhöhen, um die CPU -Nutzung zu verringern.

Wir haben viel darüber diskutiert, wie wichtig es ist, optimierte DAGs für die Aufrechterhaltung einer gesunden Luftstromumgebung zu schaffen. Aber wie messen Sie tatsächlich die Parse -Zeit Ihrer DAGs? Glücklicherweise gibt es je nach Airflow -Bereitstellung oder Betriebssystem verschiedene Möglichkeiten, dies zu tun.

Zum Beispiel, wenn Sie eine haben Wolkenkomponist Die Bereitstellung können einen DAG -Parse -Bericht problemlos abrufen, indem Sie den folgenden Befehl auf Google CLI ausführen:

gcloud composer environments run $ENVIRONMENT_NAME 
— location $LOCATION
dags report

Während das Abrufen von Parse -Metriken unkompliziert ist, kann die Messung der Wirksamkeit Ihrer Code -Optimierungen weniger sein. Jedes Mal, wenn Sie Ihren Code ändern, müssen Sie die aktualisierte Python-Datei an Ihren Cloud-Anbieter neu einsetzen, warten, bis die DAG analysiert wird, und dann einen neuen Bericht extrahieren-einen langsamen und zeitaufwändigen Prozess.

Ein weiterer möglicher Ansatz, wenn Sie sich unter Linux oder Mac befinden, besteht darin, diesen Befehl auszuführen, um die Analysezeit lokal auf Ihrer Maschine zu messen:

time python airflow/example_dags/instance.py

Dieser Ansatz ist zwar einfach, ist jedoch nicht praktikabel, um die Parse -Zeiten mehrerer DAGs systematisch zu messen und zu vergleichen.

Um diese Herausforderungen anzugehen, habe ich die erstellt airflow-parse-benchEine Python -Bibliothek, die die Messung und Vergleich der Parse -Zeiten Ihrer DAGs mithilfe der nativen Parse -Methode von Airflow vereinfacht.

Der airflow-parse-bench Das Device erleichtert das Speichern von Analysezeiten, den Vergleichen von Ergebnissen und die Standardisierung von Vergleiche in Ihren DAGs.

Set up der Bibliothek

Vor der Set up wird empfohlen, a zu verwenden virtualenv Bibliothekskonflikte vermeiden. Nach dem Einrichten können Sie das Paket installieren, indem Sie den folgenden Befehl ausführen:

pip set up airflow-parse-bench

Notiz: Dieser Befehl installiert nur die wesentlichen Abhängigkeiten (im Zusammenhang mit Luftströmen und Luftstromanbietern). Sie müssen alle zusätzlichen Bibliotheken manuell installieren, von denen Ihre DAGs abhängen.

Zum Beispiel, wenn eine DAG verwendet boto3 Um mit AWS zu interagieren, sorgen Sie dafür boto3 ist in Ihrer Umgebung installiert. Andernfalls begegnen Sie analysende Fehler.

Danach ist es notwendig, Ihre Luftstromdatenbank zu initialisieren. Dies kann durch Ausführen des folgenden Befehls erfolgen:

airflow db init

Außerdem, wenn Ihre DAGs verwenden LuftstromvariablenSie müssen sie auch lokal definieren. Es ist jedoch nicht erforderlich, echte Werte auf Ihre Variablen zu setzen, da die tatsächlichen Werte für Parsen nicht erforderlich sind:

airflow variables set MY_VARIABLE 'ANY TEST VALUE'

Ohne dies werden Sie auf einen Fehler stoßen wie:

error: 'Variable MY_VARIABLE doesn't exist'

Verwenden des Werkzeugs

Nach der Set up der Bibliothek können Sie analysieren. Angenommen, Sie haben eine DAG -Datei mit dem Namen dag_test.py Enthält der nicht optimierte DAG-Code, der im obigen Beispiel verwendet wird.

Um seine Analysezeit zu messen, rennen Sie einfach:

airflow-parse-bench --path dag_test.py

Diese Ausführung erzeugt die folgende Ausgabe:

Ausführungsergebnis. Bild des Autors.

Wie beobachtet, präsentierte unsere DAG eine Parse -Zeit von 0,61 Sekunden. Wenn ich den Befehl erneut ausführe, werde ich einige kleine Unterschiede sehen, da die Analysezeiten aufgrund von System- und Umgebungsfaktoren bei den Läufen geringfügig variieren können:

Ergebnis einer weiteren Ausführung derselben DAG. Bild des Autors.

Um eine prägnantere Zahl zu präsentieren, ist es möglich, mehrere Ausführungen zu aggregieren, indem die Anzahl der Iterationen angegeben wird:

airflow-parse-bench --path dag_test.py --num-iterations 5

Obwohl es etwas länger dauert, um fertig zu werden, berechnet dies die durchschnittliche Analysezeit über fünf Hinrichtungen.

Um die Auswirkungen der oben genannten Optimierungen zu bewerten, habe ich den Code in meinem ersetztdag_test.py mit der optimierten Model früher geteilt. Nachdem ich denselben Befehl ausgeführt hatte, bekam ich das folgende Ergebnis:

Ergebnis des optimierten Codes analysieren. Bild des Autors.

Wie bemerkt, konnte nur ein paar gute Praktiken angewendet werden, um sich quick zu verringern 0,5 Sekunden In der DAG -Analysezeit und die Bedeutung der von uns vorgenommenen Änderungen hervorhebt!

Es gibt andere interessante Funktionen, die ich für den Teilen related halte.

Als Erinnerung können Sie, wenn Sie Zweifel oder Probleme mit dem Device verwenden, auf die vollständige Dokumentation zugreifen Github.

Um alle von der Bibliothek unterstützten Parameter anzuzeigen, leiten Sie außerdem aus:

airflow-parse-bench --help

Testen mehrerer DAGs

In den meisten Fällen haben Sie wahrscheinlich Dutzende von DAGs, um die Analysezeiten zu testen. Um diesen Anwendungsfall anzugehen, habe ich einen Ordner namens erstellt dags und geben Sie vier Python -Dateien hinein.

Um die Analysezeiten für alle DAGs in einem Ordner zu messen, muss der Ordnerpfad in der angegeben werden --path Parameter:

airflow-parse-bench --path my_path/dags

Durch das Ausführen dieses Befehls wird eine Tabelle erstellt, in der die Parse -Zeiten für alle DAGs im Ordner zusammengefasst sind:

Testen der Analysezeit mehrerer DAGs. Bild des Autors.

Standardmäßig ist die Tabelle vom schnellsten bis zum langsamsten DAG sortiert. Sie können die Reihenfolge jedoch mit dem verwenden --order Parameter:

airflow-parse-bench --path my_path/dags --order desc
Umgekehrte Sortierreihenfolge. Bild des Autors.

Überspringen unveränderter DAGs

Der --skip-unchanged Der Parameter kann besonders nützlich während der Entwicklung sein. Wie der Title schon sagt, überspringt diese Possibility die Parse -Ausführung für DAGs, die seit der letzten Ausführung nicht geändert wurden:

airflow-parse-bench --path my_path/dags --skip-unchanged

Wie nachstehend gezeigt, spiegelt der Ausgang, wenn die DAGs unverändert bleiben, keinen Unterschied in den Analysezeiten wider:

Ausgabe ohne Unterschied für unveränderte Dateien. Bild des Autors.

Zurücksetzen der Datenbank

Alle DAG -Informationen, einschließlich Metriken und Geschichte, werden in einer lokalen SQLite -Datenbank gespeichert. Wenn Sie alle gespeicherten Daten löschen und frisch anfangen möchten, verwenden Sie die --reset-db Flagge:

airflow-parse-bench --path my_path/dags --reset-db

Dieser Befehl setzt die Datenbank zurück und verarbeitet die DAGs, als wäre sie die erste Ausführung.

Parse Time ist eine wichtige Metrik für die Aufrechterhaltung skalierbarer und effizienter Luftstromumgebungen, insbesondere wenn Ihre Orchestrierungsanforderungen immer komplexer werden.

Aus diesem Grund die airflow-parse-bench Die Bibliothek kann ein wichtiges Instrument sein, um Dateningenieuren zu helfen, bessere DAGs zu erstellen. Durch das Testen der Parse -Zeit Ihrer DAGs lokal, können Sie Ihren Code -Engpass einfach und schnell finden, wodurch Ihre DAGs schneller und leistungsfähiger werden.

Da der Code lokal ausgeführt wird, ist die produzierte Parse -Zeit nicht mit dem in Ihrem Luftstrom -Cluster vorhandenen. Wenn Sie jedoch in der Lage sind, die Analysezeit in Ihrer lokalen Maschine zu verkürzen, kann dies in Ihrer Cloud -Umgebung reproduziert werden.

Schließlich ist dieses Projekt für die Zusammenarbeit geöffnet! Wenn Sie Vorschläge, Ideen oder Verbesserungen haben, können Sie gerne dazu beitragen Github.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert