
Bild vom Herausgeber
# Einführung
Datenpipelines in Information-Science- und Machine-Studying-Projekten sind eine sehr praktische und vielseitige Möglichkeit, Datenverarbeitungsabläufe zu automatisieren. Aber manchmal kann unser Code der Kernlogik zusätzliche Komplexität verleihen. Python-Dekoratoren können diese gemeinsame Herausforderung meistern. In diesem Artikel werden fünf nützliche und effektive Python-Dekoratoren zum Erstellen und Optimieren leistungsstarker Datenpipelines vorgestellt.
Dieser Präambelcode geht den Codebeispielen voraus, die den fünf Dekoratoren beiliegen, um eine Model des California Housing-Datensatzes zu laden, den ich für Sie in einem öffentlichen GitHub-Repository zur Verfügung gestellt habe:
import pandas as pd
import numpy as np
# Loading the dataset
DATA_URL = "https://uncooked.githubusercontent.com/gakudo-ai/open-datasets/important/housing.csv"
print("Downloading information pipeline supply...")
df_pipeline = pd.read_csv(DATA_URL)
print(f"Loaded {df_pipeline.form(0)} rows and {df_pipeline.form(1)} columns.")
# 1. JIT-Kompilierung
Während Python-Schleifen den zweifelhaften Ruf haben, bemerkenswert langsam zu sein und bei der Durchführung komplexer Operationen wie mathematischen Transformationen in einem Datensatz Engpässe zu verursachen, gibt es eine schnelle Lösung. Es heißt @njitund es ist ein Dekorateur in der Numba Bibliothek, die Python-Funktionen zur Laufzeit in C-ähnlichen, optimierten Maschinencode übersetzt. Bei großen Datensätzen und komplexen Datenpipelines kann dies drastische Beschleunigungen bedeuten.
from numba import njit
import time
# Extracting a numeric column as a NumPy array for quick processing
incomes = df_pipeline('median_income').fillna(0).values
@njit
def compute_complex_metric(income_array):
outcome = np.zeros_like(income_array)
# In pure Python, a loop like this could usually drag
for i in vary(len(income_array)):
outcome(i) = np.log1p(income_array(i) * 2.5) ** 1.5
return outcome
begin = time.time()
df_pipeline('income_metric') = compute_complex_metric(incomes)
print(f"Processed array in {time.time() - begin:.5f} seconds!")
# 2. Zwischen-Caching
Wenn Datenpipelines rechenintensive Aggregationen oder Datenverknüpfungen enthalten, deren Ausführung Minuten bis Stunden dauern kann, reminiscence.cache kann zur Serialisierung von Funktionsausgaben verwendet werden. Im Falle eines Neustarts des Skripts oder einer Wiederherstellung nach einem Absturz kann dieser Dekorator serialisierte Array-Daten von der Festplatte neu laden, wodurch aufwändige Berechnungen übersprungen werden und nicht nur Ressourcen, sondern auch Zeit gespart werden.
from joblib import Reminiscence
import time
# Creating an area cache listing for pipeline artifacts
reminiscence = Reminiscence(".pipeline_cache", verbose=0)
@reminiscence.cache
def expensive_aggregation(df):
print("Working heavy grouping operation...")
time.sleep(1.5) # Lengthy-running pipeline step simulation
# Grouping information factors by ocean_proximity and calculating attribute-level means
return df.groupby('ocean_proximity', as_index=False).imply(numeric_only=True)
# The primary run executes the code; the second resorts to disk for immediate loading
agg_df = expensive_aggregation(df_pipeline)
agg_df_cached = expensive_aggregation(df_pipeline)
# 3. Schemavalidierung
Pandera ist eine statistische Typisierungsbibliothek (Schemaüberprüfung), die dazu konzipiert ist, die allmähliche, subtile Beschädigung von Analysemodellen wie maschinellen Lernprädiktoren oder Dashboards aufgrund schlechter Datenqualität zu verhindern. Im folgenden Beispiel reicht es aus, es in Kombination mit der Parallelverarbeitung zu verwenden Dask Bibliothek, um zu überprüfen, ob die anfängliche Pipeline dem angegebenen Schema entspricht. Wenn nicht, wird ein Fehler ausgegeben, um potenzielle Probleme frühzeitig zu erkennen.
import pandera as pa
import pandas as pd
import numpy as np
from dask import delayed, compute
# Outline a schema to implement information varieties and legitimate ranges
housing_schema = pa.DataFrameSchema({
"median_income": pa.Column(float, pa.Test.greater_than(0)),
"total_rooms": pa.Column(float, pa.Test.gt(0)),
"ocean_proximity": pa.Column(str, pa.Test.isin(('NEAR BAY', '<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'ISLAND')))
})
@delayed
@pa.check_types
def validate_and_process(df: pa.typing.DataFrame) -> pa.typing.DataFrame:
"""
Validates the dataframe chunk in opposition to the outlined schema.
If the information is corrupt, Pandera raises a SchemaError.
"""
return housing_schema.validate(df)
# Splitting the pipeline information into 4 chunks for parallel validation
chunks = np.array_split(df_pipeline, 4)
lazy_validations = (validate_and_process(chunk) for chunk in chunks)
print("Beginning parallel schema validation...")
attempt:
# Triggering the Dask graph to validate chunks in parallel
validated_chunks = compute(*lazy_validations)
df_parallel = pd.concat(validated_chunks)
print(f"Validation profitable. Processed {len(df_parallel)} rows.")
besides pa.errors.SchemaError as e:
print(f"Information Integrity Error: {e}")
# 4. Faule Parallelisierung
Durch die sequentielle Ausführung unabhängiger Pipeline-Schritte werden Verarbeitungseinheiten wie CPUs möglicherweise nicht optimum genutzt. Der @delayed Der Decorator erstellt zusätzlich zu solchen Transformationsfunktionen einen Abhängigkeitsgraphen, um die Aufgaben später auf optimierte Weise parallel auszuführen, was zur Verkürzung der Gesamtlaufzeit beiträgt.
from dask import delayed, compute
@delayed
def process_chunk(df_chunk):
# Simulating an remoted transformation job
df_chunk_copy = df_chunk.copy()
df_chunk_copy('value_per_room') = df_chunk_copy('median_house_value') / df_chunk_copy('total_rooms')
return df_chunk_copy
# Splitting the dataset into 4 chunks processed in parallel
chunks = np.array_split(df_pipeline, 4)
# Lazy computation graph (the way in which Dask works!)
lazy_results = (process_chunk(chunk) for chunk in chunks)
# Set off execution throughout a number of CPUs concurrently
processed_chunks = compute(*lazy_results)
df_parallel = pd.concat(processed_chunks)
print(f"Parallelized output form: {df_parallel.form}")
# 5. Speicherprofilierung
Der @profile Decorator soll dabei helfen, stille Speicherlecks zu erkennen – die manchmal zum Absturz von Servern führen können, wenn die zu verarbeitenden Dateien sehr groß sind. Das Muster besteht darin, die verpackte Funktion Schritt für Schritt zu überwachen und bei jedem einzelnen Schritt die Höhe des RAM-Verbrauchs oder des freigegebenen Speichers zu beobachten. Letztendlich ist dies eine großartige Möglichkeit, Ineffizienzen im Code leicht zu erkennen und die Speichernutzung mit einer klaren Richtung vor Augen zu optimieren.
from memory_profiler import profile
# A adorned operate that prints a line-by-line reminiscence breakdown to the console
@profile(precision=2)
def memory_intensive_step(df):
print("Working reminiscence diagnostics...")
# Creation of a large short-term copy to trigger an intentional reminiscence spike
df_temp = df.copy()
df_temp('new_col') = df_temp('total_bedrooms') * 100
# Dropping the short-term dataframe frees up the RAM
del df_temp
return df.dropna(subset=('total_bedrooms'))
# Working the pipeline step: it's possible you'll observe the reminiscence report in your terminal
final_df = memory_intensive_step(df_pipeline)
# Zusammenfassung
In diesem Artikel werden fünf nützliche und leistungsstarke Python-Dekoratoren zur Optimierung rechenintensiver Datenpipelines vorgestellt. Unterstützt durch paralleles Rechnen und effiziente Verarbeitungsbibliotheken wie Dask und Numba können diese Dekoratoren nicht nur umfangreiche Datentransformationsprozesse beschleunigen, sondern sie auch widerstandsfähiger gegen Fehler und Ausfälle machen.
Iván Palomares Carrascosa ist ein führender Autor, Redner und Berater in den Bereichen KI, maschinelles Lernen, Deep Studying und LLMs. Er schult und leitet andere darin, KI in der realen Welt zu nutzen.
