131 lines
4.3 KiB
Python
Executable file
131 lines
4.3 KiB
Python
Executable file
# SPDX-License-Identifier: MIT
|
|
# Copyright (c) 2025 LlamaIndex Inc.
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any, AsyncGenerator
|
|
|
|
from .context import Context
|
|
from .errors import WorkflowRuntimeError
|
|
from .events import Event, StopEvent, InternalDispatchEvent
|
|
from .types import RunResultT
|
|
|
|
|
|
class WorkflowHandler(asyncio.Future[RunResultT]):
|
|
"""
|
|
Handle a running workflow: await results, stream events, access context, or cancel.
|
|
|
|
Instances are returned by [Workflow.run][workflows.workflow.Workflow.run].
|
|
They can be awaited for the final result and support streaming intermediate
|
|
events via [stream_events][workflows.handler.WorkflowHandler.stream_events].
|
|
|
|
See Also:
|
|
- [Context][workflows.context.context.Context]
|
|
- [StopEvent][workflows.events.StopEvent]
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*args: Any,
|
|
ctx: Context | None = None,
|
|
run_id: str | None = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
self.run_id = run_id
|
|
self._ctx = ctx
|
|
self._all_events_consumed = False
|
|
|
|
@property
|
|
def ctx(self) -> Context | None:
|
|
"""The workflow [Context][workflows.context.context.Context] for this run."""
|
|
return self._ctx
|
|
|
|
def __str__(self) -> str:
|
|
return str(self.result())
|
|
|
|
def is_done(self) -> bool:
|
|
"""Return True when the workflow has completed."""
|
|
return self.done()
|
|
|
|
async def stream_events(
|
|
self, expose_internal: bool = False
|
|
) -> AsyncGenerator[Event, None]:
|
|
"""
|
|
Stream events from the workflow execution as they occur.
|
|
|
|
This method provides real-time access to events generated during workflow
|
|
execution, allowing for monitoring and processing of intermediate results.
|
|
Events are yielded in the order they are generated by the workflow.
|
|
|
|
The stream includes all events written to the context's streaming queue,
|
|
and terminates when a [StopEvent][workflows.events.StopEvent] is
|
|
encountered, indicating the workflow has completed.
|
|
|
|
Args:
|
|
expose_internal (bool): Whether to expose internal events.
|
|
|
|
Returns:
|
|
AsyncGenerator[Event, None]: An async generator that yields Event objects
|
|
as they are produced by the workflow.
|
|
|
|
Raises:
|
|
ValueError: If the context is not set on the handler.
|
|
WorkflowRuntimeError: If all events have already been consumed by a
|
|
previous call to `stream_events()` on the same handler instance.
|
|
|
|
Examples:
|
|
```python
|
|
handler = workflow.run()
|
|
|
|
# Stream and process events in real-time
|
|
async for event in handler.stream_events():
|
|
if isinstance(event, StopEvent):
|
|
print(f"Workflow completed with result: {event.result}")
|
|
else:
|
|
print(f"Received event: {event}")
|
|
|
|
# Get final result
|
|
result = await handler
|
|
```
|
|
|
|
Note:
|
|
Events can only be streamed once per handler instance. Subsequent
|
|
calls to `stream_events()` will raise a WorkflowRuntimeError.
|
|
"""
|
|
if self.ctx is None:
|
|
raise ValueError("Context is not set!")
|
|
|
|
# Check if we already consumed all the streamed events
|
|
if self._all_events_consumed:
|
|
msg = "All the streamed events have already been consumed."
|
|
raise WorkflowRuntimeError(msg)
|
|
|
|
while True:
|
|
ev = await self.ctx.streaming_queue.get()
|
|
|
|
if isinstance(ev, InternalDispatchEvent) and not expose_internal:
|
|
continue
|
|
yield ev
|
|
|
|
if isinstance(ev, StopEvent):
|
|
self._all_events_consumed = True
|
|
break
|
|
|
|
async def cancel_run(self) -> None:
|
|
"""Cancel the running workflow.
|
|
|
|
Signals the underlying context to raise
|
|
[WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser],
|
|
which will be caught by the workflow and gracefully end the run.
|
|
|
|
Examples:
|
|
```python
|
|
handler = workflow.run()
|
|
await handler.cancel_run()
|
|
```
|
|
"""
|
|
if self.ctx:
|
|
self.ctx._cancel_flag.set()
|
|
await asyncio.sleep(0)
|