# SPDX-License-Identifier: MIT # Copyright (c) 2025 LlamaIndex Inc. from __future__ import annotations import asyncio from typing import Any, AsyncGenerator from .context import Context from .errors import WorkflowRuntimeError from .events import Event, StopEvent, InternalDispatchEvent from .types import RunResultT class WorkflowHandler(asyncio.Future[RunResultT]): """ Handle a running workflow: await results, stream events, access context, or cancel. Instances are returned by [Workflow.run][workflows.workflow.Workflow.run]. They can be awaited for the final result and support streaming intermediate events via [stream_events][workflows.handler.WorkflowHandler.stream_events]. See Also: - [Context][workflows.context.context.Context] - [StopEvent][workflows.events.StopEvent] """ def __init__( self, *args: Any, ctx: Context | None = None, run_id: str | None = None, **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) self.run_id = run_id self._ctx = ctx self._all_events_consumed = False @property def ctx(self) -> Context | None: """The workflow [Context][workflows.context.context.Context] for this run.""" return self._ctx def __str__(self) -> str: return str(self.result()) def is_done(self) -> bool: """Return True when the workflow has completed.""" return self.done() async def stream_events( self, expose_internal: bool = False ) -> AsyncGenerator[Event, None]: """ Stream events from the workflow execution as they occur. This method provides real-time access to events generated during workflow execution, allowing for monitoring and processing of intermediate results. Events are yielded in the order they are generated by the workflow. The stream includes all events written to the context's streaming queue, and terminates when a [StopEvent][workflows.events.StopEvent] is encountered, indicating the workflow has completed. Args: expose_internal (bool): Whether to expose internal events. Returns: AsyncGenerator[Event, None]: An async generator that yields Event objects as they are produced by the workflow. Raises: ValueError: If the context is not set on the handler. WorkflowRuntimeError: If all events have already been consumed by a previous call to `stream_events()` on the same handler instance. Examples: ```python handler = workflow.run() # Stream and process events in real-time async for event in handler.stream_events(): if isinstance(event, StopEvent): print(f"Workflow completed with result: {event.result}") else: print(f"Received event: {event}") # Get final result result = await handler ``` Note: Events can only be streamed once per handler instance. Subsequent calls to `stream_events()` will raise a WorkflowRuntimeError. """ if self.ctx is None: raise ValueError("Context is not set!") # Check if we already consumed all the streamed events if self._all_events_consumed: msg = "All the streamed events have already been consumed." raise WorkflowRuntimeError(msg) while True: ev = await self.ctx.streaming_queue.get() if isinstance(ev, InternalDispatchEvent) and not expose_internal: continue yield ev if isinstance(ev, StopEvent): self._all_events_consumed = True break async def cancel_run(self) -> None: """Cancel the running workflow. Signals the underlying context to raise [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser], which will be caught by the workflow and gracefully end the run. Examples: ```python handler = workflow.run() await handler.cancel_run() ``` """ if self.ctx: self.ctx._cancel_flag.set() await asyncio.sleep(0)