Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from datahub-project:master #846

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class Constants {
// App sources
public static final String UI_SOURCE = "ui";
public static final String SYSTEM_UPDATE_SOURCE = "systemUpdate";
public static final String METADATA_TESTS_SOURCE = "metadataTests";

/** Entities */
public static final String CORP_USER_ENTITY_NAME = "corpuser";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@

if (inputBatch.containsDuplicateAspects()) {
log.warn(String.format("Batch contains duplicates: %s", inputBatch));
MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc();
}

return aspectDao
Expand Down Expand Up @@ -928,13 +929,15 @@

// No changes, return
if (changeMCPs.isEmpty()) {
MetricUtils.counter(EntityServiceImpl.class, "batch_empty").inc();
return Collections.<UpdateAspectResult>emptyList();
}

// do final pre-commit checks with previous aspect value
ValidationExceptionCollection exceptions =
AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get());
if (!exceptions.isEmpty()) {
MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc();
throw new ValidationException(collectMetrics(exceptions).toString());
}

Expand Down Expand Up @@ -972,10 +975,13 @@
*/
if (overwrite || databaseAspect == null) {
result =
ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect)
.toBuilder()
.request(writeItem)
.build();
Optional.ofNullable(
ingestAspectToLocalDB(
txContext, writeItem, databaseSystemAspect))
.map(
optResult ->
optResult.toBuilder().request(writeItem).build())
.orElse(null);

} else {
RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
Expand All @@ -996,49 +1002,56 @@

return result;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {
txContext.commitAndContinue();
}
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
log.info("Ingestion of aspects batch to database took {} ms", took);
}
if (!upsertResults.isEmpty()) {
// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {

Check warning on line 1010 in metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Constant values

Condition `txContext != null` is always `true`
txContext.commitAndContinue();
}
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
log.info("Ingestion of aspects batch to database took {} ms", took);
}

// Retention optimization and tx
if (retentionService != null) {
List<RetentionService.RetentionContext> retentionBatch =
upsertResults.stream()
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
.get(result.getUrn().toString())
.containsKey(result.getRequest().getAspectName()))
.filter(
result -> {
RecordTemplate oldAspect = result.getOldValue();
RecordTemplate newAspect = result.getNewValue();
// Apply retention policies if there was an update to existing aspect
// value
return oldAspect != newAspect
&& oldAspect != null
&& retentionService != null;
})
.map(
result ->
RetentionService.RetentionContext.builder()
.urn(result.getUrn())
.aspectName(result.getRequest().getAspectName())
.maxVersion(Optional.of(result.getMaxVersion()))
.build())
.collect(Collectors.toList());
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
// Retention optimization and tx
if (retentionService != null) {
List<RetentionService.RetentionContext> retentionBatch =
upsertResults.stream()
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
.get(result.getUrn().toString())
.containsKey(result.getRequest().getAspectName()))
.filter(
result -> {
RecordTemplate oldAspect = result.getOldValue();
RecordTemplate newAspect = result.getNewValue();
// Apply retention policies if there was an update to existing
// aspect
// value
return oldAspect != newAspect
&& oldAspect != null
&& retentionService != null;
})
.map(
result ->
RetentionService.RetentionContext.builder()
.urn(result.getUrn())
.aspectName(result.getRequest().getAspectName())
.maxVersion(Optional.of(result.getMaxVersion()))
.build())
.collect(Collectors.toList());
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);

Check warning on line 1048 in metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Nullability and data flow problems

Method invocation `applyRetentionWithPolicyDefaults` may produce `NullPointerException`
} else {
log.warn("Retention service is missing!");
}
} else {
log.warn("Retention service is missing!");
MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc();
log.warn("Empty transaction detected. {}", inputBatch);
}

return upsertResults;
Expand Down Expand Up @@ -2246,7 +2259,7 @@
final RollbackResult result =
aspectDao.runInTransactionWithRetry(
(txContext) -> {
Integer additionalRowsDeleted = 0;

Check warning on line 2262 in metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Wrapper type may be primitive

Type may be primitive

// 1. Fetch the latest existing version of the aspect.
final EntityAspect.EntitySystemAspect latest =
Expand All @@ -2268,7 +2281,7 @@
}

// 3. Check if this is a key aspect
Boolean isKeyAspect = opContext.getKeyAspectName(entityUrn).equals(aspectName);

Check warning on line 2284 in metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Wrapper type may be primitive

Type may be primitive

// 4. Fetch all preceding aspects, that match
List<EntityAspect> aspectsToDelete = new ArrayList<>();
Expand Down Expand Up @@ -2506,7 +2519,7 @@
* @param databaseAspect The aspect as it exists in the database.
* @return result object
*/
@Nonnull
@Nullable
private UpdateAspectResult ingestAspectToLocalDB(
@Nullable TransactionContext txContext,
@Nonnull final ChangeMCP writeItem,
Expand All @@ -2520,6 +2533,9 @@
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);

// 2. Compare the latest existing and new.
final RecordTemplate databaseValue =
databaseAspect == null ? null : databaseAspect.getRecordTemplate();

final EntityAspect.EntitySystemAspect previousBatchAspect =
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
final RecordTemplate previousValue =
Expand All @@ -2528,7 +2544,7 @@
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (previousValue != null
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
&& DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {

SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
Expand Down Expand Up @@ -2564,45 +2580,49 @@
}

// 4. Save the newValue as the latest version
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
txContext,
writeItem.getUrn().toString(),
writeItem.getAspectName(),
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
previousBatchAspect == null
? null
: previousBatchAspect.getEntityAspect().getCreatedFor(),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
newValueStr,
writeItem.getAuditStamp().getActor().toString(),
writeItem.getAuditStamp().hasImpersonator()
? writeItem.getAuditStamp().getImpersonator().toString()
: null,
new Timestamp(writeItem.getAuditStamp().getTime()),
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
writeItem.getNextAspectVersion());

// metrics
aspectDao.incrementWriteMetrics(
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(writeItem.getRecordTemplate())
.oldSystemMetadata(
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
.newSystemMetadata(writeItem.getSystemMetadata())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(versionOfOld)
.build();
if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
txContext,
writeItem.getUrn().toString(),
writeItem.getAspectName(),
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
previousBatchAspect == null
? null
: previousBatchAspect.getEntityAspect().getCreatedFor(),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
newValueStr,
writeItem.getAuditStamp().getActor().toString(),
writeItem.getAuditStamp().hasImpersonator()
? writeItem.getAuditStamp().getImpersonator().toString()
: null,
new Timestamp(writeItem.getAuditStamp().getTime()),
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
writeItem.getNextAspectVersion());

// metrics
aspectDao.incrementWriteMetrics(
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(writeItem.getRecordTemplate())
.oldSystemMetadata(
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
.newSystemMetadata(writeItem.getSystemMetadata())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(versionOfOld)
.build();
}

return null;
}

private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) {
Expand Down
Loading
Loading