In diesem Tutorial zeigen wir, wie man ein einheitliches System erstellt Apache Beam Pipeline, die mit DirectRunner sowohl im Batch- als auch im Stream-ähnlichen Modus nahtlos funktioniert. Wir generieren synthetische, ereigniszeitbewusste Daten und wenden festes Fenstering mit Triggern und zulässiger Verspätung an, um zu demonstrieren, wie Apache Beam sowohl pünktliche als auch verspätete Ereignisse konsistent verarbeitet. Indem wir nur die Eingabequelle wechseln, halten wir die Kernaggregationslogik identisch, was uns hilft, klar zu verstehen, wie sich das Ereigniszeitmodell, die Fenster und die Bereiche von Beam verhalten, ohne auf eine externe Streaming-Infrastruktur angewiesen zu sein. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.
!pip -q set up -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q set up -U apache-beam crcmod
import apache_beam as beam
from apache_beam.choices.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.set off import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone
Wir installieren die erforderlichen Abhängigkeiten und stellen die Versionskompatibilität sicher, damit Apache Beam. Wir importieren die wichtigsten Beam-APIs zusammen mit Fenstern, Triggern und TestStream-Dienstprogrammen, die später in der Pipeline benötigt werden. Wir integrieren auch Normal-Python-Module für die Zeitverwaltung und JSON-Formatierung. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.
MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
def make_event(user_id, event_type, quantity, event_time_epoch_s):
return {"user_id": user_id, "event_type": event_type, "quantity": float(quantity), "event_time": int(event_time_epoch_s)}
base = datetime.now(timezone.utc).exchange(microsecond=0)
t0 = int(base.timestamp())
BATCH_EVENTS = (
make_event("u1", "buy", 20, t0 + 5),
make_event("u1", "buy", 15, t0 + 20),
make_event("u2", "buy", 8, t0 + 35),
make_event("u1", "refund", -5, t0 + 62),
make_event("u2", "buy", 12, t0 + 70),
make_event("u3", "buy", 9, t0 + 75),
make_event("u2", "buy", 3, t0 + 50),
)
Wir definieren die globale Konfiguration, die Fenstergröße, Verspätung und Ausführungsmodus steuert. Wir erstellen synthetische Ereignisse mit expliziten Ereigniszeit-Zeitstempeln, sodass das Fensterverhalten deterministisch und leicht nachvollziehbar ist. Wir bereiten einen kleinen Datensatz vor, der absichtlich Ereignisse außerhalb der Reihenfolge und späte Ereignisse enthält, um die Ereigniszeitsemantik von Beam zu beobachten. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.
def format_joined_record(kv):
user_id, d = kv
return {
"user_id": user_id,
"depend": int(d("depend")(0)) if d("depend") else 0,
"sum_amount": float(d("sum_amount")(0)) if d("sum_amount") else 0.0,
}
class WindowedUserAgg(beam.PTransform):
def develop(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e("event_time")))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
set off=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e("user_id"), e("quantity")))
counts = keyed | beam.combiners.Depend.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{"depend": counts, "sum_amount": sums}
| beam.CoGroupByKey()
| beam.Map(format_joined_record)
)
Wir erstellen eine wiederverwendbare Beam-PTransform, die die gesamte Fensteraggregationslogik kapselt. Wir wenden feste Fenster, Auslöser und Akkumulationsregeln an, gruppieren dann Ereignisse nach Benutzern und berechnen Anzahlen und Summen. Wir halten diese Transformation unabhängig von der Datenquelle, sodass die gleiche Logik sowohl für Batch- als auch für Streaming-Eingaben gilt. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.
class AddWindowInfo(beam.DoFn):
def course of(self, aspect, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.begin)
we = float(window.finish)
yield {
**aspect,
"window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_first": pane_info.is_first,
"pane_is_last": pane_info.is_last,
}
def build_test_stream():
return (
TestStream()
.advance_watermark_to(t0)
.add_elements((
beam.window.TimestampedValue(make_event("u1", "buy", 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event("u1", "buy", 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event("u2", "buy", 8, t0 + 35), t0 + 35),
))
.advance_processing_time(5)
.advance_watermark_to(t0 + 61)
.add_elements((
beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event("u2", "buy", 12, t0 + 70), t0 + 70),
beam.window.TimestampedValue(make_event("u3", "buy", 9, t0 + 75), t0 + 75),
))
.advance_processing_time(5)
.add_elements((
beam.window.TimestampedValue(make_event("u2", "buy", 3, t0 + 50), t0 + 50),
))
.advance_watermark_to(t0 + 121)
.advance_watermark_to_infinity()
)
Wir reichern jeden aggregierten Datensatz mit Fenster- und Bereichsmetadaten an, damit wir klar erkennen können, wann und warum Ergebnisse ausgegeben werden. Aus Gründen der Übersichtlichkeit konvertieren wir die internen Zeitstempel von Beam in für Menschen lesbare UTC-Zeiten. Wir definieren außerdem einen TestStream, der das echte Streaming-Verhalten mithilfe von Wasserzeichen, Fortschritten bei der Verarbeitungszeit und späten Daten simuliert. Schauen Sie sich das an VOLLSTÄNDIGE CODES hier.
def run_batch():
with beam.Pipeline(choices=PipelineOptions(())) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
def run_stream():
opts = PipelineOptions(())
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(choices=opts) as p:
(
p
| build_test_stream()
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
run_stream() if MODE == "stream" else run_batch()
Wir verbinden alles zu ausführbaren Batch- und Stream-ähnlichen Pipelines. Wir wechseln zwischen den Modi, indem wir ein einzelnes Flag ändern und gleichzeitig dieselbe Aggregationstransformation wiederverwenden. Wir führen die Pipeline aus und drucken die Fensterergebnisse direkt aus, sodass der Ausführungsfluss und die Ausgaben leicht zu überprüfen sind.
Zusammenfassend haben wir gezeigt, dass dieselbe Beam-Pipeline sowohl begrenzte Batch-Daten als auch unbegrenzte, streamähnliche Daten verarbeiten kann und dabei die identische Fenster- und Aggregationssemantik beibehält. Wir haben beobachtet, wie Wasserzeichen, Auslöser und Akkumulationsmodi Einfluss darauf haben, wann Ergebnisse ausgegeben werden und wie späte Daten zuvor berechnete Fenster aktualisieren. Außerdem haben wir uns auf die konzeptionellen Grundlagen des einheitlichen Modells von Beam konzentriert, das eine solide Grundlage für die spätere Skalierung desselben Designs auf echte Streaming-Läufer und Produktionsumgebungen bietet.
Schauen Sie sich das an VOLLSTÄNDIGE CODES hier. Sie können uns auch gerne weiter folgen Twitter und vergessen Sie nicht, bei uns mitzumachen 100.000+ ML SubReddit und Abonnieren Unser Publication. Warten! Bist du im Telegram? Jetzt können Sie uns auch per Telegram kontaktieren.
Schauen Sie sich unsere neueste Model von an ai2025.deveine auf das Jahr 2025 ausgerichtete Analyseplattform, die Modelleinführungen, Benchmarks und Ökosystemaktivitäten in einen strukturierten Datensatz umwandelt, den Sie filtern, vergleichen und exportieren können
Michal Sutter ist ein Knowledge-Science-Experte mit einem Grasp of Science in Knowledge Science von der Universität Padua. Mit einer soliden Grundlage in statistischer Analyse, maschinellem Lernen und Datentechnik ist Michal hervorragend darin, komplexe Datensätze in umsetzbare Erkenntnisse umzuwandeln.

