Fühlen Sie, wenn alles scheint Um intestine zu arbeiten, bis Sie unter die Motorhaube schauen und feststellen, dass Ihr System 10 × mehr Kraftstoff verbrennt, als es nötig ist?

Wir hatten ein Consumer -Skript, das Anfragen abfeuerte, um unsere Eingabeaufforderungen zu validieren, die mit asynchronen Python -Code erstellt und in einem Jupyter -Notizbuch reibungslos ausgeführt wurden. Sauber, einfach und schnell. Wir haben es regelmäßig durchgeführt, um unsere Modelle zu testen und Bewertungsdaten zu sammeln. Keine roten Fahnen. Keine Warnungen.

Aber unter dieser polierten Oberfläche lief etwas leise schief.

Wir sahen keine Misserfolge. Wir bekamen keine Ausnahmen. Wir haben nicht einmal Langsamkeit bemerkt. Aber unser System leistete viel mehr Arbeit als nötig, und wir haben es nicht bemerkt.

In diesem Beitrag gehen wir durch, wie wir das Downside entdeckt haben, was es verursacht hat und wie a Einfache strukturelle Veränderung in unserem asynchronisierten Code reduzierte den LLM -Verkehr und die Kosten um 90%, um 90%. mit praktisch ohne Geschwindigkeits- oder Funktionsverlust.

Nun, faire Warnung, Das Lesen dieses Beitrags wird Ihre LLM -Kosten nicht auf magische Weise um 90% senken. Aber das Mitnehmen hier ist breiter: Kleine, übersehene Entwurfsentscheidungen, manchmal nur wenige Codezeilen, können zu massiven Ineffizienzen führen. Wenn Sie beabsichtigt, wie Ihr Code ausgeführt wird, können Sie auf lange Sicht Zeit, Geld und Frustration sparen.

Die Repair selbst könnte zuerst Nische fühlen. Es beinhaltet die Feinheiten von Pythons asynchronem Verhalten, wie Aufgaben geplant und versandt werden. Wenn Sie mit Python vertraut sind und async/awaitSie werden mehr aus den Code -Beispielen herausholen, aber selbst wenn dies nicht der Fall ist, gibt es immer noch viel zu genießen. Denn in der wahren Geschichte hier geht es nicht nur um LLMs oder Python, sondern um Verantwortungsbewusst, effizientes Engineering.

Lass uns eingraben.

Das Setup

Um die Validierung zu automatisieren, verwenden wir einen vordefinierten Datensatz und lösen unser System über ein Consumer -Skript aus. Die Validierung konzentriert sich auf a Kleine Untergruppe des Datensatzes, so wird der Clientcode erst nach Erhalt einer bestimmten Anzahl von Antworten aufhört.

Hier ist eine vereinfachte Model unseres Kunden in Python:

import asyncio
from aiohttp import ClientSession
from tqdm.asyncio import tqdm_asyncio

URL = "http://localhost:8000/instance"
NUMBER_OF_REQUESTS = 100
STOP_AFTER = 10

async def fetch(session: ClientSession, url: str) -> bool:
    async with session.get(url) as response:
        physique = await response.json()
        return physique("worth")

async def most important():
    outcomes = ()

    async with ClientSession() as session:
        duties = (fetch(session, URL) for _ in vary(NUMBER_OF_REQUESTS))

        for future in tqdm_asyncio.as_completed(duties, whole=NUMBER_OF_REQUESTS, desc="Fetching"):
            response = await future
            if response is True:
                outcomes.append(response)
                if len(outcomes) >= STOP_AFTER:
                    print(f"n✅ Stopped after receiving {STOP_AFTER} true responses.")
                    break

asyncio.run(most important())

Dieses Skript liest Anfragen von einem Datensatz, feuert sie gleichzeitig aus und stoppt, sobald wir genug gesammelt haben true Antworten für unsere Bewertung. In der Produktion ist die Logik komplexer und basiert auf der Vielfalt der Antworten, die wir benötigen. Aber die Struktur ist die gleiche.

Verwenden wir einen Dummy -Fastapi -Server, um das echte Verhalten zu simulieren:

import asyncio
import fastapi
import uvicorn
import random

app = fastapi.FastAPI()

@app.get("/instance")
async def instance():
    sleeping_time = random.uniform(1, 2)
    await asyncio.sleep(sleeping_time)
    return {"worth": random.alternative((True, False))}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Lassen Sie uns nun diesen Dummy -Server anfeuern und den Consumer ausführen. Sie werden so etwas aus dem Consumer -Terminal sehen:

Die Fortschrittsleiste hörte auf, nachdem sie 10 Antworten erhalten hatte

Können Sie das Downside erkennen?

Foto von Keiteu Ko An Unplash

Hübsch! Schnell, sauber und… warte alles wie erwartet?

An der Oberfläche, es scheint Wie der Kunde das Richtige tut: Anfragen zu senden, 10 zu erhalten true Antworten, dann anhalten.

Aber ist es?

Fügen wir unserem Server ein paar Druckanweisungen hinzu, um zu sehen, was er tatsächlich unter der Haube tut:

import asyncio
import fastapi
import uvicorn
import random

app = fastapi.FastAPI()

@app.get("/instance")
async def instance():
    print("Bought a request")
    sleeping_time = random.uniform(1, 2)
    print(f"Sleeping for {sleeping_time:.2f} seconds")
    await asyncio.sleep(sleeping_time)
    worth = random.alternative((True, False))
    print(f"Returning worth: {worth}")
    return {"worth": worth}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0", port=8000)

Jetzt alles erneut ausführen.

Sie werden anfangen, solche Protokolle zu sehen:

Bought a request
Sleeping for 1.11 seconds
Bought a request
Sleeping for 1.29 seconds
Bought a request
Sleeping for 1.98 seconds
...
Returning worth: True
Returning worth: False
Returning worth: False
...

Schauen Sie sich die Serverprotokolle genauer an. Sie werden etwas Unerwartetes bemerken: Anstatt nur 14 Anfragen zu verarbeiten, wie wir in der Fortschrittsleiste sehen, übernimmt der Server alle 100. Obwohl der Kunde nach Erhalt 10 anhält true Antworten, es sendet immer noch jede Anfrage im Voraus. Infolgedessen muss der Server alle verarbeiten.

Es ist ein einfacher Fehler zu verpassenVor allem, weil alles aus der Sicht des Kunden korrekt zu funktionieren scheint: Antworten kommen schnell ein, die Fortschrittsbalken sind Fortschritte und das Skript verlässt früh. Aber hinter den Kulissen werden alle 100 Anfragen sofort gesendet, unabhängig davon, wann wir uns entscheiden, nicht mehr zuzuhören. Dies führt zu 10 × mehr Verkehr als nötig, steigern Sie die Kosten, die Erhöhung der Final und die Risikobeschränkungen.

Die Hauptfrage lautet additionally: Warum passiert das und wie können wir sicherstellen, dass wir nur die Anfragen senden, die wir tatsächlich benötigen? Die Antwort stellte sich als eine kleine, aber mächtige Veränderung heraus.

Die Wurzel des Issues liegt darin, wie die Aufgaben geplant sind. In unserem ursprünglichen Code erstellen wir eine Liste von 100 Aufgaben auf einmal:

duties = (fetch(session, URL) for _ in vary(NUMBER_OF_REQUESTS))

for future in tqdm_asyncio.as_completed(duties, whole=NUMBER_OF_REQUESTS, desc="Fetching"):
    response = await future

Wenn Sie eine Liste von Coroutinen an übergeben as_completedPython wickelt sofort jede Coroutine in a Job und plant es auf der Occasion -Schleife. Dies geschieht, bevor Sie über den Schleifenkörper iteriert werden. Sobald eine Coroutine zu einer wird JobDie Ereignisschleife wird sofort im Hintergrund ausgeführt.

as_completed selbst kontrolliert die Parallelität nicht, es einfach wartet darauf, dass die Aufgaben beendet werden und sie nacheinander in der Reihenfolge, die sie erledigen. Betrachten Sie es als Iterator über abgeschlossene Futures, nicht als Verkehrscontroller. Dies bedeutet, dass zu dem Zeitpunkt, an dem Sie beginnen, zu schleifen, Alle 100 Anfragen sind bereits im Gange. Nach 10 ausbrechen true Die Ergebnisse hinderen Sie daran, den Relaxation zu verarbeiten, aber es hindert sie nicht daran, gesendet zu werden.

Um dies zu beheben, haben wir vorgestellt A Semaphor Genauigkeit einschränken. Das Semaphor fügt ein leichtes Schloss im Inneren hinzu fetch so dass nur eine feste Anzahl von Anforderungen gleichzeitig beginnen kann. Der Relaxation bleibt im Pause und wartet auf einen Slot. Sobald wir unseren Stoppzustand erreicht haben, erwerben die Pautenaufgaben nie das Schloss, so dass sie ihre Anfragen nie senden.

Hier ist die angepasste Model:

import asyncio
from aiohttp import ClientSession
from tqdm.asyncio import tqdm_asyncio

URL = "http://localhost:8000/instance"
NUMBER_OF_REQUESTS = 100
STOP_AFTER = 10

async def fetch(session: ClientSession, url: str, semaphore: asyncio.Semaphore) -> str:
    async with semaphore:
        async with session.get(url) as response:
            physique = await response.json()
            return physique("worth")

async def most important():
    outcomes = ()
    semaphore = asyncio.Semaphore(int(STOP_AFTER * 1.5))

    async with ClientSession() as session:
        duties = (fetch(session, URL, semaphore) for _ in vary(NUMBER_OF_REQUESTS))

        for future in tqdm_asyncio.as_completed(duties, whole=NUMBER_OF_REQUESTS, desc="Fetching"):
            response = await future
            if response:
                outcomes.append(response)
                if len(outcomes) >= STOP_AFTER:
                    print(f"n✅ Stopped after receiving {STOP_AFTER} true responses.")
                    break

asyncio.run(most important())

Mit dieser Änderung definieren wir immer noch 100 Anfragen im Voraus, aber aber Nur eine kleine Gruppe darf gleichzeitig laufen15 in diesem Beispiel. Wenn wir unseren Stoppzustand frühzeitig erreichen, stoppt die Occasion -Schleife, bevor wir weitere Anfragen gestartet haben. Dadurch reagiert das Verhalten und reduziert gleichzeitig unnötige Anrufe.

Jetzt werden die Serverprotokolle nur etwa 20 angezeigt "Bought a request/Returning response" Einträge. Auf der Kundenseite erscheint die Fortschrittsleiste mit dem Unique identisch.

Die Fortschrittsleiste hörte auf, nachdem sie 10 Antworten erhalten hatte

Mit dieser Änderung sahen wir sofortige Auswirkungen: 90% Reduzierung des Anforderungsvolumens und LLM -Kostenohne merkwürdige Verschlechterung der Kundenerfahrung. Es verbesserte auch den Durchsatz im gesamten Staff, reduzierte die Warteschlange und beseitigte die Charge-Restrict-Probleme unserer LLM-Anbieter.

Diese kleine strukturelle Anpassung machte unsere Validierungspipeline dramatisch effizienter, ohne dem Code viel Komplexität zu verleihen. Es ist eine gute Erinnerung daran, dass sich der Steuerfluss in asynchronen Systemen nicht immer so verhalten, wie Sie es annehmen, es sei denn, Sie sind explizit darüber, wie Aufgaben geplant sind und wann sie ausgeführt werden sollten.

Bonus Perception: Schließen der Ereignisschleife

Wenn wir den ursprünglichen Clientcode ohne ausgeführt hätten asyncio.runWir hätten das Downside vielleicht früher bemerkt.
Wenn wir beispielsweise so ein manuelles Occasion -Loop -Administration wie folgt verwendet hatten:

loop = asyncio.get_event_loop()
loop.run_until_complete(most important())
loop.shut()

Python hätte Warnungen gedruckt wie:

Die Aufgabe wurde zerstört, aber sie steht noch aus!

Diese Warnungen erscheinen, wenn das Programm beendet wird, während noch unvollendete asynchronen Aufgaben in der Schleife geplant sind. Wenn wir einen Bildschirm voller Warnungen gesehen hätten, hätte er wahrscheinlich viel früher eine rote Fahne ausgelöst.

Warum haben wir diese Warnung bei der Verwendung nicht gesehen? asyncio.run()?

Weil asyncio.run() Kümmert sich um die Reinigung hinter den Kulissen. Es wird nicht nur Ihre Coroutine und Ihren Ausgang ausgeführt, sondern auch alle verbleibenden Aufgaben abgebaut, bis sie fertig ist, und erst dann die Ereignisschleife abschließt. Dieses integrierte Sicherheitsnetz verhindert, dass diese „anhängigen Aufgaben“ Warnungen angezeigt werden, auch wenn Ihr Code leise mehr Aufgaben eröffnet hat, als er nötig struggle.

Infolgedessen es unterdrückt diese Warnungen für „ausstehende Aufgaben“ Wenn Sie die Schleife manuell schließen mit loop.shut() nach run_until_complete()Alle übrig gebliebenen Aufgaben, die nicht erwartet wurden, werden immer noch herumhängen. Python erkennt, dass Sie die Schleife gewaltsam abschließen, während die Arbeiten noch geplant sind, und warnt Sie davor.

Dies kann nicht heißen, dass jedes asynchrische Python -Programm vermeiden sollte asyncio.run() oder immer benutzen loop.run_until_complete() mit einem Handbuch loop.shut(). Aber es zeigt etwas Wichtiges: Sie sollten sich bewusst sein, welche Aufgaben noch ausgeführt werden, wenn Ihr Programm austritt. Zumindest ist es eine gute Idee, vor dem Abschalten ausstehende Aufgaben zu überwachen oder zu protokollieren.

Letzte Gedanken

Indem wir den Kontrollfluss zurücktreten und überdenken konnten, konnten wir unseren Validierungsprozess dramatisch effizienter gestalten – Nicht durch Hinzufügen von mehr Infrastruktur, sondern indem wir das verwendeten, was wir bereits sorgfältiger hatten. Ein paar Zeilen der Codeänderung führten zu a 90% Kostenreduzierung ohne zusätzliche Komplexität. Es löste Charge-Restrict-Fehler, reduzierte Systemlast auf und ermöglichte es dem Staff, Bewertungen häufiger durchzuführen, ohne Engpässe zu verursachen.

Es ist eine wichtige Erinnerung daran„Clear“ asynchronisierter Code bedeutet nicht immer effizienter Code, es ist entscheidend, die Verwendung von Systemressourcen beabsichtigt zu sein, wie wir Systemressourcen verwenden. Verantwortungsbewusstes, effizientes Engineering ist mehr als nur das Schreiben von Code, der funktioniert. Es geht darum, Systeme zu entwerfen, die Zeit, Geld und gemeinsame Ressourcen respektieren, insbesondere in kollaborativen Umgebungen. Wenn Sie Compute als gemeinsames Vermögenswert anstelle eines unendlichen Swimming pools behandeln, profitieren alle davon: Systemsskala besser, Groups bewegen sich schneller und die Kosten bleiben vorhersehbar.

Egal, ob Sie LLM -Anrufe tätigen, Kubernetes -Jobs starten oder Daten in Stapeln verarbeiten, innehalten und sich fragen: Verwende ich nur das, was ich Wirklich brauchen?

Oft sind die Antwort und die Verbesserung nur eine Codezeile entfernt.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert