Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement streaming support and EventManager integration in flow execution #5460

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

ogabrielluiz
Copy link
Contributor

@ogabrielluiz ogabrielluiz commented Dec 26, 2024

This pull request introduces several enhancements to the langflow API, focusing on adding support for event streaming and improving flow execution telemetry. The key changes include the integration of an EventManager for handling streaming events, updates to flow execution functions to support streaming, and the addition of new utility functions for event management.

Enhancements to event streaming and telemetry:

Updates to flow execution functions:

Test it using the Basic Prompting template with stream enabled in the OpenAI component and running the following script: https://gist.github.com/ogabrielluiz/150c256f2030379d94913a36817e3784

…nt handling

- Added EventManager import and parameter to run_graph_internal function.
- Updated function call to include event_manager for improved event management during graph execution.
- Added event_manager parameter to multiple methods in the Graph class to facilitate better event management during graph execution.
- Updated process and run methods to include event_manager, ensuring it is passed through to relevant function calls.
- Improved documentation for methods to reflect the new event_manager parameter.
… integration

- Added support for streaming responses in the simplified_run_flow endpoint, allowing real-time event handling during flow execution.
- Introduced consume_and_yield and run_flow_generator functions to manage event consumption and client communication.
- Integrated EventManager for enhanced event tracking, including success and error notifications.
- Updated endpoint documentation to reflect new streaming capabilities and parameters.
- Improved error handling and logging for better debugging and client disconnection management.
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. enhancement New feature or request labels Dec 26, 2024
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Dec 26, 2024
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Dec 26, 2024
Copy link

codspeed-hq bot commented Dec 26, 2024

CodSpeed Performance Report

Merging #5460 will improve performances by ×2.5

Comparing add-streaming-to-run (66979a0) with main (bd7f913)

Summary

⚡ 2 improvements
✅ 13 untouched benchmarks

Benchmarks breakdown

Benchmark main add-streaming-to-run Change
test_setup_llm_caching 2.9 ms 1.2 ms ×2.5
test_successful_run_with_input_type_any 266.2 ms 215.8 ms +23.35%

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

src/backend/base/langflow/api/v1/endpoints.py:209

  • The event_manager parameter should be checked for None before usage to avoid potential issues.
event_manager: EventManager,

logger.error(f"Error running flow: {e}")
event_manager.on_error(data={"error": str(e)})
finally:
await event_manager.queue.put((None, None, time.time))
Copy link
Preview

Copilot AI Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time.time function should be called as time.time().

Suggested change
await event_manager.queue.put((None, None, time.time))
await event_manager.queue.put((None, None, time.time()))

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
- Removed the logging of the request object in the simplified_run_flow function to streamline logging and reduce verbosity.
- This change enhances the clarity of logs by focusing on essential information during flow execution.
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Dec 26, 2024
@edwinjosechittilappilly
Copy link
Collaborator

Good Work!

I believe this may not cause any reverse compatibility issues with existing APIs ?
I also plan to look into this PR.

@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Dec 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants