Wenn Sie den Workflow vom Terminal aus starten, können Sie problemlos erkennen, welcher Schritt ausgeführt wird und welche Protokollierung wir in diese Schritte eingefügt haben.
Wir können die Human-in-the-Loop-Interaktion auch ermöglichen, indem wir einfach user_feedback = enter()
im Workflow. Dadurch wird der Workflow angehalten und auf die Benutzereingabe gewartet (siehe das Human-in-the-Loop-Beispiel in diesem offiziellen Llamaindex Notizbuch). Um jedoch die gleiche Funktionalität in einer benutzerfreundlichen Oberfläche erreichen zu können, sind zusätzliche Änderungen am ursprünglichen Workflow erforderlich.
Die Ausführung eines Workflows kann lange dauern. Um die Benutzerfreundlichkeit zu verbessern, hat Llamaindex eine Möglichkeit bereitgestellt, Streaming-Ereignisse zu senden, um den Fortschritt des Workflows anzuzeigen, wie im Pocket book angezeigt. HierIn meinem Workflow definiere ich einen WorkflowStreamingEvent
Klasse, um nützliche Informationen zur Ereignisnachricht einzuschließen, z. B. den Typ des Ereignisses und von welchem Schritt es gesendet wird:
class WorkflowStreamingEvent(BaseModel):
event_type: Literal("server_message", "request_user_input") = Discipline(
..., description="Kind of the occasion"
)
event_sender: str = Discipline(
..., description="Sender (workflow step identify) of the occasion"
)
event_content: Dict(str, Any) = Discipline(..., description="Content material of the occasion")
Um das Senden von Streaming-Ereignissen zu ermöglichen, muss der Workflow-Schritt Zugriff auf den freigegebenen Kontext haben. Dies geschieht durch Hinzufügen @step(pass_context=True)
Dekorator zur Schrittdefinition. Dann können wir in der Schrittdefinition Ereignismeldungen über den Fortschritt durch den Kontext senden. Zum Beispiel in der tavily_query()
Schritt:
@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.information("research_topic") = ev.user_query
question = f"arxiv papers in regards to the state-of-the-art of {ev.user_query}"
ctx.write_event_to_stream(
Occasion(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=examine.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{question}'"},
).model_dump()
)
)
In diesem Beispiel setzen wir die event_type
zu sein “server_message”
. Das bedeutet, dass es sich um eine Replace-Nachricht handelt und keine Benutzeraktion erforderlich ist. Wir haben einen anderen Ereignistyp "request_user_input"
das darauf hinweist, dass eine Benutzereingabe erforderlich ist. Beispielsweise in der gather_feedback_outline()
Schritt im Workflow: Nach dem Generieren der Folientextgliederung aus der ursprünglichen Papierzusammenfassung wird eine Nachricht gesendet, in der der Benutzer aufgefordert wird, dem Gliederungstext seine Zustimmung zu erteilen und Suggestions dazu abzugeben:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Current person the unique paper abstract and the outlines generated, collect suggestions from person"""
...# Ship a particular occasion indicating that person enter is required
ctx.write_event_to_stream(
Occasion(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": examine.currentframe().f_code.co_name,
"event_content": {
"abstract": ev.abstract,
"define": ev.define.dict(),
"message": "Do you approve this define? If not, please present suggestions.",
},
}
)
)
)
...
Diese Ereignisse werden in der Backend-API und der Frontend-Logik unterschiedlich behandelt, was ich in den späteren Abschnitten dieses Artikels ausführlich beschreiben werde.
Beim Senden einer "request_user_input"
Ereignis an den Benutzer, wir wollen nur mit dem nächsten Schritt fortfahren nach wir haben die Benutzereingabe erhalten. Wie im Workflow-Diagramm oben gezeigt, geht es entweder zum outlines_with_layout()
Schritt, wenn der Benutzer die Gliederung genehmigt, oder an den summary2outline()
wiederholen Sie den Schritt, wenn der Benutzer nicht zustimmt.
Dies wird erreicht durch die Future()
Objekt aus Pythons asyncio
Bibliothek. In der SlideGenerationWorkflow
Klasse setzen wir ein Attribut self.user_input_future = asyncio.Future()
das kann man erwarten in der gather_feedback_outline()
Schritt. Die anschließende Ausführung des Workflows hängt vom Inhalt des Benutzerfeedbacks ab:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...# Anticipate person enter
if not self.user_input_future.accomplished():
user_response = await self.user_input_future
logger.information(f"gather_feedback_outline: Acquired person response: {user_response}")
# Course of user_response, which must be a JSON string
attempt:
response_data = json.masses(user_response)
approval = response_data.get("approval", "").decrease().strip()
suggestions = response_data.get("suggestions", "").strip()
besides json.JSONDecodeError:
# Deal with invalid JSON
logger.error("Invalid person response format")
increase Exception("Invalid person response format")
if approval == ":materials/thumb_up:":
return OutlineOkEvent(abstract=ev.abstract, define=ev.define)
else:
return OutlineFeedbackEvent(
abstract=ev.abstract, define=ev.define, suggestions=suggestions
)
Wir richten das Backend mit fastAPI ein, stellen einen POST-Endpunkt zur Bearbeitung von Anfragen bereit und initiieren den Workflow-Lauf. Die asynchrone Funktion run_workflow_endpoint()
dauert ResearchTopic
als Eingabe. In der Funktion wird ein asynchroner Generator event_generator()
definiert, wodurch eine Aufgabe zum Ausführen des Workflows erstellt wird und die Ereignisse im Verlauf des Workflows an den Consumer gestreamt werden. Wenn der Workflow abgeschlossen ist, werden auch die endgültigen Dateiergebnisse an den Consumer gestreamt.
class ResearchTopic(BaseModel):
question: str = Discipline(..., instance="instance question")@app.submit("/run-slide-gen")
async def run_workflow_endpoint(matter: ResearchTopic):
workflow_id = str(uuid.uuid4())
wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)
async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}nn"
job = asyncio.create_task(wf.run(user_query=matter.question))
logger.debug(f"event_generator: Created job {job}")
attempt:
async for ev in wf.stream_events():
logger.information(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}nn"
await asyncio.sleep(0.1) # Small sleep to make sure correct chunking
final_result = await job
# Assemble the obtain URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"
final_result_with_url = {
"consequence": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}
yield f"{json.dumps({'final_result': final_result_with_url})}nn"
besides Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'occasion': 'error', 'message': error_message})}nn"
lastly:
# Clear up
workflows.pop(workflow_id, None)
return StreamingResponse(event_generator(), media_type="textual content/event-stream")
Zusätzlich zu diesem Endpunkt gibt es Endpunkte zum Empfangen von Benutzereingaben vom Consumer und zum Verarbeiten von Dateidownloadanforderungen. Da jedem Workflow eine eindeutige Workflow-ID zugewiesen ist, können wir die vom Consumer empfangenen Benutzereingaben dem richtigen Workflow zuordnen. Durch Aufrufen des set_result()
auf die wartenden Future
kann die Ausführung des ausstehenden Workflows fortgesetzt werden.
@app.submit("/submit_user_input")
async def submit_user_input(information: dict = Physique(...)):
workflow_id = information.get("workflow_id")
user_input = information.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the long run
logger.information(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.accomplished():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.information("submit_user_input: set_result referred to as")
else:
logger.information("submit_user_input: future already accomplished")
return {"standing": "enter acquired"}
else:
increase HTTPException(
status_code=404, element="Workflow not discovered or future not initialized"
)
Der Obtain-Endpunkt identifiziert anhand der Workflow-ID auch, wo sich die endgültige Datei befindet.
@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "remaining.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="software/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"remaining.pptx",
)
else:
increase HTTPException(status_code=404, element="File not discovered")
Auf der Frontend-Seite, nachdem der Benutzer das Forschungsthema über st.text_input()
wird in einem Hintergrund-Thread in einer neuen Ereignisschleife ein lang laufender Prozess gestartet, um die gestreamten Ereignisse vom Backend zu empfangen, ohne den Relaxation der Seite zu beeinträchtigen:
def start_long_running_task(url, payload, message_queue, user_input_event):
attempt:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.shut()
besides Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))...
def principal():
...
with st.sidebar:
with st.kind(key="slide_gen_form"):
question = st.text_input(
"Enter the subject of your analysis:",
)
submit_button = st.form_submit_button(label="Submit")
if submit_button:
# Reset the workflow_complete flag for a brand new workflow
st.session_state.workflow_complete = False
# Begin the long-running job in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Beginning the background thread...")
st.session_state.workflow_thread = threading.Thread(
goal=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"question": question},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.begin()
st.session_state.received_lines = ()
else:
st.write("Background thread is already working.")
Die vom Backend gestreamten Ereignisdaten werden abgerufen von httpx.AsyncClient
und zur weiteren Verarbeitung in eine Nachrichtenwarteschlange gestellt. Abhängig vom Ereignistyp werden unterschiedliche Informationen extrahiert. Für den Ereignistyp “request_user_input”
wird der Thread ebenfalls angehalten, bis die Benutzereingabe erfolgt.
async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as shopper:
async with shopper.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield lineasync def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Beginning to fetch streaming information..."))
data_json = None
async for information in fetch_streaming_data(url, payload):
if information:
attempt:
data_json = json.masses(information)
if "workflow_id" in data_json:
# Ship workflow_id to principal thread
message_queue.put(("workflow_id", data_json("workflow_id")))
proceed
elif "final_result" in data_json:
# Ship final_result to principal thread
message_queue.put(("final_result", data_json("final_result")))
proceed
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ("request_user_input"):
# Ship the message to the primary thread
message_queue.put(("user_input_required", data_json))
# Wait till person enter is offered
user_input_event.wait()
user_input_event.clear()
proceed
else:
# Ship the road to the primary thread
message_queue.put(("message", format_workflow_info(data_json)))
besides json.JSONDecodeError: # todo: is that this essential?
message_queue.put(("message", information))
if data_json and "final_result" in data_json or "final_result" in str(information):
break # Cease processing after receiving the ultimate consequence
Wir speichern die Nachrichten im st.session_state
und verwenden Sie eine st.expander()
um diese gestreamten Daten anzuzeigen und zu aktualisieren.
if st.session_state.received_lines:
with expander_placeholder.container():
# Create or replace the expander with the most recent truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()
Um sicherzustellen, dass die Benutzeroberfläche reaktionsfähig bleibt und die Ereignismeldungen anzeigt, wenn sie in einem Hintergrundthread verarbeitet werden, verwenden wir eine benutzerdefinierte automatische Aktualisierung Komponente zum Aktualisieren der Seite in festgelegten Intervallen:
if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, restrict=None, key="data_refresh")
Wenn das gestreamte Ereignis vom Typ ist “request_user_input”
zeigen wir die zugehörigen Informationen in einem separaten Container an und sammeln Benutzerfeedback. Da es bei einem Workflow-Lauf mehrere Ereignisse geben kann, die Benutzereingaben erfordern, platzieren wir sie in einer Nachrichtenwarteschlange und stellen sicher, dass dem st.suggestions()
, st.text_area()
Und st.button()
die mit jedem Ereignis verknüpft sind, um sicherzustellen, dass die Widgets sich nicht gegenseitig stören:
def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
information = st.session_state.user_input_prompt
event_type = information.get("event_type")
if event_type == "request_user_input":
abstract = information.get("event_content").get("abstract")
define = information.get("event_content").get("define")
prompt_message = information.get("event_content").get(
"message", "Please overview the define."
)# show the content material for person enter
st.markdown("## Unique Abstract:")
st.text_area("Abstract", abstract, disabled=True, peak=400)
st.divider()
st.markdown("## Generated Slide Define:")
st.json(define)
st.write(prompt_message)
# Outline distinctive keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"
# Show the approval suggestions widget
approval = st.suggestions("thumbs", key=approval_key)
st.write(f"Present Approval state is: {approval}")
logging.information(f"Present Approval state is: {approval}")
# Show the suggestions textual content space
suggestions = st.text_area(
"Please present suggestions you probably have any:", key=feedback_key
)
# Deal with the submission of person response
if st.button(
"Submit Suggestions", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and suggestions utilizing distinctive keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")
# Guarantee approval_state is legitimate
if approval_state not in (0, 1):
st.error("Please choose an approval possibility.")
return
user_response = {
"approval": (
":materials/thumb_down:"
if approval_state == 0
else ":materials/thumb_up:"
),
"suggestions": user_feedback,
}
# Ship the person's response to the backend
attempt:
response = requests.submit(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.information(
f"Backend response for submitting approval: {response.status_code}"
)
besides requests.RequestException as e:
st.error(f"Didn't submit person enter: {str(e)}")
return
...
Am Ende, wenn der Workflow-Lauf abgeschlossen ist, erhält der Frontend-Consumer eine Antwort, die den Pfad zu den endgültig generierten Dateien enthält (dasselbe Foliendeck im PDF-Format zum Rendern in der Benutzeroberfläche und im PPTX-Format zum Herunterladen wie das Endergebnis). Wir zeigen die PDF-Datei an und erstellen eine Schaltfläche zum Herunterladen der PPTX-Datei:
if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
attempt:
# Fetch the PDF content material
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content materialst.markdown("### Generated Slide Deck:")
# Show the PDF utilizing an iframe
st.markdown(
f'<iframe src="information:software/pdf;base64,{base64.b64encode(st.session_state.pdf_data).decode()}" width="100%" peak="600px" kind="software/pdf"></iframe>',
unsafe_allow_html=True,
)
besides Exception as e:
st.error(f"Didn't load the PDF file: {str(e)}")
# Present the obtain button for PPTX if obtainable
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
attempt:
# Fetch the PPTX content material
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content material
st.download_button(
label="Obtain Generated PPTX",
information=pptx_data,
file_name="generated_slides.pptx",
mime="software/vnd.openxmlformats-officedocument.presentationml.presentation",
)
besides Exception as e:
st.error(f"Didn't load the PPTX file: {str(e)}")
Wir erstellen eine Multi-Service-Docker-Anwendung mit docker-compose
um die Frontend- und Backend-Apps auszuführen.
model: '3.8'providers:
backend:
construct:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./information:/app/information
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure
frontend:
construct:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network
networks:
app-network:
Das ist es! Lauf einfach docker-compose up
und wir verfügen jetzt über eine App, die einen Forschungsworkflow basierend auf der Eingabeabfrage des Benutzers ausführen, den Benutzer während der Ausführung um Suggestions bitten und dem Benutzer das Endergebnis anzeigen kann.