

Bild von Autor | Ideogramm
Sie kennen dieses Gefühl, wenn Sie Daten in verschiedenen Formaten und Quellen verstreut haben, und Sie müssen alles verstehen? Genau das lösen wir heute. Lassen Sie uns eine ETL -Pipeline erstellen, die unordentliche Daten aufnimmt und sie in etwas tatsächlich Nützliches verwandelt.
In diesem Artikel werde ich Sie durch das Erstellen einer Pipeline führen, die E-Commerce-Transaktionen verarbeitet. Nichts Besonderes, nur praktischer Code, der den Job erledigt.
Wir holen Daten aus einer CSV-Datei (wie Sie von einer E-Commerce-Plattform herunterladen würden), reinigen Sie sie und speichern sie für die Analyse in einer geeigneten Datenbank.
🔗 Hyperlink zum Code auf GitHub
Was ist ein Extrakt, eine Transformation, eine Lastpipeline (ETL)?
Jede ETL -Pipeline folgt dem gleichen Muster. Sie schnappen sich Daten von irgendwo (Extrahieren), reinigen Sie sie und machen sie besser (Transformation), dann geben Sie sie an einen nützlichen Ort (Laden).


ETL Pipeline | Bild von Autor | Diagrams.internet (Draw.io)
Der Prozess beginnt mit dem Extrakt Section, in der Daten aus verschiedenen Quellsystemen wie Datenbanken, APIs, Dateien oder Streaming -Plattformen abgerufen werden. Während dieser Section identifiziert und zieht die Pipeline relevante Daten und führt gleichzeitig Verbindungen zu unterschiedlichen Systemen auf, die möglicherweise mit verschiedenen Zeitplänen und Formaten arbeiten.
Als nächstes die verwandeln Die Section repräsentiert die Kernverarbeitungsstufe, in der extrahierte Daten einer Reinigung, Validierung und Umstrukturierung unterzogen werden. Dieser Schritt befasst sich mit Datenqualitätsproblemen, wendet Geschäftsregeln an, führt Berechnungen durch und wandelt Daten in das erforderliche Format und die erforderliche Struktur um. Gemeinsame Transformationen umfassen Datentypkonvertierungen, Feldzuordnung, Aggregationen und die Entfernung von Duplikaten oder ungültige Datensätze.
Schließlich die laden Section überträgt die jetzt transformierten Daten in das Zielsystem. Dieser Schritt kann durch Volllast auftreten, wobei ganze Datensätze ersetzt oder inkrementelle Lasten ersetzt werden, wobei nur neue oder geänderte Daten hinzugefügt werden. Die Ladestrategie hängt von Faktoren wie Datenvolumen, Systemleistungsanforderungen und geschäftlichen Anforderungen ab.
Schritt 1: Auszug
In dem Schritt „Extrakt“ werden wir Daten in die Hände bekommen. In der realen Welt laden Sie diesen CSV möglicherweise von der Berichts-Dashboard Ihrer E-Commerce-Plattform aus, ziehen Sie es von einem FTP-Server oder erhalten Sie über API. Hier lesen wir in einer verfügbaren CSV -Datei.
def extract_data_from_csv(csv_file_path):
strive:
print(f"Extracting knowledge from {csv_file_path}...")
df = pd.read_csv(csv_file_path)
print(f"Efficiently extracted {len(df)} information")
return df
besides FileNotFoundError:
print(f"Error: {csv_file_path} not discovered. Creating pattern knowledge...")
csv_file = create_sample_csv_data()
return pd.read_csv(csv_file)
Jetzt, da wir die Rohdaten aus seiner Quelle haben (raw_transactions.csv), wir müssen es in etwas Nutzbares verwandeln.
Schritt 2: Transformation
Hier machen wir die Daten tatsächlich nützlich.
def transform_data(df):
print("Reworking knowledge...")
df_clean = df.copy()
# Take away information with lacking emails
initial_count = len(df_clean)
df_clean = df_clean.dropna(subset=('customer_email'))
removed_count = initial_count - len(df_clean)
print(f"Eliminated {removed_count} information with lacking emails")
# Calculate derived fields
df_clean('total_amount') = df_clean('value') * df_clean('amount')
# Extract date elements
df_clean('transaction_date') = pd.to_datetime(df_clean('transaction_date'))
df_clean('yr') = df_clean('transaction_date').dt.yr
df_clean('month') = df_clean('transaction_date').dt.month
df_clean('day_of_week') = df_clean('transaction_date').dt.day_name()
# Create buyer segments
df_clean('customer_segment') = pd.lower(df_clean('total_amount'),
bins=(0, 50, 200, float('inf')),
labels=('Low', 'Medium', 'Excessive'))
return df_clean
Erstens lassen wir Zeilen mit fehlenden E -Mails fallen, da unvollständige Kundendaten für die meisten Analysen nicht hilfreich sind.
Dann berechnen wir total_amount Durch Multiplizieren von Preis und Menge. Dies scheint offensichtlich zu sein, aber Sie wären überrascht, wie oft abgeleitete Felder wie diese in Rohdaten fehlen.
Die Date -Extraktion ist sehr praktisch. Anstatt nur einen Zeitstempel zu haben, haben wir jetzt ein separates Jahr, Monat und Tag der Woche. Dies erleichtert die Analyse von Mustern wie „Verkaufen wir an Wochenenden mehr?“
Die Kundensegmentierung verwendet pd.lower() kann besonders nützlich sein. Es wird automatisch Kunden in Ausgabenkategorien ein. Anstatt nur Transaktionsmengen zu haben, haben wir sinnvolle Geschäftssegmente.
Schritt 3: Laden
In einem echten Projekt laden Sie möglicherweise in eine Datenbank, senden an eine API oder drängen auf Cloud -Speicher.
Hier laden wir unsere sauberen Daten in eine ordnungsgemäße SQLite -Datenbank.
def load_data_to_sqlite(df, db_name="ecommerce_data.db", table_name="transactions"):
print(f"Loading knowledge to SQLite database '{db_name}'...")
conn = sqlite3.join(db_name)
strive:
df.to_sql(table_name, conn, if_exists="exchange", index=False)
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
record_count = cursor.fetchone()(0)
print(f"Efficiently loaded {record_count} information to '{table_name}' desk")
return f"Information efficiently loaded to {db_name}"
lastly:
conn.shut()
Jetzt können Analysten SQL-Abfragen ausführen, BI-Instruments verbinden und diese Daten tatsächlich zur Entscheidungsfindung verwenden.
SQLite funktioniert intestine dafür, da es leicht ist, kein Einrichten erfordert und eine einzige Datei erstellt, die Sie problemlos teilen oder sichern können. Der if_exists="exchange" Parameter bedeutet, dass Sie diese Pipeline mehrmals ausführen können, ohne sich um doppelte Daten zu kümmern.
Wir haben Überprüfungsschritte hinzugefügt, damit Sie wissen, dass die Final erfolgreich struggle. Es gibt nichts Schlimmeres, als zu glauben, dass Ihre Daten sicher gespeichert sind, um später einen leeren Tisch zu finden.
Ausführen der ETL -Pipeline
Dies orchestriert den gesamten Extrakt, transformieren und laden Workflow.
def run_etl_pipeline():
print("Beginning ETL Pipeline...")
# Extract
raw_data = extract_data_from_csv('raw_transactions.csv')
# Rework
transformed_data = transform_data(raw_data)
# Load
load_result = load_data_to_sqlite(transformed_data)
print("ETL Pipeline accomplished efficiently!")
return transformed_data
Beachten Sie, wie dies alles zusammenhält. Extrahieren, transformieren, laden, fertig. Sie können dies ausführen und sofort Ihre verarbeiteten Daten sehen.
Sie können den vollständigen Code finden auf Github.
Einpacken
Diese Pipeline übernimmt Roh -Transaktionsdaten und verwandelt sie in etwas, mit dem ein Analyst oder Datenwissenschaftler tatsächlich zusammenarbeiten kann. Sie haben saubere Aufzeichnungen, berechnete Felder und aussagekräftige Segmente.
Jede Funktion macht eine Sache intestine, und Sie können jeden Teil leicht ändern oder erweitern, ohne den Relaxation zu brechen.
Versuchen Sie nun, selbst zu laufen. Versuchen Sie auch, es für einen anderen Anwendungsfall zu ändern. Blissful Coding!
Bala Priya c ist ein Entwickler und technischer Schriftsteller aus Indien. Sie arbeitet gern an der Schnittstelle zwischen Mathematik, Programmierung, Datenwissenschaft und Inhaltserstellung. Ihre Interessensgebiete und Fachgebiete umfassen DevOps, Information Science und natürliche Sprachverarbeitung. Sie liest gerne, schreibt, codieren und Kaffee! Derzeit arbeitet sie daran, ihr Wissen mit der Entwicklergemeinschaft zu lernen und zu teilen, indem sie Tutorials, Anleitungen, Meinungsstücke und vieles mehr autorisiert. Bala erstellt auch ansprechende Ressourcenübersichten und Codierungs -Tutorials.
