Erstellen einer Datenpipeline mit Prefect
Bild vom Autor | Canva

In diesem Tutorial lernen wir Prefect kennen, ein modernes Device zur Workflow-Orchestrierung. Wir beginnen mit dem Aufbau einer Datenpipeline mit Pandas und vergleichen sie dann mit einem Prefect-Workflow, um ein besseres Verständnis zu erlangen. Am Ende stellen wir unseren Workflow bereit und zeigen Ausführungsprotokolle auf dem Dashboard an.

Was ist Präfekt?

Präfekt ist ein Workflow-Administration-System, das für die Orchestrierung und Verwaltung komplexer Daten-Workflows, einschließlich Machine-Studying-Pipelines (ML), entwickelt wurde. Es bietet einen Rahmen für die Erstellung, Planung und Überwachung von Workflows und ist damit ein unverzichtbares Device für die Verwaltung von ML-Operationen (MLOps).

Prefect bietet Aufgaben- und Flussverwaltung, sodass Benutzer Abhängigkeiten definieren und Workflows effizient ausführen können. Mit Funktionen wie Statusverwaltung und Beobachtbarkeit bietet Prefect Einblicke in den Aufgabenstatus und -verlauf und unterstützt so das Debuggen und Optimieren. Es verfügt über ein hochgradig interaktives Dashboard, mit dem Sie verschiedene andere Funktionen planen, überwachen und integrieren können, die Ihren Workflow für die MLOps-Pipeline verbessern. Sie können sogar Benachrichtigungen einrichten und andere ML-Frameworks mit wenigen Klicks integrieren.

Prefect ist als Open-Supply-Framework und verwalteter Cloud-Dienst verfügbar und vereinfacht Ihren Arbeitsablauf noch weiter.

Erstellen einer Datenpipeline mit Pandas

Wir werden die Datenpipeline replizieren, die ich in den vorherigen Tutorials verwendet habe (Erstellen von Information Science-Pipelines mit Pandas – KDnuggets), um Ihnen eine Vorstellung davon zu geben, wie jede Aufgabe in der Pipeline funktioniert und wie sie kombiniert werden. Ich erwähne es hier, damit Sie klar vergleichen können, wie sich perfekte Datenpipelines von normalen Pipelines unterscheiden.

import pandas as pd

def load_data(path):
    return pd.read_csv(path)

def data_cleaning(information):
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

def convert_dtypes(information, types_dict=None):
    information = information.astype(dtype=types_dict)
    ## convert the date column to datetime
    information("Date") = pd.to_datetime(information("Date"))
    return information

def data_analysis(information):
    information("month") = information("Date").dt.month
    new_df = information.groupby("month")("Items Bought").imply()
    return new_df

def data_visualization(new_df, vis_type="bar"):
    new_df.plot(form=vis_type, figsize=(10, 5), title="Common Items Bought by Month")
    return new_df

path = "On-line Gross sales Information.csv"
df = (
    pd.DataFrame()
    .pipe(lambda x: load_data(path))
    .pipe(data_cleaning)
    .pipe(convert_dtypes, {"Product Class": "str", "Product Title": "str"})
    .pipe(data_analysis)
    .pipe(data_visualization, "line")
)

Wenn wir den obigen Code ausführen, wird jede Aufgabe nacheinander ausgeführt und generiert die Datenvisualisierung. Abgesehen davon geschieht nichts. Wir können es planen, die Ausführungsprotokolle anzeigen oder sogar Instruments von Drittanbietern zur Benachrichtigung oder Überwachung integrieren.

Erstellen einer Datenpipeline mit PrefectErstellen einer Datenpipeline mit Prefect

Erstellen einer Datenpipeline mit Prefect

Jetzt werden wir die gleiche Pipeline mit dem gleichen Datensatz erstellen On-line-Verkaufsdatensatz – Daten zu beliebten Marktplätzen aber mit Prefect. Wir werden zuerst die PRefect-Bibliothek mit dem PIP-Befehl installieren.

Wenn Sie den folgenden Code überprüfen, werden Sie feststellen, dass sich nicht wirklich etwas geändert hat. Die Funktionen sind dieselben, aber mit der Hinzufügung der Python-Dekoratoren. Jeder Schritt in der Pipeline hat den Dekorator `@activity` und die Pipeline, die diese Schritte kombiniert, hat den Dekorator `@circulate`. Darüber hinaus speichern wir auch die generierte Abbildung.

import pandas as pd
import matplotlib.pyplot as plt
from prefect import activity, circulate

@activity
def load_data(path):
    return pd.read_csv(path)

@activity
def data_cleaning(information):
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

@activity
def convert_dtypes(information, types_dict=None):
    information = information.astype(dtype=types_dict)
    information("Date") = pd.to_datetime(information("Date"))
    return information

@activity
def data_analysis(information):
    information("month") = information("Date").dt.month
    new_df = information.groupby("month")("Items Bought").imply()
    return new_df

@activity
def data_visualization(new_df, vis_type="bar"):

    new_df.plot(form=vis_type, figsize=(10, 5), title="Common Items Bought by Month")
    plt.savefig("average_units_sold_by_month.png")
    return new_df

@circulate(title="Information Pipeline")
def data_pipeline(path: str):
    df = load_data(path)
    df_cleaned = data_cleaning(df)
    df_converted = convert_dtypes(
        df_cleaned, {"Product Class": "str", "Product Title": "str"}
    )
    analysis_result = data_analysis(df_converted)
    new_df = data_visualization(analysis_result, "line")
    return new_df

# Run the circulate!
if __name__ == "__main__":
    new_df = data_pipeline("On-line Gross sales Information.csv")
    print(new_df)

Wir führen unsere Datenpipeline aus, indem wir den Speicherort der CSV-Datei angeben. Sie führt alle Schritte der Reihe nach aus und generiert Protokolle mit den Ausführungszuständen.

14:18:48.649 | INFO    | prefect.engine - Created circulate run 'enlightened-dingo' for circulate 'Information Pipeline'
14:18:48.816 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'load_data-0' for activity 'load_data'
14:18:48.822 | INFO    | Movement run 'enlightened-dingo' - Executing 'load_data-0' instantly...
14:18:48.990 | INFO    | Process run 'load_data-0' - Completed in state Accomplished()
14:18:49.052 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'data_cleaning-0' for activity 'data_cleaning'
14:18:49.053 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_cleaning-0' instantly...
14:18:49.226 | INFO    | Process run 'data_cleaning-0' - Completed in state Accomplished()
14:18:49.283 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'convert_dtypes-0' for activity 'convert_dtypes'
14:18:49.288 | INFO    | Movement run 'enlightened-dingo' - Executing 'convert_dtypes-0' instantly...
14:18:49.441 | INFO    | Process run 'convert_dtypes-0' - Completed in state Accomplished()
14:18:49.506 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'data_analysis-0' for activity 'data_analysis'
14:18:49.510 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_analysis-0' instantly...
14:18:49.684 | INFO    | Process run 'data_analysis-0' - Completed in state Accomplished()
14:18:49.753 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'data_visualization-0' for activity 'data_visualization'
14:18:49.760 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_visualization-0' instantly...
14:18:50.087 | INFO    | Process run 'data_visualization-0' - Completed in state Accomplished()
14:18:50.144 | INFO    | Movement run 'enlightened-dingo' - Completed in state Accomplished()

Am Ende erhalten Sie den transformierten Datenrahmen und die Visualisierungen.

Erstellen einer Datenpipeline mit PrefectErstellen einer Datenpipeline mit Prefect

Bereitstellen der Prefect Pipeline

Um die Prefect-Pipeline bereitzustellen, müssen wir zunächst unsere Codebasis in die Python-Datei „data_pipe.py“ verschieben. Danach ändern wir, wie wir unsere Pipeline ausführen. Wir verwenden die Funktion „.server“, um die Pipeline bereitzustellen, und übergeben die CSV-Datei als Argument an die Funktion.

data_pipe.py:

import pandas as pd
import matplotlib.pyplot as plt
from prefect import activity, circulate

@activity
def load_data(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

@activity
def data_cleaning(information: pd.DataFrame) -> pd.DataFrame:
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

@activity
def convert_dtypes(information: pd.DataFrame, types_dict: dict = None) -> pd.DataFrame:
    information = information.astype(dtype=types_dict)
    information("Date") = pd.to_datetime(information("Date"))
    return information

@activity
def data_analysis(information: pd.DataFrame) -> pd.DataFrame:
    information("month") = information("Date").dt.month
    new_df = information.groupby("month")("Items Bought").imply()
    return new_df

@activity
def data_visualization(new_df: pd.DataFrame, vis_type: str = "bar") -> pd.DataFrame:
    new_df.plot(form=vis_type, figsize=(10, 5), title="Common Items Bought by Month")
    plt.savefig("average_units_sold_by_month.png")
    return new_df

@activity
def save_to_csv(df: pd.DataFrame, filename: str):
    df.to_csv(filename, index=False)
    return filename

@circulate(title="Information Pipeline")
def run_pipeline(path: str):
    df = load_data(path)
    df_cleaned = data_cleaning(df)
    df_converted = convert_dtypes(
        df_cleaned, {"Product Class": "str", "Product Title": "str"}
    )
    analysis_result = data_analysis(df_converted)
    data_visualization(analysis_result, "line")
    save_to_csv(analysis_result, "average_units_sold_by_month.csv")

# Run the circulate
if __name__ == "__main__":
    run_pipeline.serve(
        title="pass-params-deployment",
        parameters=dict(path="On-line Gross sales Information.csv"),
    )

Wenn wir die Python-Datei ausführen, erhalten wir die Meldung, dass wir zum Ausführen der bereitgestellten Pipeline den folgenden Befehl verwenden müssen:

Erstellen einer Datenpipeline mit PrefectErstellen einer Datenpipeline mit Prefect

Starten Sie ein neues Terminalfenster und geben Sie den Befehl ein, um die Ausführung dieses Flows auszulösen.

$ prefect deployment run 'Information Pipeline/pass-params-deployment'

Wie wir sehen, wurden Movement-Läufe eingeleitet, d. h. die Pipeline läuft im Hintergrund. Wir können jederzeit zum ersten Terminalfenster zurückkehren, um die Protokolle anzuzeigen.

Erstellen einer Datenpipeline mit PrefectErstellen einer Datenpipeline mit Prefect

Um die Protokolle im Dashboard anzuzeigen, müssen wir das Prefect-Dashboard starten, indem wir den folgenden Befehl eingeben:

Klicken Sie auf den Dashboard-Hyperlink, um das Dashboard in Ihrem Webbrowser zu starten.

Erstellen einer Datenpipeline mit PrefectErstellen einer Datenpipeline mit Prefect

Das Dashboard besteht aus verschiedenen Registerkarten und Informationen zu Ihrer Pipeline, Ihrem Workflow und Ihren Läufen. Um den aktuellen Lauf anzuzeigen, navigieren Sie zur Registerkarte „Movement Runs“ und wählen Sie den aktuellsten Movement-Lauf aus.

Erstellen einer Datenpipeline mit PrefectErstellen einer Datenpipeline mit Prefect

Der gesamte Quellcode, alle Daten und Informationen sind verfügbar unter Kingabzpro/Datenpipeline mit Präfekt GitHub-Repository. Bitte vergessen Sie nicht, es mit einem Stern ⭐ zu markieren.

Abschluss

Um Ihren Datenworkflow zu skalieren und unnötige Probleme zu vermeiden, müssen Sie eine Pipeline mit den richtigen Instruments erstellen. Mit Prefect können Sie Ihre Läufe planen, die Pipeline debuggen und sie in mehrere Drittanbietertools integrieren, die Sie bereits verwenden. Es ist einfach zu verwenden und bietet zahlreiche Funktionen, die Sie lieben werden. Wenn Sie Prefect noch nicht kennen, empfehle ich Ihnen dringend, sich Prefect Cloud anzusehen. Sie bieten kostenlose Stunden an, damit Benutzer die Cloud-Plattform kennenlernen und sich mit dem Workflow-Administration-System vertraut machen können.

Abid Ali Awan (@1abidaliawan) ist ein zertifizierter Datenwissenschaftler, der gerne Modelle für maschinelles Lernen erstellt. Derzeit konzentriert er sich auf die Erstellung von Inhalten und das Schreiben technischer Blogs zu Technologien für maschinelles Lernen und Datenwissenschaft. Abid hat einen Grasp-Abschluss in Technologiemanagement und einen Bachelor-Abschluss in Telekommunikationstechnik. Seine Imaginative and prescient ist es, mithilfe eines Graph-Neural-Networks ein KI-Produkt für Studenten zu entwickeln, die mit psychischen Erkrankungen zu kämpfen haben.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert