eine produktionsbereite Pipeline für Multi-Stream-Videoanalysen: hardwarebeschleunigte Dekodierung, Verfolgung, Bildschirmanzeige und Nachrichtenvermittlung, alles über GStreamer verkabelt. Für nach TensorRT exportierte Standarderkennungsmodelle: nvinfer kümmert sich um alles.

Der allgemeine Fall hat jedoch Grenzen. Imaginative and prescient-Language-Modelle, benutzerdefinierte Nachbearbeitung, gedrehte Begrenzungsrahmen oder die Notwendigkeit, Modelle zur Laufzeit im laufenden Betrieb auszutauschen – das sind Orte, an denen … nvinfer’s Annahmen scheitern. Manchmal haben Sie einen ausgereiften PyTorch-Inferenzstapel, den Ihr Workforce sorgfältig abgestimmt hat, und Sie möchten, dass DeepStream aufruft Das anstatt es in einer Konfigurationsdatei erneut zu implementieren.

Es ist erwähnenswert, dass speziell für Modelle der YOLO-Familie DeepStream-Yolo von Marcos Luciano hat bereits hervorragende Arbeit bei der Implementierung der benutzerdefinierten Nachbearbeitung in C++ geleistet. Wenn C++ auf dem Tisch ist, beginnen Sie dort. Dieser Artikel nimmt einen anderen Standpunkt ein: das gleiche Ergebnis vollständig in Python zu erzielen, indem ein benutzerdefiniertes GStreamer-Plugin mit verwendet wird pyservicemaker ohne Einbußen beim Durchsatz.

Die wichtigste Erkenntnis, die dies ermöglicht: nachgelagerte Elemente wie nvtracker, nvdsosdUnd nvmsgconv Es ist egal, welches Component Erkennungsmetadaten erzeugt hat. Schreiben Sie korrekt in die Metadatenstruktur von DeepStream und der Relaxation des Ökosystems funktioniert so nvinfer warfare nie auf dem Bild.

DeepStream-Metadaten

Jeder Puffer, der durch eine DeepStream-Pipeline fließt, enthält mehr als Pixeldaten. Von dem Second an, in dem Frames durchlaufen werden nvstreammuxjede GstBuffer hat eine NvDsBatchMeta daran befestigte Struktur. Die Hierarchie ist unkompliziert und kann im gefunden werden offizielle Dokumentation.

NvDsBatchMeta
├── NvDsUserMeta                        (batch-level customized metadata)
└── NvDsFrameMeta                       (one per supply stream)
    ├── NvDsUserMeta                    (frame-level customized metadata)
    └── NvDsObjectMeta                  (one per detected object)
        ├── NvDsClassifierMeta
        └── NvDsUserMeta                (object-level customized metadata)

NvDsBatchMeta beschreibt die gesamte Cost. Jede NvDsFrameMeta entspricht einem Quellstream und enthält Informationen auf Body-Ebene wie die Quell-ID und die Body-Nummer. Jede NvDsObjectMeta stellt eine einzelne Erkennung dar. Das heißt, wenn unser Plugin Erkennungen schreibt, schreiben wir eine NvDsObjectMeta für jeden.

Das Wichtigste, was man verstehen muss, ist das Nichts davon ist Eigentum von nvinfer. Es handelt sich um einen Shared-Information-Vertrag. Jedes GStreamer-Component in der Pipeline kann daraus lesen, darauf schreiben oder beides:

  • nvtracker liest Objektbegrenzungsrahmen und schreibt Monitoring-IDs.
  • nvdsosd liest Kästchen und Beschriftungen, um Überlagerungen zu zeichnen.
  • nvmsgconv liest die gesamte Struktur, um Nachrichtennutzlasten zu erzeugen.

Unser benutzerdefiniertes Plugin schreibt Erkennungen einfach auf die gleiche Weise in diese Struktur nvinfer würde, und alles nachgelagerte nimmt sie ohne Änderung auf. Eine wichtige Einschränkung, die es zu verstehen gilt, bevor wir Code schreiben: NvDsObjectMeta Instanzen kann nicht direkt aus Python erstellt werden. Der Versuch, die Klasse zu instanziieren, löst ein aus No constructor outlined! Fehler zur Laufzeit.

Der Grund ist architektonischer Natur. DeepStream verwaltet seine Metadatenobjekte über Speicherpools, vorab zugewiesene Blöcke, die über Frames hinweg recycelt werden, um den Overhead durch wiederholte Heap-Zuweisung und -Freigabe in einer Pipeline mit hohem Durchsatz zu vermeiden. Diese Swimming pools sind Eigentum von NvDsBatchMeta und leben auf der C-Seite der Grenze. Die Python-Bindungen machen den Zugriff auf diese Swimming pools verfügbar, machen aber bewusst keinen Python-seitigen Konstruktor verfügbar, da beim Erstellen eines NvDsObjectMeta Außerhalb des Swimming pools würde das Lebenszyklusmanagement umgangen werden, das die Speichernutzung von DeepStream vorhersehbar hält. Der richtige Weg, einen zu bekommen, besteht darin, die Cost danach zu fragen: batch_meta.acquire_object_meta()wodurch Sie eine vorab zugewiesene Instanz aus dem Pool erhalten. Wenn der Body fertig ist, gibt DeepStream ihn automatisch an den Pool zurück.

Die Python-Brücke: pyservicemaker

Um mit den Metadaten von DeepStream aus Python zu interagieren, verwenden wir pyservicemakerNVIDIAs aktuelles, unterstütztes Python SDK für DeepStream. Der offizielle Dokumentation behandelt die Grundlagen von Pipelines und Flows, zeigt jedoch nicht, wie man Metadaten aus einem benutzerdefinierten Inferenzelement schreibt und anfügt. Das ist die Lücke, die dieser Artikel füllt.

Die Schlüsselabstraktion ist BatchMetadataOperator. Unterklassifizierung und Implementierung handle_metadata(batch_meta) gibt Ihnen Zugriff auf alles NvDsBatchMeta für jeden Puffer, der durch die Pipeline fließt. Von da an ist das Iterieren von Frames genauso einfach wie das Verwenden batch_meta.frame_items und Anbringen eines Erkennungsobjekts.

pyservicemaker bietet auch eine Buffer Wrapper herum Gst.Buffer das entlarvt batch_meta direkt und, was wichtig ist, ein extract(batch_id) Methode, die ein DLPack-Deal with an den GPU-Speicher jedes Frames zurückgibt. Dadurch ist eine Zero-Copy-Inferenz möglich, da wir den Body direkt an TensorRT übergeben können, ohne jemals die GPU zu verlassen.

Anstatt zu benutzen BatchMetadataOperator Standalone über eine Sonde, falten wir das gleiche Muster direkt in unsere benutzerdefinierten Plugins do_transform_ip Methode, die uns neben dem Metadatenzugriff auch die Kontrolle über den Lebenszyklus, die Eigenschaften und die Obergrenzenverhandlung des Parts gibt. Aber zuerst müssen wir dieses Plugin erstellen.

Ein erkennbares Python-GStreamer-Plugin

GStreamer erkennt Plugins zur Laufzeit, indem es die in aufgelisteten Verzeichnisse scannt GST_PLUGIN_PATH. Speziell für Python-Plugins sieht es in a aus python/ Unterverzeichnis innerhalb jedes dieser Pfade. Das bedeutet, dass Ihr Plugin nur ein ist .py Datei an der richtigen Stelle abgelegt, keine Kompilierung, kein CMake, keine gemeinsam genutzte Bibliothek. Der Nachteil besteht darin, dass das Registrierungsmuster streng ist und ein Fehler zu stillen Fehlern führt, die wirklich schwer zu debuggen sind.

$GST_PLUGIN_PATH/
└── python/
    └── gstexampleplugin.py   # your plugin

Satz GST_PLUGIN_PATH auf das übergeordnete Verzeichnis zeigen und GStreamer findet es python/gstexampleplugin.py automatisch beim nächsten Pipeline-Lauf.

Das Plugin-Skelett

Hier ist das minimale Grundgerüst für ein Passthrough-Inferenzelement: Es empfängt gestapelte Videopuffer, führt Inferenz aus, hängt Metadaten an und übergibt den Puffer unverändert an den Downstream.

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GstBase, GObject

import torch
from pyservicemaker import Buffer

GST_PLUGIN_NAME = "gstexampleplugin"

Gst.init(None)

class GstExamplePlugin(GstBase.BaseTransform):

    __gstmetadata__ = (
        'GstExamplePlugin',                     # title
        'Filter/Impact/Video',                  # classification
        'Customized inference ingredient',             # description
        'Your Identify'                             # creator
    )

    src_format = Gst.Caps.from_string(
        "video/x-raw(reminiscence:NVMM), format=RGB, "
        "width=(int)( 1, 2147483647 ), peak=(int)( 1, 2147483647 ), "
        "framerate=(fraction)( 0/1, 2147483647/1 )"
    )
    sink_format = Gst.Caps.from_string(
        "video/x-raw(reminiscence:NVMM), format=RGB, "
        "width=(int)( 1, 2147483647 ), peak=(int)( 1, 2147483647 ), "
        "framerate=(fraction)( 0/1, 2147483647/1 )"
    )

    src_pad_template = Gst.PadTemplate.new(
        "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format
    )
    sink_pad_template = Gst.PadTemplate.new(
        "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format
    )
    __gsttemplates__ = (src_pad_template, sink_pad_template)

    __gproperties__ = {
        'model-engine': (
            str,
            'TensorRT engine path',
            'Path to the .engine file',
            '',
            GObject.ParamFlags.READWRITE
        ),
        'confidence-threshold': (
            float,
            'Confidence threshold',
            'Minimal confidence to connect a detection',
            0.0, 1.0, 0.5,
            GObject.ParamFlags.READWRITE
        ),
    }

    def __init__(self):
        tremendous().__init__()
        self.model_engine = ''
        self.confidence_threshold = 0.5
        self.engine = None

    def do_get_property(self, prop):
        if prop.title == 'model-engine':
            return self.model_engine
        elif prop.title == 'confidence-threshold':
            return self.confidence_threshold

    def do_set_property(self, prop, worth):
        if prop.title == 'model-engine':
            self.model_engine = worth
        elif prop.title == 'confidence-threshold':
            self.confidence_threshold = worth

    def do_start(self):
        # Load your TensorRT engine right here
        self.engine = load_engine(self.model_engine) # This operate ought to be carried out
        return True

    def do_transform_ip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn:
        """In-place remodel: connect metadata, cross buffer unchanged."""
        buffer = Buffer(gst_buffer)
        batch_meta = buffer.batch_meta

        frames = ()
        for frame_meta in batch_meta.frame_items:
            t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id))
            frames.append
        batch = torch.stack(frames, dim=0)

        # Run your mannequin inference
        outcomes = self.engine(batch)
        
        # Now we might want to iterate over the outcomes for every body
        # and connect it to the object_meta in case it's detection/segmentation
        # in any other case we are able to do it as user_meta
        # The next is pseudocode, which is determined by your inference
        for frame_meta in batch_meta.frame_items:
            for det in outcomes:
                obj = batch_meta.acquire_object_meta()
                # Fill the obj with every detection
                ...
                frame_meta.append(obj)
 
        return Gst.FlowReturn.OK


# --- Registration ---
GObject.type_register(GstExamplePlugin)
__gstelementfactory__ = (GST_PLUGIN_NAME, Gst.Rank.NONE, GstExamplePlugin)

Ein paar Dinge, die es zu diesem Skelett zu beachten gilt:

GstBase.BaseTransform ist die richtige Basisklasse für einen In-Place-Filter, der einen Puffer empfängt, ihn ändert (durch Anhängen von Metadaten) und ihn an den Downstream weitergibt. Wir überschreiben do_transform_ip statt do_transform weil wir keinen neuen Ausgabepuffer zuweisen.

__gstmetadata__ Und __gsttemplates__ sind nicht optionally available. Ohne sie registriert GStreamer das Component nicht. Die Großbuchstabenzeichenfolge video/x-raw(reminiscence:NVMM) teilt GStreamer mit, dass dieses Component mit NVIDIA-Speicher arbeitet, was wichtig ist, um in einer DeepStream-Pipeline auf der GPU zu bleiben.

__gproperties__ entlarvt model-engine Und confidence-threshold als erstklassige GStreamer-Eigenschaften, was bedeutet, dass Sie sie über a festlegen können gst-launch Befehlszeile oder aus dem Python-Pipeline-Code, ohne die Quelle zu berühren.

Für die Registrierung sind die letzten beiden Zeilen erforderlich: GObject.type_register teilt dem GObject-Typsystem die Klasse mit und __gstelementfactory__ teilt GStreamer mit, welcher Elementname verfügbar gemacht und welche Klasse instanziiert werden soll.

Überprüfung des Plugins. Sobald die Datei vorhanden und der Cache leer ist, überprüfen Sie die Registrierung mit:

GST_PLUGIN_PATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin

Sie sollten die Elementmetadaten, Pad-Vorlagen und beide Eigenschaften aufgelistet sehen. Wenn Sie sie sehen, kennt GStreamer Ihr Plugin und Sie können es in eine Pipeline einfügen.

Beispiel einer Finish-to-Finish-Inferenz mit Ultralytics

Nachdem das Plugin-Gerüst vorhanden ist, ist es an der Zeit, die Inferenzlogik auszufüllen. Der vollständige Arbeitscode ist als verfügbar GitHub Gist. Sobald es auffindbar ist, können Sie es wie zuvor überprüfen oder die Pipeline starten. Hier ist ein einfaches Beispiel, das lediglich eine Inferenz durchführt und die fps anzeigt:

gst-launch-1.0 -v 
  nvstreammux title=m width=1280 peak=720 batch-size=1 
    batched-push-timeout=33000 ! 
  nvvideoconvert nvbuf-memory-type=0 ! 
  'video/x-raw(reminiscence:NVMM), format=RGB' ! 
  gstyoloplugin model-path=/path/to/yolo26s.engine ! 
  fpsdisplaysink text-overlay=false silent=false sync=false 
    video-sink=fakesink 
  uridecodebin uri=file:///path/to/video.mp4 ! m.sink_0

Den Code prüfen

Kompatibilitätsproblem

Wenn Sie den Code lesen, ist Ihnen möglicherweise aufgefallen, dass wir den überschreiben tuple Objekt, aber nur drinnen ultralytics.nn.backends.tensorrt Modul, da dort das Downside liegt. Es gibt einen bekannten Kompatibilitätsgrenzfall zwischen TensorRT-Python-Bindungen und die GStreamer Python-Wrapper-Framework (PyGObject) Dadurch wird Ihre Pipeline mit der berüchtigten Meldung „Segmentierungsfehler (Kern-Dump)“ zum Absturz gebracht. Aus diesem Grund warfare es notwendig, diesen Codeausschnitt zu erstellen, der uns hilft, das beabsichtigte Verhalten beizubehalten:

import ultralytics.nn.backends.tensorrt as trt_backend

_original_tuple = tuple

def safe_tuple(obj):
    if "tensorrt" in sort(obj).__module__ and kind(obj).__name__ == "Dims":
        return _original_tuple(obj(i) for i in vary(len(obj)))
    return _original_tuple(obj)

trt_backend.tuple = safe_tuple

Dies ersetzt die tuple Referenz innerhalb des Namensraums des Ultralytics-Backends zur Laufzeit mit einer Model, die auf den indexbasierten Zugriff zurückgreift Dims Gegenstände, alles andere bleibt unberührt. Das ist zwar nicht elegant, aber chirurgisch und muss beim Import erfolgen, bevor ein Modell instanziiert wird.

Die Inferenzschleife

Die Inferenzschleife selbst ist recht einfach:

  1. Extrahieren Sie die Frames aus den Puffern
  2. Vorverarbeitung + Inferenz
  3. Hängen Sie die Ergebnisse an die Objektmetadaten jedes Frames an, wenn nachgelagerte Elemente der Pipeline Deepstream-Plugins sind.

Unten finden Sie den Snippet-Code für Zero-Copy mit DLPack:

frames = ()
for frame_meta in batch_meta.frame_items:
    t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id))
    frames.append
batch = torch.stack(frames, dim=0)

Vorverarbeitung der Eingabe

YOLO-Modelle, beim Durchlaufen von a torch.TensorErwarten Sie eine feste Eingabeform (N, 3, 640, 640) laut Dokumentation. Es lösen sich jedoch Rahmen nvstreammux wird die Auflösung haben, die Ihre Quelle hat. Der verwendete Ansatz ist Letterboxing: Skalieren Sie den Rahmen so, dass er in die Zielabmessungen passt und dabei das Seitenverhältnis beibehält, und füllen Sie dann den verbleibenden Raum auf. Die wichtigste Erkenntnis hierbei ist, dass wir dies vollständig auf der GPU und über den gesamten Stapel hinweg auf einmal tun können, ohne jemals den CPU-Speicher zu berühren.

Mit Body-Extraktion, Letterboxing, Inferenz und Koordinateninvertierung erfolgt alles in einem einzigen Schritt auf der GPU do_transform_ip Aufruf verhält sich das Plugin genauso nvinfer aus der Perspektive jedes Downstream-Parts, aber mit der vollen Flexibilität eines Python-Inferenzstapels darunter.

Von hier aus übernimmt der Relaxation der DeepStream-Pipeline: nvtracker vergibt IDs, nvdsosd zeichnet Überlagerungen und nvmsgconv Serialisiert Nutzdaten.

Praktische Erkenntnisse und nächste Schritte

Wenn Sie bis hierher gefolgt sind, haben Sie ein funktionierendes Muster zum Ersetzen nvinfer mit Ihrem eigenen Python-Inferenzelement, und was noch wichtiger ist, Sie verstehen Warum jedes Stück ist so wie es ist.

Das Muster verallgemeinert sich. Alles, was hier beschrieben wird: das Plugin-Gerüst, die Batch-Vorverarbeitung und der Metadatenanhang, ist modellunabhängig. Austausch von Ultralytics YOLO gegen RFDETR von Roboflow ist unkompliziert und der GStreamer und pyservicemaker Gerüst bleibt identisch. Dasselbe gilt auch für exotischere Architekturen: NVIDIAs eigene deepstream_reference_apps Das Repository enthält ein funktionierendes Beispiel für die Integration eines Imaginative and prescient-Language-Modells über vLLM unter Verwendung genau dieses Plugin-Ansatzes, den es sich zu studieren lohnt, wenn Sie über die Erkennung hinaus in das Videoverständnis vordringen.

Der vollständige Plugin-Code ist als verfügbar GitHub Gist. Wenn Sie etwas darauf aufbauen: ein anderes Modell, ein Multi-Stream-Setup oder eine VLM-Integration, wäre ich gespannt, wie es weitergeht. Viel Spaß beim Codieren!

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert