diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index fa9109689caad4..29faa3955ea662 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -18,6 +18,7 @@ import com.linkedin.entity.Entity; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.EnvelopedAspect; import com.linkedin.metadata.aspect.EnvelopedAspectArray; @@ -97,7 +98,7 @@ public class JavaEntityClient implements EntityClient { private final TimeseriesAspectService timeseriesAspectService; private final RollbackService rollbackService; private final EventProducer eventProducer; - private final int batchGetV2Size; + private final EntityClientConfig entityClientConfig; @Override @Nullable @@ -132,7 +133,7 @@ public Map batchGetV2( Map responseMap = new HashMap<>(); - Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size)) + Iterators.partition(urns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) .forEachRemaining( batch -> { try { @@ -159,7 +160,8 @@ public Map batchGetVersionedV2( Map responseMap = new HashMap<>(); - Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size)) + Iterators.partition( + versionedUrns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) .forEachRemaining( batch -> { try { @@ -760,48 +762,62 @@ public List batchIngestProposals( : Constants.UNKNOWN_ACTOR; final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr); - AspectsBatch batch = - AspectsBatchImpl.builder() - .mcps( - metadataChangeProposals, - auditStamp, - opContext.getRetrieverContext().get(), - opContext.getValidationContext().isAlternateValidation()) - .build(); - - List results = entityService.ingestProposal(opContext, batch, async); - entitySearchService.appendRunId(opContext, results); - - Map, List> resultMap = - results.stream() - .collect( - Collectors.groupingBy( - result -> - Pair.of( - result.getRequest().getUrn(), result.getRequest().getAspectName()))); - - // Preserve ordering - return batch.getItems().stream() - .map( - requestItem -> { - // Urns generated - List urnsForRequest = - resultMap - .getOrDefault( - Pair.of(requestItem.getUrn(), requestItem.getAspectName()), List.of()) - .stream() - .map(IngestResult::getUrn) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toList()); - - // Update runIds - urnsForRequest.forEach( - urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata())); - - return urnsForRequest.isEmpty() ? null : urnsForRequest.get(0).toString(); - }) - .collect(Collectors.toList()); + List updatedUrns = new ArrayList<>(); + Iterators.partition( + metadataChangeProposals.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) + .forEachRemaining( + batch -> { + AspectsBatch aspectsBatch = + AspectsBatchImpl.builder() + .mcps( + batch, + auditStamp, + opContext.getRetrieverContext().get(), + opContext.getValidationContext().isAlternateValidation()) + .build(); + + List results = + entityService.ingestProposal(opContext, aspectsBatch, async); + entitySearchService.appendRunId(opContext, results); + + Map, List> resultMap = + results.stream() + .collect( + Collectors.groupingBy( + result -> + Pair.of( + result.getRequest().getUrn(), + result.getRequest().getAspectName()))); + + // Preserve ordering + updatedUrns.addAll( + aspectsBatch.getItems().stream() + .map( + requestItem -> { + // Urns generated + List urnsForRequest = + resultMap + .getOrDefault( + Pair.of(requestItem.getUrn(), requestItem.getAspectName()), + List.of()) + .stream() + .map(IngestResult::getUrn) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + + // Update runIds + urnsForRequest.forEach( + urn -> + tryIndexRunId(opContext, urn, requestItem.getSystemMetadata())); + + return urnsForRequest.isEmpty() + ? null + : urnsForRequest.get(0).toString(); + }) + .collect(Collectors.toList())); + }); + return updatedUrns; } @SneakyThrows diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java index ab68abc69bce7c..eda9b3a880228f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java @@ -5,6 +5,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClientCache; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; import com.linkedin.metadata.entity.DeleteEntityService; @@ -43,7 +44,7 @@ public SystemJavaEntityClient( RollbackService rollbackService, EventProducer eventProducer, EntityClientCacheConfig cacheConfig, - int batchGetV2Size) { + EntityClientConfig entityClientConfig) { super( entityService, deleteEntityService, @@ -54,7 +55,7 @@ public SystemJavaEntityClient( timeseriesAspectService, rollbackService, eventProducer, - batchGetV2Size); + entityClientConfig); this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build(); this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java index 7b1fccafbb9e63..4d977d179f91e4 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java @@ -12,6 +12,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RequiredFieldNotPresentException; import com.linkedin.domain.Domains; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.batch.AspectsBatch; @@ -90,7 +91,7 @@ private JavaEntityClient getJavaEntityClient() { _timeseriesAspectService, rollbackService, _eventProducer, - 1); + EntityClientConfig.builder().batchGetV2Size(1).build()); } @Test diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java index e47cdf80281c9a..d5aa7e9c51983a 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.when; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration; @@ -330,6 +331,6 @@ private EntityClient entityClientHelper( null, null, null, - 1); + EntityClientConfig.builder().batchGetV2Size(1).build()); } } diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java index 889473d32d1a35..b7b698c73ddac3 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java @@ -4,6 +4,7 @@ import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.config.DataHubAppConfiguration; import com.linkedin.metadata.config.MetadataChangeProposalConfig; @@ -276,6 +277,6 @@ protected EntityClient entityClient( null, null, null, - 1); + EntityClientConfig.builder().batchGetV2Size(1).build()); } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index ef87afdef46cb7..4e356f5fb3670a 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -15,26 +15,21 @@ import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory; import com.linkedin.metadata.EventUtils; -import com.linkedin.metadata.dao.throttle.ThrottleControl; import com.linkedin.metadata.dao.throttle.ThrottleSensor; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; +import com.linkedin.metadata.kafka.util.KafkaListenerUtil; import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.mxe.FailedMetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.Topics; import io.datahubproject.metadata.context.OperationContext; -import java.io.IOException; import java.util.Optional; -import javax.annotation.Nonnull; import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -43,7 +38,6 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component; @Slf4j @@ -80,38 +74,7 @@ public class MetadataChangeProposalsProcessor { @PostConstruct public void registerConsumerThrottle() { - if (kafkaThrottle != null - && provider - .getMetadataChangeProposal() - .getThrottle() - .getComponents() - .getMceConsumer() - .isEnabled()) { - log.info("MCE Consumer Throttle Enabled"); - kafkaThrottle.addCallback( - (throttleEvent) -> { - Optional container = - Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId)); - if (container.isEmpty()) { - log.warn( - "Expected container was missing: {} throttle is not possible.", - mceConsumerGroupId); - } else { - if (throttleEvent.isThrottled()) { - container.ifPresent(MessageListenerContainer::pause); - return ThrottleControl.builder() - // resume consumer after sleep - .callback( - (resumeEvent) -> container.ifPresent(MessageListenerContainer::resume)) - .build(); - } - } - - return ThrottleControl.NONE; - }); - } else { - log.info("MCE Consumer Throttle Disabled"); - } + KafkaListenerUtil.registerThrottle(kafkaThrottle, provider, registry, mceConsumerGroupId); } @KafkaListener( @@ -132,7 +95,9 @@ public void consume(final ConsumerRecord consumerRecord) consumerRecord.serializedValueSize(), consumerRecord.timestamp()); - log.debug("Record {}", record); + if (log.isDebugEnabled()) { + log.debug("Record {}", record); + } MetadataChangeProposal event = new MetadataChangeProposal(); try { @@ -148,45 +113,18 @@ public void consume(final ConsumerRecord consumerRecord) MDC.put( MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse("")); - log.debug("MetadataChangeProposal {}", event); - // TODO: Get this from the event itself. + if (log.isDebugEnabled()) { + log.debug("MetadataChangeProposal {}", event); + } String urn = entityClient.ingestProposal(systemOperationContext, event, false); log.info("Successfully processed MCP event urn: {}", urn); } catch (Throwable throwable) { log.error("MCP Processor Error", throwable); log.error("Message: {}", record); - sendFailedMCP(event, throwable); + KafkaListenerUtil.sendFailedMCP(event, throwable, fmcpTopicName, kafkaProducer); } } finally { MDC.clear(); } } - - private void sendFailedMCP(@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) { - final FailedMetadataChangeProposal failedMetadataChangeProposal = - createFailedMCPEvent(event, throwable); - try { - final GenericRecord genericFailedMCERecord = - EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal); - log.debug("Sending FailedMessages to topic - {}", fmcpTopicName); - log.info( - "Error while processing FMCP: FailedMetadataChangeProposal - {}", - failedMetadataChangeProposal); - kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord)); - } catch (IOException e) { - log.error( - "Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}", - e.getStackTrace(), - failedMetadataChangeProposal); - } - } - - @Nonnull - private FailedMetadataChangeProposal createFailedMCPEvent( - @Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) { - final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal(); - fmcp.setError(ExceptionUtils.getStackTrace(throwable)); - fmcp.setMetadataChangeProposal(event); - return fmcp; - } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java new file mode 100644 index 00000000000000..fed93628fe4d79 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java @@ -0,0 +1,116 @@ +package com.linkedin.metadata.kafka.batch; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.dao.throttle.ThrottleSensor; +import com.linkedin.metadata.kafka.config.batch.BatchMetadataChangeProposalProcessorCondition; +import com.linkedin.metadata.kafka.util.KafkaListenerUtil; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.Topics; +import io.datahubproject.metadata.context.OperationContext; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@Import({RestliEntityClientFactory.class}) +@Conditional(BatchMetadataChangeProposalProcessorCondition.class) +@EnableKafka +@RequiredArgsConstructor +public class BatchMetadataChangeProposalsProcessor { + private static final String CONSUMER_GROUP_ID_VALUE = + "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}"; + + private final OperationContext systemOperationContext; + private final SystemEntityClient entityClient; + private final Producer kafkaProducer; + + @Qualifier("kafkaThrottle") + private final ThrottleSensor kafkaThrottle; + + private final KafkaListenerEndpointRegistry registry; + private final ConfigurationProvider provider; + + private final Histogram kafkaLagStats = + MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); + + @Value( + "${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + + Topics.FAILED_METADATA_CHANGE_PROPOSAL + + "}") + private String fmcpTopicName; + + @Value(CONSUMER_GROUP_ID_VALUE) + private String mceConsumerGroupId; + + @PostConstruct + public void registerConsumerThrottle() { + KafkaListenerUtil.registerThrottle(kafkaThrottle, provider, registry, mceConsumerGroupId); + } + + @KafkaListener( + id = CONSUMER_GROUP_ID_VALUE, + topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}", + containerFactory = "kafkaEventConsumer", + batch = "true") + public void consume(final List> consumerRecords) { + try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) { + List metadataChangeProposals = + new ArrayList<>(consumerRecords.size()); + for (ConsumerRecord consumerRecord : consumerRecords) { + kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); + final GenericRecord record = consumerRecord.value(); + + log.info( + "Got MCP event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); + + MetadataChangeProposal event = new MetadataChangeProposal(); + try { + event = EventUtils.avroToPegasusMCP(record); + } catch (Throwable throwable) { + log.error("MCP Processor Error", throwable); + log.error("Message: {}", record); + KafkaListenerUtil.sendFailedMCP(event, throwable, fmcpTopicName, kafkaProducer); + } + metadataChangeProposals.add(event); + } + + try { + List urns = + entityClient.batchIngestProposals( + systemOperationContext, metadataChangeProposals, false); + log.info("Successfully processed MCP event urns: {}", urns); + } catch (Exception e) { + // Java client should never throw this + log.error("Exception in batch ingest", e); + } + } + } +} diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java index 1cdb05b04e0ac9..554684d5e8fe77 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java @@ -9,7 +9,8 @@ public class MetadataChangeProposalProcessorCondition implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { Environment env = context.getEnvironment(); - return "true".equals(env.getProperty("MCE_CONSUMER_ENABLED")) - || "true".equals(env.getProperty("MCP_CONSUMER_ENABLED")); + return ("true".equals(env.getProperty("MCE_CONSUMER_ENABLED")) + || "true".equals(env.getProperty("MCP_CONSUMER_ENABLED"))) + && !Boolean.parseBoolean(env.getProperty("MCP_CONSUMER_BATCH_ENABLED")); } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/batch/BatchMetadataChangeProposalProcessorCondition.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/batch/BatchMetadataChangeProposalProcessorCondition.java new file mode 100644 index 00000000000000..296e37c7a90695 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/batch/BatchMetadataChangeProposalProcessorCondition.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.kafka.config.batch; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + +public class BatchMetadataChangeProposalProcessorCondition implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + Environment env = context.getEnvironment(); + return ("true".equals(env.getProperty("MCE_CONSUMER_ENABLED")) + || "true".equals(env.getProperty("MCP_CONSUMER_ENABLED"))) + && Boolean.parseBoolean(env.getProperty("MCP_CONSUMER_BATCH_ENABLED")); + } +} diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/util/KafkaListenerUtil.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/util/KafkaListenerUtil.java new file mode 100644 index 00000000000000..874a45c995e911 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/util/KafkaListenerUtil.java @@ -0,0 +1,96 @@ +package com.linkedin.metadata.kafka.util; + +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.dao.throttle.ThrottleControl; +import com.linkedin.metadata.dao.throttle.ThrottleSensor; +import com.linkedin.mxe.FailedMetadataChangeProposal; +import com.linkedin.mxe.MetadataChangeProposal; +import java.io.IOException; +import java.util.Optional; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; + +@Slf4j +public class KafkaListenerUtil { + + private KafkaListenerUtil() {} + + public static void registerThrottle( + ThrottleSensor kafkaThrottle, + ConfigurationProvider provider, + KafkaListenerEndpointRegistry registry, + String mceConsumerGroupId) { + if (kafkaThrottle != null + && provider + .getMetadataChangeProposal() + .getThrottle() + .getComponents() + .getMceConsumer() + .isEnabled()) { + log.info("MCE Consumer Throttle Enabled"); + kafkaThrottle.addCallback( + (throttleEvent) -> { + Optional container = + Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId)); + if (container.isEmpty()) { + log.warn( + "Expected container was missing: {} throttle is not possible.", + mceConsumerGroupId); + } else { + if (throttleEvent.isThrottled()) { + container.ifPresent(MessageListenerContainer::pause); + return ThrottleControl.builder() + // resume consumer after sleep + .callback( + (resumeEvent) -> container.ifPresent(MessageListenerContainer::resume)) + .build(); + } + } + + return ThrottleControl.NONE; + }); + } else { + log.info("MCE Consumer Throttle Disabled"); + } + } + + public static void sendFailedMCP( + @Nonnull MetadataChangeProposal event, + @Nonnull Throwable throwable, + String fmcpTopicName, + Producer kafkaProducer) { + final FailedMetadataChangeProposal failedMetadataChangeProposal = + createFailedMCPEvent(event, throwable); + try { + final GenericRecord genericFailedMCERecord = + EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal); + log.debug("Sending FailedMessages to topic - {}", fmcpTopicName); + log.info( + "Error while processing FMCP: FailedMetadataChangeProposal - {}", + failedMetadataChangeProposal); + kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord)); + } catch (IOException e) { + log.error( + "Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}", + e.getStackTrace(), + failedMetadataChangeProposal); + } + } + + @Nonnull + public static FailedMetadataChangeProposal createFailedMCPEvent( + @Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) { + final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal(); + fmcp.setError(ExceptionUtils.getStackTrace(throwable)); + fmcp.setMetadataChangeProposal(event); + return fmcp; + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java index e99978a26d6cf5..e783b4e1963d0a 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java @@ -50,7 +50,7 @@ public EntityClient entityClient( _timeseriesAspectService, rollbackService, _eventProducer, - entityClientConfig.getBatchGetV2Size()); + entityClientConfig); } @Bean("systemEntityClient") @@ -79,6 +79,6 @@ public SystemEntityClient systemEntityClient( rollbackService, _eventProducer, entityClientCacheConfig, - entityClientConfig.getBatchGetV2Size()); + entityClientConfig); } }