Skip to content

Commit

Permalink
Merge pull request #388 from datayoga-io/387-add_field-block-batch-pr…
Browse files Browse the repository at this point in the history
…ocessing-exception-causes-all-records-to-be-rejected

add_field block - batch processing exception causes all records to be rejected #387
  • Loading branch information
zalmane authored Dec 31, 2024
2 parents 55cb310 + 3cafb89 commit 25ef812
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
37 changes: 31 additions & 6 deletions core/src/datayoga_core/blocks/add_field/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datayoga_core import expression, utils
from datayoga_core.block import Block as DyBlock
from datayoga_core.context import Context
from datayoga_core.result import BlockResult
from datayoga_core.result import BlockResult, Result, Status

logger = logging.getLogger("dy")

Expand All @@ -22,10 +22,35 @@ def init(self, context: Optional[Context] = None):

async def run(self, data: List[Dict[str, Any]]) -> BlockResult:
logger.debug(f"Running {self.get_block_name()}")
for field, expr in self.fields.items():
expression_results = expr.search_bulk(data)

for i, row in enumerate(data):
utils.set_field(row, field, expression_results[i])
result = BlockResult()

for field, expr in self.fields.items():
try:
# Try batch processing first
expression_results = expr.search_bulk(data)

# If successful, set fields for all records
for i, row in enumerate(data):
utils.set_field(row, field, expression_results[i])
except Exception as e:
logger.debug(
f"Batch processing failed for field {field} with {e}, falling back to individual processing")

# Process each record individually
for row in data:
try:
single_result = expr.search(row)
utils.set_field(row, field, single_result)

except Exception as record_error:
# Add to rejected list with error message
result.rejected.append(Result(status=Status.REJECTED, payload=row, message=f"{record_error}"))
continue

# Add to processed list if successful
result.processed.append(Result(status=Status.SUCCESS, payload=row))

return result

# If we get here, batch processing was successful for all fields
return utils.all_success(data)
46 changes: 46 additions & 0 deletions core/src/datayoga_core/blocks/add_field/tests/test_add_field.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from datayoga_core import utils
from datayoga_core.blocks.add_field.block import Block
from datayoga_core.result import Status


@pytest.mark.asyncio
Expand Down Expand Up @@ -122,3 +123,48 @@ async def test_add_field_with_dot():
]) == utils.all_success([
{"fname": "john", "lname": "doe", "name.full_name": "john doe"}
])


@pytest.mark.asyncio
async def test_single_record_failure():
"""Test case showing that JSON parse failures on some records shouldn't fail the entire batch."""
block = Block({
"field": "parsed",
"language": "jmespath",
"expression": "json_parse(JSON_FORMAT)"
})
block.init()

test_data = [
{"JSON_FORMAT": '{"valid": "json1"}'},
{"JSON_FORMAT": "{invalid_json1"},
{"JSON_FORMAT": '{"valid": "json2"}'},
{"JSON_FORMAT": "{invalid_json2"},
{"JSON_FORMAT": '{"valid": "json3"}'},
{"JSON_FORMAT": '{"name": "test"}'}
]

result = await block.run(test_data)

# Check counts
assert len(result.processed) == 4 # Should have 4 successful records
assert len(result.rejected) == 2 # Should have 2 rejected records
assert len(result.filtered) == 0

# Check processed records
for i, record in enumerate(result.processed):
assert record.status == Status.SUCCESS
if i == 0:
assert record.payload["parsed"] == {"valid": "json1"}
elif i == 1:
assert record.payload["parsed"] == {"valid": "json2"}
elif i == 2:
assert record.payload["parsed"] == {"valid": "json3"}
elif i == 3:
assert record.payload["parsed"] == {"name": "test"}

# Check rejected records
for record in result.rejected:
assert record.status == Status.REJECTED
assert "invalid_json" in record.payload["JSON_FORMAT"]
assert record.message # Should contain JSON parse error

0 comments on commit 25ef812

Please sign in to comment.