Eintritt in ein neues Unternehmen als Dateningenieur. Sie erben eine ganze Reihe von ETL-Pipelines und sind für deren Wartung verantwortlich. Was sind Ihrer Meinung nach die Herausforderungen Ihrer Arbeit?

Typischerweise können Sie mit den folgenden Problemen konfrontiert werden:

  • Upstream-Schemaänderungen: Entwicklerteams können Felder hinzufügen oder löschen, Datentypen ändern oder Spalten umbenennen. Wenn sich ein Quellschema unerwartet ändert, können ETL-Aufträge abrupt fehlschlagen. Erschwerend kommt hinzu, dass die Pipeline unbemerkt beschädigte oder Nullwerte in nachgelagerte Tabellen lädt.
  • Probleme mit der Datenqualität: Manchmal schlagen ETL-Jobs nicht sofort fehl, im Gegenteil, sie werden ausgeführt und mit einem Erfolgsstatus abgeschlossen. Die geladenen Daten sind jedoch völlig falsch und enthalten doppelte oder fehlende Datensätze.
  • Mangelnde Dokumentation: Ältere Pipelines verfügen möglicherweise über wenige Dokumente oder die vorhandenen Dokumente sind möglicherweise veraltet. Sie sind sich additionally nicht sicher, ob sie der aktuellen Geschäftslogik entsprechen.
  • Volumenwachstum und Leistungsspitzen: Das Datenvolumen steigt mit dem Wachstum des Unternehmens. Eine für einen kleineren historischen Datensatz optimierte ETL-Pipeline kann bei der Verarbeitung großer Mengen leicht langsam werden, ins Stocken geraten oder ausfallen.

Ein automatisierter Testworkflow kann Ihnen dabei helfen, die oben genannten Probleme zu lösen. Warum? Denn der strukturierte Workflow kann Ihnen dabei helfen, alle wichtigen Aspekte einer ETL-Pipeline schnell zu verstehen: die Geschäftslogik, die Algorithmen für die Datentransformation, die Datentypen und alle Datenprobleme, die die ETL-Pipelines lösen müssen. Die Testmuster sind wiederverwendbar – Sie müssen nicht jedes Mal einen neuen Workflow entwerfen, wenn Sie eine andere ETL-Pipeline erben.

Im heutigen Artikel konzentriere ich mich auf automatisierte Exams im Knowledge Engineering, einschließlich der Umgebungskonfiguration und eines praktischen Workflows. Am Ende werde ich auch diskutieren, wie KI-gestützter Code den Arbeitsablauf beschleunigen und die Produktivität verbessern kann.

Sorgen Sie dafür, dass die Umwelt funktioniert

Wenn Sie den automatisierten Testworkflow zum ersten Mal erstellen, kann die Einrichtung der Umgebung einige Zeit dauern. Es gibt verschiedene Instruments und Abläufe für Dateningenieure zum Einrichten der Testumgebung. Wenn Sie jedoch die folgenden Schritte befolgen, verläuft der Vorgang einfach und reibungslos.

Erstens müssen Sie nur drei Dinge installieren: Docker Desktop, VS Code und Dev Containers Extension.

In Ihrem Testworkflow erstellt Docker schlanke, isolierte und wiederholbare Testumgebungen. Es ermöglicht Ihnen, eine Scheindateninfrastruktur (z. B. Datenbanken, Datenpipelines und Orchestrierungs-Engines) direkt auf einem lokalen Pc oder innerhalb einer Steady Integration (CI)-Pipeline aufzubauen. Mit Docker können Sie Ihre Integrationstests und Datenvalidierungen auf allen Plattformen identisch ausführen, ohne lokale Betriebssysteme zu belasten.

Visible Studio Code (VS Code) ist eine zentralisierte Entwicklungsumgebung für die Skripterstellung, das Debuggen, die Implementierung und die Automatisierung von Datenpipeline-Exams. Als Dateningenieur haben Sie es möglicherweise für Ihre anderen Projekte verwendet. Möglicherweise sind Sie mit PyCharm oder IntelliJ IDEA besser vertraut. Aus Sicht der Benutzererfahrung wähle ich VS Code aufgrund seines schlanken Aufbaus, seines Erweiterungsökosystems und seines hybriden Pocket book-/Skript-Workflows. KI-native Editoren wie Cursor und Windsurf erfreuen sich bei Entwicklern immer größerer Beliebtheit, worauf ich im späteren Teil dieses Artikels näher eingehen werde.

Ich gehe davon aus, dass Sie Python, Poetry und Java bereits installiert haben. Sie können Ihr VS Code-Terminal öffnen, die folgenden Skripte eingeben, um deren Versionen zu überprüfen und sicherzustellen, dass sie aktualisiert sind. Sie können sie auch unter Ihrem Terminal installieren, falls Sie dies noch nicht getan haben.

python --version
java -version
poetry --version

Mit der Dev Container-Erweiterung können Sie einen Docker-Container als voll funktionsfähige, reproduzierbare Entwicklungsumgebung verwenden. Es standardisiert Umgebungen im gesamten Group und ermöglicht das lokale Testen der Datenerfassungslogik ohne den Verbrauch von Cloud-Ressourcen. Die Set up von Dev Container ist recht einfach. Sie müssen lediglich Erweiterungen in VS Code öffnen – Sie können Strg+Umschalt+X (Home windows/Linux) oder Befehl+Umschalt+X (Mac) drücken, dann in der Suchleiste nach „Dev Containers“ suchen und auf „Installieren“ klicken.

Die Dev Containers-Erweiterung weiß jedoch nicht, wie Sie Ihre spezifische Umgebung erstellen. Es braucht einen „Leitfaden“. Der Leitfaden ist der .devcontainer Ordner und die devcontainer.json Die Datei unter dem Ordner teilt der Dev-Container-Erweiterung Folgendes mit:

  • Welches Docker-Picture heruntergeladen werden soll.
  • Welche Ports weitergeleitet werden sollen.
  • Welche VS-Code-Erweiterungen im Container installiert werden sollen.

Es gibt zwei Methoden, die Sie erhalten können .devcontainer Ordner. Wenn Sie mit diesen Instruments noch nicht vertraut sind, können Sie das automatisierte Software von VS Code verwenden. Wenn Sie eine Python- oder Knowledge Engineering-Vorlage auswählen, kann VS Code den Ordner automatisch generieren. Wenn Sie mehr Erfahrung mit solchen Projekten haben, können Sie es auch von Grund auf manuell schreiben, um den Testanforderungen Ihres Groups gerecht zu werden. Der .devcontainer Der Ordner kann zusammen mit Ihrem Quellcode und Ihren Quelldaten, die Sie zum Testen vorbereiten, festgeschrieben und an Git übertragen werden.

Um Ihnen das Leben zu erleichtern, können Sie das Git-Repository klonen und diesen Ordner mit VS Code öffnen.

git clone https://github.com/firm/data-ingestion-transformation.git

Der letzte Schritt der Konfiguration besteht darin, den Container erneut zu öffnen. Warum ist es wichtig? Denn wenn Sie auf „Im Container erneut öffnen“ klicken, startet VS Code seine Backend-Engine neu. Es startet den Docker-Container und hängt Ihren lokalen Projektordner direkt in diesem Container an. Ihr Quellcode und Ihre Quelldaten in dieser ETL-Pipeline sind für die Docker-Umgebung zugänglich. Sie können Ihre Exams sicher in einer isolierten Sandbox ausführen. Klingt cool? Ja, jetzt haben Sie Ihre Umgebung bereits eingerichtet und können mit dem Testen Ihrer ETL-Pipelines beginnen.

Lassen Sie sich durch die Exams sagen, was das System tut

Wenn ich eine unbekannte ETL-Pipeline erbe, lautet meine erste Frage nicht: „Wie funktioniert der Code?“ Stattdessen frage ich: „Welches Verhalten soll das System hervorrufen?“ Exams beantworten diese Frage normalerweise schneller als Quellcode.

Stellen Sie sich vor, das Unternehmen, dem Sie beitreten, verwendet LLMs wie GPT-5.5, Claude 4.6 und Gemini 3 Professional und das Finanzteam möchte die KI-Ausgaben teamübergreifend verfolgen.

Vom Autor erstellte Scheinbeispieldaten

Die obige Tabelle zeigt einen Teil der zu speichernden Daten im CSV-Format. Die Spaltennamen müssen standardisiert werden, indem Leerzeichen durch Unterstriche ersetzt werden, damit nachgelagerte Systeme konsistent auf Felder verweisen können. Zum Beispiel. „Modellname“ sollte zu „Modellname“ werden. Du hast es gefunden ingest.py die Funktionen zur Spaltenstandardisierung und Datenaufnahme zu definieren und ai_cost_ingest.py um diese Funktionen im Ordner aufzurufen.

import logging
from typing import Record

from pyspark.sql import SparkSession


def sanitize_columns(columns: Record(str)) -> Record(str):
    return (column.exchange(" ", "_") for column in columns)


def run(spark: SparkSession, ingest_path: str, transformation_path: str) -> None:
    logging.information("Studying textual content file from: %s", ingest_path)

    input_df = (
        spark.learn.format("org.apache.spark.csv")
        .possibility("header", True)
        .csv(ingest_path)
    )

    renamed_columns = sanitize_columns(input_df.columns)

    ref_df = input_df.toDF(*renamed_columns)

    ref_df.write.parquet(transformation_path)
import logging
import sys

from pyspark.sql import SparkSession

from data_ingestions.ai_cost import ingest

LOG_FILENAME = "mission.log"
APP_NAME = "AI_Cost Pipeline: Ingest"

if __name__ == "__main__":
    logging.basicConfig(filename=LOG_FILENAME, stage=logging.INFO)
    logging.information(sys.argv)

    if len(sys.argv) != 3:
        logging.warning("Enter supply and output path are required")
        sys.exit(1)

    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
    sc = spark.sparkContext
    app_name = sc.appName
    logging.information("Utility Initialized: " + app_name)
    input_path = sys.argv(1)
    output_path = sys.argv(2)
    ingest.run(spark, input_path, output_path)
    logging.information("Utility Finished: " + spark.sparkContext.appName)
    spark.cease()

Sie sollten zunächst die definierten Funktionen verstehen. Sie fragen sich vielleicht: „Was genau soll?“ sanitize_columns() Tun? Behandelt es führende Leerzeichen, nachfolgende Leerzeichen und interne Leerzeichen?“ Mit diesen Fragen im Kopf schreiben Sie folgenden Code:

from data_ingestions.ai_cost import ingest

def test_should_sanitize_nothing() -> None:
    no_whitespace_columns = ("Mannequin")

    precise = ingest.sanitize_columns(no_whitespace_columns)
    anticipated = no_whitespace_columns
    assert anticipated == precise

def test_should_sanitize_whitespace_outside() -> None:
    no_whitespace_columns = (" Immediate Tokens ")

    precise = ingest.sanitize_columns(no_whitespace_columns)
    anticipated = ("_Prompt_Tokens_")
    assert anticipated == precise

def test_should_sanitize_whitespace_in_between() -> None:
    no_whitespace_columns = ("Immediate Tokens")

    precise = ingest.sanitize_columns(no_whitespace_columns)
    anticipated = ("Prompt_Tokens")
    assert anticipated == precise

Mit dem Code können Sie die Funktion testen sanitize_columns() direkt, ohne Spark zu starten und Dateien zu verarbeiten. Es ist ein Beispiel für a Unit-Check.

Unit-Exams

Unit-Exams dienen dazu, einen kleinen Teil der Logik isoliert zu validieren. Sie sind in der Regel schnell, deterministisch und unabhängig von externen Systemen.

Integrationstests

Unit-Exams zeigen an, ob sich ein kleiner Teil der Logik korrekt verhält. Aber sie können die Frage nicht beantworten: „Funktioniert die gesamte Pipeline, wenn alle Komponenten miteinander verbunden sind?“

Für einen Dateningenieur bedeutet dies oft:

  • Dateien lesen
  • Spark starten
  • Transformationen ausführen
  • Ausgaben schreiben
  • Ergebnisse validieren

Um die gesamte Pipeline zu testen, benötigen wir Integrationstestsdie das Systemverhalten offenbaren. Integrationstests sind beim Onboarding sehr nützlich, da sie beschreiben, was das System tun muss, unabhängig davon, wie sich die Implementierung im Laufe der Zeit weiterentwickelt.

Für die AI_cost Bei einem Datenerfassungsprojekt können Sie einen Integrationstest verwenden, um zu überprüfen, ob:

  • Die Eingabe erfolgt als CSV-Dateien.
  • Zur Verarbeitung der Daten wird Spark eingesetzt.
  • Spaltennamen werden bereinigt.
  • Datenwerte bleiben unverändert.
  • Die Ausgabe erfolgt im Parquet-Format.
  • Der gesamte Aufnahmeworkflow muss erfolgreich sein.
import csv
import os
import tempfile
from pathlib import Path
from typing import Record, Tuple

from pyspark.sql import SparkSession

from data_ingestions.ai_cost import ingest

def test_should_sanitize_column_names(
    spark_session: SparkSession,
) -> None:

    given_ingest_folder, given_transform_folder = (
        __create_ingest_and_transform_folders()
    )

    input_csv_path = given_ingest_folder + "enter.csv"

    csv_content = (
        (
            "Mannequin Identify",
            "Immediate Tokens",
            " Completion Tokens "
        ),
        (
            "GPT-5.5",
            "1200",
            "300"
        ),
        (
            "Gemini 3 Professional",
            "900",
            "250"
        ),
    )

    __write_csv_file(input_csv_path, csv_content)

    ingest.run(
        spark_session,
        input_csv_path,
        given_transform_folder
    )

    precise = spark_session.learn.parquet(
        given_transform_folder
    )

    anticipated = spark_session.createDataFrame(
        (
            ("GPT-5.5", "1200", "300"),
            ("Gemini 3 Professional", "900", "250")
        ),
        (
            "Model_Name",
            "Prompt_Tokens",
            "_Completion_Tokens_"
        )
    )

    assert anticipated.gather() == precise.gather()

Lassen Sie die KI die ETL-Pipeline lesen, bevor Sie es tun

Stellen Sie sich vor, Sie überprüfen eine unbekannte ETL-Pipeline, die Hunderte oder sogar Tausende Zeilen PySpark-Code enthält. Den Code zu verstehen und Exams zu schreiben, kann Stunden oder sogar Tage dauern. Heutzutage können Instruments wie Cursor, Windsurf und GitHub Copilot dabei helfen, diesen Prozess zu beschleunigen.

Nehmen Sie als Beispiel Cursor. Als KI-Assistent kann er ein gesamtes Repository analysieren und Erklärungen zu einzelnen Modulen, Funktionen und Datenflüssen generieren. Es kann auch erste Versionen von Unit-Exams und Integrationstests generieren. Um die Produktivität zu maximieren, müssen Sie als Dateningenieur die richtigen Fragen stellen. Hier einige Beispielfragen, die Sie stellen könnten:

  • Was ist der Zweck dieses ETL-Jobs?
  • Welche Eingabe- und Ausgabeformate erwartet diese Pipeline?
  • Welche Funktionen sind für die Datenvalidierung verantwortlich?
  • Welche Randfälle sind derzeit ungetestet?

KI kann Testfälle vorschlagen, aber nicht feststellen, ob diese Exams den Geschäftsanforderungen und der Unternehmensstrategie entsprechen. Es liegt weiterhin in Ihrer Verantwortung, die Pipeline zu verstehen, Annahmen zu validieren und den Code zu überprüfen. KI ist eher ein Produktivitätsbeschleuniger als ein Ersatz für technisches Urteilsvermögen. Es spart Ihnen Zeit, die ETL-Pipeline zu verstehen und zu testen, sodass Sie sich auf höherwertige Knowledge-Engineering-Arbeiten wie das Entwerfen von Datenarchitekturen, den Aufbau skalierbarer Datenplattformen und die Unterstützung datengesteuerter Entscheidungsfindung konzentrieren können.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert