Warum die Daten von gestern scannen, wenn Sie die Daten von heute erhöhen können?

Bild vom Autor

SQL-Aggregationsfunktionen können rechenintensiv sein, wenn sie auf große Datensätze angewendet werden. Wenn Datensätze wachsen, wird die Neuberechnung von Metriken für den gesamten Datensatz immer wieder ineffizient. Um diese Herausforderung anzugehen, inkrementelle Aggregation wird häufig eingesetzt – eine Methode, bei der ein früherer Zustand beibehalten und mit neuen eingehenden Daten aktualisiert wird. Während dieser Ansatz für Aggregationen wie COUNT oder SUM unkompliziert ist, stellt sich die Frage: Wie kann er auf komplexere Metriken wie die Standardabweichung angewendet werden?

Standardabweichung ist eine statistische Metrik, die das Ausmaß der Variation oder Streuung der Werte einer Variablen im Verhältnis zu ihrem Mittelwert misst.
Es wird abgeleitet, indem die Quadratwurzel daraus gezogen wird Varianz.
Die Formel zur Berechnung der Varianz einer Stichprobe lautet wie folgt:

Stichprobenvarianzformel

Die Berechnung der Standardabweichung kann komplex sein, da dazu sowohl der Mittelwert als auch die Summe der quadrierten Differenzen aller Datenpunkte aktualisiert werden müssen. Mit algebraischer Manipulation können wir jedoch eine Formel für inkrementelle Berechnungen ableiten, die Aktualisierungen unter Verwendung eines vorhandenen Datensatzes und die nahtlose Integration neuer Daten ermöglicht. Dieser Ansatz vermeidet eine Neuberechnung jedes Mal, wenn neue Daten hinzugefügt werden, wodurch der Prozess wesentlich effizienter wird (eine detaillierte Ableitung ist verfügbar). auf meinem GitHub).

Abgeleitete Stichprobenvarianzformel

Die Formel gliederte sich grundsätzlich in drei Teile:
1. Die gewichtete Varianz der vorhandenen Menge
2. Die gewichtete Varianz des neuen Satzes
3. Die mittlere Differenzvarianz unter Berücksichtigung der Varianz zwischen Gruppen.

Diese Methode ermöglicht die inkrementelle Varianzberechnung durch Beibehaltung von COUNT (okay), AVG (µk) und VAR (Sk) des vorhandenen Satzes und deren Kombination mit COUNT (n), AVG (µn) und VAR (Sn) von das neue Set. Dadurch kann die aktualisierte Standardabweichung effizient berechnet werden, ohne den gesamten Datensatz erneut scannen zu müssen.

Nachdem wir uns nun mit der Mathematik hinter der inkrementellen Standardabweichung befasst haben (oder zumindest das Wesentliche verstanden haben), wollen wir uns mit der dbt-SQL-Implementierung befassen. Im folgenden Beispiel zeigen wir Ihnen, wie Sie ein inkrementelles Modell einrichten, um diese Statistiken für die Transaktionsdaten eines Benutzers zu berechnen und zu aktualisieren.

Betrachten Sie eine Transaktionstabelle mit dem Namen stg__transactionsdas Benutzertransaktionen (Ereignisse) verfolgt. Unser Ziel ist es, eine zeitstatische Tabelle zu erstellen, int__user_tx_statedas den „Standing“ von Benutzertransaktionen aggregiert. Die Spaltendetails für beide Tabellen finden Sie im Bild unten.

Bild vom Autor

Um den Prozess effizient zu gestalten, wollen wir die Statustabelle schrittweise aktualisieren, indem wir die neuen eingehenden Transaktionsdaten mit den vorhandenen aggregierten Daten (dh dem aktuellen Benutzerstatus) kombinieren. Dieser Ansatz ermöglicht es uns, den aktualisierten Benutzerstatus zu berechnen, ohne alle historischen Daten durchsuchen zu müssen.

Bild vom Autor

Der folgende Code setzt das Verständnis einiger dbt-Konzepte voraus. Wenn Sie damit nicht vertraut sind, können Sie den Code möglicherweise trotzdem verstehen, ich empfehle jedoch dringend, ihn durchzugehen dbts inkrementelle Anleitung oder lesen dieser tolle Beitrag.

Wir erstellen Schritt für Schritt ein vollständiges DBT-SQL mit dem Ziel, inkrementelle Aggregationen effizient zu berechnen, ohne die gesamte Tabelle wiederholt zu scannen. Der Prozess beginnt mit der Definition des Modells als inkrementell in dbt und der Verwendung unique_key um vorhandene Zeilen zu aktualisieren, anstatt neue einzufügen.

-- depends_on: {{ ref('stg__transactions') }}
{{ config(materialized='incremental', unique_key=('USER_ID'), incremental_strategy='merge') }}

Als nächstes rufen wir Datensätze aus dem ab stg__transactions Tisch.
Der is_incremental Block filtert Transaktionen mit Zeitstempeln, die nach der letzten Benutzeraktualisierung liegen, und schließt effektiv „nur neue Transaktionen“ ein.

WITH NEW_USER_TX_DATA AS (
SELECT
USER_ID,
TX_ID,
TX_TIMESTAMP,
TX_VALUE
FROM {{ ref('stg__transactions') }}
{% if is_incremental() %}
WHERE TX_TIMESTAMP > COALESCE((choose max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ)
{% endif %}
)

Nachdem wir die neuen Transaktionsdatensätze abgerufen haben, aggregieren wir sie nach Benutzern, sodass wir den Standing jedes Benutzers in den folgenden CTEs schrittweise aktualisieren können.

INCREMENTAL_USER_TX_DATA AS (
SELECT
USER_ID,
MAX(TX_TIMESTAMP) AS UPDATED_AT,
COUNT(TX_VALUE) AS INCREMENTAL_COUNT,
AVG(TX_VALUE) AS INCREMENTAL_AVG,
SUM(TX_VALUE) AS INCREMENTAL_SUM,
COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV,
FROM
NEW_USER_TX_DATA
GROUP BY
USER_ID
)

Jetzt kommen wir zum schweren Teil, bei dem wir die Aggregationen tatsächlich berechnen müssen. Wenn wir uns nicht im inkrementellen Modus befinden (dh wir haben noch keine „Standing“-Zeilen), wählen wir einfach die neuen Aggregationen aus

NEW_USER_CULMULATIVE_DATA AS (
SELECT
NEW_DATA.USER_ID,
{% if not is_incremental() %}
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX,
NEW_DATA.INCREMENTAL_AVG AS AVG_TX,
NEW_DATA.INCREMENTAL_SUM AS SUM_TX,
NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX
{% else %}
...

Aber wenn wir uns im inkrementellen Modus befinden, müssen wir frühere Daten zusammenführen und sie mit den neuen Daten kombinieren, die wir im erstellt haben INCREMENTAL_USER_TX_DATA CTE basierend auf der oben beschriebenen Formel.
Wir beginnen mit der Berechnung der neuen SUM, COUNT und AVG:

  ...
{% else %}
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) AS _n, -- that is n
NEW_DATA.INCREMENTAL_COUNT AS _k, -- that is okay
COALESCE(EXISTING_USER_DATA.SUM_TX, 0) + NEW_DATA.INCREMENTAL_SUM AS NEW_SUM_TX, -- new sum
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) + NEW_DATA.INCREMENTAL_COUNT AS NEW_COUNT_TX, -- new depend
NEW_SUM_TX / NEW_COUNT_TX AS AVG_TX, -- new avg
...

Anschließend berechnen wir die drei Teile der Varianzformel

1. Die vorhandene gewichtete Varianz, die auf 0 gekürzt wird, wenn der vorherige Satz aus einem oder weniger Elementen besteht:

    ...
CASE
WHEN _n > 1 THEN (((_n - 1) / (NEW_COUNT_TX - 1)) * POWER(COALESCE(EXISTING_USER_DATA.STDDEV_TX, 0), 2))
ELSE 0
END AS EXISTING_WEIGHTED_VARIANCE, -- current weighted variance
...

2. Die inkrementell gewichtete Varianz auf die gleiche Weise:

    ...
CASE
WHEN _k > 1 THEN (((_k - 1) / (NEW_COUNT_TX - 1)) * POWER(NEW_DATA.INCREMENTAL_STDDEV, 2))
ELSE 0
END AS INCREMENTAL_WEIGHTED_VARIANCE, -- incremental weighted variance
...

3. Die mittlere Differenzvarianz, wie zuvor beschrieben, zusammen mit SQL-Be a part of-Begriffen, um vergangene Daten einzubeziehen.

    ...
POWER((COALESCE(EXISTING_USER_DATA.AVG_TX, 0) - NEW_DATA.INCREMENTAL_AVG), 2) AS MEAN_DIFF_SQUARED,
CASE
WHEN NEW_COUNT_TX = 1 THEN 0
ELSE (_n * _k) / (NEW_COUNT_TX * (NEW_COUNT_TX - 1))
END AS BETWEEN_GROUP_WEIGHT, -- between group weight
BETWEEN_GROUP_WEIGHT * MEAN_DIFF_SQUARED AS MEAN_DIFF_VARIANCE, -- imply diff variance
EXISTING_WEIGHTED_VARIANCE + INCREMENTAL_WEIGHTED_VARIANCE + MEAN_DIFF_VARIANCE AS VARIANCE_TX,
CASE
WHEN _n = 0 THEN NEW_DATA.INCREMENTAL_STDDEV -- no "previous" information
WHEN _k = 0 THEN EXISTING_USER_DATA.STDDEV_TX -- no "new" information
ELSE SQRT(VARIANCE_TX) -- stddev (which is the foundation of variance)
END AS STDDEV_TX,
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_SUM_TX AS SUM_TX,
NEW_COUNT_TX AS COUNT_TX
{% endif %}
FROM
INCREMENTAL_USER_TX_DATA new_data
{% if is_incremental() %}
LEFT JOIN
{{ this }} EXISTING_USER_DATA
ON
NEW_DATA.USER_ID = EXISTING_USER_DATA.USER_ID
{% endif %}
)

Schließlich wählen wir die Spalten der Tabelle aus und berücksichtigen dabei sowohl inkrementelle als auch nicht-inkrementelle Fälle:

SELECT
USER_ID,
UPDATED_AT,
COUNT_TX,
SUM_TX,
AVG_TX,
STDDEV_TX
FROM NEW_USER_CULMULATIVE_DATA

Durch die Kombination all dieser Schritte gelangen wir zum endgültigen SQL-Modell:

-- depends_on: {{ ref('stg__initial_table') }}
{{ config(materialized='incremental', unique_key=('USER_ID'), incremental_strategy='merge') }}
WITH NEW_USER_TX_DATA AS (
SELECT
USER_ID,
TX_ID,
TX_TIMESTAMP,
TX_VALUE
FROM {{ ref('stg__initial_table') }}
{% if is_incremental() %}
WHERE TX_TIMESTAMP > COALESCE((choose max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ)
{% endif %}
),
INCREMENTAL_USER_TX_DATA AS (
SELECT
USER_ID,
MAX(TX_TIMESTAMP) AS UPDATED_AT,
COUNT(TX_VALUE) AS INCREMENTAL_COUNT,
AVG(TX_VALUE) AS INCREMENTAL_AVG,
SUM(TX_VALUE) AS INCREMENTAL_SUM,
COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV,
FROM
NEW_USER_TX_DATA
GROUP BY
USER_ID
),

NEW_USER_CULMULATIVE_DATA AS (
SELECT
NEW_DATA.USER_ID,
{% if not is_incremental() %}
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX,
NEW_DATA.INCREMENTAL_AVG AS AVG_TX,
NEW_DATA.INCREMENTAL_SUM AS SUM_TX,
NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX
{% else %}
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) AS _n, -- that is n
NEW_DATA.INCREMENTAL_COUNT AS _k, -- that is okay
COALESCE(EXISTING_USER_DATA.SUM_TX, 0) + NEW_DATA.INCREMENTAL_SUM AS NEW_SUM_TX, -- new sum
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) + NEW_DATA.INCREMENTAL_COUNT AS NEW_COUNT_TX, -- new depend
NEW_SUM_TX / NEW_COUNT_TX AS AVG_TX, -- new avg
CASE
WHEN _n > 1 THEN (((_n - 1) / (NEW_COUNT_TX - 1)) * POWER(COALESCE(EXISTING_USER_DATA.STDDEV_TX, 0), 2))
ELSE 0
END AS EXISTING_WEIGHTED_VARIANCE, -- current weighted variance
CASE
WHEN _k > 1 THEN (((_k - 1) / (NEW_COUNT_TX - 1)) * POWER(NEW_DATA.INCREMENTAL_STDDEV, 2))
ELSE 0
END AS INCREMENTAL_WEIGHTED_VARIANCE, -- incremental weighted variance
POWER((COALESCE(EXISTING_USER_DATA.AVG_TX, 0) - NEW_DATA.INCREMENTAL_AVG), 2) AS MEAN_DIFF_SQUARED,
CASE
WHEN NEW_COUNT_TX = 1 THEN 0
ELSE (_n * _k) / (NEW_COUNT_TX * (NEW_COUNT_TX - 1))
END AS BETWEEN_GROUP_WEIGHT, -- between group weight
BETWEEN_GROUP_WEIGHT * MEAN_DIFF_SQUARED AS MEAN_DIFF_VARIANCE,
EXISTING_WEIGHTED_VARIANCE + INCREMENTAL_WEIGHTED_VARIANCE + MEAN_DIFF_VARIANCE AS VARIANCE_TX,
CASE
WHEN _n = 0 THEN NEW_DATA.INCREMENTAL_STDDEV -- no "previous" information
WHEN _k = 0 THEN EXISTING_USER_DATA.STDDEV_TX -- no "new" information
ELSE SQRT(VARIANCE_TX) -- stddev (which is the foundation of variance)
END AS STDDEV_TX,
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_SUM_TX AS SUM_TX,
NEW_COUNT_TX AS COUNT_TX
{% endif %}
FROM
INCREMENTAL_USER_TX_DATA new_data
{% if is_incremental() %}
LEFT JOIN
{{ this }} EXISTING_USER_DATA
ON
NEW_DATA.USER_ID = EXISTING_USER_DATA.USER_ID
{% endif %}
)

SELECT
USER_ID,
UPDATED_AT,
COUNT_TX,
SUM_TX,
AVG_TX,
STDDEV_TX
FROM NEW_USER_CULMULATIVE_DATA

Während dieses Prozesses haben wir gezeigt, wie man sowohl nicht-inkrementelle als auch inkrementelle Modi effektiv handhabt und mathematische Techniken nutzt, um Metriken wie Varianz und Standardabweichung effizient zu aktualisieren. Durch die nahtlose Kombination historischer und neuer Daten haben wir einen optimierten, skalierbaren Ansatz für die Datenaggregation in Echtzeit erreicht.

In diesem Artikel haben wir die mathematische Technik zur inkrementellen Berechnung der Standardabweichung und deren Implementierung mithilfe der inkrementellen Modelle von dbt untersucht. Dieser Ansatz erweist sich als äußerst effizient und ermöglicht die Verarbeitung großer Datensätze, ohne dass der gesamte Datensatz erneut gescannt werden muss. In der Praxis führt dies zu schnelleren, skalierbareren Systemen, die Echtzeit-Updates effizient verarbeiten können. Wenn Sie darüber weiter diskutieren oder Ihre Gedanken mitteilen möchten, können Sie sich gerne an mich wenden – ich würde gerne Ihre Gedanken hören!

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert