Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add try/except logic to safely parse JSON objects #65

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions agent_gateway/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,32 @@ def _parse_snowflake_response(self, data_str):
# Remove the 'data: ' prefix if it exists
if obj.startswith("data: "):
obj = obj[6:]
# Load the JSON object into a Python dictionary
json_dict = json.loads(obj, strict=False)
# Append the JSON dictionary to the list
json_list.append(json_dict)
try:
# Load the JSON object into a Python dictionary
json_dict = json.loads(obj, strict=False)
# Append the JSON dictionary to the list
json_list.append(json_dict)
except json.JSONDecodeError as e:
gateway_logger.log(
logging.ERROR,
f"Failed to parse JSON object: {obj}. Error: {e}",
)
continue

completion = ""
choices = {}
for chunk in json_list:
choices = chunk["choices"][0]

if "content" in choices["delta"].keys():
completion += choices["delta"]["content"]
# Ensure 'choices' key exists in the chunk
if "choices" in chunk and chunk["choices"]:
choices = chunk["choices"][0]

# Ensure 'delta' key exists in choices and it contains 'content'
if "delta" in choices and "content" in choices["delta"]:
completion += choices["delta"]["content"]
else:
gateway_logger.log(
logging.WARNING, f"Missing or empty 'choices' in chunk: {chunk}"
)

return completion
except KeyError as e:
Expand Down
121 changes: 80 additions & 41 deletions agent_gateway/gateway/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from langchain.callbacks.base import AsyncCallbackHandler

from agent_gateway.executors.schema import Plan
from agent_gateway.gateway.constants import END_OF_PLAN
from agent_gateway.gateway.output_parser import (
ACTION_PATTERN,
Expand All @@ -32,7 +31,6 @@
)
from agent_gateway.gateway.task_processor import Task
from agent_gateway.tools.base import StructuredTool, Tool
from agent_gateway.tools.logger import gateway_logger
from agent_gateway.tools.utils import CortexEndpointBuilder, post_cortex_request


Expand Down Expand Up @@ -272,49 +270,90 @@ def _prepare_llm_request(self, prompt):

return headers, url, data

def _parse_snowflake_response(self, data_str):
json_objects = data_str.split("\ndata: ")
json_list = []

# Iterate over each JSON object
for obj in json_objects:
obj = obj.strip()
if obj:
# Remove the 'data: ' prefix if it exists
if obj.startswith("data: "):
obj = obj[6:]
# Load the JSON object into a Python dictionary
json_dict = json.loads(str(obj))
# Append the JSON dictionary to the list
json_list.append(json_dict)

completion = ""
choices = {}
for chunk in json_list:
choices = chunk["choices"][0]

if "content" in choices["delta"].keys():
completion += choices["delta"]["content"]

gateway_logger.log(logging.DEBUG, f"Planner response:{completion}")
return completion

async def plan(self, inputs: dict, is_replan: bool, **kwargs: Any):
llm_response = await self.run_llm(
inputs=inputs,
is_replan=is_replan,
)
llm_response = llm_response + "\n"
plan_response = self.output_parser.parse(llm_response)
return plan_response
def _parse_snowflake_response(self, data_str: str) -> str:
"""
Parse the snowflake response string and return the completion text.
"""
try:
json_objects = data_str.split("\ndata: ")
json_list = []

# Iterate over each JSON object and parse it safely
for obj in json_objects:
obj = obj.strip()
if obj:
# Remove the 'data: ' prefix if it exists
if obj.startswith("data: "):
obj = obj[6:]

# Safely parse the JSON object
try:
json_dict = json.loads(obj) # JSON parsing
json_list.append(json_dict)
except json.JSONDecodeError as e:
self.gateway_logger.log(
logging.ERROR,
f"Failed to decode JSON object: {obj}. Error: {e}",
)
continue # Skip malformed objects

completion = ""
choices = {}

for chunk in json_list:
try:
choices = chunk["choices"][0] # Get the first choice
if "content" in choices["delta"]:
completion += choices["delta"]["content"]
except KeyError as e:
self.gateway_logger.log(
logging.WARNING, f"Missing key in chunk: {e}. Chunk: {chunk}"
)
continue # Skip chunks with missing keys

self.gateway_logger.log(logging.DEBUG, f"Planner response: {completion}")
return completion

except Exception as e:
self.gateway_logger.log(
logging.ERROR, f"Unexpected error in _parse_snowflake_response: {e}"
)
return "" # Return empty string on failure

async def plan(self, inputs: dict, is_replan: bool, **kwargs: Any) -> Optional[str]:
"""
Asynchronously generate a plan based on inputs.
"""
try:
llm_response = await self.run_llm(inputs=inputs, is_replan=is_replan)
llm_response += "\n"

# Parse the LLM response
plan_response = self.output_parser.parse(llm_response)
return plan_response

except Exception as e:
self.gateway_logger.log(logging.ERROR, f"Error in plan method: {e}")
return None # Return None to indicate failure

async def aplan(
self,
inputs: dict,
task_queue: asyncio.Queue[Optional[str]],
is_replan: bool,
**kwargs: Any,
) -> Plan:
"""Given input, asynchronously decide what to do."""
aplan_response = self.run_llm(inputs=inputs, is_replan=is_replan)
await aplan_response
) -> Optional[Any]:
"""
Asynchronously decide what to do given input.
"""
try:
# Run the LLM asynchronously
aplan_response = await self.run_llm(inputs=inputs, is_replan=is_replan)

# Process the response (if necessary, parse here or in the caller)
await aplan_response
return aplan_response

except Exception as e:
self.gateway_logger.log(logging.ERROR, f"Error in aplan method: {e}")
return None # Return None to indicate failure