Arbeiten mit Milliardenzeilen-Datensätzen in Python (mit Vaex)
Bild vom Autor

# Einführung

Der Umgang mit riesigen Datensätzen mit Milliarden von Zeilen ist eine große Herausforderung in der Datenwissenschaft und -analyse. Traditionelle Werkzeuge wie Pandas funktionieren intestine für kleine bis mittlere Datensätze, die in den Systemspeicher passen. Mit zunehmender Größe der Datensätze werden sie jedoch langsam, verbrauchen viel Arbeitsspeicher (RAM) für ihre Funktion und stürzen häufig mit Fehlern wegen unzureichendem Arbeitsspeicher (OOM) ab.

Hier ist Vaexeine leistungsstarke Python-Bibliothek für die Out-of-Core-Datenverarbeitung, kommt ins Spiel. Mit Vaex können Sie große tabellarische Datensätze effizient und speicherschonend überprüfen, ändern, visualisieren und analysieren, sogar auf einem Customary-Laptop computer.

# Was ist Vaex?

Vaex ist eine Python-Bibliothek für faule, out-of-core DataFrames (ähnlich wie Pandas) für Daten, die größer als Ihr RAM sind.

Hauptmerkmale:

Vaex ist darauf ausgelegt, große Datenmengen effizient zu verarbeiten, indem es direkt mit Daten auf der Festplatte arbeitet und nur die benötigten Teile liest, wodurch das Laden ganzer Dateien in den Speicher vermieden wird.

Vaex verwendet Lazy Analysis, was bedeutet, dass Operationen nur dann berechnet werden, wenn tatsächlich Ergebnisse angefordert werden, und es kann spaltenbasierte Datenbanken – die Daten spaltenweise statt zeilenweise speichern – wie HDF5, Apache Arrow und Parquet sofort über Speicherzuordnung öffnen.

Vaex basiert auf optimierten C/C++-Backends und kann Statistiken berechnen und Operationen für Milliarden von Zeilen professional Sekunde durchführen, wodurch umfangreiche Analysen selbst auf bescheidener {Hardware} schnell möglich sind.

Es verfügt über eine Pandas-ähnliche Anwendungsprogrammierschnittstelle (API), die den Übergang für Benutzer, die bereits mit Pandas vertraut sind, reibungsloser macht und ihnen hilft, Large-Knowledge-Funktionen ohne steile Lernkurve zu nutzen.

# Vergleich von Vaex und Dask

Vaex ist nicht ähnlich Dask als Ganzes, ähnelt aber Dask DataFramesdie auf Pandas aufgebaut sind DataFrames. Dies bedeutet, dass Dask bestimmte Pandas-Probleme erbt, beispielsweise die Anforderung, dass Daten in einigen Kontexten vollständig in den RAM geladen werden müssen, um verarbeitet zu werden. Dies ist bei Vaex nicht der Fall. Vaex macht kein DataFrame kopieren, damit größere Datenmengen verarbeitet werden können DataFrames auf Maschinen mit weniger Hauptspeicher. Sowohl Vaex als auch Dask verwenden Lazy Processing. Der Hauptunterschied besteht darin, dass Vaex das Feld nur bei Bedarf berechnet, während wir bei Dask das explizit aufrufen müssen compute() Funktion. Um Vaex optimum nutzen zu können, müssen die Daten im HDF5- oder Apache Arrow-Format vorliegen.

# Warum traditionelle Werkzeuge Probleme haben

Instruments wie Pandas laden den gesamten Datensatz vor der Verarbeitung in den RAM. Bei Datensätzen, die größer als der Speicher sind, führt dies zu Folgendem:

  • Langsame Leistung
  • Systemabstürze (OOM-Fehler)
  • Eingeschränkte Interaktivität

Vaex lädt niemals den gesamten Datensatz in den Speicher; stattdessen:

  • Streamt Daten von der Festplatte
  • Verwendet virtuelle Spalten und verzögerte Auswertung, um die Berechnung zu verzögern
  • Ergebnisse werden nur dann materialisiert, wenn sie ausdrücklich benötigt werden

Dies ermöglicht die Analyse großer Datensätze auch auf bescheidener {Hardware}.

# Wie Vaex unter der Haube funktioniert

// Out-of-Core-Ausführung

Vaex liest Daten nach Bedarf mithilfe der Speicherzuordnung von der Festplatte. Dadurch kann es mit Datendateien arbeiten, die viel größer sind, als der Arbeitsspeicher aufnehmen kann.

// Faule Bewertung

Anstatt jede Operation sofort auszuführen, erstellt Vaex ein Berechnungsdiagramm. Berechnungen werden nur ausgeführt, wenn Sie ein Ergebnis anfordern (z. B. beim Drucken oder Plotten).

// Virtuelle Spalten

Virtuelle Spalten sind im Datensatz definierte Ausdrücke, die bis zur Berechnung keinen Speicher belegen. Das spart RAM und beschleunigt Arbeitsabläufe.

# Erste Schritte mit Vaex

// Vaex installieren

Erstellen Sie eine saubere virtuelle Umgebung:

conda create -n vaex_demo python=3.9
conda activate vaex_demo

Installieren Sie Vaex mit pip:

pip set up vaex-core vaex-hdf5 vaex-viz

Vaex upgraden:

pip set up --upgrade vaex

Unterstützende Bibliotheken installieren:

pip set up pandas numpy matplotlib

// Große Datensätze öffnen

Vaex unterstützt verschiedene gängige Speicherformate für die Verarbeitung großer Datenmengen. Es kann direkt mit HDF5-, Apache Arrow- und Parquet-Dateien arbeiten, die alle für effizienten Festplattenzugriff und schnelle Analysen optimiert sind. Während Vaex auch CSV-Dateien lesen kann, müssen diese zunächst in ein effizienteres Format konvertiert werden, um die Leistung bei der Arbeit mit großen Datensätzen zu verbessern.

So öffnen Sie eine Parquet-Datei:

import vaex

df = vaex.open("your_huge_dataset.parquet")
print(df)

Jetzt können Sie die Datensatzstruktur überprüfen, ohne sie in den Speicher zu laden.

// Kernoperationen in Vaex

Daten filtern:

filtered = df(df.gross sales > 1000)

Dadurch wird das Ergebnis nicht sofort berechnet; Stattdessen wird der Filter nur bei Bedarf registriert und angewendet.

Gruppieren nach und Aggregationen:

consequence = df.groupby("class", agg=vaex.agg.imply("gross sales"))
print(consequence)

Vaex berechnet Aggregationen effizient mithilfe paralleler Algorithmen und minimalem Speicher.

Statistik berechnen:

mean_price = df("value").imply()
print(mean_price)

Vaex berechnet dies im laufenden Betrieb, indem es den Datensatz in Blöcken scannt.

// Demonstration mit einem Taxi-Datensatz

Wir werden einen realistischen 50-Millionen-Reihentaxi-Datensatz erstellen, um die Fähigkeiten von Vaex zu demonstrieren:

import vaex
import numpy as np
import pandas as pd
import time

Legen Sie zur Reproduzierbarkeit einen zufälligen Startwert fest:

np.random.seed(42)
print("Creating 50 million row dataset...")
n = 50_000_000

Generieren Sie realistische Taxifahrtdaten:

information = {
    'passenger_count': np.random.randint(1, 7, n),
    'trip_distance': np.random.exponential(3, n),
    'fare_amount': np.random.gamma(10, 1.5, n),
    'tip_amount': np.random.gamma(2, 1, n),
    'total_amount': np.random.gamma(12, 1.8, n),
    'payment_type': np.random.selection(('credit score', 'money', 'cell'), n),
    'pickup_hour': np.random.randint(0, 24, n),
    'pickup_day': np.random.randint(1, 8, n),
}

Erstellen Sie Vaex DataFrame:

df_vaex = vaex.from_dict(information)

Export in das HDF5-Format (effizient für Vaex):

df_vaex.export_hdf5('taxi_50M.hdf5')
print(f"Created dataset with {n:,} rows")

Ausgabe:

Form: (50000000, 8)
Created dataset with 50,000,000 rows

Wir haben jetzt einen Datensatz mit 50 Millionen Zeilen und 8 Spalten.

// Leistung von Vaex vs. Pandas

Öffnen großer Dateien mit Vaex-Speicherzuordnung:

begin = time.time()
df_vaex = vaex.open('taxi_50M.hdf5')
vaex_time = time.time() - begin

print(f"Vaex opened {df_vaex.form(0):,} rows in {vaex_time:.4f} seconds")
print(f"Reminiscence utilization: ~0 MB (memory-mapped)")

Ausgabe:

Vaex opened 50,000,000 rows in 0.0199 seconds
Reminiscence utilization: ~0 MB (memory-mapped)

Pandas: In den Speicher laden (versuchen Sie dies nicht mit 50 Millionen Zeilen!):

# This is able to fail on most machines
df_pandas = pd.read_hdf('taxi_50M.hdf5')

Dies führt zu einem Speicherfehler! Vaex öffnet Dateien unabhängig von ihrer Größe quick sofort, da keine Daten in den Speicher geladen werden.

Grundlegende Aggregationen: Berechnen Sie Statistiken für 50 Millionen Zeilen:

begin = time.time()
stats = {
    'mean_fare': df_vaex.fare_amount.imply(),
    'mean_distance': df_vaex.trip_distance.imply(),
    'total_revenue': df_vaex.total_amount.sum(),
    'max_fare': df_vaex.fare_amount.max(),
    'min_fare': df_vaex.fare_amount.min(),
}
agg_time = time.time() - begin

print(f"nComputed 5 aggregations in {agg_time:.4f} seconds:")
print(f"  Imply fare: ${stats('mean_fare'):.2f}")
print(f"  Imply distance: {stats('mean_distance'):.2f} miles")
print(f"  Whole income: ${stats('total_revenue'):,.2f}")
print(f"  Fare vary: ${stats('min_fare'):.2f} - ${stats('max_fare'):.2f}")

Ausgabe:

Computed 5 aggregations in 0.8771 seconds:
  Imply fare: $15.00
  Imply distance: 3.00 miles
  Whole income: $1,080,035,827.27
  Fare vary: $1.25 - $55.30

Filtervorgänge: Lange Fahrten filtern:

begin = time.time()
long_trips = df_vaex(df_vaex.trip_distance > 10)
filter_time = time.time() - begin

print(f"nFiltered for journeys > 10 miles in {filter_time:.4f} seconds")
print(f"  Discovered: {len(long_trips):,} lengthy journeys")
print(f"  Proportion: {(len(long_trips)/len(df_vaex)*100):.2f}%")

Ausgabe:

Filtered for journeys > 10 miles in 0.0486 seconds
Discovered: 1,784,122 lengthy journeys
Proportion: 3.57%

Mehrere Bedingungen:

begin = time.time()
premium_trips = df_vaex((df_vaex.trip_distance > 5) & 
                        (df_vaex.fare_amount > 20) & 
                        (df_vaex.payment_type == 'credit score'))
multi_filter_time = time.time() - begin

print(f"nMultiple situation filter in {multi_filter_time:.4f} seconds")
print(f"  Premium journeys (>5mi, >$20, credit score): {len(premium_trips):,}")

Ausgabe:

A number of situation filter in 0.0582 seconds
Premium journeys (>5mi, >$20, credit score): 457,191

Gruppieren nach Operationen:

begin = time.time()
by_payment = df_vaex.groupby('payment_type', agg={
    'mean_fare': vaex.agg.imply('fare_amount'),
    'mean_tip': vaex.agg.imply('tip_amount'),
    'total_trips': vaex.agg.rely(),
    'total_revenue': vaex.agg.sum('total_amount')
})
groupby_time = time.time() - begin

print(f"nGroupBy operation in {groupby_time:.4f} seconds")
print(by_payment.to_pandas_df())

Ausgabe:

GroupBy operation in 5.6362 seconds
  payment_type  mean_fare  mean_tip  total_trips  total_revenue
0       credit score  15.001817  2.000065     16663623   3.599456e+08
1       cell  15.001200  1.999679     16667691   3.600165e+08
2         money  14.999397  2.000115     16668686   3.600737e+08

Komplexere Gruppierung nach:

begin = time.time()
by_hour = df_vaex.groupby('pickup_hour', agg={
    'avg_distance': vaex.agg.imply('trip_distance'),
    'avg_fare': vaex.agg.imply('fare_amount'),
    'trip_count': vaex.agg.rely()
})
complex_groupby_time = time.time() - begin

print(f"nGroupBy by hour in {complex_groupby_time:.4f} seconds")
print(by_hour.to_pandas_df().head(10))

Ausgabe:

GroupBy by hour in 1.6910 seconds
   pickup_hour  avg_distance   avg_fare  trip_count
0            0      2.998120  14.997462     2083481
1            1      3.000969  14.998814     2084650
2            2      3.003834  15.001777     2081962
3            3      3.001263  14.998196     2081715
4            4      2.998343  14.999593     2083882
5            5      2.997586  15.003988     2083421
6            6      2.999887  15.011615     2083213
7            7      3.000240  14.996892     2085156
8            8      3.002640  15.000326     2082704
9            9      2.999857  14.997857     2082284

// Erweiterte Vaex-Funktionen

Virtuelle Spalten (berechnete Spalten) ermöglichen das Hinzufügen von Spalten ohne Datenkopieren:

df_vaex('tip_percentage') = (df_vaex.tip_amount / df_vaex.fare_amount) * 100
df_vaex('is_generous_tipper') = df_vaex.tip_percentage > 20
df_vaex('rush_hour') = (df_vaex.pickup_hour >= 7) & (df_vaex.pickup_hour <= 9) | 
                        (df_vaex.pickup_hour >= 17) & (df_vaex.pickup_hour <= 19)

Diese werden im laufenden Betrieb ohne Speicheraufwand berechnet:

print("Added 3 digital columns with zero reminiscence overhead")
generous_tippers = df_vaex(df_vaex.is_generous_tipper)
print(f"Beneficiant tippers (>20% tip): {len(generous_tippers):,}")

rush_hour_trips = df_vaex(df_vaex.rush_hour)
print(f"Rush hour journeys: {len(rush_hour_trips):,}")

Ausgabe:

VIRTUAL COLUMNS
Added 3 digital columns with zero reminiscence overhead
Beneficiant tippers (>20% tip): 11,997,433
Rush hour journeys: 12,498,848

Korrelationsanalyse:

corr = df_vaex.correlation(df_vaex.trip_distance, df_vaex.fare_amount)
print(f"Correlation (distance vs fare): {corr:.4f}")

Perzentile:

attempt:
    percentiles = df_vaex.percentile_approx('fare_amount', (25, 50, 75, 90, 95, 99))
besides AttributeError:
    percentiles = (
        df_vaex.fare_amount.quantile(0.25),
        df_vaex.fare_amount.quantile(0.50),
        df_vaex.fare_amount.quantile(0.75),
        df_vaex.fare_amount.quantile(0.90),
        df_vaex.fare_amount.quantile(0.95),
        df_vaex.fare_amount.quantile(0.99),
    )

print(f"nFare percentiles:")
print(f"twenty fifth: ${percentiles(0):.2f}")
print(f"fiftieth (median): ${percentiles(1):.2f}")
print(f"seventy fifth: ${percentiles(2):.2f}")
print(f"ninetieth: ${percentiles(3):.2f}")
print(f"ninety fifth: ${percentiles(4):.2f}")
print(f"99th: ${percentiles(5):.2f}")

Standardabweichung:

std_fare = df_vaex.fare_amount.std()
print(f"nStandard deviation of fares: ${std_fare:.2f}")

Zusätzliche nützliche Statistiken:

print(f"nAdditional statistics:")
print(f"Imply: ${df_vaex.fare_amount.imply():.2f}")
print(f"Min: ${df_vaex.fare_amount.min():.2f}")
print(f"Max: ${df_vaex.fare_amount.max():.2f}")

Ausgabe:

Correlation (distance vs fare): -0.0001

Fare percentiles:
  twenty fifth: $11.57
  fiftieth (median): $nan
  seventy fifth: $nan
  ninetieth: $nan
  ninety fifth: $nan
  99th: $nan

Customary deviation of fares: $4.74

Further statistics:
  Imply: $15.00
  Min: $1.25
  Max: $55.30

// Datenexport

# Export filtered information
high_value_trips = df_vaex(df_vaex.total_amount > 50)

Exportieren in verschiedene Formate:

begin = time.time()
high_value_trips.export_hdf5('high_value_trips.hdf5')
export_time = time.time() - begin
print(f"Exported {len(high_value_trips):,} rows to HDF5 in {export_time:.4f}s")

Sie können auch in CSV, Parquet usw. exportieren:

high_value_trips.export_csv('high_value_trips.csv')
high_value_trips.export_parquet('high_value_trips.parquet')

Ausgabe:

Exported 13,054 rows to HDF5 in 5.4508s

Dashboard „Leistungszusammenfassung“.

print("VAEX PERFORMANCE SUMMARY")
print(f"Dataset measurement:           {n:,} rows")
print(f"File measurement on disk:      ~2.4 GB")
print(f"RAM utilization:              ~0 MB (memory-mapped)")
print()
print(f"Open time:              {vaex_time:.4f} seconds")
print(f"Single aggregation:     {agg_time:.4f} seconds")
print(f"Easy filter:          {filter_time:.4f} seconds")
print(f"Advanced filter:         {multi_filter_time:.4f} seconds")
print(f"GroupBy operation:      {groupby_time:.4f} seconds")
print()
print(f"Throughput:             ~{n/groupby_time:,.0f} rows/second")

Ausgabe:

VAEX PERFORMANCE SUMMARY
Dataset measurement:           50,000,000 rows
File measurement on disk:      ~2.4 GB
RAM utilization:              ~0 MB (memory-mapped)

Open time:              0.0199 seconds
Single aggregation:     0.8771 seconds
Easy filter:          0.0486 seconds
Advanced filter:         0.0582 seconds
GroupBy operation:      5.6362 seconds

Throughput:             ~8,871,262 rows/second

# Abschließende Gedanken

Vaex ist very best, wenn Sie mit großen Datensätzen arbeiten, die größer als 1 GB sind und nicht in den Arbeitsspeicher passen, Large Knowledge erkunden, Function Engineering mit Millionen von Zeilen durchführen oder Datenvorverarbeitungspipelines erstellen.

Sie sollten Vaex nicht für Datensätze verwenden, die kleiner als 100 MB sind. Für diese ist die Verwendung von Pandas einfacher. Wenn Sie mit komplexen Verknüpfungen über mehrere Tabellen hinweg arbeiten, ist die Verwendung von SQL-Datenbanken (Structured Question Language) möglicherweise besser. Wenn Sie die vollständige Pandas-API benötigen, beachten Sie, dass die Kompatibilität von Vaex eingeschränkt ist. Für Echtzeit-Streaming-Daten sind andere Instruments besser geeignet.

Vaex füllt eine Lücke im Python-Knowledge-Science-Ökosystem: die Möglichkeit, effizient und interaktiv an Milliarden-Zeilen-Datensätzen zu arbeiten, ohne alles in den Speicher zu laden. Seine Out-of-Core-Architektur, das Lazy-Execution-Modell und die optimierten Algorithmen machen es zu einem leistungsstarken Instrument für die Erkundung großer Datenmengen, selbst auf einem Laptop computer. Unabhängig davon, ob Sie umfangreiche Protokolle, wissenschaftliche Umfragen oder hochfrequente Zeitreihen untersuchen, hilft Vaex dabei, die Lücke zwischen Benutzerfreundlichkeit und Skalierbarkeit großer Datenmengen zu schließen.

Shittu Olumide ist ein Software program-Ingenieur und technischer Autor, der sich leidenschaftlich dafür einsetzt, modernste Technologien zu nutzen, um fesselnde Erzählungen zu erschaffen, mit einem scharfen Blick fürs Element und einem Gespür für die Vereinfachung komplexer Konzepte. Sie können Shittu auch auf finden Twitter.



Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert