beginnt oft mit Werkzeugen wie Pandas. Sie sind intuitiv, leistungsstark und perfekt für kleine bis mittelgroße Datensätze. Sobald Ihre Daten jedoch über das hinausgehen, was bequem in den Speicher passt, tauchen Leistungsprobleme auf. Hier ist PySpark kommt rein.
Beachten Sie, dass ich in diesem Artikel die Begriffe Spark und PySpark häufig synonym verwende. Für unsere Zwecke spielt es keine Rolle, aber Sie sollten bedenken, dass sie unterschiedlich sind. Spark ist das übergreifende Framework für verteiltes Computing (geschrieben in Scala) und PySpark ist eine dedizierte Python-API für Spark.
Was ist PySpark?
PySpark ist die Python-API für Apache Spark, ein verteiltes Pc-Framework zur effizienten Verarbeitung großer Datenmengen. Anstatt alle Berechnungen auf einer einzigen Maschine auszuführen, verteilt Spark die Arbeit auf mehrere Maschinen (einen Cluster), sodass Sie Daten in großem Maßstab verarbeiten und gleichzeitig Code schreiben können, der Python-Benutzern dennoch vertraut vorkommt.
Einer der Hauptvorteile von PySpark besteht darin, dass es einen Großteil der Komplexität verteilter Systeme abstrahiert. Sie müssen Threads, Speicher oder Netzwerkkommunikation nicht manuell verwalten. Spark kümmert sich für Sie um diese Anliegen, während Sie sich auf die Beschreibung konzentrieren Was Sie möchten eher mit den Daten umgehen als Wie es sollte ausgeführt werden.
Wenn Sie noch kein Spark-Neuling sind, sollten Sie vor der Verwendung von Spark drei wichtige Kernideen kennen lernen. Diese sind:
1. Cluster
Wenn Leute hören, dass Spark auf einem „Cluster“ läuft, kann das einschüchternd klingen. In der Praxis sind für den Einstieg keine tiefgreifenden Kenntnisse über verteilte Systeme erforderlich. Ein Cluster ist einfach eine Gruppe miteinander vernetzter Server, die zusammenarbeiten können. In einer Spark-Anwendung, die auf einem Cluster ausgeführt wird, fungiert eine Maschine als TreiberKoordinierung der Arbeit, während die anderen als fungieren TestamentsvollstreckerDurchführung von Berechnungen für Datenblöcke. Wenn die Executor-Knoten ihre Arbeit beendet haben, senden sie ein Sign an den Treiberknoten zurück, und der Treiber kann dann mit dem endgültigen Ergebnissatz alles tun, was erforderlich ist.
┌───────────────────┐
│ Driver │
│(your PySpark app) │
└─────────┬─────────┘
│
| The Driver farms out work
| to a number of executors
┌────────────────────┼───────────────────────────┐
│ │ │
┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐
│ Executor 1 │ │ Executor 2 │ │ Executor N │
│ processes half│ │ processes half│ ...... │ processes half│
│ of the info │ | of the info │ │ of the info │
└────────────────┘ └────────────────┘ └────────────────┘
Denken Sie daran, dass Sie Spark nicht auf einem physischen Rechencluster ausführen müssen. Wenn Sie PySpark lokal ausführen, wird Spark simuliert ein Cluster auf Ihrem Laptop computer oder PC mit mehreren Kernen. Eine der Stärken von PySpark besteht darin, dass derselbe Code später mit nur sehr geringfügigen Änderungen in einem echten Cluster bereitgestellt werden kann, sei es in der Cloud oder vor Ort.
Diese Trennung von Koordination und Ausführung ermöglicht Spark die Skalierung. Wenn Datensätze wachsen, können mehr Executoren hinzugefügt werden, um Daten parallel zu verarbeiten, wodurch die Laufzeit verkürzt wird, ohne dass Änderungen an Ihrem Code erforderlich sind.
2. Der Spark-Datenrahmen
Das Herzstück von PySpark ist das DataFrame-APIDies ist die Hauptmethode, mit der Sie in Spark mit Daten arbeiten. Ein DataFrame ist einfach eine Datentabelle, die aus Zeilen und Spalten besteht – sehr ähnlich einer Tabelle in einer Datenbank oder einem DataFrame in Pandas. Wenn Sie bereits SQL oder Pandas verwendet haben, werden Ihnen die Grundideen bekannt vorkommen.
Mit Spark DataFrames können Sie allgemeine Datenaufgaben wie das Filtern von Zeilen, das Auswählen von Spalten, das Gruppieren von Daten, das Zusammenführen von Tabellen und das Berechnen von Zusammenfassungen wie Zählungen oder Durchschnittswerten ausführen. Diese Vorgänge sind leicht zu lesen und zu schreiben, sodass Sie sich auf sie konzentrieren können Was Sie möchten eher mit den Daten als mit den technischen Particulars ihrer Funktionsweise arbeiten.
Das Besondere an Spark ist, was hinter den Kulissen passiert. Spark ermittelt automatisch die effizienteste Methode zur Ausführung Ihrer DataFrame-Vorgänge und führt diese dann parallel auf mehreren Computern in einem Cluster aus. Sie müssen dies nicht selbst verwalten – Spark kümmert sich um Dinge wie die Aufteilung der Daten, die Koordinierung der Arbeit und die Wiederherstellung nach Fehlern, wenn etwas schief geht.
Aus diesem Grund können Spark DataFrames damit umgehen sehr große Datensätzeauch solche, die zu groß sind, um in den Speicher eines einzelnen Computer systems zu passen. Gleichzeitig bieten sie eine einfache und vertraute Benutzeroberfläche, was PySpark zu einem leistungsstarken und dennoch zugänglichen Device für die Arbeit mit Huge Knowledge macht.
3. Faule vs. eifrige Bewertung
Eine weitere wissenswerte Stärke von PySpark ist der Ansatz zwischen fauler und eifriger Ausführung.
Die meisten Python-Datenbibliotheken wie Pandas verwenden eifrige Ausführung. Das bedeutet, dass, wenn Sie eine Operation ausführen, diese sofort ausgeführt wird, gefolgt von der nächsten Operation und so weiter.
PySpark geht damit anders um, indem es eine Technik namens Lazy Execution verwendet. Wenn Sie Datentransformationen schreiben, z. B. das Auswählen von Spalten oder das Filtern von Zeilen, führt Spark diese nicht sofort aus. Stattdessen erstellt es einen optimierten Ausführungsplan und führt die Berechnung nur aus, wenn eine Aktion (z. B. das Anzeigen von Ergebnissen oder das Schreiben von Daten auf die Festplatte) ausgelöst wird. Dadurch kann Spark den Workflow vor der Ausführung optimieren und Ihren Code ohne zusätzlichen Aufwand Ihrerseits effizienter machen.
Keen execution (e.g. pandas)
information ──filter──► end result (computed instantly)
In pandas, every operation runs as quickly as it's referred to as. That is
intuitive however may be inefficient for big datasets.
PySpark makes use of lazy execution.
Lazy execution (PySpark)
information ──filter──►
│
└─groupby──► (plan builds right here)
│
└─agg──► (nonetheless no execution)
│
motion ──► executes right here
Um diesen Punkt deutlich zu machen, stellen Sie sich das folgende Szenario vor. Nehmen wir an, wir haben einen Datenrahmen mit 10 Millionen Datensätzen, den wir …
a) Fügen Sie eine neue leere Spalte namens X hinzu
b) Filtern Sie die Daten auf eine Weise, die dazu führt, dass wir 50 % der Datensätze entfernen
c) Führen Sie eine Aggregation der verbleibenden Datensätze durch, sodass die neue Spalte X den MAX-Wert eines anderen Werts in dieser Zeile enthält
d) Drucken Sie die Zeile mit dem höchsten Wert von X aus
Auf einem System wie Pandas, das eine eifrige Ausführung durchführt, wird jeder Schritt genau so ausgeführt, wie wir es oben beschrieben haben. Für 10 Millionen Datensätze würde es so aussehen:
- Spalte hinzufügen: Das System erstellt eine neue Model des Datensatzes mit 10 Millionen Zeilen im Speicher und fügt Spalte X hinzu.
- Filter: Das System filtert alle 10 Millionen Zeilen, was zu 5 Millionen Löschungen führt, und schreibt a neu 5-Millionen-Zeilen-Datensatz im Speicher.
- Aggregation: Es berechnet den MAX-Wert für jede Zeile und aktualisiert die Spalte.
- Drucken: Es findet die oberste Zeile und zeigt sie Ihnen an.
Das Downside ist, dass wir eine Menge „schwere Arbeit“ geleistet haben (eine Spalte zu 10 Millionen Zeilen hinzugefügt), nur um im nächsten Schritt sofort die Hälfte dieser Arbeit wegzuwerfen.
Spark hingegen führt aufgrund seines Lazy-Execution-Modells keine Arbeit aus, wenn Sie die Schritte (a), (b) oder (c) definieren. Stattdessen wird ein erstellt Logischer Plan (auch DAG genannt – Directed Uneven Graph), um die Arbeit zu erledigen.
Wenn Sie schließlich Schritt (d) auslösen – Die Aktion – Der Optimierer von Spark betrachtet den gesamten Plan und stellt fest, dass er viel intelligenter funktionieren kann:
- Prädikat-Pushdown: Spark sieht den Filter (50 % der Datensätze entfernen). Anstatt Spalte X zu 10 Millionen Zeilen hinzuzufügen, ist es Verschiebt die Filterung ganz an den Anfang.
- Optimierung: Es fügt nur Spalte X hinzu und aggregiert die übrig 5 Millionen Zeilen.
- Ergebnis: Es vermeidet die Verarbeitung von 5 Millionen Datensätzen und spart 50 % Speicher- und CPU-Zeit.
Einrichten der Entwicklungsumgebung
Okay, das ist genug der Theorie. Schauen wir uns an, wie Sie PySpark auf Ihrem System installieren und einige Beispielcode-Snippets ausführen können. Als Einführungstext für Anfänger würde die tatsächliche Erstellung eines realen Clusters mit mehreren Knoten den Rahmen dieses Artikels sprengen. Aber wie ich bereits erwähnt habe, kann Spark einen synthetischen Cluster auf Ihrem PC oder Laptop computer erstellen, wenn es sich um einen Multicore-PC oder Laptop computer handelt, was der Fall sein wird, wenn Ihr System weniger als etwa 10 Jahre alt ist.
Als Erstes richten wir für diese Arbeit eine separate Entwicklungsumgebung ein, um sicherzustellen, dass unsere Projekte isoliert sind und sich nicht gegenseitig beeinträchtigen. Für diesen Teil verwende ich WSL2 Ubuntu für Home windows und Conda, Sie können aber auch die Umgebung und Methode verwenden, die Sie gewohnt sind.
Installieren Sie PySpark usw.
# 1. Create a brand new setting with Python 3.11 (very secure for Spark)
conda create -n spark_env python=3.11 -y
# 2. Activate it
conda activate spark_env
# 3. Set up PySpark and PyArrow (wanted for Parquet recordsdata)
pip set up pyspark pyarrow jupyter
Um zu überprüfen, ob PySpark korrekt installiert wurde, geben Sie den Befehl pyspark in ein Terminalfenster ein.
$ pyspark
Python 3.11.14 | packaged by conda-forge | (most important, Oct 22 2025, 22:46:25) (GCC 14.3.0) on linux
Kind "assist", "copyright", "credit" or "license" for extra data.
WARNING: Utilizing incubator modules: jdk.incubator.vector
WARNING: bundle solar.safety.motion not in java.base
Utilizing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/15 16:15:21 WARN Utils: Your hostname, tpr-desktop, resolves to a loopback tackle: 127.0.1.1; utilizing 10.255.255.254 as a substitute (on interface lo)
26/01/15 16:15:21 WARN Utils: Set SPARK_LOCAL_IP if you'll want to bind to a different tackle
Utilizing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log degree to "WARN".
To regulate logging degree use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/15 16:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library in your platform... utilizing builtin-java lessons the place relevant
WARNING: A terminally deprecated technique in solar.misc.Unsafe has been referred to as
WARNING: solar.misc.Unsafe::arrayBaseOffset has been referred to as by org.apache.spark.unsafe.Platform (file:/house/tom/miniconda3/envs/pandas_to_pyspark/lib/python3.11/site-packages/pyspark/jars/spark-unsafe_2.13-4.1.1.jar)
WARNING: Please take into account reporting this to the maintainers of sophistication org.apache.spark.unsafe.Platform
WARNING: solar.misc.Unsafe::arrayBaseOffset can be eliminated in a future launch
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/__ / .__/_,_/_/ /_/_ model 4.1.1
/_/
Utilizing Python model 3.11.14 (most important, Oct 22 2025 22:46:25)
Spark context Internet UI obtainable at http://10.255.255.254:4040
Spark context obtainable as 'sc' (grasp = native(*), app id = local-1768493723158).
SparkSession obtainable as 'spark'.
>>>
Wenn Sie das Spark-Willkommensbanner nicht sehen, ist ein Fehler aufgetreten und Sie sollten Ihre Set up noch einmal überprüfen.
Beispiel 1 – Erstellen eines lokalen Clusters
Das ist eigentlich ziemlich einfach. Geben Sie einfach Folgendes in Ihr Notizbuch ein.
from pyspark.sql import SparkSession
# Initialize the Spark Session
spark = SparkSession.builder
.grasp("native(*)")
.appName("MyLocalCluster")
.config("spark.driver.reminiscence", "2g")
.getOrCreate()
# Confirm the cluster is operating
print(f"Spark is operating model: {spark.model}")
print(f"Grasp URL: {spark.sparkContext.grasp}")
#
# The output
#
Spark is operating model: 4.1.1
Grasp URL: native(*)
Der SparkSession Konzept ist wichtig. In den frühen Tagen von Spark mussten Benutzer mehrere „Einstiegspunkte“ jonglieren (wie SparkContext für Kernfunktionen, SQLContext für Datenrahmen und HiveContext für Datenbanken). Für Anfänger battle es verwirrend.
Die SparkSession wurde in Spark 2.0 als „One-Cease-Store“ für alles eingeführt. Es ist der zentrale Einstiegspunkt für die Interaktion mit der Spark-Funktionalität.
Beispiel 2 – Erstellen eines Datenrahmens
Das Erstellen von Datenrahmen und das Bearbeiten der darin enthaltenen Daten in PySpark ist das, was Sie die meiste Zeit tun. Und es ist ziemlich einfach zu machen. Hier definieren wir, dass unser Datenrahmen drei Datensätze und drei benannte Spalten enthält.
# 1. Outline your information as a listing of tuples
information = (
("Alice", 34, "New York"),
("Bob", 45, "London"),
("Catherine", 29, "Paris")
)
# 2. Outline your column names
columns = ("Title", "Age", "Metropolis")
# 3. Create the DataFrame
df = spark.createDataFrame(information, columns)
# 4. Present the end result
df.present()
#
# The output
#
+---------+---+--------+
| Title|Age| Metropolis|
+---------+---+--------+
| Alice| 34|New York|
| Bob| 45| London|
|Catherine| 29| Paris|
+---------+---+--------+
Wahrscheinlicher ist, dass alle von Ihnen verwendeten Datenrahmen zunächst durch das Einlesen von Daten aus einer Datei oder Datenbank erstellt werden. Erstellen Sie auf Ihrem System eine CSV-Datei mit dem Namen sales_data.csv und folgendem Inhalt.
transaction_id,customer_name,net_amount,tax_amount, is_member
101,Alice,250.50,25.05,true
102,Bob,120.00,6.00, false
103,Charlie,450.75,25.07,true
104,David,89.99,5.73,false
Das Erstellen eines Datenrahmens aus einer Datei wie dieser ist einfach.
# Load the CSV file
df = spark.learn.format("csv")
.possibility("header", "true")
.possibility("inferSchema", "true")
.load("sales_data.csv")
# Present the info
print("Dataframe Contents:")
df.present()
# Present the info sorts (Schema)
print("Knowledge Schema:")
df.printSchema()
#
# The output
#
Dataframe Contents:
+--------------+-------------+----------+----------+----------+
|transaction_id|customer_name|net_amount|tax_amount| is_member|
+--------------+-------------+----------+----------+----------+
| 101| Alice| 250.5| 25.05| true|
| 102| Bob| 120.0| 6.0| false|
| 103| Charlie| 450.75| 25.07| true|
| 104| David| 89.99| 5.73| false|
+--------------+-------------+----------+----------+----------+
Knowledge Schema:
root
|-- transaction_id: integer (nullable = true)
|-- customer_name: string (nullable = true)
|-- net_amount: double (nullable = true)
|-- tax_amount: double (nullable = true)
|-- is_member: string (nullable = true)
Beispiel 3 – Datenverarbeitung
Sobald Sie Ihre Eingabedaten in einem Datenrahmen haben, möchten Sie sie natürlich als Nächstes verarbeiten oder auf irgendeine Weise manipulieren. Das ist auch einfach. Nehmen wir an, wir möchten unter Bezugnahme auf die soeben geladenen Umsatzdaten den Bruttobetrag (Netto + Steuern) und den Steuersatz als Prozentsatz des Bruttobetrags für jeden Datensatz berechnen und diese zu unserem anfänglichen Datenrahmen hinzufügen.
from pyspark.sql import capabilities as F
# 1. Add 'gross_amount' by including internet and tax
# 2. Add 'tax_percentage' by dividing tax by the brand new gross quantity
df_extended = df.withColumn("gross_amount", F.col("net_amount") + F.col("tax_amount"))
.withColumn("tax_percentage",
(F.col("tax_amount") / (F.col("net_amount") + F.col("tax_amount"))) * 100)
# 3. Optionally available: Spherical the proportion to 2 decimal locations for readability
df_extended = df_extended.withColumn("tax_percentage", F.spherical(F.col("tax_percentage"), 2))
# Present the brand new columns together with the previous ones
df_extended.present()
#
# The output
#
+--------------+-------------+----------+----------+----------+------------+--------------+
|transaction_id|customer_name|net_amount|tax_amount| is_member|gross_amount|tax_percentage|
+--------------+-------------+----------+----------+----------+------------+--------------+
| 101| Alice| 250.5| 25.05| true| 275.55| 9.09|
| 102| Bob| 120.0| 6.0| false| 126.0| 4.76|
| 103| Charlie| 450.75| 25.07| true| 475.82| 5.27|
| 104| David| 89.99| 5.73| false| 95.72| 5.99|
+--------------+-------------+----------+----------+----------+------------+--------------+
Zusammenfassung
Damit ist unser kurzer Ausflug in die Welt des verteilten Rechnens mit PySpark abgeschlossen. Ich habe erklärt, was PySpark ist und warum Sie die Verwendung in Betracht ziehen sollten, wenn die von Ihnen verarbeiteten Daten Ihre Speichergrenzen überschreiten. Kurz gesagt: Die Fähigkeit von PySpark, auf große Multi-Node-Cluster zu skalieren, sein Lazy-Execution-Modell und die Datenrahmen-Datenstruktur machen es zu einem idealen Kraftpaket für die Datenverarbeitung.
PySpark wird häufig in Pipelines für Datentechnik, Analyse und maschinelles Lernen eingesetzt. Es lässt sich intestine in Cloud-Plattformen integrieren, unterstützt eine Vielzahl von Datenquellen (wie CSV, Parquet und Datenbanken) und lässt sich von einem Laptop computer bis hin zu großen Produktionsclustern skalieren.
Wenn Sie mit Python vertraut sind und mit großen Datenmengen arbeiten möchten, ohne auf die vertraute Syntax zu verzichten, ist PySpark ein hervorragender nächster Schritt. Es schließt die Lücke zwischen einfacher Datenanalyse und umfangreicher Datenverarbeitung und ist damit ein wertvolles Werkzeug für jeden, der in die Welt der Huge Knowledge einsteigt.
Hoffentlich können Sie meine einfachen Codierungsbeispiele und Erklärungen verwenden, um den nächsten Schritt zur Verwendung von PySpark in der realen Welt, auf einem echten Cluster, zu machen und eine ordnungsgemäße Huge-Knowledge-Verarbeitung durchzuführen.
