Validierung Ihrer Datenrahmen für Produktions-ML-Pipelines

Bild erzeugt mit Nachtcafé.

Die Datenvalidierung ist ein entscheidender Schritt für Produktionsanwendungen. Sie müssen sicherstellen, dass die Daten, die Sie aufnehmen, mit Ihrer Pipeline kompatibel sind und keine unerwarteten Werte vorhanden sind. Darüber hinaus ist die Validierung der Daten eine Sicherheitsmaßnahme, die verhindert, dass beschädigte oder ungenaue Informationen weiterverarbeitet werden, was bei den ersten Schritten ein Warnsignal darstellt.

Python verfügt bereits über ein großartiges OS-Projekt für diese Aufgabe Pydantisch. Beim Umgang mit großen datenrahmenähnlichen Objekten, beispielsweise beim maschinellen Lernen, Pandera ist eine viel schnellere und skalierbare Methode zur Datenvalidierung (siehe dieser Artikel mit öffentlichen Notizbüchern).

Leistungsvergleich zwischen pandera und zeilenweiser Validierung mit Pydantic für pandas.DataFrame-Objekte unterschiedlicher Größe. Bild von Quelle.

Darüber hinaus bietet Pandera Unterstützung für eine Vielzahl von Datenrahmenbibliotheken wie pandas, polars, dask, modinUnd pyspark.pandas. Weitere Informationen hierzu finden Sie unter Panderas Dokumente📄.

Haftungsausschluss. Pandera ist ein Open-Supply-Projekt, das unter der MIT-Lizenz lizenziert ist. Ich habe keine Verbindung zum Pandera-Workforce oder Union.ai. Dieser Beitrag hat kein kommerzielles Interesse.

Pandera bietet zwei Möglichkeiten, Validatoren zu definieren: Schemata Und Modelle. Ich werde mich auf das zweite konzentrieren, da es Ähnlichkeiten mit Pydantic-Modellen aufweist und der Code sauber ist.

Um ein Pandera-Modell zu definieren, erstellen Sie eine untergeordnete Klasse, die von DataframeModel erbt, und beginnen Sie mit der Deklaration der Spalten und dtypes, die der Datenrahmen haben muss:

import pandera as pa

class UserModel(pa.DataFrameModel):
id: int
username: str
e-mail: str
is_active: bool
membership: str
creation_date: pd.DatetimeTZDtype

# Use
df = pd.DataFrame(...)
UserModel.validate(df) # <- If invalidad raises SchemaError

Beachten Sie, dass ich zum Definieren des Erstellungszeitstempels des Benutzers den nativen Datumstyp von Pandas anstelle anderer ähnlicher Datentypen verwendet habe datetime.datetime. Pandera unterstützt nur die integrierten Datentypen Python, NumPy und Pandas. Sie können auch erstellen benutzerdefinierte Datentypenaber das ist ein fortgeschrittenes Thema und in den meisten Fällen selten notwendig.

Spalteneigenschaften validieren

Mit Pandera können Sie neben dem Datentyp auch andere Spalteneigenschaften validieren:

class UserModel(pa.DataFrameModel):
id: int = pa.Area(distinctive=True, ge=0)
username: str = pa.Area(str_matches=r"^(a-zA-Z0-9_)+$")
e-mail: str = pa.Area(str_matches=r"^(a-zA-Z0-9_.+-)+@(a-zA-Z0-9-)+.(a-zA-Z0-9-.)+$")
is_active: bool
membership: str = pa.Area(isin=("premium", "free"))
creation_date: pd.DatetimeTZDtype = pa.Area(dtype_kwargs={"unit": "ns", "tz": "UTC"})

Hier verwende ich Panderas Feld genau wie Pydantics.

  • Erstens gebe ich an, dass die id Die Spalte darf keine doppelten Werte enthalten und diese müssen größer oder gleich 0 sein.
  • In username Und e-mail Ich überprüfe mithilfe von Regex-Ausdrücken, ob Zeichenfolgen gültig sind. Benutzernamen dürfen nur alphanumerische Zeichen und Unterstriche enthalten, während E-Mails auch Bindestriche und Punkte enthalten können, aber immer dem Muster „smth@smth.smth“ folgen.
  • membership kann nur einen Wert aus der Liste übernehmen. Ein besserer Ansatz besteht darin, eine StrEnum zu verwenden, um die gültigen Werte zu definieren, anstatt sie fest zu codieren.
  • Endlich, creation_date muss in Nanosekundeneinheiten und der UTC-Zeitzone angegeben werden. Diese Zeile kann mit Annotated aus der Typisierungsbibliothek sauberer gestaltet werden creation_date: Annotated(pd.DatetimeTZDtype, "ns", "UTC")

Kasse die Dokumente um alle Feldoptionen zu lesen😋

Benutzerdefinierte Validierungen

Manchmal ist es notwendig, eigene benutzerdefinierte Validierungen hinzuzufügen. Mit Pandera können Sie spritzen Spalten-/Indexprüfungen (benutzerdefinierte Prüfungen einzelner Spalten) und Datenrahmenprüfungen (Prüft zwischen mehreren Spalten).

import pandera as pa
from pandera.typing import Sequence

class UserModel(pa.DataFrameModel):
id: int = pa.Area(distinctive=True, ge=0)
username: str = pa.Area(str_matches=r"^(a-zA-Z0-9_)+$")
e-mail: str = pa.Area(
str_matches=r"^(a-zA-Z0-9_.+-)+@(a-zA-Z0-9-)+.(a-zA-Z0-9-.)+$"
)
is_active: bool
membership: str = pa.Area(isin=("premium", "free"))
creation_date: Annotated(pd.DatetimeTZDtype, "ns", "UTC")

# column/index checks
@pa.verify("username", title="username_length")
def username_length(cls, x: Sequence(str)) -> Sequence(bool):
"""
Test username size is between 1 and 20 characters
"""
return x.str.len().between(1, 20)

@pa.verify("creation_date", title="min_creation_date")
def min_creation_date(cls, x: Sequence(pd.DatetimeTZDtype)) -> Sequence(bool):
"""
Test creation date is after 2000-01-01
"""
return x >= dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)

# dataframe verify
@pa.dataframe_check
def membership_is_valid(
cls, df: pd.DataFrame, title="membership_is_valid"
) -> Sequence(bool):
"""
Test account age without cost memebers is <= 30 days
"""
current_time = dt.datetime.now(dt.timezone.utc)
thirty_days = dt.timedelta(days=30)

return (df("membership") == "premium") | (
(df("membership") == "free")
& ((current_time - df("creation_date")) <= thirty_days)
)

Bedenken Sie, dass Sie mit ganzen Spaltenobjekten arbeiten (Sequence), sodass Vorgänge in Prüfungen für eine bessere Leistung vektorisiert werden sollten.

Andere Konfigurationen

Aliase
Wenn Spaltennamen aufgrund der Sprachsyntax nicht als Python-Variablen deklariert werden können, ermöglicht Pandera das Festlegen eines Alias ​​für den Spaltenvalidator, der mit dem Datenrahmen übereinstimmt.

class MyModel(pa.DataFrameModel):
alias_column: int = pa.Area(..., alias="Alias Column")
...

Streng und Zwang
Wenn die strict Wenn die Possibility auf „true“ gesetzt ist, wird der validierte Datenrahmen gezwungen, nur die im Pandera DataFrameModel definierten Spalten zu enthalten. Andererseits, wenn die coerce Wenn die Possibility aktiviert ist, versucht Pandera, die Spaltendaten so umzuwandeln, dass sie mit dem dtype des Modells übereinstimmen.

class MyModel(pa.DataFrameModel):
...

class Config:
strict = True # defaul: False
coerce = True # default: False

Die Zwangsoption kann auch auf Feldebene eingestellt werden pa.Area(..., coerce=True)

Faule Validierung
Standardmäßig gibt Pandera einen Fehler aus, wenn eine Validierungsprüfung nicht bestanden wird. Dies kann ärgerlich sein, da nur der erste aufgetretene Validierungsfehler angezeigt wird und die Überprüfung der restlichen Daten verhindert wird.

In manchen Fällen ist es besser, den gesamten Datenrahmen in einem Durchgang validieren und alle Fehler sammeln zu lassen, anstatt sie einzeln zu beheben und darauf zu warten, dass die Validierung erneut ausgeführt wird. Das erste ist, was die verzögerte Validierung bewirkt.

df = pd.DataFrame(...)
Mymodel.validate(df, lazy_validation=True)
Bild erzeugt mit Nachtcafé.

Da die meisten ML-Pipelines in Python mit tabellarischen Daten trainiert werden, die in Datenrahmenstrukturen codiert sind, Pandera ist ein großartiges und leistungsstarkes Software zur Validierung ihrer Eingaben und Ausgaben.

# pipeline.py

class MLPipeline:
"""Normal ML Pipeline"""
def __init__(self, model_id: str):
self.model_id = model_id

def load_model(self) -> None:
...

def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
... # <- Potential invalid information error
return df_transform

def predict(self, df: pd.DataFrame) -> pd.DataFrame:
self.load_model()
df_transform = self.rework(df)
df('rating') = self.mannequin.predict(df_transform) # <- Potential invalid information error
return df

Wir möchten vermeiden, dass das Modell aufgrund ungültiger Daten einen Fehler auslöst. Das würde bedeuten, dass wir die ganze Arbeit, das Modell in den Speicher zu laden und die Rohdaten zu verarbeiten, umsonst erledigt haben, Ressourcen verschwendet haben und verhindert haben, dass die restlichen Datenpunkte ausgewertet werden.

Wenn die Ausgabe des Modells eine falsche Struktur aufweist, schlägt unsere Nachverarbeitungspipeline (Ergebnisse in die Datenbank hochladen, Ergebnisse über die RESTful-API zurückgeben usw.) ebenfalls fehl.

Nachdem wir die Validierungsmodelle mit Pandera definiert haben, können wir sie nutzen Dekorateure für die Pipeline-Integration um eine I/O-Validierung durchzuführen.

# fashions.py
import pandera as pa

class InputModel(pa.DataFrameModel):
...

class PredictorModel(pa.DataFrameModel):
...

# OutputModel inherits all InputModel validation fields
# and likewise consists of the rating
class OutputModel(InputModel):
rating: float = pa.Area(ge=0, le=1) # assuming mannequin returns probab.

# pipeline.py
import pandera as pa
from .fashions import InputModel, PredictorModel, OutputModel

class MLPipeline:
"""Normal ML Pipeline"""
def __init__(self, model_id: str):
self.model_id = model_id

def load_model(self) -> None:
...

@pa.check_io(df=InputModel.to_schema(), out=PredictorModel.to_schema(), lazy=True)
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
...
return df_transform

@pa.check_output(OutputModel.to_schema(), lazy=True)
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
self.load_model()
df_transform = self.rework(df)
df('rating') = self.mannequin.predict(df_transform)
return df

Weil wir ein Zwischendatenrahmenobjekt generieren df_transform In der ML-Pipeline empfiehlt es sich, diese ebenfalls zu validieren, um Fehler zu vermeiden. Der vorhersagen Die Methodeneingabe wird nicht validiert, da sie bereits erfolgt ist transform_data.

Umgang mit ungültigen Zeilen

Wir möchten nicht, dass unsere Pipeline kaputt geht, nur weil einige Datenpunkte falsche Daten enthalten. Im Falle eines Validierungsfehlers sollte die Strategie darin bestehen, die problematischen Datenpunkte beiseite zu legen und die Pipeline mit den restlichen Daten weiter auszuführen. Die Pipeline kann nicht stoppen!🔥

Pandera-Modelle haben die Possibility, alle ungültigen Zeilen automatisch zu entfernen:

class MyModel(pa.DataFrameModel):
...

class Config:
drop_invalid_rows = True

Es kann jedoch gefährlich sein, alle ungültigen Zeilen zu löschen, ohne sie zu protokollieren. Sie müssen wissen, warum diese Datenpunkte ungültig waren, damit Sie später dem Kunden oder dem Dateningenieur mitteilen können, was die Fehlerursache warfare.

Deshalb erstelle ich statt Pandera-Dekoratoren lieber meine eigenen Validierungshilfsfunktionen:

from typing import Tuple
import logging

logging.basicConfig(degree=logging.INFO)
logger = logging.getLogger(__name__)

def log_pandera_errors(exc: pa.errors.SchemaErrors) -> None:
"""
Logs all errors from a SchemaErrors exception.
"""
for err_type, classes in exc.message.objects():
for _, errors in classes.objects():
for err in errors:
logger.error(f"{err_type} ERROR: {err('column')}. {err('error')}")

def handle_invalid(
df: pd.DataFrame, exc: pa.errors.SchemaErrors
) -> Tuple(pd.DataFrame, pd.DataFrame):
"""
Handles invalid information in a DataFrame primarily based on a SchemaErrors exception.
"""
log_pandera_errors(exc)

df_failure = exc.failure_cases

# Test for errors that can't be resolved
# i.e. they don't seem to be related to a particular row index
nan_indices = df_failure("index").isna()
if nan_indices.any():
error_msg = "n".be a part of(
f" - Column: {row('column')}, verify: {row('verify')}, "
f"failure_case: {row('failure_case')}"
for row in df_failure(nan_indices).to_dict("information")
)
increase ValueError(
f"Schema validation failed with no chance of constant:n{error_msg}n"
"The pipeline can not proceed 😢. Resolve earlier than rerunning"
)

invalid_idcs = df.index.isin(df_failure("index").distinctive())
df_invalid = format_invalid_df(df.loc(invalid_idcs, :), exc)
df_valid = df.iloc(~invalid_idcs)

return df_valid, df_invalid

def validate(
df: pd.DataFrame, mannequin: pa.DataFrameModel
) -> Tuple(pd.DataFrame, pd.DataFrame):
"""
Validates a DataFrame towards a DataFrameModel and handles errors.
"""
strive:
return mannequin.validate(df, lazy=True), pd.DataFrame()
besides pa.errors.SchemaErrors as ex:
return handle_invalid(df, ex)

Ausgabe, die einige Fehler erzwingt und die Spalte entfernt id:

# Error output
ERROR:__main__:SCHEMA ERROR: UserModel. column 'id' not in dataframe. Columns in dataframe: ('username', 'e-mail', 'membership', 'is_active', 'creation_date')
ERROR:__main__:DATA ERROR: username. Column 'username' failed element-wise validator quantity 0: str_matches('^(a-zA-Z0-9_)+$') failure circumstances: bpercent09
ERROR:__main__:DATA ERROR: e-mail. Column 'e-mail' failed element-wise validator quantity 0: str_matches('^(a-zA-Z0-9_.+-)+@(a-zA-Z0-9-)+.(a-zA-Z0-9-.)+$') failure circumstances: ef.com
ERROR:__main__:DATA ERROR: UserModel. DataFrameSchema 'UserModel' failed element-wise validator quantity 0: <Test membership_is_valid> failure circumstances: c, ef.com, free, True, 2000-12-31 00:00:00+00:00

ValueError: Schema validation failed with no chance of constant:
- Column: UserModel, verify: column_in_dataframe, failure_case: id
The pipeline can not proceed 😢. Resolve earlier than rerunning

Im Falle eines unlösbaren Fehlers, der eine ganze Spalte betrifft, kann die Pipeline nicht fortgesetzt werden.

Testen

Nicht zuletzt enthalten Pandera-Modelle und -Schemata auch eine Methode zur Generierung von Beispieldaten gemäß ihrer Definition. Sie müssen installieren speculation Bibliothek, um es zu verwenden.

Nachdem ich es jedoch anhand einiger Beispiele getestet habe, empfehle ich es nicht. Sobald Sie mit dem Hinzufügen einiger Einschränkungen beginnen, dauert die Generierung der synthetischen Daten zu lange, und meistens werden sie nicht variiert (die generierten Daten decken nicht den gesamten Restriktionsraum ab und wiederholen sich). Die beste Various, die ich gefunden habe, ist um Datengeneratoren für jedes Modell hinzuzufügen, das Sie testen möchten – schließlich gibt es in einer Pipeline auch nicht so viele Datenrahmen, die validiert werden müssen – .

class UserModel(pa.DataFrameModel):
...

def pattern(dimension: int = 10) -> pd.DataFrame:
"""Added methodology to generate legitimate take a look at information manually"""
current_time = dt.datetime.now(dt.timezone.utc)
return pd.DataFrame(
{
"id": vary(dimension),
"username": (f"user_{i}" for i in vary(dimension)),
"e-mail": (f"user_{i}@instance.com" for i in vary(dimension)),
"is_active": (True) * dimension,
"membership": ("premium") * dimension, # All premium to move checks
"creation_date": (current_time) * dimension,
}
)

Die Datenvalidierung ist für jede Datenverarbeitungspipeline und insbesondere beim maschinellen Lernen von entscheidender Bedeutung. Pandera vereinfacht einen Großteil dieser Arbeit, indem es einen flexiblen und effizienten modellbasierten Ansatz zur Validierung von Daten in Datenrahmen bereitstellt.

Mit Pandera können Sie Modellklassen definieren, die Spaltentypen, Bereiche und sogar komplexe bedingte Einschränkungen erzwingen. Dadurch können Datenqualitätsprobleme frühzeitig in der Pipeline erkannt und sichergestellt werden, dass die Daten den erwarteten Requirements entsprechen, bevor sie die nächsten Schritte erreichen.

Durch die Integration von Pandera in eine ML-Pipeline können Sie robuste Datenprüfungen erstellen, die dazu beitragen, Fehler zu vermeiden und die Zuverlässigkeit der Modellausgaben zu verbessern.

Endgültiges pandera.DataFrameModel, das in den Checks verwendet wurde:

import pandas as pd
import pandera as pa
from pandera.typing import Sequence
from typing import Annotated
import datetime as dt

class UserModel(pa.DataFrameModel):
id: int = pa.Area(distinctive=True, ge=0, coerce=False)
username: str = pa.Area(str_matches=r"^(a-zA-Z0-9_)+$")
e-mail: str = pa.Area(
str_matches=r"^(a-zA-Z0-9_.+-)+@(a-zA-Z0-9-)+.(a-zA-Z0-9-.)+$"
)
is_active: bool
membership: str = pa.Area(isin=("premium", "free"))
creation_date: Annotated(pd.DatetimeTZDtype, "ns", "UTC")

@pa.verify("username", title="username_length")
def username_length(cls, x: Sequence(str)) -> Sequence(bool):
"""
Test username size is between 1 and 20 characters
"""
return x.str.len().between(1, 20)

@pa.verify("creation_date", title="min_creation_date")
def min_creation_date(cls, x: Sequence(pd.DatetimeTZDtype)) -> Sequence(bool):
"""
Test creation date is after 2000-01-01
"""
return x >= dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)

@pa.dataframe_check
def membership_is_valid(
cls, df: pd.DataFrame, title="membership_is_valid"
) -> Sequence(bool):
"""
Test account age without cost memebers is <= 30 days
"""
current_time = dt.datetime.now(dt.timezone.utc)
thirty_days = dt.timedelta(days=30)

return (df("membership") == "premium") | (
(df("membership") == "free")
& ((current_time - df("creation_date")) <= thirty_days)
)

class Config:
strict = True
coerce = True

def pattern(dimension: int = 10) -> pd.DataFrame:
"""Added methodology to generate legitimate take a look at information manually"""
current_time = dt.datetime.now(dt.timezone.utc)
return pd.DataFrame(
{
"id": vary(dimension),
"username": (f"user_{i}" for i in vary(dimension)),
"e-mail": (f"user_{i}@instance.com" for i in vary(dimension)),
"is_active": (True) * dimension,
"membership": ("premium")
* dimension, # All premium to keep away from date restrictions
"creation_date": (current_time) * dimension,
}
)

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert