Die meisten Tutorials zum maschinellen Lernen konzentrieren sich auf synchrones Serving in Echtzeit, das sofortige Antworten auf Vorhersageanfragen ermöglicht. Dieser Ansatz kann jedoch bei Datenverkehrsspitzen Probleme bereiten und ist für Aufgaben mit langer Laufzeit nicht preferrred. Außerdem sind leistungsstärkere Maschinen erforderlich, um schnell reagieren zu können, und wenn der Shopper oder Server ausfällt, geht das Vorhersageergebnis normalerweise verloren.
In diesem Blogbeitrag zeigen wir, wie man ein Machine-Studying-Modell als asynchronen Employee mit Celery und Redis ausführt. Wir verwenden das Florence-2-Basismodell, ein Imaginative and prescient-Sprachmodell, das für seine beeindruckende Leistung bekannt ist. Dieses Tutorial bietet ein minimales, aber funktionales Beispiel, das Sie für Ihre eigenen Anwendungsfälle anpassen und erweitern können.
Sie können hier eine Demo der App ansehen: https://caption-app-dfmj3maizq-ew.a.run.app/
Der Kern unserer Lösung basiert auf Celery, einer Python-Bibliothek, die diese Shopper/Employee-Logik für uns implementiert. Sie ermöglicht es uns, die Rechenarbeit auf viele Employee zu verteilen und so die Skalierbarkeit Ihres ML-Inferenz-Anwendungsfalls auf hohe und unvorhersehbare Lasten zu verbessern.
Der Ablauf funktioniert folgendermaßen:
- Der Shopper übermittelt eine Aufgabe mit einigen Parametern an eine vom Dealer verwaltete Warteschlange (in unserem Beispiel Redis).
- Ein (oder mehrere) Employee überwacht kontinuierlich die Warteschlange und nimmt eingehende Aufgaben auf. Anschließend führt er sie aus und speichert das Ergebnis im Backend-Speicher.
- Der Shopper kann das Ergebnis der Aufgabe anhand seiner ID abrufen, indem er entweder das Backend abfragt oder den Kanal der Aufgabe abonniert.
Beginnen wir mit einem vereinfachten Beispiel:
Führen Sie zuerst Redis aus:
docker run -p 6379:6379 redis
Hier ist der Employee-Code:
from celery import Celery
# Configure Celery to make use of Redis because the dealer and backend
app = Celery(
"duties", dealer="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# Outline a easy activity
@app.activity
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(("employee", "--loglevel=information"))
Und der Clientcode:
from celery import Celery
app = Celery("duties", dealer="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.management.examine().lively()=}")
task_name = "duties.add"
add = app.signature(task_name)
print("Gotten Process")
# Ship a activity to the employee
outcome = add.delay(4, 6)
print("Ready for Process")
outcome.wait()
# Get the outcome
print(f"Consequence: {outcome.outcome}")
Dies ergibt das erwartete Ergebnis: „Ergebnis: 10“
Kommen wir nun zum eigentlichen Anwendungsfall: Bereitstellung von Florence 2.
Wir werden eine Multi-Container-Bildunterschriftsanwendung erstellen, die Redis für die Aufgabenwarteschlange, Celery für die Aufgabenverteilung und ein lokales Quantity oder Google Cloud Storage für die potenzielle Bildspeicherung verwendet. Die Anwendung besteht aus wenigen Kernkomponenten: Modellinferenz, Aufgabenverteilung, Shopper-Interaktion und Dateispeicherung.
Architekturübersicht:
- Klient: Initiiert Bildbeschriftungsanfragen, indem diese (über den Dealer) an den Employee gesendet werden.
- Arbeiter: Empfängt Anfragen, lädt Bilder herunter, führt Inferenzen mithilfe des vortrainierten Modells durch und gibt Ergebnisse zurück.
- Redis: Fungiert als Nachrichtenbroker und erleichtert die Kommunikation zwischen Shopper und Employee.
- Dateispeicher: Temporärer Speicher für Bilddateien
Komponentenaufschlüsselung:
1. Modellinferenz (mannequin.py):
- Abhängigkeiten und Initialisierung:
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Picture
from processing_florence2 import Florence2Processor
mannequin = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
- Importiert die erforderlichen Bibliotheken für die Bildverarbeitung, Webanforderungen, Google Cloud Storage-Interaktion und Protokollierung.
- Initialisiert das vortrainierte Florence-2-Modell und den Prozessor zur Bildunterschriftengenerierung.
- Bild-Obtain (download_image):
def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
# Deal with HTTP/HTTPS URLs
# ... (code to obtain picture from URL) ...
elif url.startswith("gs://"):
# Deal with Google Cloud Storage paths
# ... (code to obtain picture from GCS) ...
else:
# Deal with native file paths
# ... (code to open picture from native path) ...
- Lädt das Bild von der angegebenen URL herunter.
- Unterstützt HTTP/HTTPS-URLs, Google Cloud Storage-Pfade (
gs://
) und lokale Dateipfade. - Inferenzausführung (run_inference):
def run_inference(url, task_prompt):
# ... (code to obtain picture utilizing download_image perform) ...
attempt:
# ... (code to open and course of the picture) ...
inputs = processor(textual content=task_prompt, pictures=picture, return_tensors="pt")
besides ValueError:
# ... (error dealing with) ...
# ... (code to generate captions utilizing the mannequin) ...
generated_ids = mannequin.generate(
input_ids=inputs("input_ids"),
pixel_values=inputs("pixel_values"),
# ... (mannequin technology parameters) ...
)
# ... (code to decode generated captions) ...
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)(0)
# ... (code to post-process generated captions) ...
parsed_answer = processor.post_process_generation(
generated_text, activity=task_prompt, image_size=(picture.width, picture.peak)
)
return parsed_answer
Orchestriert den Bildbeschriftungsprozess:
- Lädt das Bild herunter mit
download_image
. - Bereitet das Bild und die Aufgabenaufforderung für das Modell vor.
- Generiert Untertitel mithilfe des geladenen Florence-2-Modells.
- Dekodiert und bearbeitet die generierten Untertitel nach.
- Gibt die endgültige Beschriftung zurück.
2. Aufgabenverteilung (employee.py):
import os
from celery import Celery
# ... different imports ...
# Get Redis URL from surroundings variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to make use of Redis because the dealer and backend
app = Celery("duties", dealer=REDIS_URL, backend=REDIS_URL)
# ... (Celery configurations) ...
- Richtet Celery so ein, dass Redis als Nachrichtenbroker für die Aufgabenverteilung verwendet wird.
- Aufgabendefinition (inference_task):
@app.activity(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
# ... (logging and error dealing with) ...
return run_inference(url, task_prompt)
- Definiert die
inference_task
die von Celery-Mitarbeitern ausgeführt werden. - Diese Aufgabe ruft den
run_inference
Funktion vonmannequin.py
. - Ausführung durch den Arbeiter:
if __name__ == "__main__":
app.worker_main(("employee", "--loglevel=information", "--pool=solo"))
- Startet einen Celery-Employee, der auf Aufgaben wartet und diese ausführt.
3. Shopper-Interaktion (consumer.py):
import os
from celery import Celery
# Get Redis URL from surroundings variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to make use of Redis because the dealer and backend
app = Celery("duties", dealer=REDIS_URL, backend=REDIS_URL)
- Stellt eine Verbindung zu Celery her und verwendet dabei Redis als Nachrichtenbroker.
- Aufgabenübermittlung (send_inference_task):
def send_inference_task(url, task_prompt):
activity = inference_task.delay(url, task_prompt)
print(f"Process despatched with ID: {activity.id}")
# Anticipate the outcome
outcome = activity.get(timeout=120)
return outcome
- Sendet eine Bildbeschriftungsaufgabe (
inference_task
) an den Celery-Employee. - Wartet, bis der Employee die Aufgabe abgeschlossen hat, und ruft das Ergebnis ab.
Docker-Integration (docker-compose.yml):
- Definiert ein Multi-Container-Setup mit Docker Compose:
- Redis: Führt den Redis-Server für die Nachrichtenvermittlung aus.
- Modell: Erstellt und implementiert den Modellinferenz-Employee.
- App: Erstellt und stellt die Clientanwendung bereit.
- Blume: Führt ein webbasiertes Celery-Tasküberwachungstool aus.
Sie können den vollständigen Stapel mit Folgendem ausführen:
docker-compose up
Und da haben Sie es! Wir haben gerade eine umfassende Anleitung zum Erstellen eines asynchronen Machine-Studying-Inferenzsystems mit Celery, Redis und Florence 2 erkundet. Dieses Tutorial hat gezeigt, wie Sie Celery effektiv für die Aufgabenverteilung, Redis für die Nachrichtenvermittlung und Florence 2 für die Bildbeschriftung verwenden. Durch die Nutzung asynchroner Workflows können Sie große Anfragemengen verarbeiten, die Leistung verbessern und die allgemeine Belastbarkeit Ihrer ML-Inferenzanwendungen erhöhen. Mit dem bereitgestellten Docker Compose-Setup können Sie das gesamte System mit einem einzigen Befehl selbst ausführen.
Bereit für den nächsten Schritt? Die Bereitstellung dieser Architektur in der Cloud kann ihre eigenen Herausforderungen mit sich bringen. Lassen Sie mich in den Kommentaren wissen, ob Sie einen Folgebeitrag zur Cloud-Bereitstellung sehen möchten!
Code: https://github.com/CVxTz/celery_ml_deploy
Demo: https://caption-app-dfmj3maizq-ew.a.run.app/