Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batch run beta component #5489

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/backend/base/langflow/components/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .structured_output import StructuredOutputComponent

__all__ = [
"BatchRunComponent",
"CreateListComponent",
"CurrentDateComponent",
"IDGeneratorComponent",
Expand Down
93 changes: 93 additions & 0 deletions src/backend/base/langflow/components/helpers/batch_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from langflow.custom import Component
from langflow.field_typing import LanguageModel

Check failure on line 2 in src/backend/base/langflow/components/helpers/batch_run.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (TC001)

src/backend/base/langflow/components/helpers/batch_run.py:2:35: TC001 Move application import `langflow.field_typing.LanguageModel` into a type-checking block
from langflow.io import (
DataFrameInput,
HandleInput,
MultilineInput,
Output,
StrInput,
)
from langflow.schema import DataFrame
from langflow.schema.message import Message


class BatchRunComponent(Component):
display_name = "Batch Run"
description = "Runs a language model over each row of a DataFrame’s text column and returns a DataFrame containing one model response per row."

Check failure on line 16 in src/backend/base/langflow/components/helpers/batch_run.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (RUF001)

src/backend/base/langflow/components/helpers/batch_run.py:16:70: RUF001 String contains ambiguous `’` (RIGHT SINGLE QUOTATION MARK). Did you mean ``` (GRAVE ACCENT)?

Check failure on line 16 in src/backend/base/langflow/components/helpers/batch_run.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (E501)

src/backend/base/langflow/components/helpers/batch_run.py:16:121: E501 Line too long (147 > 120)
icon = "List"
beta = True

inputs = [
HandleInput(
name="model",
display_name="Language Model",
info="Connect the 'Language Model' output from your LLM component here.",
input_types=["LanguageModel"],
),
MultilineInput(
name="system_message",
display_name="System Message",
info="Multi-line system instruction for all items in the DataFrame.",
required=False,
),
DataFrameInput(
name="df",
display_name="DataFrame",
info="The DataFrame whose column (specified by 'column_name') we'll treat as text messages.",
),
StrInput(
name="column_name",
display_name="Column Name",
info="The name of the DataFrame column to treat as text messages. Default='text'.",
value="text",
),
]

outputs = [
Output(
display_name="Batch Results",
name="batch_results",
method="run_batch",
info="A list of processed messages returned by the model, one per row in the chosen DataFrame column.",
),
]

def run_batch(self) -> DataFrame:
"""For each row in df[column_name], combine that text with system_message
and invoke the model, returning a list of responses as Langflow Messages.
"""

Check failure on line 58 in src/backend/base/langflow/components/helpers/batch_run.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (D205)

src/backend/base/langflow/components/helpers/batch_run.py:56:9: D205 1 blank line required between summary line and description
# Retrieve inputs
model: LanguageModel = self.model
system_msg: str = self.system_message or ""
df: DataFrame = self.df
col_name: str = self.column_name or "text"

if col_name not in df.columns:
raise ValueError(f"Column '{col_name}' not found in the DataFrame.")

Check failure on line 66 in src/backend/base/langflow/components/helpers/batch_run.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (TRY003)

src/backend/base/langflow/components/helpers/batch_run.py:66:19: TRY003 Avoid specifying long messages outside the exception class

Check failure on line 66 in src/backend/base/langflow/components/helpers/batch_run.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (EM102)

src/backend/base/langflow/components/helpers/batch_run.py:66:30: EM102 Exception must not use an f-string literal, assign to variable first

# We'll treat each row's text as a user message
user_texts = df[col_name].astype(str).tolist()

results: list[Message] = []

for text in user_texts:
# Build conversation array: system + user
conversation = []
if system_msg:
conversation.append({"role": "system", "content": system_msg})
conversation.append({"role": "user", "content": text})

# Invoke the model
response = model.invoke(conversation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do this. Langchain has a batch method. Also, it is better to using async methods as it can help with performance.

https://python.langchain.com/docs/how_to/lcel_cheatsheet/#batch-a-runnable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what's the proposal? Change this component or create a "batch mode" for models?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated


# Convert response to a Langflow Message
if hasattr(response, "content"):
# If the model returns an object with .content (e.g. AIMessage)
new_message = Message(text=response.content)
else:
# Otherwise assume it's raw text or a dict
new_message = Message(text=str(response))

results.append(new_message)

return DataFrame(results)
Loading