Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement streaming support and EventManager integration in flow execution #5460

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 145 additions & 60 deletions src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import asyncio
import time
from collections.abc import AsyncGenerator
from http import HTTPStatus
from typing import TYPE_CHECKING, Annotated
from uuid import UUID

import sqlalchemy as sa
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from loguru import logger
from sqlmodel import select

Expand All @@ -26,6 +28,7 @@
)
from langflow.custom.custom_component.component import Component
from langflow.custom.utils import build_custom_component_template, get_instance_name, update_component_build_config
from langflow.events.event_manager import create_stream_tokens_event_manager
from langflow.exceptions.api import APIException, InvalidChatInputError
from langflow.exceptions.serialization import SerializationError
from langflow.graph.graph.base import Graph
Expand All @@ -47,6 +50,7 @@
from langflow.utils.version import get_version_info

if TYPE_CHECKING:
from langflow.services.event_manager import EventManager
from langflow.services.settings.service import SettingsService

router = APIRouter(tags=["Base"])
Expand Down Expand Up @@ -92,6 +96,7 @@ async def simple_run_flow(
*,
stream: bool = False,
api_key_user: User | None = None,
event_manager: EventManager | None = None,
):
if input_request.input_value is not None and input_request.tweaks is not None:
validate_input_and_tweaks(input_request)
Expand Down Expand Up @@ -131,6 +136,7 @@ async def simple_run_flow(
inputs=inputs,
outputs=outputs,
stream=stream,
event_manager=event_manager,
)

return RunResponse(outputs=task_result, session_id=session_id)
Expand All @@ -145,6 +151,7 @@ async def simple_run_flow_task(
*,
stream: bool = False,
api_key_user: User | None = None,
event_manager: EventManager | None = None,
):
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions."""
try:
Expand All @@ -153,12 +160,96 @@ async def simple_run_flow_task(
input_request=input_request,
stream=stream,
api_key_user=api_key_user,
event_manager=event_manager,
)

except Exception: # noqa: BLE001
logger.exception(f"Error running flow {flow.id} task")


async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> AsyncGenerator:
"""Consumes events from a queue and yields them to the client while tracking timing metrics.

This coroutine continuously pulls events from the input queue and yields them to the client.
It tracks timing metrics for how long events spend in the queue and how long the client takes
to process them.

Args:
queue (asyncio.Queue): The queue containing events to be consumed and yielded
client_consumed_queue (asyncio.Queue): A queue for tracking when the client has consumed events

Yields:
The value from each event in the queue

Notes:
- Events are tuples of (event_id, value, put_time)
- Breaks the loop when receiving a None value, signaling completion
- Tracks and logs timing metrics for queue time and client processing time
- Notifies client consumption via client_consumed_queue
"""
while True:
event_id, value, put_time = await queue.get()
if value is None:
break
get_time = time.time()
yield value
get_time_yield = time.time()
client_consumed_queue.put_nowait(event_id)
logger.debug(
f"consumed event {event_id} "
f"(time in queue, {get_time - put_time:.4f}, "
f"client {get_time_yield - get_time:.4f})"
)


async def run_flow_generator(
flow: Flow,
input_request: SimplifiedAPIRequest,
api_key_user: User | None,
event_manager: EventManager,
client_consumed_queue: asyncio.Queue,
) -> None:
"""Executes a flow asynchronously and manages event streaming to the client.

This coroutine runs a flow with streaming enabled and handles the event lifecycle,
including success completion and error scenarios.

Args:
flow (Flow): The flow to execute
input_request (SimplifiedAPIRequest): The input parameters for the flow
api_key_user (User | None): Optional authenticated user running the flow
event_manager (EventManager): Manages the streaming of events to the client
client_consumed_queue (asyncio.Queue): Tracks client consumption of events

Events Generated:
- "add_message": Sent when new messages are added during flow execution
- "token": Sent for each token generated during streaming
- "end": Sent when flow execution completes, includes final result
- "error": Sent if an error occurs during execution

Notes:
- Runs the flow with streaming enabled via simple_run_flow()
- On success, sends the final result via event_manager.on_end()
- On error, logs the error and sends it via event_manager.on_error()
- Always sends a final None event to signal completion
"""
try:
result = await simple_run_flow(
flow=flow,
input_request=input_request,
stream=True,
api_key_user=api_key_user,
event_manager=event_manager,
)
event_manager.on_end(data={"result": result.model_dump()})
await client_consumed_queue.get()
except (ValueError, InvalidChatInputError, SerializationError) as e:
logger.error(f"Error running flow: {e}")
event_manager.on_error(data={"error": str(e)})
finally:
await event_manager.queue.put((None, None, time.time))
Copy link
Preview

Copilot AI Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time.time function should be called as time.time().

Suggested change
await event_manager.queue.put((None, None, time.time))
await event_manager.queue.put((None, None, time.time()))

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options


@router.post("/run/{flow_id_or_name}", response_model_exclude_none=True) # noqa: RUF100, FAST003
async def simplified_run_flow(
*,
Expand All @@ -167,76 +258,70 @@ async def simplified_run_flow(
input_request: SimplifiedAPIRequest | None = None,
stream: bool = False,
api_key_user: Annotated[UserRead, Depends(api_key_security)],
) -> RunResponse:
"""Executes a specified flow by ID.
request: Request,
):
"""Executes a specified flow by ID with support for streaming and telemetry.

Executes a specified flow by ID with input customization, performance enhancements through caching,
and optional data streaming.
This endpoint executes a flow identified by ID or name, with options for streaming the response
and tracking execution metrics. It handles both streaming and non-streaming execution modes.

### Parameters:
- `db` (Session): Database session for executing queries.
- `flow_id_or_name` (str): ID or endpoint name of the flow to run.
- `input_request` (SimplifiedAPIRequest): Request object containing input values, types, output selection, tweaks,
and session ID.
- `api_key_user` (User): User object derived from the provided API key, used for authentication.
- `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching.

### SimplifiedAPIRequest:
- `input_value` (Optional[str], default=""): Input value to pass to the flow.
- `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value,
determining how the input is interpreted.
- `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output,
affecting which components' outputs are included in the response. If set to "debug", all outputs are returned.
- `output_component` (Optional[str], default=None): Specific component output to retrieve. If provided,
only the output of the specified component is returned. This overrides the `output_type` parameter.
- `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution
parameters.
- `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for
subsequent requests.


### Tweaks
A dictionary of tweaks to customize the flow execution.
The tweaks can be used to modify the flow's parameters and components.
Tweaks can be overridden by the input values.
You can use Component's `id` or Display Name as key to tweak a specific component
(e.g., `{"Component Name": {"parameter_name": "value"}}`).
You can also use the parameter name as key to tweak all components with that parameter
(e.g., `{"parameter_name": "value"}`).

### Returns:
- A `RunResponse` object containing the execution results, including selected (or all, based on `output_type`)
outputs of the flow and the session ID, facilitating result retrieval and further interactions in a session
context.
Args:
background_tasks (BackgroundTasks): FastAPI background task manager
flow (FlowRead | None): The flow to execute, loaded via dependency
input_request (SimplifiedAPIRequest | None): Input parameters for the flow
stream (bool): Whether to stream the response
api_key_user (UserRead): Authenticated user from API key
request (Request): The incoming HTTP request

### Raises:
- HTTPException: 404 if the specified flow ID curl -X 'POST' \

### Example:
```bash
curl -X 'POST' \
'http://<your_server>/run/{flow_id}' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H 'x-api-key: YOU_API_KEY' \
-H '
-d '{
"input_value": "Sample input",
"input_type": "chat",
"output_type": "chat",
"tweaks": {},
}'
```
Returns:
Union[StreamingResponse, RunResponse]: Either a streaming response for real-time results
or a RunResponse with the complete execution results

This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency,
supporting a wide range of applications by allowing for dynamic input and output configuration along with
performance optimizations through session management and caching.
Raises:
HTTPException: For flow not found (404) or invalid input (400)
APIException: For internal execution errors (500)

Notes:
- Supports both streaming and non-streaming execution modes
- Tracks execution time and success/failure via telemetry
- Handles graceful client disconnection in streaming mode
- Provides detailed error handling with appropriate HTTP status codes
- In streaming mode, uses EventManager to handle events:
- "add_message": New messages during execution
- "token": Individual tokens during streaming
- "end": Final execution result
"""
telemetry_service = get_telemetry_service()
input_request = input_request if input_request is not None else SimplifiedAPIRequest()
if flow is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found")
start_time = time.perf_counter()
logger.info(f"Request: {request}")

if stream:
asyncio_queue: asyncio.Queue = asyncio.Queue()
asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue()
event_manager = create_stream_tokens_event_manager(queue=asyncio_queue)
main_task = asyncio.create_task(
run_flow_generator(
flow=flow,
input_request=input_request,
api_key_user=api_key_user,
event_manager=event_manager,
client_consumed_queue=asyncio_queue_client_consumed,
)
)

async def on_disconnect() -> None:
logger.debug("Client disconnected, closing tasks")
main_task.cancel()

return StreamingResponse(
consume_and_yield(asyncio_queue, asyncio_queue_client_consumed),
background=on_disconnect,
media_type="text/event-stream",
)

try:
result = await simple_run_flow(
flow=flow,
Expand Down
8 changes: 8 additions & 0 deletions src/backend/base/langflow/events/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,11 @@ def create_default_event_manager(queue):
manager.register_event("on_build_start", "build_start")
manager.register_event("on_build_end", "build_end")
return manager


def create_stream_tokens_event_manager(queue):
manager = EventManager(queue)
manager.register_event("on_message", "add_message")
manager.register_event("on_token", "token")
manager.register_event("on_end", "end")
return manager
20 changes: 18 additions & 2 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ async def _run(
stream: bool,
session_id: str,
fallback_to_env_vars: bool,
event_manager: EventManager | None = None,
) -> list[ResultData | None]:
"""Runs the graph with the given inputs.

Expand All @@ -737,6 +738,7 @@ async def _run(
stream (bool): Whether to stream the results or not.
session_id (str): The session ID for the graph.
fallback_to_env_vars (bool): Whether to fallback to environment variables.
event_manager (EventManager | None): The event manager for the graph.

Returns:
List[Optional["ResultData"]]: The outputs of the graph.
Expand Down Expand Up @@ -770,7 +772,11 @@ async def _run(
try:
# Prioritize the webhook component if it exists
start_component_id = find_start_component_id(self._is_input_vertices)
await self.process(start_component_id=start_component_id, fallback_to_env_vars=fallback_to_env_vars)
await self.process(
start_component_id=start_component_id,
fallback_to_env_vars=fallback_to_env_vars,
event_manager=event_manager,
)
self.increment_run_count()
except Exception as exc:
self._end_all_traces_async(error=exc)
Expand Down Expand Up @@ -804,6 +810,7 @@ async def arun(
session_id: str | None = None,
stream: bool = False,
fallback_to_env_vars: bool = False,
event_manager: EventManager | None = None,
) -> list[RunOutputs]:
"""Runs the graph with the given inputs.

Expand All @@ -815,6 +822,7 @@ async def arun(
session_id (Optional[str], optional): The session ID for the graph. Defaults to None.
stream (bool, optional): Whether to stream the results or not. Defaults to False.
fallback_to_env_vars (bool, optional): Whether to fallback to environment variables. Defaults to False.
event_manager (EventManager | None): The event manager for the graph.

Returns:
List[RunOutputs]: The outputs of the graph.
Expand Down Expand Up @@ -847,6 +855,7 @@ async def arun(
stream=stream,
session_id=session_id or "",
fallback_to_env_vars=fallback_to_env_vars,
event_manager=event_manager,
)
run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs)
logger.debug(f"Run outputs: {run_output_object}")
Expand Down Expand Up @@ -1494,7 +1503,13 @@ def get_vertices_with_target(self, vertex_id: str) -> list[Vertex]:
vertices.append(vertex)
return vertices

async def process(self, *, fallback_to_env_vars: bool, start_component_id: str | None = None) -> Graph:
async def process(
self,
*,
fallback_to_env_vars: bool,
start_component_id: str | None = None,
event_manager: EventManager | None = None,
) -> Graph:
"""Processes the graph with vertices in each layer run in parallel."""
first_layer = self.sort_vertices(start_component_id=start_component_id)
vertex_task_run_count: dict[str, int] = {}
Expand All @@ -1520,6 +1535,7 @@ async def process(self, *, fallback_to_env_vars: bool, start_component_id: str |
fallback_to_env_vars=fallback_to_env_vars,
get_cache=chat_service.get_cache,
set_cache=chat_service.set_cache,
event_manager=event_manager,
),
name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}",
)
Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/processing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from langflow.api.v1.schemas import InputValueRequest
from langflow.graph.graph.base import Graph
from langflow.graph.schema import RunOutputs
from langflow.services.event_manager import EventManager


class Result(BaseModel):
Expand All @@ -30,6 +31,7 @@ async def run_graph_internal(
session_id: str | None = None,
inputs: list[InputValueRequest] | None = None,
outputs: list[str] | None = None,
event_manager: EventManager | None = None,
) -> tuple[list[RunOutputs], str]:
"""Run the graph and generate the result."""
inputs = inputs or []
Expand All @@ -55,6 +57,7 @@ async def run_graph_internal(
stream=stream,
session_id=effective_session_id or "",
fallback_to_env_vars=fallback_to_env_vars,
event_manager=event_manager,
)
return run_outputs, effective_session_id

Expand Down
Loading