From b29db6755c657e5a0ec2c824cf5f8c5d1f3c5155 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 26 Dec 2024 18:04:26 -0300 Subject: [PATCH 1/5] refactor: add create_stream_tokens_event_manager for handling streaming events --- src/backend/base/langflow/events/event_manager.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/backend/base/langflow/events/event_manager.py b/src/backend/base/langflow/events/event_manager.py index d293e338e70d..9ae4e9172843 100644 --- a/src/backend/base/langflow/events/event_manager.py +++ b/src/backend/base/langflow/events/event_manager.py @@ -93,3 +93,11 @@ def create_default_event_manager(queue): manager.register_event("on_build_start", "build_start") manager.register_event("on_build_end", "build_end") return manager + + +def create_stream_tokens_event_manager(queue): + manager = EventManager(queue) + manager.register_event("on_message", "add_message") + manager.register_event("on_token", "token") + manager.register_event("on_end", "end") + return manager From 6c0f5c6148df24c34c531f77da82b754b01b4ba2 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 26 Dec 2024 18:04:58 -0300 Subject: [PATCH 2/5] feat: integrate EventManager into run_graph_internal for enhanced event handling - Added EventManager import and parameter to run_graph_internal function. - Updated function call to include event_manager for improved event management during graph execution. --- src/backend/base/langflow/processing/process.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/base/langflow/processing/process.py b/src/backend/base/langflow/processing/process.py index 4b570966f08f..3cc4bc72d929 100644 --- a/src/backend/base/langflow/processing/process.py +++ b/src/backend/base/langflow/processing/process.py @@ -15,6 +15,7 @@ from langflow.api.v1.schemas import InputValueRequest from langflow.graph.graph.base import Graph from langflow.graph.schema import RunOutputs + from langflow.services.event_manager import EventManager class Result(BaseModel): @@ -30,6 +31,7 @@ async def run_graph_internal( session_id: str | None = None, inputs: list[InputValueRequest] | None = None, outputs: list[str] | None = None, + event_manager: EventManager | None = None, ) -> tuple[list[RunOutputs], str]: """Run the graph and generate the result.""" inputs = inputs or [] @@ -55,6 +57,7 @@ async def run_graph_internal( stream=stream, session_id=effective_session_id or "", fallback_to_env_vars=fallback_to_env_vars, + event_manager=event_manager, ) return run_outputs, effective_session_id From b2134bfac0314e8efd25ed2565c087660c89515c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 26 Dec 2024 18:05:11 -0300 Subject: [PATCH 3/5] feat: enhance Graph class with event_manager parameter - Added event_manager parameter to multiple methods in the Graph class to facilitate better event management during graph execution. - Updated process and run methods to include event_manager, ensuring it is passed through to relevant function calls. - Improved documentation for methods to reflect the new event_manager parameter. --- src/backend/base/langflow/graph/graph/base.py | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index ced2c2af4ffb..15215dd624b4 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -726,6 +726,7 @@ async def _run( stream: bool, session_id: str, fallback_to_env_vars: bool, + event_manager: EventManager | None = None, ) -> list[ResultData | None]: """Runs the graph with the given inputs. @@ -737,6 +738,7 @@ async def _run( stream (bool): Whether to stream the results or not. session_id (str): The session ID for the graph. fallback_to_env_vars (bool): Whether to fallback to environment variables. + event_manager (EventManager | None): The event manager for the graph. Returns: List[Optional["ResultData"]]: The outputs of the graph. @@ -770,7 +772,11 @@ async def _run( try: # Prioritize the webhook component if it exists start_component_id = find_start_component_id(self._is_input_vertices) - await self.process(start_component_id=start_component_id, fallback_to_env_vars=fallback_to_env_vars) + await self.process( + start_component_id=start_component_id, + fallback_to_env_vars=fallback_to_env_vars, + event_manager=event_manager, + ) self.increment_run_count() except Exception as exc: self._end_all_traces_async(error=exc) @@ -804,6 +810,7 @@ async def arun( session_id: str | None = None, stream: bool = False, fallback_to_env_vars: bool = False, + event_manager: EventManager | None = None, ) -> list[RunOutputs]: """Runs the graph with the given inputs. @@ -815,6 +822,7 @@ async def arun( session_id (Optional[str], optional): The session ID for the graph. Defaults to None. stream (bool, optional): Whether to stream the results or not. Defaults to False. fallback_to_env_vars (bool, optional): Whether to fallback to environment variables. Defaults to False. + event_manager (EventManager | None): The event manager for the graph. Returns: List[RunOutputs]: The outputs of the graph. @@ -847,6 +855,7 @@ async def arun( stream=stream, session_id=session_id or "", fallback_to_env_vars=fallback_to_env_vars, + event_manager=event_manager, ) run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs) logger.debug(f"Run outputs: {run_output_object}") @@ -1494,7 +1503,13 @@ def get_vertices_with_target(self, vertex_id: str) -> list[Vertex]: vertices.append(vertex) return vertices - async def process(self, *, fallback_to_env_vars: bool, start_component_id: str | None = None) -> Graph: + async def process( + self, + *, + fallback_to_env_vars: bool, + start_component_id: str | None = None, + event_manager: EventManager | None = None, + ) -> Graph: """Processes the graph with vertices in each layer run in parallel.""" first_layer = self.sort_vertices(start_component_id=start_component_id) vertex_task_run_count: dict[str, int] = {} @@ -1520,6 +1535,7 @@ async def process(self, *, fallback_to_env_vars: bool, start_component_id: str | fallback_to_env_vars=fallback_to_env_vars, get_cache=chat_service.get_cache, set_cache=chat_service.set_cache, + event_manager=event_manager, ), name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}", ) From 3602bbc4990c3ecd56184ad2b51ec22833a5d9cd Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 26 Dec 2024 18:05:23 -0300 Subject: [PATCH 4/5] feat: implement streaming support in flow execution with EventManager integration - Added support for streaming responses in the simplified_run_flow endpoint, allowing real-time event handling during flow execution. - Introduced consume_and_yield and run_flow_generator functions to manage event consumption and client communication. - Integrated EventManager for enhanced event tracking, including success and error notifications. - Updated endpoint documentation to reflect new streaming capabilities and parameters. - Improved error handling and logging for better debugging and client disconnection management. --- src/backend/base/langflow/api/v1/endpoints.py | 205 +++++++++++++----- 1 file changed, 145 insertions(+), 60 deletions(-) diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index d150bdb1576d..c30efda58843 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -2,6 +2,7 @@ import asyncio import time +from collections.abc import AsyncGenerator from http import HTTPStatus from typing import TYPE_CHECKING, Annotated from uuid import UUID @@ -9,6 +10,7 @@ import sqlalchemy as sa from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status from fastapi.encoders import jsonable_encoder +from fastapi.responses import StreamingResponse from loguru import logger from sqlmodel import select @@ -26,6 +28,7 @@ ) from langflow.custom.custom_component.component import Component from langflow.custom.utils import build_custom_component_template, get_instance_name, update_component_build_config +from langflow.events.event_manager import create_stream_tokens_event_manager from langflow.exceptions.api import APIException, InvalidChatInputError from langflow.exceptions.serialization import SerializationError from langflow.graph.graph.base import Graph @@ -47,6 +50,7 @@ from langflow.utils.version import get_version_info if TYPE_CHECKING: + from langflow.services.event_manager import EventManager from langflow.services.settings.service import SettingsService router = APIRouter(tags=["Base"]) @@ -92,6 +96,7 @@ async def simple_run_flow( *, stream: bool = False, api_key_user: User | None = None, + event_manager: EventManager | None = None, ): if input_request.input_value is not None and input_request.tweaks is not None: validate_input_and_tweaks(input_request) @@ -131,6 +136,7 @@ async def simple_run_flow( inputs=inputs, outputs=outputs, stream=stream, + event_manager=event_manager, ) return RunResponse(outputs=task_result, session_id=session_id) @@ -145,6 +151,7 @@ async def simple_run_flow_task( *, stream: bool = False, api_key_user: User | None = None, + event_manager: EventManager | None = None, ): """Run a flow task as a BackgroundTask, therefore it should not throw exceptions.""" try: @@ -153,12 +160,96 @@ async def simple_run_flow_task( input_request=input_request, stream=stream, api_key_user=api_key_user, + event_manager=event_manager, ) except Exception: # noqa: BLE001 logger.exception(f"Error running flow {flow.id} task") +async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> AsyncGenerator: + """Consumes events from a queue and yields them to the client while tracking timing metrics. + + This coroutine continuously pulls events from the input queue and yields them to the client. + It tracks timing metrics for how long events spend in the queue and how long the client takes + to process them. + + Args: + queue (asyncio.Queue): The queue containing events to be consumed and yielded + client_consumed_queue (asyncio.Queue): A queue for tracking when the client has consumed events + + Yields: + The value from each event in the queue + + Notes: + - Events are tuples of (event_id, value, put_time) + - Breaks the loop when receiving a None value, signaling completion + - Tracks and logs timing metrics for queue time and client processing time + - Notifies client consumption via client_consumed_queue + """ + while True: + event_id, value, put_time = await queue.get() + if value is None: + break + get_time = time.time() + yield value + get_time_yield = time.time() + client_consumed_queue.put_nowait(event_id) + logger.debug( + f"consumed event {event_id} " + f"(time in queue, {get_time - put_time:.4f}, " + f"client {get_time_yield - get_time:.4f})" + ) + + +async def run_flow_generator( + flow: Flow, + input_request: SimplifiedAPIRequest, + api_key_user: User | None, + event_manager: EventManager, + client_consumed_queue: asyncio.Queue, +) -> None: + """Executes a flow asynchronously and manages event streaming to the client. + + This coroutine runs a flow with streaming enabled and handles the event lifecycle, + including success completion and error scenarios. + + Args: + flow (Flow): The flow to execute + input_request (SimplifiedAPIRequest): The input parameters for the flow + api_key_user (User | None): Optional authenticated user running the flow + event_manager (EventManager): Manages the streaming of events to the client + client_consumed_queue (asyncio.Queue): Tracks client consumption of events + + Events Generated: + - "add_message": Sent when new messages are added during flow execution + - "token": Sent for each token generated during streaming + - "end": Sent when flow execution completes, includes final result + - "error": Sent if an error occurs during execution + + Notes: + - Runs the flow with streaming enabled via simple_run_flow() + - On success, sends the final result via event_manager.on_end() + - On error, logs the error and sends it via event_manager.on_error() + - Always sends a final None event to signal completion + """ + try: + result = await simple_run_flow( + flow=flow, + input_request=input_request, + stream=True, + api_key_user=api_key_user, + event_manager=event_manager, + ) + event_manager.on_end(data={"result": result.model_dump()}) + await client_consumed_queue.get() + except (ValueError, InvalidChatInputError, SerializationError) as e: + logger.error(f"Error running flow: {e}") + event_manager.on_error(data={"error": str(e)}) + finally: + await event_manager.queue.put((None, None, time.time)) + + @router.post("/run/{flow_id_or_name}", response_model_exclude_none=True) # noqa: RUF100, FAST003 async def simplified_run_flow( *, @@ -167,76 +258,70 @@ async def simplified_run_flow( input_request: SimplifiedAPIRequest | None = None, stream: bool = False, api_key_user: Annotated[UserRead, Depends(api_key_security)], -) -> RunResponse: - """Executes a specified flow by ID. + request: Request, +): + """Executes a specified flow by ID with support for streaming and telemetry. - Executes a specified flow by ID with input customization, performance enhancements through caching, - and optional data streaming. + This endpoint executes a flow identified by ID or name, with options for streaming the response + and tracking execution metrics. It handles both streaming and non-streaming execution modes. - ### Parameters: - - `db` (Session): Database session for executing queries. - - `flow_id_or_name` (str): ID or endpoint name of the flow to run. - - `input_request` (SimplifiedAPIRequest): Request object containing input values, types, output selection, tweaks, - and session ID. - - `api_key_user` (User): User object derived from the provided API key, used for authentication. - - `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching. - - ### SimplifiedAPIRequest: - - `input_value` (Optional[str], default=""): Input value to pass to the flow. - - `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value, - determining how the input is interpreted. - - `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output, - affecting which components' outputs are included in the response. If set to "debug", all outputs are returned. - - `output_component` (Optional[str], default=None): Specific component output to retrieve. If provided, - only the output of the specified component is returned. This overrides the `output_type` parameter. - - `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution - parameters. - - `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for - subsequent requests. - - - ### Tweaks - A dictionary of tweaks to customize the flow execution. - The tweaks can be used to modify the flow's parameters and components. - Tweaks can be overridden by the input values. - You can use Component's `id` or Display Name as key to tweak a specific component - (e.g., `{"Component Name": {"parameter_name": "value"}}`). - You can also use the parameter name as key to tweak all components with that parameter - (e.g., `{"parameter_name": "value"}`). - - ### Returns: - - A `RunResponse` object containing the execution results, including selected (or all, based on `output_type`) - outputs of the flow and the session ID, facilitating result retrieval and further interactions in a session - context. + Args: + background_tasks (BackgroundTasks): FastAPI background task manager + flow (FlowRead | None): The flow to execute, loaded via dependency + input_request (SimplifiedAPIRequest | None): Input parameters for the flow + stream (bool): Whether to stream the response + api_key_user (UserRead): Authenticated user from API key + request (Request): The incoming HTTP request - ### Raises: - - HTTPException: 404 if the specified flow ID curl -X 'POST' \ - - ### Example: - ```bash - curl -X 'POST' \ - 'http:///run/{flow_id}' \ - -H 'accept: application/json' \ - -H 'Content-Type: application/json' \ - -H 'x-api-key: YOU_API_KEY' \ - -H ' - -d '{ - "input_value": "Sample input", - "input_type": "chat", - "output_type": "chat", - "tweaks": {}, - }' - ``` + Returns: + Union[StreamingResponse, RunResponse]: Either a streaming response for real-time results + or a RunResponse with the complete execution results - This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, - supporting a wide range of applications by allowing for dynamic input and output configuration along with - performance optimizations through session management and caching. + Raises: + HTTPException: For flow not found (404) or invalid input (400) + APIException: For internal execution errors (500) + + Notes: + - Supports both streaming and non-streaming execution modes + - Tracks execution time and success/failure via telemetry + - Handles graceful client disconnection in streaming mode + - Provides detailed error handling with appropriate HTTP status codes + - In streaming mode, uses EventManager to handle events: + - "add_message": New messages during execution + - "token": Individual tokens during streaming + - "end": Final execution result """ telemetry_service = get_telemetry_service() input_request = input_request if input_request is not None else SimplifiedAPIRequest() if flow is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found") start_time = time.perf_counter() + logger.info(f"Request: {request}") + + if stream: + asyncio_queue: asyncio.Queue = asyncio.Queue() + asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue() + event_manager = create_stream_tokens_event_manager(queue=asyncio_queue) + main_task = asyncio.create_task( + run_flow_generator( + flow=flow, + input_request=input_request, + api_key_user=api_key_user, + event_manager=event_manager, + client_consumed_queue=asyncio_queue_client_consumed, + ) + ) + + async def on_disconnect() -> None: + logger.debug("Client disconnected, closing tasks") + main_task.cancel() + + return StreamingResponse( + consume_and_yield(asyncio_queue, asyncio_queue_client_consumed), + background=on_disconnect, + media_type="text/event-stream", + ) + try: result = await simple_run_flow( flow=flow, From 66979a09318f687338a982213263f22b25d9d3e2 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 26 Dec 2024 18:21:26 -0300 Subject: [PATCH 5/5] refactor: remove request logging from simplified_run_flow endpoint - Removed the logging of the request object in the simplified_run_flow function to streamline logging and reduce verbosity. - This change enhances the clarity of logs by focusing on essential information during flow execution. --- src/backend/base/langflow/api/v1/endpoints.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index c30efda58843..f118cdaea461 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -258,7 +258,6 @@ async def simplified_run_flow( input_request: SimplifiedAPIRequest | None = None, stream: bool = False, api_key_user: Annotated[UserRead, Depends(api_key_security)], - request: Request, ): """Executes a specified flow by ID with support for streaming and telemetry. @@ -296,7 +295,6 @@ async def simplified_run_flow( if flow is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found") start_time = time.perf_counter() - logger.info(f"Request: {request}") if stream: asyncio_queue: asyncio.Queue = asyncio.Queue()