Einführung
eine kontinuierliche Variable für vier verschiedene Produkte. Die Pipeline für maschinelles Lernen wurde in Databricks erstellt und besteht aus zwei Hauptkomponenten.
- Characteristic-Vorbereitung in SQL mit serverloses Computing.
- Rückschluss auf ein Ensemble aus mehreren hundert Modellen mithilfe von Job-Clustern, um die Rechenleistung zu kontrollieren.
Bei unserem ersten Versuch benötigte ein 420-Kern-Cluster quick 10 Stunden für die Verarbeitung von nur 18 Partitionen.
Das Ziel ist es Optimieren Sie den Datenfluss, um die Clusternutzung zu maximieren und Skalierbarkeit sicherzustellen. Die Inferenz erfolgt anhand von vier Sätzen von ML-Modellen, einem Satz professional Produkt. Wir werden uns jedoch darauf konzentrieren wie die Daten gespeichert werden wie es aussehen wird Wie viel Parallelität können wir nutzen? zur Schlussfolgerung. Wir werden uns nicht auf das Innenleben der Schlussfolgerung selbst konzentrieren.
Wenn zu wenige Dateipartitionen vorhanden sind, benötigt der Cluster viel Zeit, um große Dateien zu scannen, und zu diesem Zeitpunkt können Sie, sofern keine Neupartitionierung vorgenommen wird (das bedeutet zusätzliche Netzwerklatenz und Datenverschiebung), möglicherweise auch auf eine große Anzahl von Zeilen in jeder Partition schließen. Dies führt auch zu langen Laufzeiten.

Allerdings haben Unternehmen nur begrenzt die Geduld, ML-Pipelines mit direkter Auswirkung auf die Organisation auszuliefern. Daher sind die Checks begrenzt.
In diesem Artikel werden wir unsere Characteristic-Datenlandschaft überprüfen, dann einen Überblick über die ML-Inferenz geben und die Ergebnisse und Diskussionen der Inferenzleistung basierend auf vier Datensatzbehandlungsszenarien präsentieren:
- Partitionierte Tabelle, kein Salt, keine Zeilenbeschränkung in Partitionen (ungesalzen und geteilt)
- Partitionierte Tabelle, gesalzen, mit 1 Mio. Zeilenbeschränkung (salzig und geteilt)
- Liquid-Cluster-Tabelle, kein Salt, keine Zeilenbeschränkung in Partitionen (ungesalzen und flüssig)
- Liquid-Cluster-Tabelle, gesalzen, mit 1 Mio. Zeilenbeschränkung (salzig und flüssig)
Datenlandschaft
Der Datensatz enthält Funktionen, die der Satz von ML-Modellen zur Inferenz verwendet. Es verfügt über ca. 550 Millionen Zeilen und enthält vier im Attribut identifizierte Produkte ProductLine:
- Produkt A: ~10,45 Mio. (1,9 %)
- Produkt B: ~4,4 Mio. (0,8 %)
- Produkt C: ~100 Mio. (17,6 %)
- Produkt D: ~354 Mio. (79,7 %)
Es verfügt dann über ein weiteres Attribut mit niedriger Kardinalität attrB, das nur zwei unterschiedliche Werte enthält und als Filter verwendet wird, um Teilmengen des Datensatzes für jeden Teil des ML-Programs zu extrahieren.
Darüber hinaus, RunDate Protokolliert das Datum, an dem die Options generiert wurden. Sie können nur angehängt werden. Abschließend wird der Datensatz mit der folgenden Abfrage gelesen:
SELECT
Id,
ProductLine,
AttrB,
AttrC,
RunDate,
{model_features}
FROM
catalog.schema.FeatureStore
WHERE
ProductLine = :product AND
AttrB = :attributeB AND
RunDate = :RunDate
Salt-Implementierung
Die Salzung wird hier dynamisch generiert. Sein Zweck besteht darin, die Daten entsprechend den Volumina zu verteilen. Das bedeutet, dass große Produkte mehr Eimer und kleinere Produkte weniger Eimer erhalten. Produkt D sollte beispielsweise angesichts der Anteile in der Datenlandschaft etwa 80 % der Buckets erhalten.
Wir tun dies, damit wir vorhersehbare Inferenzlaufzeiten haben und die Clusterauslastung maximieren können.
# Calculate proportion of every (ProductLine, AttrB) primarily based on row counts
brand_cat_counts = df_demand_price_grid_load.groupBy(
"ProductLine", "AttrB"
).depend()
total_count = df_demand_price_grid_load.depend()
brand_cat_percents = brand_cat_counts.withColumn(
"%", F.col("depend") / F.lit(total_count)
)
# Accumulate percentages as dicts with string keys (it will later decide
# the variety of salt buckets every product receives
brand_cat_percent_dict = {
f"{row('ProductLine')}|{row('AttrB')}": row('%')
for row in brand_cat_percents.acquire()
}
# Accumulate counts as dicts with string keys (it will assist
# so as to add an extra bucket if counts isn't divisible by the variety of
# buckets for the product
brand_cat_count_dict = {
f"{row('ProductLine')}|{row('AttrB')}": row('depend')
for row in brand_cat_percents.acquire()
}
# Helper to flatten key-value pairs for create_map
def dict_to_map_expr(d):
expr = ()
for okay, v in d.gadgets():
expr.append(F.lit(okay))
expr.append(F.lit(v))
return expr
percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict))
count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict))
# Add string key column in pyspark
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"product_cat_key",
F.concat_ws("|", F.col("ProductLine"), F.col("AttrB"))
)
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"%", percent_case.getItem(F.col("product_cat_key"))
).withColumn(
"product_count", count_case.getItem(F.col("product_cat_key"))
)
# Set min/max buckets
min_buckets = 10
max_buckets = 1160
# Calculate buckets per row primarily based on (BrandName, price_delta_cat) proportion
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets_base",
(F.lit(min_buckets) + (F.col("%") * (max_buckets - min_buckets))).forged("int")
)
# Add an additional bucket if brand_count isn't divisible by buckets_base
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets",
F.when(
(F.col("product_count") % F.col("buckets_base")) != 0,
F.col("buckets_base") + 1
).in any other case(F.col("buckets_base"))
)
# Generate salt per row primarily based on (ProductLine, AttrB) bucket depend
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"salt",
(F.rand(seed=42) * F.col("buckets")).forged("int")
)
# Carry out the repartition utilizing the core attributes and the salt column
df_demand_price_grid_load = df_demand_price_grid_load.repartition(
1200, "AttrB", "ProductLine", "salt"
).drop("product_cat_key", "%", "brand_count", "buckets_base", "buckets", "salt")
Schließlich speichern wir unseren Datensatz in der Characteristic-Tabelle und fügen eine maximale Anzahl von Zeilen professional Partition hinzu. Dadurch soll verhindert werden, dass Spark Partitionen mit zu vielen Zeilen generiert, was auch dann möglich ist, wenn wir den Salt bereits berechnet haben.
Warum erzwingen wir 1 Mio. Zeilen? Das Hauptaugenmerk liegt auf der Modellinferenzzeit und nicht so sehr auf der Dateigröße. Nach einigen Checks mit 1M, 1,5M, 2M liefert der erste in unserem Fall die beste Leistung. Auch bei diesem Projekt ist das Funds und die Zeit sehr begrenzt, sodass wir das Beste aus unseren Ressourcen herausholen müssen.
df_demand_price_grid_load.write
.mode("overwrite")
.choice("replaceWhere", f"RunDate = '{params('RunDate')}'")
.choice("maxRecordsPerFile", 1_000_000)
.partitionBy("RunDate", "price_delta_cat", "BrandName")
.saveAsTable(f"{params('catalog_revauto')}.{params('schema_revenueautomation')}.demand_features_price_grid")
Warum verlassen Sie sich nicht einfach auf die Adaptive Question Execution (AQE) von Spark?
Denken Sie daran, dass der Hauptfokus auf Inferenzzeiten liegt und nicht auf Messungen, die auf reguläre Spark-SQL-Abfragen abgestimmt sind, wie z. B. die Dateigröße. Eigentlich struggle es unser erster Versuch, nur AQE zu verwenden. Wie Sie in den Ergebnissen sehen werden, waren die Laufzeiten sehr unerwünscht und führten angesichts unserer Datenanteile nicht zu einer Maximierung der Clusterauslastung.
Schlussfolgerung des maschinellen Lernens
Es gibt eine Pipeline mit 4 Aufgaben, eine professional Produkt. Jede Aufgabe führt die folgenden allgemeinen Schritte aus:
- Lädt die Funktionen aus dem entsprechenden Produkt
- Lädt die Teilmenge der ML-Modelle für das entsprechende Produkt
- Führt eine Inferenz in der Hälfte der Teilmenge durch, die durch geteilt wird
AttrB - Führt eine Inferenz in der anderen Hälfte durch
AttrB - Speichert Daten in der Ergebnistabelle
Wir werden uns auf eine der Inferenzphasen konzentrieren, um diesen Artikel nicht mit Zahlen zu überladen, obwohl die andere Part in Struktur und Ergebnissen sehr ähnlich ist. Darüber hinaus können Sie in Abb. 2 den DAG für die auszuwertende Inferenz sehen.

Es scheint sehr einfach zu sein, aber die Laufzeiten können variieren, je nachdem, wie Ihre Daten gespeichert werden und wie groß Ihr Cluster ist.
Clusterkonfiguration
Für die Inferenzphase, die wir analysieren, gibt es einen Cluster professional Produkt, der auf die Infrastrukturbeschränkungen des Projekts und auch auf die Datenverteilung abgestimmt ist:
- Produkt A: 35 Arbeiter (Standard_DS14v2, 420 Kerne)
- Produkt B: 5 Arbeiter (Standard_DS14v2, 70 Kerne)
- Produkt C: 1 Employee (Standard_DS14v2, 14 Kerne)
- Produkt D: 1 Employee (Standard_DS14v2, 14 Kerne)
Darüber hinaus ist AdaptiveQueryExecution standardmäßig aktiviert, sodass Spark entscheiden kann, wie die Daten angesichts des von Ihnen bereitgestellten Kontexts am besten gespeichert werden.
Ergebnisse und Diskussion
Sie sehen für jedes Szenario eine Darstellung der Anzahl der Dateipartitionen professional Produkt und der durchschnittlichen Anzahl der Zeilen professional Partition, um Ihnen einen Hinweis darauf zu geben, wie viele Zeilen das ML-System professional Spark-Aufgabe inferenzieren wird. Darüber hinaus stellen wir Spark-UI-Metriken vor, um die Laufzeitleistung zu beobachten und nach der Verteilung von Daten zum Zeitpunkt der Inferenz zu suchen. Wir werden den Spark-UI-Teil nur für Produkt D ausführen, das das größte ist, um nicht zu viele Informationen einzuschließen. Darüber hinaus wird die Inferenz auf Produkt D je nach Szenario zu einem Engpass in der Laufzeit. Ein weiterer Grund, warum dies der Hauptschwerpunkt der Ergebnisse struggle.
Ungesalzen und Partitioniert
In Abb. 3 können Sie sehen, dass die durchschnittliche Dateipartition mehrere zehn Millionen Zeilen umfasst, was eine beträchtliche Laufzeit für einen einzelnen Executor bedeutet. Das größte im Durchschnitt ist Produkt C mit mehr als 45 Millionen Zeilen in einer einzelnen Partition. Das kleinste ist Produkt B mit etwa 12 Millionen durchschnittlichen Zeilen.

Abb. 4 zeigt die Anzahl der Partitionen professional Produkt, insgesamt 26 für alle. Bei der Überprüfung von Produkt D fallen 18 Partitionen deutlich unter die 420 Kerne, die uns zur Verfügung stehen, und im Durchschnitt führt jede Partition eine Inferenz auf etwa 40 Millionen Zeilen durch.

Schauen Sie sich Abbildung 5 an. Insgesamt hat der Cluster 9,9 Stunden benötigt und struggle immer noch nicht fertig, da wir den Job abbrechen mussten, weil er teuer wurde und die Checks anderer Leute blockierte.

Aus der zusammenfassenden Statistik in Abb. 6 für die abgeschlossenen Aufgaben können wir erkennen, dass es in den Partitionen für Produkt D eine starke Abweichung gab. Die maximale Eingabegröße betrug ~56 MB und die Laufzeit betrug 7,8 Stunden.

Ungesalzen und flüssig
In diesem Szenario können wir sehr ähnliche Ergebnisse hinsichtlich der durchschnittlichen Anzahl von Zeilen professional Dateipartition und der Anzahl von Partitionen professional Produkt beobachten, wie in Abb. 7 bzw. Abb. 8 zu sehen ist.

Produkt D verfügt über 19 Dateipartitionen, additionally immer noch sehr wenig als 420 Kerne.

Wir können bereits jetzt davon ausgehen, dass dieses Experiment sehr teuer werden würde, daher habe ich beschlossen, den Inferenztest für dieses Szenario zu überspringen. Auch hier gilt: Im Idealfall führen wir es weiter, aber in meinem Board gibt es einen Rückstand an Tickets.
Salzig und Partitioniert
Nach Anwendung des Salting- und Neupartitionierungsprozesses erhalten wir durchschnittlich etwa 2,5 Millionen Datensätze professional Partition für die Produkte A und B und etwa 1 Million für die Produkte C und D, wie in Abbildung 9 dargestellt.

Darüber hinaus können wir in Abb. 10 sehen, dass die Anzahl der Dateipartitionen für Produkt D auf etwa 860 gestiegen ist, was 430 für jede Inferenzstufe ergibt.

Daraus ergibt sich eine Laufzeit von 3 Stunden für die Inferenz von Produkt D mit 360 Aufgaben, wie in Abb. 11 dargestellt.

Betrachtet man die zusammenfassenden Statistiken aus Abb. 12, sieht die Verteilung ausgeglichen aus, mit Laufzeiten um 1,7, aber einer maximalen Aufgabe, die 3 Stunden dauert, was eine weitere Untersuchung in der Zukunft wert ist.

Ein großer Vorteil besteht darin, dass das Salz die Daten entsprechend den Proportionen der Produkte verteilt. Wenn wir mehr Ressourcen zur Verfügung hätten, könnten wir die Anzahl der Shuffle-Partitionen erhöhen repartition() und fügen Sie Arbeiter entsprechend den Anteilen der Daten hinzu. Dies stellt sicher, dass unser Prozess vorhersehbar skaliert.
Salzig und flüssig
Dieses Szenario kombiniert die beiden stärksten Hebel, die wir bisher untersucht haben:
Salting zur Kontrolle der Dateigröße und Parallelität sowie Liquid Clustering, um zusammengehörige Daten ohne starre Partitionsgrenzen am gleichen Ort zu halten.
Nach Anwendung der gleichen Salting-Strategie und eines 1-M-Zeilenlimits professional Partition zeigt die Flüssigkeits-Cluster-Tabelle eine sehr ähnliche durchschnittliche Partitionsgröße wie der gesalzene und partitionierte Fall, wie in Abb. 13 dargestellt. Die Produkte C und D bleiben nahe am 1-M-Zeilen-Ziel, während sich die Produkte A und B leicht über diesem Schwellenwert einpendeln.

Der Hauptunterschied besteht jedoch darin, wie diese Partitionen von Spark verteilt und genutzt werden. Wie in Abb. 14 dargestellt, erreicht Produkt D erneut eine hohe Anzahl von Dateipartitionen und bietet so genügend Parallelität, um die verfügbaren Kerne während der Inferenz zu sättigen.

Im Gegensatz zum partitionierten Gegenstück ermöglicht das Liquid Clustering Spark, das Dateilayout im Laufe der Zeit anzupassen und gleichzeitig vom Salt zu profitieren. Dies führt zu einer gleichmäßigeren Arbeitsverteilung auf die Ausführenden mit weniger extremen Ausreißern sowohl bei der Eingabegröße als auch bei der Aufgabendauer.
Aus der zusammenfassenden Statistik in Abb. 15 können wir erkennen, dass die meisten Aufgaben innerhalb eines engen Laufzeitfensters abgeschlossen werden und die maximale Aufgabendauer kürzer ist als im Salty- und Partitionierungsszenario. Dies deutet auf einen geringeren Zeitversatz und eine bessere Lastverteilung im gesamten Cluster hin.


Ein wichtiger Nebeneffekt besteht darin, dass Liquid Clustering die Datenlokalität für die gefilterten Spalten beibehält, ohne strenge Partitionsgrenzen zu erzwingen. Dadurch kann Spark weiterhin vom Überspringen von Daten profitieren, während der Salt dafür sorgt, dass kein einzelner Executor mit zig Millionen Zeilen überlastet wird.
Gesamt, salzig und flüssig erweist sich als das robusteste Setup: Es maximiert die Parallelität, minimiert Abweichungen und reduziert das Betriebsrisiko, wenn die Inferenz-Workloads wachsen oder sich Clusterkonfigurationen ändern.
Wichtige Erkenntnisse
- Die Skalierbarkeit von Inferenzen wird häufig durch das Datenlayout und nicht durch die Komplexität des Modells begrenzt. Dateipartitionen mit geringer Größe können dazu führen, dass Hunderte von Kernen im Leerlauf bleiben, während einige wenige Ausführer zig Millionen Zeilen verarbeiten.
- Die Partitionierung allein reicht für groß angelegte Schlussfolgerungen nicht aus. Ohne Kontrolle der Dateigröße können partitionierte Tabellen immer noch riesige Partitionen erzeugen, die zu langwierigen, verzerrten Aufgaben führen.
- Salting ist ein wirksames Werkzeug, um Parallelität freizuschalten. Durch die Einführung eines Salt-Schlüssels und die Durchsetzung einer Zeilenbeschränkung professional Partition wird die Anzahl ausführbarer Aufgaben drastisch erhöht und die Laufzeiten stabilisiert.
- Liquid Clustering ergänzt das Salzen, indem es den Versatz ohne starre Grenzen reduziert. Dadurch kann Spark das Dateilayout im Laufe der Zeit anpassen und das System so widerstandsfähiger machen, wenn die Datenmenge wächst.
