Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add_field block - batch processing exception causes all records to be rejected #387 #388

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions core/src/datayoga_core/blocks/add_field/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datayoga_core import expression, utils
from datayoga_core.block import Block as DyBlock
from datayoga_core.context import Context
from datayoga_core.result import BlockResult
from datayoga_core.result import BlockResult, Result, Status

logger = logging.getLogger("dy")

Expand All @@ -22,10 +22,35 @@ def init(self, context: Optional[Context] = None):

async def run(self, data: List[Dict[str, Any]]) -> BlockResult:
logger.debug(f"Running {self.get_block_name()}")
for field, expr in self.fields.items():
expression_results = expr.search_bulk(data)

for i, row in enumerate(data):
utils.set_field(row, field, expression_results[i])
result = BlockResult()

for field, expr in self.fields.items():
try:
# Try batch processing first
expression_results = expr.search_bulk(data)

# If successful, set fields for all records
for i, row in enumerate(data):
utils.set_field(row, field, expression_results[i])
except Exception as e:
logger.debug(
f"Batch processing failed for field {field} with {e}, falling back to individual processing")

# Process each record individually
for row in data:
try:
single_result = expr.search(row)
utils.set_field(row, field, single_result)

except Exception as record_error:
# Add to rejected list with error message
result.rejected.append(Result(status=Status.REJECTED, payload=row, message=f"{record_error}"))
continue

# Add to processed list if successful
result.processed.append(Result(status=Status.SUCCESS, payload=row))

return result

# If we get here, batch processing was successful for all fields
return utils.all_success(data)
46 changes: 46 additions & 0 deletions core/src/datayoga_core/blocks/add_field/tests/test_add_field.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from datayoga_core import utils
from datayoga_core.blocks.add_field.block import Block
from datayoga_core.result import Status


@pytest.mark.asyncio
Expand Down Expand Up @@ -122,3 +123,48 @@ async def test_add_field_with_dot():
]) == utils.all_success([
{"fname": "john", "lname": "doe", "name.full_name": "john doe"}
])


@pytest.mark.asyncio
async def test_single_record_failure():
"""Test case showing that JSON parse failures on some records shouldn't fail the entire batch."""
block = Block({
"field": "parsed",
"language": "jmespath",
"expression": "json_parse(JSON_FORMAT)"
})
block.init()

test_data = [
{"JSON_FORMAT": '{"valid": "json1"}'},
{"JSON_FORMAT": "{invalid_json1"},
{"JSON_FORMAT": '{"valid": "json2"}'},
{"JSON_FORMAT": "{invalid_json2"},
{"JSON_FORMAT": '{"valid": "json3"}'},
{"JSON_FORMAT": '{"name": "test"}'}
]

result = await block.run(test_data)

# Check counts
assert len(result.processed) == 4 # Should have 4 successful records
assert len(result.rejected) == 2 # Should have 2 rejected records
assert len(result.filtered) == 0

# Check processed records
for i, record in enumerate(result.processed):
assert record.status == Status.SUCCESS
if i == 0:
assert record.payload["parsed"] == {"valid": "json1"}
elif i == 1:
assert record.payload["parsed"] == {"valid": "json2"}
elif i == 2:
assert record.payload["parsed"] == {"valid": "json3"}
elif i == 3:
assert record.payload["parsed"] == {"name": "test"}

# Check rejected records
for record in result.rejected:
assert record.status == Status.REJECTED
assert "invalid_json" in record.payload["JSON_FORMAT"]
assert record.message # Should contain JSON parse error
Loading