

Bild von Editor | Chatgpt
# Einführung
Daten Engineering umfasst die Verarbeitung großer Datensätze, das Erstellen von ETL -Pipelines und die Aufrechterhaltung der Datenqualität. Dateningenieure arbeiten mit Streaming -Daten, überwachen die Systemleistung, verarbeiten Schemaänderungen und gewährleisten die Datenkonsistenz über verteilte Systeme.
Python-Einzeiler können dazu beitragen, diese Aufgaben zu vereinfachen, indem komplexe Operationen in einzelnen, lesbaren Aussagen kondensiert werden. Dieser Artikel konzentriert sich auf praktische Einzeiler, die allgemeine Daten technische Probleme lösen.
Die hier vorgestellten Einzeiler behandeln reale Aufgaben wie Verarbeitung von Ereignisdaten mit unterschiedlichen Strukturen, die Analyse von Systemprotokollen auf Leistungsprobleme, die Bearbeitung von API-Antworten mit unterschiedlichen Schemas und die Implementierung von Datenqualitätsprüfungen. Fangen wir an.
🔗 Hyperlink zum Code auf GitHub
# Beispieldaten
Lassen Sie uns einige Beispieldaten aufspannen, um unsere Einzeiler auszuführen:
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
# Create streaming occasion information
np.random.seed(42)
occasions = ()
for i in vary(1000):
properties = {
'device_type': np.random.selection(('cell', 'desktop', 'pill')),
'page_path': np.random.selection(('/residence', '/merchandise', '/checkout')),
'session_length': np.random.randint(60, 3600)
}
if np.random.random() > 0.7:
properties('purchase_value') = spherical(np.random.uniform(20, 300), 2)
occasion = {
'event_id': f'evt_{i}',
'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
'user_id': f'user_{np.random.randint(100, 999)}',
'event_type': np.random.selection(('view', 'click on', 'buy')),
'metadata': json.dumps(properties)
}
occasions.append(occasion)
# Create database efficiency logs
db_logs = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', durations=5000, freq='1min'),
'operation': np.random.selection(('SELECT', 'INSERT', 'UPDATE'), 5000, p=(0.7, 0.2, 0.1)),
'duration_ms': np.random.lognormal(imply=4, sigma=1, measurement=5000),
'table_name': np.random.selection(('customers', 'orders', 'merchandise'), 5000),
'rows_processed': np.random.poisson(lam=25, measurement=5000),
'connection_id': np.random.randint(1, 20, 5000)
})
# Create API log information
api_logs = ()
for i in vary(800):
log_entry = {
'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
'endpoint': np.random.selection(('/api/customers', '/api/orders', '/api/metrics')),
'status_code': np.random.selection((200, 400, 500), p=(0.8, 0.15, 0.05)),
'response_time': np.random.exponential(150)
}
if log_entry('status_code') == 200:
log_entry('payload_size') = np.random.randint(100, 5000)
api_logs.append(log_entry)
# 1. Extrahieren von JSON -Feldern in DataFrame -Spalten
Konvertieren Sie die JSON -Metadatenfelder aus Ereignisprotokollen zur Analyse in separate Datenfream -Spalten.
events_df = pd.DataFrame(({**occasion, **json.hundreds(occasion('metadata'))} for occasion in occasions)).drop('metadata', axis=1)
Dieser One-Liner verwendet das Listenverständnis mit dem Auspacken von Wörterbuch, um die Basisfelder jedes Ereignisses mit seinen analysierten JSON-Metadaten zusammenzuführen. Der drop() Entfernt das Unique metadata Spalte Da der Inhalt jetzt in separaten Spalten befindet.
Ausgabe:

Dies erstellt einen Datenrahmen mit 1000 Zeilen und 8 Spalten, in denen JSON -Felder mögen device_type Und purchase_value Werden Sie einzelne Spalten, die direkt ausfragt und aggregiert werden können.
# 2. Identifizieren der Leistungsausreißer nach Betriebstyp
Finden Sie Datenbankvorgänge, die im Vergleich zu ähnlichen Vorgängen ungewöhnlich lange dauern.
outliers = db_logs.groupby('operation').apply(lambda x: x(x('duration_ms') > x('duration_ms').quantile(0.95))).reset_index(drop=True)
Diese Gruppendatenbankprotokolle nach Betriebstyp und filtert dann jede Gruppe für Datensätze, die die 95. Perzentildauer überschreiten.
Verkürzte Ausgabe:

Dies kehrt ungefähr 250 Ausreißerbetriebe zurück (5% von 5000 insgesamt), wobei jeder Vorgang signifikant langsamer als 95% der ähnlichen Vorgänge erzielte.
# 3. Berechnen der durchschnittlichen Reaktionszeiten für API -Endpunkte
Überwachen Sie die Leistungstrends im Laufe der Zeit für verschiedene API -Endpunkte mithilfe von Schiebern.
api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')('response_time').rolling('1H').imply().reset_index()
Dadurch werden die API -Protokolle in einen Datenrahmen umgewandelt, der setzt timestamp als Index für zeitbasierte Operationen und sortiert chronologisch, um eine monotonische Reihenfolge sicherzustellen. Es gruppiert dann zu endpoint und wendet ein rollendes 1-stündiges Fenster auf die Antwortzeiten an.
Innerhalb jedes Gleitfensters die imply() Funktion berechnet die durchschnittliche Antwortzeit. Das rollende Fenster bewegt sich über die Zeit und bietet eher Leistungstrendanalysen als isolierte Messungen.
Verkürzte Ausgabe:

Wir erhalten die Reaktionszeittrends, die zeigen, wie sich die Leistung jedes API -Endpunkts im Laufe der Zeit mit Werten in Millisekunden ändert. Höhere Werte zeigen eine langsamere Leistung an.
# 4. Erkennen von Schemaänderungen in Ereignisdaten
Identifizieren Sie, wann neue Felder in Ereignismetadaten erscheinen, die in früheren Ereignissen nicht vorhanden waren.
schema_evolution = pd.DataFrame(({ok: kind(v).__name__ for ok, v in json.hundreds(occasion('metadata')).gadgets()} for occasion in occasions)).fillna('lacking').nunique()
Dies analysiert die JSON -Metadaten aus jedem Ereignis und erstellt einen Wörterbuch -Mapping -Feldnamen für ihre Python -Typamen kind(v).__name__.
Der resultierende DataFrame verfügt über eine Zeile professional Ereignis und eine Spalte professional eindeutiges Feld für alle Ereignisse. Der fillna('lacking') erledigt Ereignisse, die keine bestimmten Felder haben, und nunique() zählt wie viele verschiedene Werte (einschließlich lacking) erscheinen in jeder Spalte.
Ausgabe:
device_type 1
page_path 1
session_length 1
purchase_value 2
dtype: int64
# 5. Aggregation von mehrstufigen Datenbankverbindungsleistung
Erstellen Sie zusammenfassende Statistiken, die nach Betriebstyp und Verbindung für die Ressourcenüberwachung gruppiert sind.
connection_perf = db_logs.groupby(('operation', 'connection_id')).agg({'duration_ms': ('imply', 'rely'), 'rows_processed': ('sum', 'imply')}).spherical(2)
Diese Gruppendatenbankprotokolle nach Betriebstyp und Verbindungs -ID gleichzeitig, wodurch eine hierarchische Analyse der Umgang mit verschiedenen Verbindungen erstellt wird.
Der agg() Die Funktion wendet mehrere Aggregationsfunktionen an: imply Und rely Damit die Dauer sowohl durchschnittliche Leistung als auch Abfragefrequenz zeigen kann, während sum Und imply für rows_processed Durchschnittsmuster zeigen. Der spherical(2) sorgt für eine lesbare Dezimalpräzision.
Ausgabe:

Dadurch wird ein multi-indexed-Datenrahmen erstellt, das zeigt, wie jede Verbindung unterschiedliche Vorgänge ausführt.
# 6. Erstellen stündlicher Ereignistypverteilungsmuster
Berechnen Sie die Verteilungsmuster des Ereignisarts über verschiedene Stunden hinweg, um die Benutzerverhaltenszyklen zu verstehen.
hourly_patterns = pd.DataFrame(occasions).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby(('hour', 'event_type')).measurement().unstack(fill_value=0).div(pd.DataFrame(occasions).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby('hour').measurement(), axis=0).spherical(3)
Dies extrahiert Stunde aus Zeitstempeln mit assign() und ein Lambda, dann erstellt dann eine Kreuzung von Stunden und Ereignisarten mithilfe von Tätigkeiten groupby Und unstack.
Der div() Der Betrieb normalisiert sich durch Gesamtereignisse professional Stunde, um eine proportionale Verteilung und nicht um Rohzahlen zu zeigen.
Verkürzte Ausgabe:

Gibt eine Matrix zurück, die den Anteil jedes Ereignisarts zeigt (viewAnwesend click onAnwesend buy) für jede Stunde des Tages, wobei Benutzerverhaltensmuster und Spitzenaktivitätsperioden für unterschiedliche Aktionen aufgedeckt werden.
# 7. Berechnung der API -Fehlerrate Zusammenfassung nach Statuscode
Überwachen Sie die API -Gesundheit, indem Sie Fehlerverteilungsmuster über alle Endpunkte analysieren.
error_breakdown = pd.DataFrame(api_logs).groupby(('endpoint', 'status_code')).measurement().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').measurement(), axis=0).spherical(3)
Diese gruppiert API -Protokolle von beiden endpoint Und status_codeverwendet dann measurement() Ereignisse zählen und unstack() Statuscodes in Spalten zu drehen. Der div() Der Betrieb normalisiert sich durch Gesamtanforderungen professional Endpunkt, um Anteile anstelle von Rohzahlen anzuzeigen, wodurch die Endpunkte die höchsten Fehlerraten und welche Arten von Fehlern sie erzeugen.
Ausgabe:
status_code 200 400 500
endpoint
/api/metrics 0.789 0.151 0.060
/api/orders 0.827 0.140 0.033
/api/customers 0.772 0.167 0.061
Erstellt eine Matrix, die den Anteil jedes Statuscode (200, 400, 500) für jeden Endpunkt zeigt, sodass problematische Endpunkte erkennen und ob sie bei Clientfehlern (4xx) oder Serverfehler (5xx) fehlschlagen.
# 8. Implementierung der Erkennung von Schiebfensteranomalie
Erkennen Sie ungewöhnliche Muster, indem Sie die aktuelle Leistung mit der jüngsten historischen Leistung vergleichen.
anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x('duration_ms').rolling(window=100, min_periods=10).imply()).assign(is_anomaly=lambda x: x('duration_ms') > 2 * x('rolling_mean'))
Diese Sorten loget sich chronologisch an, berechnet einen rollenden Mittelwert der letzten 100 Operationen mithilfe rolling()Anschließend markiert Operationen, bei denen die aktuelle Dauer doppelt so hoch ist wie der Rolling -Durchschnitt. Der min_periods=10 Stellt sicher, dass die Berechnungen erst beginnen, nachdem ausreichende Daten verfügbar sind.
Verkürzte Ausgabe:

Fügt jedem Datenbankvorgang Anomalie -Flags hinzu und identifiziert Operationen, die im Vergleich zur jüngsten Leistung ungewöhnlich langsam sind, anstatt statische Schwellenwerte zu verwenden.
# 9. Optimieren speichereffizienter Datentypen
Optimieren Sie automatisch die Verwendungsverbrauch von DataFrame -Speicher, indem Sie numerische Typen auf die kleinstmöglichen Darstellungen heruntergefahren werden.
optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs(c), downcast="integer") if pd.api.varieties.is_integer_dtype(db_logs(c)) else pd.to_numeric(db_logs(c), downcast="float")) for c in db_logs.select_dtypes(embody=('int', 'float')).columns})
Dies wählt nur numerische Spalten aus und ersetzt sie im Unique db_logs mit heruntergekommenen Versionen verwenden pd.to_numeric(). Für Ganzzahlspalten versucht es int8Anwesend int16Und int32 Vor dem Aufenthalt bei int64. Für Float -Spalten versucht es float32 vor float64.
Dies reduziert den Speicherverbrauch für große Datensätze.
# 10. Berechnung stündlicher Ereignisverarbeitungsmetriken
Überwachen Sie Streaming Pipeline Well being, indem Sie das Ereignisvolumen und die Benutzer -Engagement -Muster verfolgen.
pipeline_metrics = pd.DataFrame(occasions).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby('hour').agg({'event_id': 'rely', 'user_id': 'nunique', 'event_type': lambda x: (x == 'buy').imply()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).spherical(3)
Dies extrahiert die Stunde aus Zeitstempeln und Gruppen Ereignisse zu Stunde und berechnet dann drei wichtige Metriken: Gesamtveranstaltung der Ereignisanzeige mithilfe rely()einzigartige Benutzer verwenden nunique()und Kaufumrechnungsrate unter Verwendung einer Lambda, die den Anteil der Kaufveranstaltungen berechnet. Der rename() Die Methode bietet beschreibende Spaltennamen für die endgültige Ausgabe.
Ausgabe:

Dies zeigt stündliche Kennzahlen, die den gesamten Tag des Ereignisvolumens, des Nutzungsbetriebs und der Conversion -Raten im Laufe des Tages angeben.
# Einpacken
Diese Einzeiler sind nützlich für Daten technische Aufgaben. Sie kombinieren PANDAS-Operationen, statistische Analysen und Datentransformationstechniken, um reale Szenarien effizient zu bewältigen.
Jedes Muster kann anhand spezifischer Anforderungen angepasst und erweitert werden und gleichzeitig die Kernlogik beibehalten, die sie für die Produktionsanwendung wirksam macht.
Completely happy Coding!
Bala Priya c ist ein Entwickler und technischer Schriftsteller aus Indien. Sie arbeitet gern an der Schnittstelle zwischen Mathematik, Programmierung, Datenwissenschaft und Inhaltserstellung. Ihre Interessensgebiete und Fachgebiete umfassen DevOps, Knowledge Science und natürliche Sprachverarbeitung. Sie liest gerne, schreibt, codieren und Kaffee! Derzeit arbeitet sie daran, ihr Wissen mit der Entwicklergemeinschaft zu lernen und zu teilen, indem sie Tutorials, Anleitungen, Meinungsstücke und vieles mehr autorisiert. Bala erstellt auch ansprechende Ressourcenübersichten und Codierungs -Tutorials.
