From 32ef3894400ac3aaac6f9d5b780aa58a9f6fb48c Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Mon, 25 Nov 2024 21:20:02 -0600 Subject: [PATCH] fix(dataProduct): reduce write fan-out for unset side effect (#11951) --- .../DataProductUnsetSideEffect.java | 61 +++++----- .../DataProductUnsetSideEffectTest.java | 107 ++++++++++++++++++ 2 files changed, 141 insertions(+), 27 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java b/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java index 544040d14f8b7..9c4bb52f014fc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffect.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -70,6 +71,7 @@ private static Stream generatePatchRemove( log.error("Unable to process data product properties for urn: {}", mclItem.getUrn()); return Stream.empty(); } + Map> patchOpMap = new HashMap<>(); for (DataProductAssociation dataProductAssociation : Optional.ofNullable(dataProductProperties.getAssets()) .orElse(new DataProductAssociationArray())) { @@ -93,40 +95,45 @@ private static Stream generatePatchRemove( if (!result.getEntities().isEmpty()) { for (RelatedEntities entity : result.getEntities()) { if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) { - EntitySpec entitySpec = - retrieverContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(DATA_PRODUCT_ENTITY_NAME); GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); patchOp.setOp(PatchOperationType.REMOVE.getValue()); patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn())); - mcpItems.add( - PatchItemImpl.builder() - .urn(UrnUtils.getUrn(entity.getSourceUrn())) - .entitySpec( - retrieverContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) - .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) - .aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) - .patch( - GenericJsonPatch.builder() - .arrayPrimaryKeys( - Map.of( - DataProductPropertiesTemplate.ASSETS_FIELD_NAME, - List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME))) - .patch(List.of(patchOp)) - .build() - .getJsonPatch()) - .auditStamp(mclItem.getAuditStamp()) - .systemMetadata(mclItem.getSystemMetadata()) - .build(retrieverContext.getAspectRetriever().getEntityRegistry())); + patchOpMap + .computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>()) + .add(patchOp); } } } } + for (String urn : patchOpMap.keySet()) { + EntitySpec entitySpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME); + mcpItems.add( + PatchItemImpl.builder() + .urn(UrnUtils.getUrn(urn)) + .entitySpec( + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys( + Map.of( + DataProductPropertiesTemplate.ASSETS_FIELD_NAME, + List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME))) + .patch(patchOpMap.get(urn)) + .build() + .getJsonPatch()) + .auditStamp(mclItem.getAuditStamp()) + .systemMetadata(mclItem.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry())); + } return mcpItems.stream(); } return Stream.empty(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java index 1151014bf1162..12dd57f94da23 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/dataproducts/sideeffects/DataProductUnsetSideEffectTest.java @@ -34,6 +34,8 @@ import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.test.metadata.aspect.TestEntityRegistry; import io.datahubproject.metadata.context.RetrieverContext; +import jakarta.json.JsonArray; +import jakarta.json.JsonObject; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -251,6 +253,111 @@ public void testDPRemoveOld() { .build(mockAspectRetriever.getEntityRegistry()))); } + @Test + public void testBulkAssetMove() { + DataProductUnsetSideEffect test = new DataProductUnsetSideEffect(); + test.setConfig(TEST_PLUGIN_CONFIG); + + // Create 100 dataset URNs and set up their existing relationships + List datasetUrns = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Urn datasetUrn = + UrnUtils.getUrn( + String.format("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_%d,PROD)", i)); + datasetUrns.add(datasetUrn); + + // Mock the existing relationship for each dataset with the old data product + RelatedEntities relatedEntities = + new RelatedEntities( + "DataProductContains", + TEST_PRODUCT_URN_2.toString(), // Old data product + datasetUrn.toString(), + RelationshipDirection.INCOMING, + null); + + List relatedEntitiesList = new ArrayList<>(); + relatedEntitiesList.add(relatedEntities); + RelatedEntitiesScrollResult relatedEntitiesScrollResult = + new RelatedEntitiesScrollResult(1, 10, null, relatedEntitiesList); + + when(retrieverContext + .getGraphRetriever() + .scrollRelatedEntities( + eq(null), + eq(QueryUtils.newFilter("urn", datasetUrn.toString())), + eq(null), + eq(EMPTY_FILTER), + eq(ImmutableList.of("DataProductContains")), + eq( + QueryUtils.newRelationshipFilter( + EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(Collections.emptyList()), + eq(null), + eq(10), + eq(null), + eq(null))) + .thenReturn(relatedEntitiesScrollResult); + } + + // Create data product properties with all 100 assets + DataProductProperties dataProductProperties = new DataProductProperties(); + DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray(); + for (Urn datasetUrn : datasetUrns) { + DataProductAssociation association = new DataProductAssociation(); + association.setDestinationUrn(datasetUrn); + dataProductAssociations.add(association); + } + dataProductProperties.setAssets(dataProductAssociations); + + // Run test + ChangeItemImpl dataProductPropertiesChangeItem = + ChangeItemImpl.builder() + .urn(TEST_PRODUCT_URN) // New data product + .aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)) + .aspectSpec( + TEST_REGISTRY + .getEntitySpec(DATA_PRODUCT_ENTITY_NAME) + .getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)) + .recordTemplate(dataProductProperties) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + List testOutput = + test.postMCPSideEffect( + List.of( + MCLItemImpl.builder() + .build( + dataProductPropertiesChangeItem, + null, + null, + retrieverContext.getAspectRetriever())), + retrieverContext) + .toList(); + + // Verify test + assertEquals(testOutput.size(), 1, "Expected one patch to remove assets from old data product"); + + MCPItem patchItem = testOutput.get(0); + assertEquals( + patchItem.getUrn(), TEST_PRODUCT_URN_2, "Patch should target the old data product"); + assertEquals(patchItem.getAspectName(), DATA_PRODUCT_PROPERTIES_ASPECT_NAME); + + // Verify the patch contains remove operations for all 100 assets + JsonArray patchArray = ((PatchItemImpl) patchItem).getPatch().toJsonArray(); + assertEquals(patchArray.size(), 100, "Should have 100 remove operations"); + + // Verify each remove operation + for (int i = 0; i < 100; i++) { + JsonObject op = patchArray.getJsonObject(i); + assertEquals(op.getString("op"), PatchOperationType.REMOVE.getValue()); + assertEquals( + op.getString("path"), + String.format("/assets/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_%d,PROD)", i)); + } + } + private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) { DataProductProperties dataProductProperties = new DataProductProperties(); DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray();