Nutzung von Sigma-Regeln zur Anomalieerkennung in Cybersicherheitsprotokollen: Eine Studie zur Leistungsoptimierung

Foto von Ed Vazquez auf Unsplash

Eine der Rollen des Kanadisches Zentrum für Cybersicherheit (CCCS) besteht darin, Anomalien zu erkennen und schnellstmöglich Gegenmaßnahmen zu ergreifen.

Während wir unsere Sigma-Regelerkennungen in die Produktion einführten, machten wir eine interessante Beobachtung in unserer Spark-Streaming-Anwendung. Das Ausführen einer einzelnen großen SQL-Anweisung mit 1000 Sigma-Erkennungsregeln struggle langsamer als das Ausführen von fünf separaten Abfragen, von denen jede 200 Sigma-Regeln anwendete. Das struggle überraschend, da das Ausführen von fünf Abfragen Spark dazu zwingt, die Quelldaten fünfmal statt einmal zu lesen. Weitere Einzelheiten finden Sie in unserer Artikelserie:

Angesichts der enormen Menge an Telemetriedaten und Erkennungsregeln, die wir ausführen müssen, führt jede Leistungssteigerung zu erheblichen Kosteneinsparungen. Daher haben wir uns entschlossen, diese eigenartige Beobachtung zu untersuchen, um sie zu erklären und möglicherweise zusätzliche Möglichkeiten zur Leistungssteigerung zu entdecken. Dabei haben wir einiges gelernt und wollten es mit der breiteren Neighborhood teilen.

Einführung

Wir hatten das Gefühl, dass wir bei der Codegenerierung von Spark an eine Grenze stoßen würden. Daher ist ein wenig Hintergrundwissen zu diesem Thema erforderlich. Im Jahr 2014 führte Spark die Codegenerierung ein, um Ausdrücke der Type auszuwerten (id > 1 and id > 2) and (id < 1000 or (id + id) = 12)Dieser Artikel von Databricks erklärt es sehr intestine: Spannende Leistungsverbesserungen für Spark SQL in Sicht

Zwei Jahre später führte Spark die Entire-Stage-Codegenerierung ein. Diese Optimierung führt mehrere Operatoren zu einer einzigen Java-Funktion zusammen. Wie die Expression-Codegenerierung eliminiert die Entire-Stage-Codegenerierung virtuelle Funktionsaufrufe und nutzt CPU-Register für Zwischendaten. Sie wird jedoch nicht auf der Expression-Ebene, sondern auf der Operatorebene angewendet. Operatoren sind die Knoten in einem Ausführungsplan. Weitere Informationen finden Sie unter Apache Spark als Compiler: Eine Milliarde Zeilen professional Sekunde auf einem Laptop computer zusammenführen

Um diese Artikel zusammenzufassen, erstellen wir den Plan für diese einfache Abfrage:

clarify codegen
choose
id,
(id > 1 and id > 2) and (id < 1000 or (id + id) = 12) as take a look at
from
vary(0, 10000, 1, 32)

In dieser einfachen Abfrage verwenden wir zwei Operatoren: Vary zum Generieren von Zeilen und Choose zum Ausführen einer Projektion. Wir sehen diese Operatoren im physischen Plan der Abfrage. Beachten Sie das Sternchen (codegen id : 1)neben den Knoten und den zugehörigen

|== Bodily Plan ==
* Mission (2)
+- * Vary (1)

(1) Vary (codegen id : 1)
Output (1): (id#36167L)
Arguments: Vary (0, 10000, step=1, splits=Some(32))

(2) Mission (codegen id : 1)
Output (2): (id#36167L, (((id#36167L > 1) AND (id#36167L > 2)) AND ((id#36167L < 1000) OR ((id#36167L + id#36167L) = 12))) AS take a look at#36161)
Enter (1): (id#36167L)

. Dies zeigt an, dass diese beiden Operatoren mithilfe der Entire-Stage-Codegenerierung zu einer einzigen Java-Funktion zusammengeführt wurden.

Generated code:
/* 001 */ public Object generate(Object() references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ ultimate class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ non-public Object() references;
/* 008 */ non-public scala.assortment.Iterator() inputs;
/* 009 */ non-public boolean range_initRange_0;
/* 010 */ non-public lengthy range_nextIndex_0;
/* 011 */ non-public TaskContext range_taskContext_0;
/* 012 */ non-public InputMetrics range_inputMetrics_0;
/* 013 */ non-public lengthy range_batchEnd_0;
/* 014 */ non-public lengthy range_numElementsTodo_0;
/* 015 */ non-public org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter() range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3);
/* 016 */
/* 017 */ public GeneratedIteratorForCodegenStage1(Object() references) {
/* 018 */ this.references = references;
/* 019 */ }
/* 020 */
/* 021 */ public void init(int index, scala.assortment.Iterator() inputs) {
/* 022 */ partitionIndex = index;
/* 023 */ this.inputs = inputs;
/* 024 */
/* 025 */ range_taskContext_0 = TaskContext.get();
/* 026 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 027 */ range_mutableStateArray_0(0) = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 028 */ range_mutableStateArray_0(1) = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */ range_mutableStateArray_0(2) = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 030 */
/* 031 */ }
/* 032 */
/* 033 */ non-public void project_doConsume_0(lengthy project_expr_0_0) throws java.io.IOException {
/* 034 */ // frequent sub-expressions
/* 035 */
/* 036 */ boolean project_value_4 = false;
/* 037 */ project_value_4 = project_expr_0_0 > 1L;
/* 038 */ boolean project_value_3 = false;
/* 039 */
/* 040 */ if (project_value_4) {
/* 041 */ boolean project_value_7 = false;
/* 042 */ project_value_7 = project_expr_0_0 > 2L;
/* 043 */ project_value_3 = project_value_7;
/* 044 */ }
/* 045 */ boolean project_value_2 = false;
/* 046 */
/* 047 */ if (project_value_3) {
/* 048 */ boolean project_value_11 = false;
/* 049 */ project_value_11 = project_expr_0_0 < 1000L;
/* 050 */ boolean project_value_10 = true;
/* 051 */
/* 052 */ if (!project_value_11) {
/* 053 */ lengthy project_value_15 = -1L;
/* 054 */
/* 055 */ project_value_15 = project_expr_0_0 + project_expr_0_0;
/* 056 */
/* 057 */ boolean project_value_14 = false;
/* 058 */ project_value_14 = project_value_15 == 12L;
/* 059 */ project_value_10 = project_value_14;
/* 060 */ }
/* 061 */ project_value_2 = project_value_10;
/* 062 */ }
/* 063 */ range_mutableStateArray_0(2).reset();
/* 064 */
/* 065 */ range_mutableStateArray_0(2).write(0, project_expr_0_0);
/* 066 */
/* 067 */ range_mutableStateArray_0(2).write(1, project_value_2);
/* 068 */ append((range_mutableStateArray_0(2).getRow()));
/* 069 */
/* 070 */ }
/* 071 */
/* 072 */ non-public void initRange(int idx) {
/* 073 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 074 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(32L);
/* 075 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
/* 076 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 077 */ java.math.BigInteger begin = java.math.BigInteger.valueOf(0L);
/* 078 */ lengthy partitionEnd;
/* 079 */
/* 080 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(begin);
/* 081 */ if (st.compareTo(java.math.BigInteger.valueOf(Lengthy.MAX_VALUE)) > 0) {
/* 082 */ range_nextIndex_0 = Lengthy.MAX_VALUE;
/* 083 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Lengthy.MIN_VALUE)) < 0) {
/* 084 */ range_nextIndex_0 = Lengthy.MIN_VALUE;
/* 085 */ } else {
/* 086 */ range_nextIndex_0 = st.longValue();
/* 087 */ }
/* 088 */ range_batchEnd_0 = range_nextIndex_0;
/* 089 */
/* 090 */ java.math.BigInteger finish = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 091 */ .multiply(step).add(begin);
/* 092 */ if (finish.compareTo(java.math.BigInteger.valueOf(Lengthy.MAX_VALUE)) > 0) {
/* 093 */ partitionEnd = Lengthy.MAX_VALUE;
/* 094 */ } else if (finish.compareTo(java.math.BigInteger.valueOf(Lengthy.MIN_VALUE)) < 0) {
/* 095 */ partitionEnd = Lengthy.MIN_VALUE;
/* 096 */ } else {
/* 097 */ partitionEnd = finish.longValue();
/* 098 */ }
/* 099 */
/* 100 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 101 */ java.math.BigInteger.valueOf(range_nextIndex_0));
/* 102 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue();
/* 103 */ if (range_numElementsTodo_0 < 0) {
/* 104 */ range_numElementsTodo_0 = 0;
/* 105 */ } else if (startToEnd.the rest(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 106 */ range_numElementsTodo_0++;
/* 107 */ }
/* 108 */ }
/* 109 */
/* 110 */ protected void processNext() throws java.io.IOException {
/* 111 */ // initialize Vary
/* 112 */ if (!range_initRange_0) {
/* 113 */ range_initRange_0 = true;
/* 114 */ initRange(partitionIndex);
/* 115 */ }
/* 116 */
/* 117 */ whereas (true) {
/* 118 */ if (range_nextIndex_0 == range_batchEnd_0) {
/* 119 */ lengthy range_nextBatchTodo_0;
/* 120 */ if (range_numElementsTodo_0 > 1000L) {
/* 121 */ range_nextBatchTodo_0 = 1000L;
/* 122 */ range_numElementsTodo_0 -= 1000L;
/* 123 */ } else {
/* 124 */ range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 125 */ range_numElementsTodo_0 = 0;
/* 126 */ if (range_nextBatchTodo_0 == 0) break;
/* 127 */ }
/* 128 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 129 */ }
/* 130 */
/* 131 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 132 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 133 */ lengthy range_value_0 = ((lengthy)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 134 */
/* 135 */ project_doConsume_0(range_value_0);
/* 136 */
/* 137 */ if (shouldStop()) {
/* 138 */ range_nextIndex_0 = range_value_0 + 1L;
/* 139 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references(0) /* numOutputRows */).add(range_localIdx_0 + 1);
/* 140 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 141 */ return;
/* 142 */ }
/* 143 */
/* 144 */ }
/* 145 */ range_nextIndex_0 = range_batchEnd_0;
/* 146 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references(0) /* numOutputRows */).add(range_localEnd_0);
/* 147 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 148 */ range_taskContext_0.killTaskIfInterrupted();
/* 149 */ }
/* 150 */ }
/* 151 */
/* 152 */ }

Der generierte Code zeigt deutlich, wie die beiden Operatoren zusammengeführt werden. project_doConsume_0 Der (id > 1 and id > 2) and (id < 1000 or (id + id) = 12)Funktion enthält den auszuwertenden Code

. Beachten Sie, wie dieser Code generiert wird, um diesen spezifischen Ausdruck auszuwerten. Dies ist eine Illustration der Generierung von Ausdruckscode. processNext Die ganze Klasse ist ein Operator mit einem project_doConsume_0Methode. Dieser generierte Operator führt sowohl die Projektions- als auch die Bereichsoperation aus. Innerhalb der while-Schleife in Zeile 117 sehen wir den Code zum Erzeugen von Zeilen und einen spezifischen Aufruf (keine virtuelle Funktion) zum

Dies veranschaulicht, was die Codegenerierung für ganze Phasen bewirkt.

Aufschlüsselung der Leistung Imagepath Nachdem wir nun ein besseres Verständnis für die Codegenerierung von Spark haben, versuchen wir zu erklären, warum die Aufteilung einer Abfrage mit 1000 Sigma-Regeln in kleinere eine bessere Leistung bringt. Betrachten wir eine SQL-Anweisung, die zwei Sigma-Regeln auswertet. Diese Regeln sind unkompliziert: Regel1 gleicht Ereignisse mit einem Imagepath endet mit ’schtask.exe‘, und Rule2 entspricht einem


choose /* #3 */
Imagepath,
CommandLine,
PID,
map_keys(map_filter(results_map, (okay,v) -> v = TRUE)) as matching_rules
from (
choose /* #2 */
*,
map('rule1', rule1, 'rule2', rule2) as results_map
from (
choose /* #1 */
*,
(lower_Imagepath like '%schtasks.exe') as rule1,
(lower_Imagepath like 'd:%') as rule2
from (
choose
decrease(PID) as lower_PID,
decrease(CommandLine) as lower_CommandLine,
decrease(Imagepath) as lower_Imagepath,
*
from (
choose
uuid() as PID,
uuid() as CommandLine,
uuid() as Imagepath,
id
from
vary(0, 10000, 1, 32)
)
)
)
)

beginnend mit „d:“. results_mapDie Auswahl mit der Bezeichnung #1 führt die Erkennung durch und speichert die Ergebnisse in neuen Spalten mit den Namen rule1 und rule2. Auswahl #2 gruppiert diese Spalten unter einer einzigen map_filter und schließlich wandelt choose #3 die Karte in ein Array von Matching-Regeln um. Es verwendet map_keys um nur die Einträge der Regeln zu behalten, die tatsächlich übereinstimmen, und dann

wird verwendet, um die Map-Einträge in eine Liste passender Regelnamen umzuwandeln.


== Bodily Plan ==
Mission (4)
+- * Mission (3)
+- * Mission (2)
+- * Vary (1)

...

(4) Mission
Output (4): (Imagepath#2, CommandLine#1, PID#0, map_keys(map_filter(map(rule1, EndsWith(lower_Imagepath#5, schtasks.exe), rule2, StartsWith(lower_Imagepath#5, d:)), lambdafunction(lambda v#12, lambda okay#11, lambda v#12, false))) AS matching_rules#9)
Enter (4): (lower_Imagepath#5, PID#0, CommandLine#1, Imagepath#2)

Drucken wir den Spark-Ausführungsplan für diese Abfrage aus:

Beachten Sie, dass Knoten Projekt (4) kein Code generiert. Knoten 4 hat eine Lambda-Funktion. Verhindert diese die Codegenerierung für die gesamte Section? Mehr dazu später.

+--------------------+--------------------+--------------------+--------------+
| Imagepath| CommandLine| PID| matched_rule|
+--------------------+--------------------+--------------------+--------------+
|09401675-dc09-4d0...|6b8759ee-b55a-486...|44dbd1ec-b4e0-488...| rule1|
|e2b4a0fd-7b88-417...|46dd084d-f5b0-4d7...|60111cf8-069e-4b8...| rule1|
|1843ee7a-a908-400...|d1105cec-05ef-4ee...|6046509a-191d-432...| rule2|
+--------------------+--------------------+--------------------+--------------+

Diese Abfrage ist nicht ganz das, was wir wollen. Wir möchten eine Ereignistabelle mit einer Spalte erstellen, die die Regel angibt, die erfüllt wurde. Etwa so: matching_rules Das ist ganz einfach. Wir müssen nur die


choose
Imagepath,
CommandLine,
PID,
matched_rule
from (
choose
*,
explode(matching_rules) as matched_rule
from (
/* unique assertion */
)
)

Spalte.

== Bodily Plan ==
* Mission (7)
+- * Generate (6)
+- Mission (5)
+- * Mission (4)
+- Filter (3)
+- * Mission (2)
+- * Vary (1)

...

(3) Filter
Enter (3): (PID#34, CommandLine#35, Imagepath#36)
Situation : (measurement(map_keys(map_filter(map(rule1, EndsWith(decrease(Imagepath#36),
schtasks.exe), rule2, StartsWith(decrease(Imagepath#36), d:)),
lambdafunction(lambda v#47, lambda okay#46, lambda v#47, false))), true) > 0)
...

(6) Generate (codegen id : 3)
Enter (4): (PID#34, CommandLine#35, Imagepath#36, matching_rules#43)
Arguments: explode(matching_rules#43), (PID#34, CommandLine#35, Imagepath#36), false, (matched_rule#48)

(7) Mission (codegen id : 3)
Output (4): (Imagepath#36, CommandLine#35, PID#34, matched_rule#48)
Enter (4): (PID#34, CommandLine#35, Imagepath#36, matched_rule#48)

Dadurch entstehen zwei zusätzliche Operatoren: Generieren (6) und Projizieren (7). Es gibt jedoch auch einen neuen Filter (3). explode Der explode Funktion generiert Zeilen für jedes Factor im Array. Wenn das Array leer ist,

erzeugt keine Zeilen und filtert effektiv Zeilen heraus, bei denen das Array leer ist. org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerateSpark verfügt über eine Optimierungsregel, die die Explode-Funktion erkennt und diese zusätzliche Bedingung erzeugt. Der Filter ist ein Versuch von Spark, die Verarbeitung so weit wie möglich abzukürzen. Der Quellcode für diese Regel, genannt

erklärt es so:

Leitet Filter aus Generate ab, sodass Zeilen, die durch dieses Generate entfernt worden wären, früher entfernt werden können – vor Verknüpfungen und in Datenquellen. Weitere Einzelheiten zur Optimierung von Ausführungsplänen durch Spark finden Sie im Artikel von David Vrba.

Abfragepläne in Spark 3.0 meistern

Eine weitere Frage stellt sich: Profitieren wir von diesem zusätzlichen Filter? Beachten Sie, dass dieser zusätzliche Filter vermutlich aufgrund der Lambda-Funktion auch kein Code ist, der für die gesamte Section generiert wird. Versuchen wir, dieselbe Abfrage auszudrücken, jedoch ohne Verwendung einer Lambda-Funktion. map_filterStattdessen können wir die Regelergebnisse in eine Karte einfügen, diese Karte auflösen und die Zeilen herausfiltern. Dadurch wird die Notwendigkeit umgangen,


choose
Imagepath,
CommandLine,
PID,
matched_rule
from (
choose
*
from (
choose
*,
explode(results_map) as (matched_rule, matched_result)
from (
/* unique assertion */
)
)
the place
matched_result = TRUE
)

. matched_rule Die Choose #3-Operation zerlegt die Karte in zwei neue Spalten. matched_result enthält den Schlüssel, der den Regelnamen darstellt, während die matched_resultSpalte enthält das Ergebnis des Erkennungstests. Um die Zeilen zu filtern, behalten wir einfach nur diejenigen mit einem positiven

.


== Bodily Plan ==
* Mission (8)
+- * Filter (7)
+- * Generate (6)
+- * Mission (5)
+- * Mission (4)
+- * Filter (3)
+- * Mission (2)
+- * Vary (1)

Der physische Plan zeigt, dass es sich bei allen Knoten um Vollphasencode handelt, der in einer einzigen Java-Funktion generiert wird, was vielversprechend ist. map_filter Führen wir einige Assessments durch, um die Leistung der Abfrage zu vergleichen mit

und diejenige, die „explodieren“ und dann „filtern“ verwendet.

Wir haben diese Assessments auf einer Maschine mit 4 CPUs ausgeführt. Wir haben 1 Million Zeilen mit jeweils 100 Regeln generiert und jede Regel hat 5 Ausdrücke ausgewertet. Diese Assessments wurden 5 Mal ausgeführt.

  • Im Durchschnitt
  • map_filter dauerte 42,6 Sekunden

explode_then_filter dauerte 51,2 Sekunden

Daher ist map_filter etwas schneller, obwohl es keine Codegenerierung für die gesamte Section verwendet.

Brought on by: org.codehaus.commons.compiler.InternalCompilerException: Code grows past 64 KB

In unserer Produktionsabfrage führen wir jedoch viel mehr Sigma-Regeln aus – insgesamt 1000 Regeln. Dazu gehören 29 Regex-Ausdrücke, 529 „Gleich“-Ausdrücke, 115 „beginnt mit“-Ausdrücke, 2352 „endet mit“-Ausdrücke und 5838 „enthält“-Ausdrücke. Lassen Sie uns unsere Abfrage noch einmal testen, diesmal jedoch mit einer leichten Erhöhung der Anzahl der Ausdrücke, indem wir 7 statt 5 professional Regel verwenden. Dabei ist in unseren Protokollen ein Fehler aufgetreten: spark.sql.codegen.maxFields Wir haben versucht, spark.sql.codegen.hugeMethodLimitUnd

aber grundsätzlich haben Java-Klassen eine Funktionsgrößenbeschränkung von 64 KB. Darüber hinaus beschränkt sich der JVM-JIT-Compiler auf das Kompilieren von Funktionen, die kleiner als 8 KB sind.

Die Abfrage läuft jedoch weiterhin einwandfrei, da Spark für bestimmte Teile des Plans auf das Volcano-Ausführungsmodell zurückgreift. WholeStageCodeGen ist schließlich nur eine Optimierung.

  • Wenn Sie den gleichen Take a look at wie zuvor ausführen, jedoch mit 7 Ausdrücken professional Regel statt 5, ist explode_then_filter viel schneller als map_filter.
  • map_filter dauerte 68,3 Sekunden

explode_then_filter dauerte 15,8 Sekunden org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate Eine Erhöhung der Anzahl der Ausdrücke führt dazu, dass Teile des explode_then_filter nicht mehr als Code für die gesamte Section generiert werden. Insbesondere der Filteroperator, der durch die Regel eingeführt wurde

spark.sql("SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate")

ist zu groß, um in die Codegenerierung der gesamten Section integriert zu werden. Sehen wir uns an, was passiert, wenn wir die Regel InferFiltersFromGenerate ausschließen:


== Bodily Plan ==
* Mission (6)
+- * Generate (5)
+- Mission (4)
+- * Mission (3)
+- * Mission (2)
+- * Vary (1)

== Bodily Plan ==
* Mission (7)
+- * Filter (6)
+- * Generate (5)
+- * Mission (4)
+- * Mission (3)
+- * Mission (2)
+- * Vary (1)

Wie erwartet verfügt der physische Plan beider Abfragen nicht mehr über einen zusätzlichen Filteroperator.

  • Das Entfernen der Regel hatte tatsächlich erhebliche Auswirkungen auf die Leistung:
  • map_filter dauerte 22,49 Sekunden

explode_then_filter dauerte 4,08 Sekunden

Beide Abfragen profitierten stark vom Entfernen der Regel. Angesichts der verbesserten Leistung beschlossen wir, die Anzahl der Sigma-Regeln auf 500 und die Komplexität auf 21 Ausdrücke zu erhöhen:

  • Ergebnisse:
  • map_filter dauerte 195,0 Sekunden

explode_then_filter dauerte 25,09 Sekunden

Trotz der erhöhten Komplexität liefern beide Abfragen noch immer eine ziemlich gute Leistung, wobei explode_then_filter map_filter deutlich übertrifft.

Es ist interessant, die verschiedenen Aspekte der Codegenerierung zu untersuchen, die Spark verwendet. Auch wenn wir derzeit möglicherweise nicht von der Codegenerierung für die gesamte Section profitieren, können wir dennoch von der Ausdrucksgenerierung profitieren. spark.sql.codegen.methodSplitThreshold Die Generierung von Ausdrücken unterliegt nicht den gleichen Einschränkungen wie die Generierung von Code in der gesamten Section. Sehr große Ausdrucksbäume können in kleinere aufgeteilt werden, und Sparks

steuert, wie diese aufgeteilt werden. Obwohl wir mit dieser Eigenschaft experimentiert haben, konnten wir keine signifikanten Verbesserungen feststellen. Die Standardeinstellung scheint zufriedenstellend. spark.sql.codegen.factoryModeSpark bietet eine Debug-Eigenschaft namens spark.sql.codegen.factoryMode=NO_CODEGENdas auf FALLBACK, CODEGEN_ONLY oder NO_CODEGEN gesetzt werden kann. Wir können die Generierung von Ausdruckscodes deaktivieren, indem wir

was zu einer drastischen Leistungsverschlechterung führt:

  • Mit 500 Regeln und 21 Ausdrücken:
  • map_filter dauerte 1581 Sekunden

explode_then_filter dauerte 122,31 Sekunden.

Auch wenn nicht alle Operatoren an der Codegenerierung für die gesamte Section teilnehmen, stellen wir dennoch erhebliche Vorteile bei der Codegenerierung für Ausdrücke fest.

Die Ergebnisse

Bild vom Autor

Mit unserem Bestfall von 25,1 Sekunden zum Auswerten von 10.500 Ausdrücken in 1 Million Zeilen erreichen wir eine sehr respektable Charge von 104 Millionen Ausdrücken professional Sekunde professional CPU. map_filter Das Fazit dieser Studie ist, dass wir bei der Auswertung einer großen Anzahl von Ausdrücken davon profitieren, unsere Abfragen, die org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate zu denen, die einen Explodieren-und-Filtern-Ansatz verwenden. Darüber hinaus

Regel scheint in unserem Anwendungsfall nicht vorteilhaft zu sein, daher sollten wir diese Regel aus unseren Abfragen ausschließen.

Erklärt es unsere ersten Beobachtungen?

Die Umsetzung dieser Erkenntnisse in unseren Produktionsjobs brachte erhebliche Vorteile. Aber auch nach diesen Optimierungen brachte die Aufteilung der großen Abfrage in mehrere kleinere weiterhin Vorteile. Bei weiterer Untersuchung stellten wir fest, dass dies nicht nur auf die Codegenerierung zurückzuführen struggle, sondern eine einfachere Erklärung hatte.

Beim Spark-Streaming wird ein Mikro-Batch vollständig ausgeführt und anschließend der Fortschritt überprüft, bevor ein neuer Mikro-Batch gestartet wird.

Während jedes Mikrobatches muss Spark alle seine Aufgaben erledigen, normalerweise 200. Allerdings sind nicht alle Aufgaben gleich. Spark verwendet eine Spherical-Robin-Strategie, um Zeilen auf diese Aufgaben zu verteilen. Daher können einige Aufgaben gelegentlich Ereignisse mit großen Attributen enthalten, z. B. eine sehr große Befehlszeile, wodurch bestimmte Aufgaben schnell abgeschlossen werden, während andere viel länger dauern. Hier beispielsweise die Verteilung der Ausführungszeit einer Mikrobatch-Aufgabe. Die mittlere Aufgabenzeit beträgt 14 Sekunden. Der schlimmste Nachzügler benötigt jedoch 1,6 Minuten!

Bild vom Autor

Dies wirft tatsächlich Licht auf ein anderes Phänomen. Die Tatsache, dass Spark während jedes Mikro-Batches auf ein paar Nachzügler-Aufgaben wartet, lässt viele CPUs ungenutzt, was erklärt, warum die Aufteilung der großen Abfrage in mehrere kleinere Abfragen zu einer schnelleren Gesamtleistung führte.

Dieses Bild zeigt 5 kleinere Abfragen, die parallel in derselben Spark-Anwendung ausgeführt werden. Batch3 wartet auf eine Nachzügleraufgabe, während die anderen Abfragen weiter ausgeführt werden.

Bild vom Autor

Während dieser Wartezeiten kann Spark die inaktiven CPUs für die Bearbeitung anderer Abfragen nutzen und so die Ressourcennutzung und den Gesamtdurchsatz maximieren.

Abschluss

In diesem Artikel haben wir einen Überblick über den Codegenerierungsprozess von Spark gegeben und erläutert, dass integrierte Optimierungen nicht immer zu den gewünschten Ergebnissen führen. Darüber hinaus haben wir gezeigt, dass die Umgestaltung einer Abfrage von Lambda-Funktionen zu einer Abfrage mit einer einfachen Explode-Operation zu Leistungsverbesserungen führte. Schließlich kamen wir zu dem Schluss, dass das Aufteilen einer großen Anweisung zwar zu Leistungssteigerungen führte, der Hauptfaktor für diese Verbesserungen jedoch die Ausführungstopologie und nicht die Abfragen selbst struggle.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert