Als Entwickler suche ich immer nach Möglichkeiten, meine Anwendungen dynamischer und interaktiver zu gestalten. Benutzer erwarten heute Echtzeitfunktionen wie Stay-Benachrichtigungen, Streaming-Updates und Dashboards, die automatisch aktualisiert werden. Das Software, das für Webentwickler oft in den Sinn kommt, wenn man diese Artwork von Anwendungen berücksichtigt, ist WebSockets und es ist unglaublich leistungsfähig.

Es gibt jedoch Zeiten, in denen Websockets übertrieben werden kann und ihre volle Funktionalität oft nicht erforderlich ist. Sie bieten einen komplexen, bidirektionalen Kommunikationskanal, aber oft brauche ich nur noch, dass der Server Updates überschreitet Zu der Kunde. Für diese gemeinsamen Szenarien ist eine einfachere und elegantere Lösung, die direkt in moderne Webplattformen eingebaut ist, als Server-Despatched-Ereignisse (SSE).

In diesem Artikel werde ich Ihnen Server-Ereignisse vorstellen. Wir werden diskutieren, was sie sind, wie sie sich mit Websockets vergleichen und warum sie oft das perfekte Werkzeug für den Job sind. Dann werden wir in eine Reihe praktischer Beispiele eintauchen, wobei Python und das Fastapi-Framework verwendet werden, um Echtzeitanwendungen zu erstellen, die überraschend einfach und doch leistungsfähig sind.

Was sind Server-Despatched-Ereignisse (SSE)?

Server-Despatched-Ereignisse sind ein Internet-Technologie-Commonplace, mit dem ein Server Daten asynchron an einen Consumer drücken kann, sobald eine anfängliche Consumer-Verbindung hergestellt wurde. Es bietet einen Einweg-Server-zu-Consumer-Datenstrom über eine einzelne, langlebige HTTP-Verbindung. Der Consumer, in der Regel ein Webbrowser, zeichnet diesen Stream ab und kann auf die von ihm empfangenen Nachrichten reagieren.

Einige wichtige Aspekte von Server-Despatched-Ereignissen umfassen:

  • Einfaches Protokoll. SSE ist ein unkompliziertes textbasiertes Protokoll. Ereignisse sind nur Textbrocken, die über HTTP gesendet wurden, sodass sie mit Commonplace -Instruments wie Curl einfach zu debuggen können.
  • Commonplace HTTP. SSE arbeitet über reguläre HTTP/HTTPS. Dies bedeutet, dass es im Allgemeinen besser mit vorhandenen Firewalls und Proxy -Servern kompatibel ist.
  • Automatische Wiederverbindung. Dies ist ein Killer -Characteristic. Wenn die Verbindung zum Server verloren geht, versucht die EventSource -API des Browsers automatisch wieder zu verbinden. Sie erhalten diese Resilienz kostenlos, ohne einen zusätzlichen JavaScript -Code zu schreiben.
  • Einweg-Kommunikation. SSE gilt ausschließlich für Server-zu-Consumer-Daten. Wenn Sie eine Full-Duplex-Kommunikation von Consumer zu Server benötigen, sind Websockets die geeignete Wahl.
  • Native Browser -Unterstützung. Alle modernen Webbrowser unterstützen integrierte Unterstützung für Server-Despatched-Ereignisse (SSE) über die EventSource-Schnittstelle, wodurch die Notwendigkeit von Consumer-Seite-Bibliotheken beseitigt wird.

Warum SSE wichtig ist/gemeinsame Anwendungsfälle

Der Hauptvorteil von SSE ist seine Einfachheit. Für eine große Klasse von Echtzeitproblemen bietet es alle notwendigen Funktionen mit einem Bruchteil der Komplexität von Websockets sowohl auf dem Server als auch auf dem Consumer. Dies bedeutet eine schnellere Entwicklung, einfachere Wartung und weniger Dinge, die schief gehen können.

SSE passt perfekt zu jedem Szenario, in dem der Server Kommunikation initiieren und Aktualisierungen an den Consumer senden muss. Zum Beispiel …

  • Stay -Benachrichtigungssysteme. Wenn Sie Benachrichtigungen an einen Benutzer weitergeben, wenn eine neue Nachricht eintrifft oder ein wichtiges Ereignis erfolgt.
  • Echtzeit-Aktivitätsfuttermittel. Streaming -Updates auf den Aktivitätsfeed eines Benutzers, ähnlich einer Twitter- oder Fb -Zeitleiste.
  • Stay -Daten Dashboards. Senden Sie kontinuierliche Updates für Aktienkicker, Sportwerte oder Überwachungsmetriken an ein Stay -Dashboard.
  • Streaming -Protokollausgänge. Anzeigen der Stay-Protokollausgabe von einem langlebigen Hintergrundvorgang direkt im Browser des Benutzers.
  • Fortschritts -Updates. Zeigen Sie den Echtzeit-Fortschritt eines Datei-Uploads, eines Datenverarbeitungsauftrags oder einer anderen langjährigen Aufgabe, die vom Benutzer initiiert wurde.

Das ist genug Theorie; Lassen Sie uns sehen, wie einfach es ist, diese Ideen mit Python umzusetzen.

Einrichtung der Entwicklungsumgebung

Wir werden Fastapi verwenden, ein modernes und leistungsstarkes Python-Internet-Framework. Seine native Unterstützung für Asyncio und Streaming-Antworten macht es perfekt für die Implementierung von Serverereignissen. Sie benötigen auch den Uvicorn -ASGI -Server, um die Anwendung auszuführen.

Wie üblich werden wir eine Entwicklungsumgebung einrichten, um unsere Projekte getrennt zu halten. Ich schlage vor, Miniconda dafür zu verwenden, aber ich kann gerne jedes Werkzeug verwenden, an das Sie gewöhnt sind.

# Create and activate a brand new digital surroundings
(base) $ conda create -n sse-env python=3.13 -y
(base) $ activate sse-env

Installieren Sie nun die externen Bibliotheken, die wir benötigen.

# Set up FastAPI and Uvicorn
(sse-env) $ pip set up fastapi uvicorn

Das ist alles, was wir brauchen. Jetzt können wir mit dem Codieren beginnen.

Code Beispiel 1 – Das Python -Backend. Ein einfacher SSE -Endpunkt

Lassen Sie uns unseren ersten SSE -Endpunkt erstellen. Jede Sekunde wird eine Nachricht mit der aktuellen Zeit an den Consumer gesendet.

Erstellen Sie eine Datei namens App.py und geben Sie Folgendes ein.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import time

app = FastAPI()

# Enable requests from http://localhost:8080 (the place index.html is served)
app.add_middleware(
    CORSMiddleware,
    allow_origins=("http://localhost:8080"),
    allow_methods=("GET"),
    allow_headers=("*"),
)

def event_stream():
    whereas True:
        yield f"knowledge: The time is {time.strftime('%X')}nn"
        time.sleep(1)

@app.get("/stream-time")
def stream():
    return StreamingResponse(event_stream(), media_type="textual content/event-stream")

Ich hoffe, Sie sind sich einig, dass dieser Code unkompliziert ist.

  1. Wir definieren eine event_stream () -Funktion. Diese Schleife wiederholt sich endlos und erzeugt jede Sekunde eine Schnur.
  2. Die erhaltene Zeichenfolge ist gemäß der SSE -Spezifikation formatiert: Sie muss mit beginnen Daten: und enden mit zwei Newlines ( n n).
  3. Unser Endpoint /Stream-Time gibt eine StreamingResponse zurück, die unseren Generator an ihn weitergibt und den Media_Type in Textual content /Occasion-Stream einstellt. Fastapi behandelt den Relaxation, hält die Verbindung offen und sendet jeweils ein Chunk an den Kunden.

Verwenden Sie den Commonplace, um den Code auszuführen, den Commonplace nicht Python App.py Befehl wie normalerweise. Tun Sie dies stattdessen.

(sse-env)$ uvicorn app:app --reload

INFO:     Will look ahead to modifications in these directories: ('/dwelling/tom')
INFO:     Uvicorn operating on http://127.0.0.1:8000 (Press CTRL+C to stop)
INFO:     Began reloader course of (4109269) utilizing WatchFiles
INFO:     Began server course of (4109271)
INFO:     Ready for software startup.
INFO:     Software startup full.

Geben Sie diese Adresse nun in Ihren Browser ein…

http://127.0.0.1:8000/stream-time

… Und du solltest so etwas sehen.

Bild des Autors

Auf dem Bildschirm sollte jede Sekunde einen aktualisierten Zeitsatz angezeigt werden.

Code-Beispiel 2. Echtzeit-Systemüberwachungs-Dashboard

In diesem Beispiel werden wir die CPU und den Speicherverbrauch unseres PC oder Laptops in Echtzeit überwachen.

Hier ist der App.py -Code, den Sie benötigen.

import asyncio
import json
import psutil
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import datetime

# Outline app FIRST
app = FastAPI()

# Then add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=("http://localhost:8080"),
    allow_methods=("GET"),
    allow_headers=("*"),
)

async def system_stats_generator(request: Request):
    whereas True:
        if await request.is_disconnected():
            print("Consumer disconnected.")
            break

        cpu_usage = psutil.cpu_percent()
        memory_info = psutil.virtual_memory()

        stats = {
            "cpu_percent": cpu_usage,
            "memory_percent": memory_info.p.c,
            "memory_used_mb": spherical(memory_info.used / (1024 * 1024), 2),
            "memory_total_mb": spherical(memory_info.complete / (1024 * 1024), 2)
        }

        yield f"knowledge: {json.dumps(stats)}nn"
        await asyncio.sleep(1)

@app.get("/system-stats")
async def stream_system_stats(request: Request):
    return StreamingResponse(system_stats_generator(request), media_type="textual content/event-stream")

@app.get("/", response_class=HTMLResponse)
async def read_root():
    with open("index.html") as f:
        return HTMLResponse(content material=f.learn())

Dieser Code erstellt einen Echtzeit-Systemüberwachungsdienst mithilfe des FastAPI-Internet-Frameworks. Es erstellt einen Webserver, der die CPU des Host -Geräts kontinuierlich verfolgt und überträgt, und Speicherverbrauch an einen angeschlossenen Internet -Consumer.

Zunächst wird eine Fastapi-Anwendung preliminary und konfiguriert CORSware (Cross-Origin-Ressourcenfreigabe). Diese Middleware ist eine Sicherheitsfunktion, die hier ausdrücklich konfiguriert ist, damit eine Webseite von http: // localhost: 8080 an diesem Server Anforderungen an diesen Server gestellt wird.

Der Kern der Anwendung ist die asynchrone Funktion von System_Stats_generator. Diese Funktion wird in einer unendlichen Schleife ausgeführt und verwendet in jeder Iteration die PSUTIL -Bibliothek, um den aktuellen Prozentsatz der CPU -Nutzung und den detaillierten Speicherstatistik zu holen, einschließlich des verwendeten Prozentsatzes, der verwendeten Megabyte und der gesamten Megabyte. Es verpackt diese Informationen in ein Wörterbuch, konvertiert es in eine JSON-Zeichenfolge und ergibt es dann im spezifischen Format „Textual content/Ereignisstrom“ (Daten:… n n).

Die Verwendung von asyncio.sleep (1) führt eine Ein Sekunden-Pause zwischen Aktualisierungen ein und verhindert, dass die Schleife übermäßige Ressourcen verbraucht. Die Funktion ist auch so konzipiert, dass ein Consumer getrennt hat und anmutig keine Daten für diesen Consumer nicht mehr gesendet hat.

Das Skript definiert zwei Webendpunkte. Der Endpunkt @App.get („/system-stats“) erstellt eine StreamingResponse, die das System_Stats_Generator initiiert. Wenn ein Consumer eine Get -Anfrage an diese URL stellt, stellt er eine anhaltende Verbindung her, und der Server beginnt jede Sekunde mit dem Streamieren der Systemstatistiken. Der zweite Endpunkt @App.get („/“) dient einer statischen HTML -Datei mit dem Namen index.html als Hauptseite. Diese HTML-Datei würde normalerweise den JavaScript-Code enthalten, der zur Verbindung zum /System-Stats-Stream erforderlich ist, und die eingehenden Leistungsdaten auf der Webseite dynamisch anzuzeigen.

Hier finden Sie den aktualisierten Entrance-Finish-Code (index.html).

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>System Monitor</title>
    <fashion>
        physique { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; background-color: #f0f2f5; shade: #333; show: flex; justify-content: heart; align-items: heart; top: 100vh; margin: 0; }
        .dashboard { background-color: white; padding: 2rem; border-radius: 8px; box-shadow: 0 4px 12px rgba(0,0,0,0.1); width: 400px; text-align: heart; }
        h1 { margin-top: 0; }
        .metric { margin-bottom: 1.5rem; }
        .metric-label { font-weight: daring; font-size: 1.2rem; margin-bottom: 0.5rem; }
        .progress-bar { width: 100%; background-color: #e9ecef; border-radius: 4px; overflow: hidden; }
        .progress-bar-fill { top: 20px; background-color: #007bff; width: 0%; transition: width 0.5s ease-in-out; }
        .metric-value { margin-top: 0.5rem; font-size: 1rem; shade: #555; }
    </fashion>
</head>
<physique>
    <div class="dashboard">
        <h1>Actual-Time Server Monitor</h1>
        <div class="metric">
            <div class="metric-label">CPU Utilization</div>
            <div class="progress-bar">
                <div id="cpu-progress" class="progress-bar-fill"></div>
            </div>
            <div id="cpu-value" class="metric-value">0%</div>
        </div>
        <div class="metric">
            <div class="metric-label">Reminiscence Utilization</div>
            <div class="progress-bar">
                <div id="mem-progress" class="progress-bar-fill" fashion="background-color: #28a745;"></div>
            </div>
            <div id="mem-value" class="metric-value">0% (0 / 0 MB)</div>
        </div>
    </div>
    <script>
        const cpuProgress = doc.getElementById('cpu-progress');
        const cpuValue = doc.getElementById('cpu-value');
        const memProgress = doc.getElementById('mem-progress');
        const memValue = doc.getElementById('mem-value');

        const eventSource = new EventSource('http://localhost:8000/system-stats');

        eventSource.onmessage = perform(occasion) {
            // Parse the JSON knowledge from the server
            const stats = JSON.parse(occasion.knowledge);

            // Replace CPU parts
            cpuProgress.fashion.width = stats.cpu_percent + '%';
            cpuValue.textContent = stats.cpu_percent.toFixed(2) + '%';

            // Replace Reminiscence parts
            memProgress.fashion.width = stats.memory_percent + '%';
            memValue.textContent = `${stats.memory_percent.toFixed(2)}% (${stats.memory_used_mb} / ${stats.memory_total_mb} MB)`;
        };

        eventSource.onerror = perform(err) {
            console.error("EventSource failed:", err);
            cpuValue.textContent = "Connection Error";
            memValue.textContent = "Connection Error";
        };
    </script>
</physique>
</html>

Führen Sie die App mit Uvicorn aus, wie wir es in Beispiel 1 getan haben. Geben Sie in einem separaten Befehlsfenster Folgendes ein, um einen Python -Server zu starten.

python3 -m http.server 8080

Öffnen Sie nun die URL http: // localhost: 8080/index.html in Ihrem Browser, und Sie werden die Ausgabe sehen, die kontinuierlich aktualisieren sollte.

Bild des Autors

Code Beispiel 3 – Hintergrundaufgabe Fortschrittsleiste

In diesem Beispiel initiieren wir eine Aufgabe und zeigen einen Balken an, der den Fortschritt der Aufgabe angibt.

Aktualisiert app.py

import asyncio
import json
import psutil
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import datetime

# Outline app FIRST
app = FastAPI()

# Then add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=("http://localhost:8080"),
    allow_methods=("GET"),
    allow_headers=("*"),
)

async def training_progress_generator(request: Request):
    """
    Simulates a long-running AI coaching process and streams progress.
    """
    total_epochs = 10
    steps_per_epoch = 100

    for epoch in vary(1, total_epochs + 1):
        # Simulate some preliminary processing for the epoch
        await asyncio.sleep(0.5)

        for step in vary(1, steps_per_epoch + 1):
            # Test if consumer has disconnected
            if await request.is_disconnected():
                print("Consumer disconnected, stopping coaching process.")
                return

            # Simulate work
            await asyncio.sleep(0.02)

            progress = (step / steps_per_epoch) * 100
            simulated_loss = (1 / epoch) * (1 - (step / steps_per_epoch)) + 0.1

            progress_data = {
                "epoch": epoch,
                "total_epochs": total_epochs,
                "progress_percent": spherical(progress, 2),
                "loss": spherical(simulated_loss, 4)
            }

            # Ship a named occasion "progress"
            yield f"occasion: progressndata: {json.dumps(progress_data)}nn"

    # Ship a last "full" occasion
    yield f"occasion: completendata: Coaching full!nn"

@app.get("/stream-training")
async def stream_training(request: Request):
    """SSE endpoint to stream coaching progress."""
    return StreamingResponse(training_progress_generator(request), media_type="textual content/event-stream")

@app.get("/", response_class=HTMLResponse)
async def read_root():
    """Serves the primary HTML web page."""
    with open("index.html") as f:
        return HTMLResponse(content material=f.learn())

Der aktualisierte Index.html -Code ist dies.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Stay Activity Progress</title>
    <fashion>
        physique { font-family: sans-serif; text-align: heart; padding-top: 50px; }
        .progress-container { width: 80%; max-width: 700px; margin: auto; }
        #start-btn { font-size: 1.2rem; padding: 10px 20px; cursor: pointer; }
        .progress-bar-outer { border: 1px strong #ccc; padding: 3px; border-radius: 5px; margin-top: 20px; }
        .progress-bar-inner { background-color: #4CAF50; width: 0%; top: 30px; text-align: heart; line-height: 30px; shade: white; border-radius: 3px; transition: width 0.1s linear; }
        #status-text { margin-top: 10px; font-size: 1rem; shade: #555; top: 2em; }
    </fashion>
</head>
<physique>
    <h1>AI Mannequin Coaching Simulation</h1>
    <div class="progress-container">
        <button id="start-btn">Begin Coaching</button>
        <div id="progress-bar-outer" class="progress-bar-outer" fashion="show: none;">
            <div id="progress-bar-inner" class="progress-bar-inner">0%</div>
        </div>
        <div id="status-text"></div>
    </div>
    <script>
        const startBtn = doc.getElementById('start-btn');
        const progressBarOuter = doc.getElementById('progress-bar-outer');
        const progressBarInner = doc.getElementById('progress-bar-inner');
        const statusText = doc.getElementById('status-text');

        let eventSource;

        startBtn.addEventListener('click on', () => {
            startBtn.disabled = true;
            progressBarOuter.fashion.show = 'block';
            statusText.textContent = 'Initializing...';

            // Shut any current connection
            if (eventSource) {
                eventSource.shut();
            }

            // Begin a brand new SSE connection
            eventSource = new EventSource('http://localhost:8000/stream-training');

            eventSource.addEventListener('progress', (e) => {
                const knowledge = JSON.parse(e.knowledge);
                const p.c = knowledge.progress_percent;
                progressBarInner.fashion.width = p.c + '%';
                progressBarInner.textContent = p.c.toFixed(0) + '%';
                statusText.textContent = `Epoch: ${knowledge.epoch}/${knowledge.total_epochs} | Loss: ${knowledge.loss}`;
            });

            eventSource.addEventListener('full', (e) => {
                statusText.textContent = e.knowledge;
                progressBarInner.fashion.backgroundColor = '#007bff';
                eventSource.shut(); // Shut the connection
                startBtn.disabled = false;
            });

            eventSource.onerror = () => {
                statusText.textContent = 'Connection error. Please attempt once more.';
                eventSource.shut();
                startBtn.disabled = false;
            };
        });
    </script>
</physique>
</html>

Stoppen Sie Ihre vorhandenen Uvicorn- und Python -Serverprozesse, wenn sie noch ausgeführt werden, und starten Sie dann beide neu.

Wenn Sie nun die Seite index.html öffnen, sollten Sie einen Bildschirm mit einer Schaltfläche sehen. Wenn Sie die Style drücken, wird eine Dummy -Aufgabe gestartet, und eine sich bewegende Leiste zeigt den Aufgabenfortschritt an.

Bild des Autors

Code Beispiel 4-Ein Echtzeit-Finanz-Aktien-Ticker

Für unser letztes Beispiel erstellen wir einen simulierten Aktien -Ticker. Der Server generiert zufällige Preisaktualisierungen für mehrere Aktiensymbole und sendet sie mit benannten Ereignissen, wobei der Ereignisname dem Aktiensymbol entspricht (z. B. Ereignis: AAPL, Ereignis: Googl). Dies ist ein leistungsstarkes Muster zum Multiplexen verschiedener Datenarten über eine einzelne SSE -Verbindung, sodass der Consumer jeden Stream unabhängig verarbeiten kann.

Hier ist der aktualisierte App.py -Code, den Sie benötigen.

import asyncio
import json
import random
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware

# Step 1: Create app first
app = FastAPI()

# Step 2: Add CORS to permit requests from http://localhost:8080
app.add_middleware(
    CORSMiddleware,
    allow_origins=("http://localhost:8080"),
    allow_methods=("GET"),
    allow_headers=("*"),
)

# Step 3: Simulated inventory costs
STOCKS = {
    "AAPL": 150.00,
    "GOOGL": 2800.00,
    "MSFT": 300.00,
}

# Step 4: Generator to simulate updates
async def stock_ticker_generator(request: Request):
    whereas True:
        if await request.is_disconnected():
            break

        image = random.selection(checklist(STOCKS.keys()))
        change = random.uniform(-0.5, 0.5)
        STOCKS(image) = max(0, STOCKS(image) + change)

        replace = {
            "image": image,
            "worth": spherical(STOCKS(image), 2),
            "change": spherical(change, 2)
        }

        # Ship named occasions so the browser can hear by image
        yield f"occasion: {image}ndata: {json.dumps(replace)}nn"
        await asyncio.sleep(random.uniform(0.5, 1.5))

# Step 5: SSE endpoint
@app.get("/stream-stocks")
async def stream_stocks(request: Request):
    return StreamingResponse(stock_ticker_generator(request), media_type="textual content/event-stream")

Und der aktualisierte index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Stay Inventory Ticker</title>
    <fashion>
        physique { font-family: sans-serif; show: flex; justify-content: heart; padding-top: 50px; }
        .ticker { show: flex; hole: 20px; }
        .inventory { border: 1px strong #ccc; padding: 15px; border-radius: 5px; width: 150px; text-align: heart; }
        .image { font-weight: daring; font-size: 1.5rem; }
        .worth { font-size: 2rem; margin: 10px 0; }
        .change { font-size: 1rem; }
        .up { shade: inexperienced; }
        .down { shade: pink; }
    </fashion>
</head>
<physique>
    <div class="ticker">
        <div id="AAPL" class="inventory">
            <div class="image">AAPL</div>
            <div class="worth">--.--</div>
            <div class="change">-.--</div>
        </div>
        <div id="GOOGL" class="inventory">
            <div class="image">GOOGL</div>
            <div class="worth">--.--</div>
            <div class="change">-.--</div>
        </div>
        <div id="MSFT" class="inventory">
            <div class="image">MSFT</div>
            <div class="worth">--.--</div>
            <div class="change">-.--</div>
        </div>
    </div>

    <script>
        const eventSource = new EventSource('http://localhost:8000/stream-stocks');

        perform updateStock(knowledge) {
            const inventory = doc.getElementById(knowledge.image);
            if (!inventory) return;

            const priceEl = inventory.querySelector('.worth');
            const changeEl = inventory.querySelector('.change');

            priceEl.textContent = knowledge.worth.toFixed(2);
            changeEl.textContent = knowledge.change.toFixed(2);

            const className = knowledge.change >= 0 ? 'up' : 'down';
            priceEl.className = 'worth ' + className;
            changeEl.className = 'change ' + className;
        }

        ('AAPL', 'GOOGL', 'MSFT').forEach(image => {
            eventSource.addEventListener(image, e => {
                const stockData = JSON.parse(e.knowledge);
                updateStock(stockData);
            });
        });

        eventSource.onerror = perform(err) {
            console.error("EventSource failed:", err);
        };
    </script>
</physique>
</html>

Stoppen Sie dann die Uvicorn- und Python -Prozesse wie zuvor neu. Wenn Sie diesmal http: // localhost: 8080/index.html in Ihrem Browser öffnen, sollten Sie einen solchen Bildschirm sehen, der die Dummy -Preise der drei Aktien kontinuierlich aktualisiert.

Bild des Autors

Zusammenfassung

In diesem Artikel habe ich gezeigt, dass für viele Anwendungsfälle in Echtzeit Server-Despatched-Ereignisse eine einfachere Different zu WebSockets bieten. Wir haben die Kernprinzipien von SSE, einschließlich des Einweg-Kommunikationsmodells und der automatischen Wiederverbindung Funktionen, erörtert. In einer Reihe praktischer Beispiele mit Python und Fastapi haben wir gesehen, wie einfach es ist, leistungsstarke Echtzeitfunktionen aufzubauen. Wir haben abgedeckt:

  • Ein einfacher Python-Again-Finish- und SSE-Endpunkt
  • Ein Stay -Systemüberwachungs -Dashboard -Streaming Streaming JSON -Daten.
  • Eine Echtzeit-Fortschrittsleiste für eine simulierte langlebige Hintergrundaufgabe.
  • Ein Multiplex -Aktien -Ticker, der benannte Ereignisse verwendet, um verschiedene Datenströme zu verwalten.

Wenn Sie das nächste Mal Daten von Ihrem Server an einen Consumer weitergeben müssen, empfehle ich Ihnen, eine Pause einzulegen, bevor Sie nach WebSockets greifen. Fragen Sie sich, ob Sie wirklich eine bidirektionale Kommunikation benötigen. Wenn die Antwort Nein lautet, sind Server-Despatched-Ereignisse wahrscheinlich die einfachere, schnellere und robustere Lösung, nach der Sie gesucht haben.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert