Die GraphRAG-Implementierung von Microsoft warfare eines der ersten Systeme und führte viele revolutionary Funktionen ein. Es kombiniert sowohl die Indizierungsphase, in der Entitäten, Beziehungen und hierarchische Gemeinschaften extrahiert und zusammengefasst werden, mit erweiterten Abfragezeitfunktionen. Dieser Ansatz ermöglicht es dem System, umfassende, thematische Fragen zu beantworten, indem es vorberechnete Entitäts-, Beziehungs- und Group-Zusammenfassungen nutzt und so über die herkömmlichen Einschränkungen beim Abrufen von Dokumenten von Commonplace-RAG-Systemen hinausgeht.

Die GraphRAG-Pipeline von Microsoft. Bild von (Edge et al., 2024), lizenziert unter CC BY 4.0.

Ich habe die Indizierungsphase sowie globale und lokale Suchmechanismen in früheren Blogbeiträgen behandelt (Hier Und Hier), daher überspringen wir diese Particulars in dieser Diskussion. Allerdings haben wir es noch nicht erkundet DRIFT-Suchedas im Mittelpunkt dieses Blogbeitrags stehen wird. DRIFT ist ein neuerer Ansatz, der Merkmale sowohl globaler als auch lokaler Suchmethoden kombiniert. Die Technik beginnt mit der Nutzung von Group-Informationen durch Vektorsuche, um einen breiten Ausgangspunkt für Abfragen zu schaffen, und nutzt diese Group-Einblicke dann, um die ursprüngliche Frage in detaillierte Folgeabfragen zu verfeinern. Dadurch kann DRIFT den Wissensgraphen dynamisch durchlaufen, um spezifische Informationen über Entitäten, Beziehungen und andere lokalisierte Particulars abzurufen und so Recheneffizienz mit umfassender Antwortqualität in Einklang zu bringen.

Drift-Suchimplementierung mit LlamaIndex-Workflows und Neo4j. Bild vom Autor.

Die Implementierung verwendet LlamaIndex-Workflows um den DRIFT-Suchprozess durch mehrere Schlüsselschritte zu orchestrieren. Es beginnt mit HyDE-GenerierungErstellen einer hypothetischen Antwort basierend auf einem Beispiel-Group-Bericht, um die Abfragedarstellung zu verbessern.

Der Group-Suchschritt Anschließend wird die Vektorähnlichkeit verwendet, um die relevantesten Group-Berichte zu identifizieren und so einen breiten Kontext für die Abfrage bereitzustellen. Das System analysiert diese Ergebnisse, um eine erste Zwischenantwort und eine Reihe von Folgeabfragen für eine tiefergehende Untersuchung zu generieren.

Diese Folgeabfragen werden während des Prozesses parallel ausgeführt Lokale Suchphaseum gezielte Informationen, einschließlich Textblöcke, Entitäten, Beziehungen und zusätzliche Group-Berichte, aus dem Wissensgraphen abzurufen. Dieser Prozess kann bis zu einer maximalen Tiefe iterieren, wobei jede Runde möglicherweise neue Folgeabfragen hervorbringt.

Schließlich ist die Antwortgenerierungsschritt fasst alle während des Prozesses gesammelten Zwischenantworten zusammen und kombiniert umfassende Erkenntnisse auf Group-Ebene mit detaillierten lokalen Erkenntnissen, um eine umfassende Antwort zu erstellen. Dieser Ansatz gleicht Breite und Tiefe aus, indem er weitestgehend mit dem Group-Kontext beginnt und nach und nach auf Einzelheiten eingeht.

Dies ist meine Implementierung der DRIFT-Suche, angepasst für LlamaIndex-Workflows und Neo4j. Ich habe den Ansatz rückentwickelt, indem ich den GraphRAG-Code von Microsoft untersucht habe, daher kann es einige Unterschiede zur ursprünglichen Implementierung geben.

Der Code ist verfügbar unter GitHub.

Datensatz

Für diesen Blogbeitrag verwenden wir Alices Abenteuer im Wunderland von Lewis Carroll, ein klassischer Textual content, der frei erhältlich ist bei Projekt Gutenberg. Dieser reichhaltige erzählerische Datensatz mit seinen miteinander verbundenen Charakteren, Orten und Ereignissen macht ihn zu einer hervorragenden Wahl für die Demonstration der Fähigkeiten von GraphRAG.

Einnahme

Für den Aufnahmeprozess verwenden wir die wieder Implementierung der Microsoft GraphRAG-Indizierung Ich habe für a entwickelt vorheriger Blogbeitragangepasst an einen LlamaIndex-Workflow.

Indexierungsworkflow. Bild vom Autor.

Die Aufnahmepipeline folgt dem Commonplace-GraphRAG-Ansatz mit drei Hauptphasen:

class MSGraphRAGIngestion(Workflow):
    @step
    async def entity_extraction(self, ev: StartEvent) -> EntitySummarization:
        chunks = splitter.split_text(ev.textual content)
        await ms_graph.extract_nodes_and_rels(chunks, ev.allowed_entities)
        return EntitySummarization()

    @step
    async def entity_summarization(
        self, ev: EntitySummarization
    ) -> CommunitySummarization:
        await ms_graph.summarize_nodes_and_rels()
        return CommunitySummarization()

    @step
    async def community_summarization(
        self, ev: CommunitySummarization
    ) -> CommunityEmbeddings:
        await ms_graph.summarize_communities()
        return CommunityEmbeddings()

Der Workflow extrahiert Entitäten und Beziehungen aus Textblöcken, generiert Zusammenfassungen sowohl für Knoten als auch für Beziehungen und erstellt dann hierarchische Group-Zusammenfassungen.

Nach der Zusammenfassung generieren wir Vektoreinbettungen sowohl für Communities als auch für Entitäten, um eine Ähnlichkeitssuche zu ermöglichen. Hier ist der Schritt zur Group-Einbettung:

@step
    async def community_embeddings(self, ev: CommunityEmbeddings) -> EntityEmbeddings:
        # Fetch all communities from the graph database
        communities = ms_graph.question(
            """
    MATCH (c:__Community__)
    WHERE c.abstract IS NOT NULL AND c.score > $min_community_rating
    RETURN coalesce(c.title, "") + " " + c.abstract AS community_description, c.id AS community_id
    """,
            params={"min_community_rating": MIN_COMMUNITY_RATING},
        )
        if communities:
            # Generate vector embeddings from group descriptions
            response = await shopper.embeddings.create(
                enter=(c("community_description") for c in communities),
                mannequin=TEXT_EMBEDDING_MODEL,
            )
            # Retailer embeddings within the graph and create vector index
            embeds = (
                {
                    "community_id": group("community_id"),
                    "embedding": embedding.embedding,
                }
                for group, embedding in zip(communities, response.knowledge)
            )
            ms_graph.question(
                """UNWIND $knowledge as row
            MATCH (c:__Community__ {id: row.community_id})
            CALL db.create.setNodeVectorProperty(c, 'embedding', row.embedding)""",
                params={"knowledge": embeds},
            )
            ms_graph.question(
                "CREATE VECTOR INDEX group IF NOT EXISTS FOR (c:__Community__) ON c.embedding"
            )
        return EntityEmbeddings()

Der gleiche Prozess wird auf Entitätseinbettungen angewendet, um die Vektorindizes zu erstellen, die für den ähnlichkeitsbasierten Abruf der DRIFT-Suche erforderlich sind.

DRIFT-Suche

Die DRIFT-Suche ist ein intuitiver Ansatz zum Abrufen von Informationen: Beginnen Sie damit, das Gesamtbild zu verstehen, und gehen Sie dann bei Bedarf in die Einzelheiten ein. Anstatt sofort nach genauen Übereinstimmungen auf Dokument- oder Entitätsebene zu suchen, konsultiert DRIFT zunächst Group-Zusammenfassungen, bei denen es sich um Übersichten auf hoher Ebene handelt, die die Hauptthemen und Themen innerhalb des Wissensgraphen erfassen.

Sobald DRIFT relevante Informationen auf höherer Ebene identifiziert, generiert es auf intelligente Weise Folgeabfragen, um genaue Informationen zu bestimmten Entitäten, Beziehungen und Quelldokumenten abzurufen. Dieser zweiphasige Ansatz spiegelt die natürliche Informationssuche des Menschen wider: Wir orientieren uns zunächst mit einem allgemeinen Überblick und stellen dann gezielte Fragen, um die Particulars zu ergänzen. Durch die Kombination der umfassenden Abdeckung der globalen Suche mit der Präzision der lokalen Suche erreicht DRIFT sowohl Breite als auch Tiefe, ohne dass der Rechenaufwand für die Verarbeitung jedes Group-Berichts oder Dokuments anfällt.

Lassen Sie uns jede Section der Implementierung durchgehen.

Der Code ist verfügbar unter GitHub.

Group-Suche

DRIFT verwendet HyDE (Hypothetical Doc Embeddings), um die Genauigkeit der Vektorsuche zu verbessern. Anstatt die Anfrage des Benutzers direkt einzubetten, generiert HyDE zunächst eine hypothetische Antwort und verwendet diese dann für die Ähnlichkeitssuche. Dies funktioniert, weil hypothetische Antworten semantisch näher an tatsächlichen Group-Zusammenfassungen liegen als reine Abfragen.

@step
async def hyde_generation(self, ev: StartEvent) -> CommunitySearch:
    # Fetch a random group report to make use of as a template for HyDE era
    random_community_report = driver.execute_query(
        """
    MATCH (c:__Community__)
    WHERE c.abstract IS NOT NULL
    RETURN coalesce(c.title, "") + " " + c.abstract AS community_description""",
        result_transformer_=lambda r: r.knowledge(),
    )
    # Generate a hypothetical reply to enhance question illustration
    hyde = HYDE_PROMPT.format(
        question=ev.question, template=random_community_report(0)("community_description")
    )
    hyde_response = await shopper.responses.create(
        mannequin="gpt-5-mini",
        enter=({"function": "person", "content material": hyde}),
        reasoning={"effort": "low"},
    )
    return CommunitySearch(question=ev.question, hyde_query=hyde_response.output_text)

Als nächstes betten wir die HyDE-Abfrage ein und rufen die fünf relevantesten Group-Berichte über Vektorähnlichkeit ab. Anschließend wird das LLM aufgefordert, aus diesen Berichten eine Zwischenantwort zu generieren und Folgefragen für eine tiefergehende Untersuchung zu identifizieren. Die Zwischenantwort wird gespeichert und alle Folgeanfragen werden parallel für die lokale Suchphase versendet.

@step
async def community_search(self, ctx: Context, ev: CommunitySearch) -> LocalSearch:
    # Create embedding from the HyDE-enhanced question
    embedding_response = await shopper.embeddings.create(
        enter=ev.hyde_query, mannequin=TEXT_EMBEDDING_MODEL
    )
    embedding = embedding_response.knowledge(0).embedding
    
    # Discover high 5 most related group studies by way of vector similarity
    community_reports = driver.execute_query(
        """
    CALL db.index.vector.queryNodes('group', 5, $embedding) YIELD node, rating
    RETURN 'community-' + node.id AS source_id, node.abstract AS community_summary
    """,
        result_transformer_=lambda r: r.knowledge(),
        embedding=embedding,
    )
    
    # Generate preliminary reply and establish what additional information is required
    initial_prompt = DRIFT_PRIMER_PROMPT.format(
        question=ev.question, community_reports=community_reports
    )
    initial_response = await shopper.responses.create(
        mannequin="gpt-5-mini",
        enter=({"function": "person", "content material": initial_prompt}),
        reasoning={"effort": "low"},
    )
    response_json = json_repair.hundreds(initial_response.output_text)
    print(f"Preliminary intermediate response: {response_json('intermediate_answer')}")
    
    # Retailer the preliminary reply and put together for parallel native searches
    async with ctx.retailer.edit_state() as ctx_state:
        ctx_state("intermediate_answers") = (
            {
                "intermediate_answer": response_json("intermediate_answer"),
                "rating": response_json("rating"),
            }
        )
        ctx_state("local_search_num") = len(response_json("follow_up_queries"))
    
    # Dispatch follow-up queries to run in parallel
    for local_query in response_json("follow_up_queries"):
        ctx.send_event(LocalSearch(question=ev.question, local_query=local_query))
    return None

Dies legt den Kernansatz von DRIFT fest: Beginnen Sie breit mit der durch HyDE erweiterten Group-Suche und gehen Sie dann mit gezielten Folgeabfragen weiter in die Tiefe.

Lokale Suche

In der lokalen Suchphase werden parallel Folgeabfragen ausgeführt, um einen Drilldown zu bestimmten Particulars durchzuführen. Jede Abfrage ruft den Zielkontext über eine entitätsbasierte Vektorsuche ab und generiert dann eine Zwischenantwort und möglicherweise weitere Folgeabfragen.

@step(num_workers=5)
async def local_search(self, ev: LocalSearch) -> LocalSearchResults:
    print(f"Operating native question: {ev.local_query}")
    
    # Create embedding for the native question
    response = await shopper.embeddings.create(
        enter=ev.local_query, mannequin=TEXT_EMBEDDING_MODEL
    )
    embedding = response.knowledge(0).embedding
    
    # Retrieve related entities and collect their related context:
    # - Textual content chunks the place entities are talked about
    # - Group studies the entities belong to
    # - Relationships between the retrieved entities
    # - Entity descriptions
    local_reports = driver.execute_query(
        """
CALL db.index.vector.queryNodes('entity', 5, $embedding) YIELD node, rating
WITH acquire(node) AS nodes
WITH
acquire {
  UNWIND nodes as n
  MATCH (n)<-(:MENTIONS)->(c:__Chunk__)
  WITH c, rely(distinct n) as freq
  RETURN {chunkText: c.textual content, source_id: 'chunk-' + c.id}
  ORDER BY freq DESC
  LIMIT 3
} AS text_mapping,
acquire {
  UNWIND nodes as n
  MATCH (n)-(:IN_COMMUNITY*)->(c:__Community__)
  WHERE c.abstract IS NOT NULL
  WITH c, c.score as rank
  RETURN {abstract: c.abstract, source_id: 'community-' + c.id}
  ORDER BY rank DESC
  LIMIT 3
} AS report_mapping,
acquire {
  UNWIND nodes as n
  MATCH (n)-(r:SUMMARIZED_RELATIONSHIP)-(m)
  WHERE m IN nodes
  RETURN {descriptionText: r.abstract, source_id: 'relationship-' + n.identify + '-' + m.identify}
  LIMIT 3
} as insideRels,
acquire {
  UNWIND nodes as n
  RETURN {descriptionText: n.abstract, source_id: 'node-' + n.identify}
} as entities
RETURN {Chunks: text_mapping, Experiences: report_mapping,
   Relationships: insideRels,
   Entities: entities} AS output
""",
        result_transformer_=lambda r: r.knowledge(),
        embedding=embedding,
    )
    
    # Generate reply based mostly on the retrieved context
    local_prompt = DRIFT_LOCAL_SYSTEM_PROMPT.format(
        response_type=DEFAULT_RESPONSE_TYPE,
        context_data=local_reports,
        global_query=ev.question,
    )
    local_response = await shopper.responses.create(
        mannequin="gpt-5-mini",
        enter=({"function": "person", "content material": local_prompt}),
        reasoning={"effort": "low"},
    )
    response_json = json_repair.hundreds(local_response.output_text)
    
    # Restrict follow-up queries to forestall exponential progress
    response_json("follow_up_queries") = response_json("follow_up_queries")(:LOCAL_TOP_K)
    
    return LocalSearchResults(outcomes=response_json, question=ev.question)

Der nächste Schritt orchestriert den iterativen Vertiefungsprozess. Es wartet darauf, dass alle parallelen Suchvorgänge mit abgeschlossen werden collect_eventsund entscheidet dann, ob der Drilldown fortgesetzt werden soll. Wenn die aktuelle Tiefe nicht das Most erreicht hat (wir verwenden maximale Tiefe = 2), werden Folgeabfragen aus allen Ergebnissen extrahiert, die Zwischenantworten gespeichert und die nächste Runde paralleler Suchvorgänge gestartet.

@step
async def local_search_results(
    self, ctx: Context, ev: LocalSearchResults
) -> LocalSearch | FinalAnswer:
    local_search_num = await ctx.retailer.get("local_search_num")
    
    # Await all parallel searches to finish
    outcomes = ctx.collect_events(ev, (LocalSearchResults) * local_search_num)
    if outcomes is None:
        return None
        
    intermediate_results = (
        {
            "intermediate_answer": occasion.outcomes("response"),
            "rating": occasion.outcomes("rating"),
        }
        for occasion in outcomes
    )
    current_depth = await ctx.retailer.get("local_search_depth", default=1)
    question = (ev.question for ev in outcomes)(0)

    # Proceed drilling down if we have not reached max depth
    if current_depth < MAX_LOCAL_SEARCH_DEPTH:
        await ctx.retailer.set("local_search_depth", current_depth + 1)
        follow_up_queries = (
            question
            for occasion in outcomes
            for question in occasion.outcomes("follow_up_queries")
        )
        
        # Retailer intermediate solutions and dispatch subsequent spherical of searches
        async with ctx.retailer.edit_state() as ctx_state:
            ctx_state("intermediate_answers").lengthen(intermediate_results)
            ctx_state("local_search_num") = len(follow_up_queries)

        for local_query in follow_up_queries:
            ctx.send_event(LocalSearch(question=question, local_query=local_query))
        return None
    else:
        return FinalAnswer(question=question)

Dadurch entsteht eine iterative Verfeinerungsschleife, in der jede Tiefenebene auf früheren Erkenntnissen aufbaut. Sobald die maximale Tiefe erreicht ist, wird die endgültige Antwortgenerierung ausgelöst.

Endgültige Antwort

Im letzten Schritt werden alle während des DRIFT-Suchprozesses gesammelten Zwischenantworten zu einer umfassenden Antwort zusammengefasst. Dazu gehören die erste Antwort aus der Group-Suche und alle Antworten, die während der lokalen Suchiterationen generiert wurden.

@step
async def final_answer_generation(self, ctx: Context, ev: FinalAnswer) -> StopEvent:
    # Retrieve all intermediate solutions collected all through the search course of
    intermediate_answers = await ctx.retailer.get("intermediate_answers")
    
    # Synthesize all findings right into a complete ultimate response
    answer_prompt = DRIFT_REDUCE_PROMPT.format(
        response_type=DEFAULT_RESPONSE_TYPE,
        context_data=intermediate_answers,
        global_query=ev.question,
    )
    answer_response = await shopper.responses.create(
        mannequin="gpt-5-mini",
        enter=(
            {"function": "developer", "content material": answer_prompt},
            {"function": "person", "content material": ev.question},
        ),
        reasoning={"effort": "low"},
    )

    return StopEvent(outcome=answer_response.output_text)

Zusammenfassung

Die DRIFT-Suche stellt eine interessante Strategie dar, um die Breite der globalen Suche mit der Präzision der lokalen Suche in Einklang zu bringen. Indem mit dem Kontext auf Group-Ebene begonnen und schrittweise iterative Folgeabfragen durchgeführt werden, wird der Rechenaufwand für die Verarbeitung aller Group-Berichte vermieden und gleichzeitig eine umfassende Abdeckung gewährleistet.

Es gibt jedoch Raum für mehrere Verbesserungen. Die aktuelle Implementierung behandelt alle Zwischenantworten gleich, aber eine Filterung basierend auf ihren Konfidenzwerten könnte die Qualität der endgültigen Antworten verbessern und Rauschen reduzieren. Ebenso könnten Folgeanfragen vor der Ausführung nach Relevanz oder potenziellem Informationsgewinn geordnet werden, um sicherzustellen, dass die vielversprechendsten Leads zuerst verfolgt werden.

Eine weitere vielversprechende Verbesserung wäre die Einführung eines Abfrageverfeinerungsschritts, der mithilfe eines LLM alle generierten Folgeabfragen analysiert, ähnliche gruppiert, um redundante Suchvorgänge zu vermeiden, und Abfragen herausfiltert, die wahrscheinlich keine nützlichen Informationen liefern. Dies könnte die Anzahl lokaler Suchen erheblich reduzieren und gleichzeitig die Antwortqualität beibehalten.

Die vollständige Implementierung ist verfügbar unter GitHub für diejenigen, die daran interessiert sind, mit diesen Verbesserungen zu experimentieren oder DRIFT für ihre eigenen Anwendungsfälle anzupassen.

Von admin

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert