Nützliche Python-Einzeiler für DatentechnikNützliche Python-Einzeiler für Datentechnik
Bild von Editor | Chatgpt

# Einführung

Daten Engineering umfasst die Verarbeitung großer Datensätze, das Erstellen von ETL -Pipelines und die Aufrechterhaltung der Datenqualität. Dateningenieure arbeiten mit Streaming -Daten, überwachen die Systemleistung, verarbeiten Schemaänderungen und gewährleisten die Datenkonsistenz über verteilte Systeme.

Python-Einzeiler können dazu beitragen, diese Aufgaben zu vereinfachen, indem komplexe Operationen in einzelnen, lesbaren Aussagen kondensiert werden. Dieser Artikel konzentriert sich auf praktische Einzeiler, die allgemeine Daten technische Probleme lösen.

Die hier vorgestellten Einzeiler behandeln reale Aufgaben wie Verarbeitung von Ereignisdaten mit unterschiedlichen Strukturen, die Analyse von Systemprotokollen auf Leistungsprobleme, die Bearbeitung von API-Antworten mit unterschiedlichen Schemas und die Implementierung von Datenqualitätsprüfungen. Fangen wir an.

🔗 Hyperlink zum Code auf GitHub

# Beispieldaten

Lassen Sie uns einige Beispieldaten aufspannen, um unsere Einzeiler auszuführen:

import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

# Create streaming occasion information
np.random.seed(42)
occasions = ()
for i in vary(1000):
    properties = {
        'device_type': np.random.selection(('cell', 'desktop', 'pill')),
        'page_path': np.random.selection(('/residence', '/merchandise', '/checkout')),
        'session_length': np.random.randint(60, 3600)
    }
    if np.random.random() > 0.7:
        properties('purchase_value') = spherical(np.random.uniform(20, 300), 2)

    occasion = {
        'event_id': f'evt_{i}',
        'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
        'user_id': f'user_{np.random.randint(100, 999)}',
        'event_type': np.random.selection(('view', 'click on', 'buy')),
        'metadata': json.dumps(properties)
    }
    occasions.append(occasion)

# Create database efficiency logs
db_logs = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', durations=5000, freq='1min'),
    'operation': np.random.selection(('SELECT', 'INSERT', 'UPDATE'), 5000, p=(0.7, 0.2, 0.1)),
    'duration_ms': np.random.lognormal(imply=4, sigma=1, measurement=5000),
    'table_name': np.random.selection(('customers', 'orders', 'merchandise'), 5000),
    'rows_processed': np.random.poisson(lam=25, measurement=5000),
    'connection_id': np.random.randint(1, 20, 5000)
})

# Create API log information
api_logs = ()
for i in vary(800):
    log_entry = {
        'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
        'endpoint': np.random.selection(('/api/customers', '/api/orders', '/api/metrics')),
        'status_code': np.random.selection((200, 400, 500), p=(0.8, 0.15, 0.05)),
        'response_time': np.random.exponential(150)
    }
    if log_entry('status_code') == 200:
        log_entry('payload_size') = np.random.randint(100, 5000)
    api_logs.append(log_entry)

# 1. Extrahieren von JSON -Feldern in DataFrame -Spalten

Konvertieren Sie die JSON -Metadatenfelder aus Ereignisprotokollen zur Analyse in separate Datenfream -Spalten.

events_df = pd.DataFrame(({**occasion, **json.hundreds(occasion('metadata'))} for occasion in occasions)).drop('metadata', axis=1)

Dieser One-Liner verwendet das Listenverständnis mit dem Auspacken von Wörterbuch, um die Basisfelder jedes Ereignisses mit seinen analysierten JSON-Metadaten zusammenzuführen. Der drop() Entfernt das Unique metadata Spalte Da der Inhalt jetzt in separaten Spalten befindet.

Ausgabe:

Extract-Json-2-ColsExtract-Json-2-Cols

Dies erstellt einen Datenrahmen mit 1000 Zeilen und 8 Spalten, in denen JSON -Felder mögen device_type Und purchase_value Werden Sie einzelne Spalten, die direkt ausfragt und aggregiert werden können.

# 2. Identifizieren der Leistungsausreißer nach Betriebstyp

Finden Sie Datenbankvorgänge, die im Vergleich zu ähnlichen Vorgängen ungewöhnlich lange dauern.

outliers = db_logs.groupby('operation').apply(lambda x: x(x('duration_ms') > x('duration_ms').quantile(0.95))).reset_index(drop=True)

Diese Gruppendatenbankprotokolle nach Betriebstyp und filtert dann jede Gruppe für Datensätze, die die 95. Perzentildauer überschreiten.

Verkürzte Ausgabe:

AusreißerAusreißer

Dies kehrt ungefähr 250 Ausreißerbetriebe zurück (5% von 5000 insgesamt), wobei jeder Vorgang signifikant langsamer als 95% der ähnlichen Vorgänge erzielte.

# 3. Berechnen der durchschnittlichen Reaktionszeiten für API -Endpunkte

Überwachen Sie die Leistungstrends im Laufe der Zeit für verschiedene API -Endpunkte mithilfe von Schiebern.

api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')('response_time').rolling('1H').imply().reset_index()

Dadurch werden die API -Protokolle in einen Datenrahmen umgewandelt, der setzt timestamp als Index für zeitbasierte Operationen und sortiert chronologisch, um eine monotonische Reihenfolge sicherzustellen. Es gruppiert dann zu endpoint und wendet ein rollendes 1-stündiges Fenster auf die Antwortzeiten an.

Innerhalb jedes Gleitfensters die imply() Funktion berechnet die durchschnittliche Antwortzeit. Das rollende Fenster bewegt sich über die Zeit und bietet eher Leistungstrendanalysen als isolierte Messungen.

Verkürzte Ausgabe:

Rolling-AvgRolling-Avg

Wir erhalten die Reaktionszeittrends, die zeigen, wie sich die Leistung jedes API -Endpunkts im Laufe der Zeit mit Werten in Millisekunden ändert. Höhere Werte zeigen eine langsamere Leistung an.

# 4. Erkennen von Schemaänderungen in Ereignisdaten

Identifizieren Sie, wann neue Felder in Ereignismetadaten erscheinen, die in früheren Ereignissen nicht vorhanden waren.

schema_evolution = pd.DataFrame(({ok: kind(v).__name__ for ok, v in json.hundreds(occasion('metadata')).gadgets()} for occasion in occasions)).fillna('lacking').nunique()

Dies analysiert die JSON -Metadaten aus jedem Ereignis und erstellt einen Wörterbuch -Mapping -Feldnamen für ihre Python -Typamen kind(v).__name__.

Der resultierende DataFrame verfügt über eine Zeile professional Ereignis und eine Spalte professional eindeutiges Feld für alle Ereignisse. Der fillna('lacking') erledigt Ereignisse, die keine bestimmten Felder haben, und nunique() zählt wie viele verschiedene Werte (einschließlich lacking) erscheinen in jeder Spalte.

Ausgabe:

device_type       1
page_path         1
session_length    1
purchase_value    2
dtype: int64

# 5. Aggregation von mehrstufigen Datenbankverbindungsleistung

Erstellen Sie zusammenfassende Statistiken, die nach Betriebstyp und Verbindung für die Ressourcenüberwachung gruppiert sind.

connection_perf = db_logs.groupby(('operation', 'connection_id')).agg({'duration_ms': ('imply', 'rely'), 'rows_processed': ('sum', 'imply')}).spherical(2)

Diese Gruppendatenbankprotokolle nach Betriebstyp und Verbindungs ​​-ID gleichzeitig, wodurch eine hierarchische Analyse der Umgang mit verschiedenen Verbindungen erstellt wird.

Der agg() Die Funktion wendet mehrere Aggregationsfunktionen an: imply Und rely Damit die Dauer sowohl durchschnittliche Leistung als auch Abfragefrequenz zeigen kann, während sum Und imply für rows_processed Durchschnittsmuster zeigen. Der spherical(2) sorgt für eine lesbare Dezimalpräzision.

Ausgabe:

AggregatAggregat

Dadurch wird ein multi-indexed-Datenrahmen erstellt, das zeigt, wie jede Verbindung unterschiedliche Vorgänge ausführt.

# 6. Erstellen stündlicher Ereignistypverteilungsmuster

Berechnen Sie die Verteilungsmuster des Ereignisarts über verschiedene Stunden hinweg, um die Benutzerverhaltenszyklen zu verstehen.

hourly_patterns = pd.DataFrame(occasions).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby(('hour', 'event_type')).measurement().unstack(fill_value=0).div(pd.DataFrame(occasions).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby('hour').measurement(), axis=0).spherical(3)

Dies extrahiert Stunde aus Zeitstempeln mit assign() und ein Lambda, dann erstellt dann eine Kreuzung von Stunden und Ereignisarten mithilfe von Tätigkeiten groupby Und unstack.

Der div() Der Betrieb normalisiert sich durch Gesamtereignisse professional Stunde, um eine proportionale Verteilung und nicht um Rohzahlen zu zeigen.

Verkürzte Ausgabe:

stündlichstündlich

Gibt eine Matrix zurück, die den Anteil jedes Ereignisarts zeigt (viewAnwesend click onAnwesend buy) für jede Stunde des Tages, wobei Benutzerverhaltensmuster und Spitzenaktivitätsperioden für unterschiedliche Aktionen aufgedeckt werden.

# 7. Berechnung der API -Fehlerrate Zusammenfassung nach Statuscode

Überwachen Sie die API -Gesundheit, indem Sie Fehlerverteilungsmuster über alle Endpunkte analysieren.

error_breakdown = pd.DataFrame(api_logs).groupby(('endpoint', 'status_code')).measurement().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').measurement(), axis=0).spherical(3)

Diese gruppiert API -Protokolle von beiden endpoint Und status_codeverwendet dann measurement() Ereignisse zählen und unstack() Statuscodes in Spalten zu drehen. Der div() Der Betrieb normalisiert sich durch Gesamtanforderungen professional Endpunkt, um Anteile anstelle von Rohzahlen anzuzeigen, wodurch die Endpunkte die höchsten Fehlerraten und welche Arten von Fehlern sie erzeugen.

Ausgabe:

status_code     200    400    500
endpoint                         
/api/metrics  0.789  0.151  0.060
/api/orders   0.827  0.140  0.033
/api/customers    0.772  0.167  0.061

Erstellt eine Matrix, die den Anteil jedes Statuscode (200, 400, 500) für jeden Endpunkt zeigt, sodass problematische Endpunkte erkennen und ob sie bei Clientfehlern (4xx) oder Serverfehler (5xx) fehlschlagen.

# 8. Implementierung der Erkennung von Schiebfensteranomalie

Erkennen Sie ungewöhnliche Muster, indem Sie die aktuelle Leistung mit der jüngsten historischen Leistung vergleichen.

anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x('duration_ms').rolling(window=100, min_periods=10).imply()).assign(is_anomaly=lambda x: x('duration_ms') > 2 * x('rolling_mean'))

Diese Sorten loget sich chronologisch an, berechnet einen rollenden Mittelwert der letzten 100 Operationen mithilfe rolling()Anschließend markiert Operationen, bei denen die aktuelle Dauer doppelt so hoch ist wie der Rolling -Durchschnitt. Der min_periods=10 Stellt sicher, dass die Berechnungen erst beginnen, nachdem ausreichende Daten verfügbar sind.

Verkürzte Ausgabe:

Sliding-Win-opSliding-Win-op

Fügt jedem Datenbankvorgang Anomalie -Flags hinzu und identifiziert Operationen, die im Vergleich zur jüngsten Leistung ungewöhnlich langsam sind, anstatt statische Schwellenwerte zu verwenden.

# 9. Optimieren speichereffizienter Datentypen

Optimieren Sie automatisch die Verwendungsverbrauch von DataFrame -Speicher, indem Sie numerische Typen auf die kleinstmöglichen Darstellungen heruntergefahren werden.

optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs(c), downcast="integer") if pd.api.varieties.is_integer_dtype(db_logs(c)) else pd.to_numeric(db_logs(c), downcast="float")) for c in db_logs.select_dtypes(embody=('int', 'float')).columns})

Dies wählt nur numerische Spalten aus und ersetzt sie im Unique db_logs mit heruntergekommenen Versionen verwenden pd.to_numeric(). Für Ganzzahlspalten versucht es int8Anwesend int16Und int32 Vor dem Aufenthalt bei int64. Für Float -Spalten versucht es float32 vor float64.

Dies reduziert den Speicherverbrauch für große Datensätze.

# 10. Berechnung stündlicher Ereignisverarbeitungsmetriken

Überwachen Sie Streaming Pipeline Well being, indem Sie das Ereignisvolumen und die Benutzer -Engagement -Muster verfolgen.

pipeline_metrics = pd.DataFrame(occasions).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby('hour').agg({'event_id': 'rely', 'user_id': 'nunique', 'event_type': lambda x: (x == 'buy').imply()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).spherical(3)

Dies extrahiert die Stunde aus Zeitstempeln und Gruppen Ereignisse zu Stunde und berechnet dann drei wichtige Metriken: Gesamtveranstaltung der Ereignisanzeige mithilfe rely()einzigartige Benutzer verwenden nunique()und Kaufumrechnungsrate unter Verwendung einer Lambda, die den Anteil der Kaufveranstaltungen berechnet. Der rename() Die Methode bietet beschreibende Spaltennamen für die endgültige Ausgabe.

Ausgabe:

EreignisverfahrenEreignisverfahren

Dies zeigt stündliche Kennzahlen, die den gesamten Tag des Ereignisvolumens, des Nutzungsbetriebs und der Conversion -Raten im Laufe des Tages angeben.

# Einpacken

Diese Einzeiler sind nützlich für Daten technische Aufgaben. Sie kombinieren PANDAS-Operationen, statistische Analysen und Datentransformationstechniken, um reale Szenarien effizient zu bewältigen.

Jedes Muster kann anhand spezifischer Anforderungen angepasst und erweitert werden und gleichzeitig die Kernlogik beibehalten, die sie für die Produktionsanwendung wirksam macht.

Completely happy Coding!

Bala Priya c ist ein Entwickler und technischer Schriftsteller aus Indien. Sie arbeitet gern an der Schnittstelle zwischen Mathematik, Programmierung, Datenwissenschaft und Inhaltserstellung. Ihre Interessensgebiete und Fachgebiete umfassen DevOps, Knowledge Science und natürliche Sprachverarbeitung. Sie liest gerne, schreibt, codieren und Kaffee! Derzeit arbeitet sie daran, ihr Wissen mit der Entwicklergemeinschaft zu lernen und zu teilen, indem sie Tutorials, Anleitungen, Meinungsstücke und vieles mehr autorisiert. Bala erstellt auch ansprechende Ressourcenübersichten und Codierungs -Tutorials.



Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert