From e902d7abcc85a44587c067a12771137f53543c6f Mon Sep 17 00:00:00 2001 From: Francesco Parisi Date: Thu, 8 Feb 2024 11:24:11 +0100 Subject: [PATCH] NOD-690: wip --- .../cfgsync/EventHubClientApplication.java | 14 +---- .../config/EventHubClientConfiguration.java | 25 ++------- .../EventHubProcessorClientConfiguration.java | 28 ---------- .../config/MessageReceiveConfiguration.java | 53 ------------------- .../config/NodoONexiConfiguration.java | 4 +- .../config/NodoPagoPAPConfiguration.java | 4 +- 6 files changed, 9 insertions(+), 119 deletions(-) delete mode 100644 src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubProcessorClientConfiguration.java delete mode 100644 src/main/java/it/gov/pagopa/node/cfgsync/config/MessageReceiveConfiguration.java diff --git a/src/main/java/it/gov/pagopa/node/cfgsync/EventHubClientApplication.java b/src/main/java/it/gov/pagopa/node/cfgsync/EventHubClientApplication.java index aa7423c..937bf9f 100644 --- a/src/main/java/it/gov/pagopa/node/cfgsync/EventHubClientApplication.java +++ b/src/main/java/it/gov/pagopa/node/cfgsync/EventHubClientApplication.java @@ -12,13 +12,9 @@ @SpringBootApplication public class EventHubClientApplication implements CommandLineRunner { - private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class); -// private final EventHubProducerClient eventHubProducerClient; private final EventProcessorClient eventProcessorClient; - public EventHubClientApplication(//EventHubProducerClient eventHubProducerClient, - EventProcessorClient eventProcessorClient) { -// this.eventHubProducerClient = eventHubProducerClient; + public EventHubClientApplication(EventProcessorClient eventProcessorClient) { this.eventProcessorClient = eventProcessorClient; } @@ -29,14 +25,6 @@ public static void main(String[] args) { @Override public void run(String... args) throws Exception { eventProcessorClient.start(); - // Wait for the processor client to be ready - TimeUnit.SECONDS.sleep(10); - -// eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World"))); -// LOGGER.info("Successfully sent a message to Event Hubs."); -// eventHubProducerClient.close(); - LOGGER.info("Stopping and closing the processor"); - eventProcessorClient.stop(); } } \ No newline at end of file diff --git a/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubClientConfiguration.java b/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubClientConfiguration.java index bbf648b..8c65f82 100644 --- a/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubClientConfiguration.java +++ b/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubClientConfiguration.java @@ -19,7 +19,7 @@ public class EventHubClientConfiguration { private static final String CONSUMER_GROUP = "$Default"; - private static final String STORAGE_ACCOUNT_ENDPOINT = ""; + private static final String STORAGE_ACCOUNT_CONNECTION_STRING = ""; private static final String STORAGE_CONTAINER_NAME = "prova"; @Value("${nodo-dei-pagamenti-cache-rx-connection-string}") @@ -29,16 +29,12 @@ public class EventHubClientConfiguration { @Bean EventHubClientBuilder eventHubClientBuilder() { - return new EventHubClientBuilder().credential(nodoCacheRxConnectionString, nodoCacheRxName, - new DefaultAzureCredentialBuilder() - .build()); + return new EventHubClientBuilder().connectionString(nodoCacheRxConnectionString); } @Bean BlobContainerClientBuilder blobContainerClientBuilder() { - return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder() - .build()) - .endpoint(STORAGE_ACCOUNT_ENDPOINT) + return new BlobContainerClientBuilder().connectionString(STORAGE_ACCOUNT_CONNECTION_STRING) .containerName(STORAGE_CONTAINER_NAME); } @@ -49,26 +45,13 @@ BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blo @Bean EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) { - return new EventProcessorClientBuilder().credential(nodoCacheRxConnectionString, nodoCacheRxName, - new DefaultAzureCredentialBuilder() - .build()) + return new EventProcessorClientBuilder().connectionString(nodoCacheRxConnectionString) .consumerGroup(CONSUMER_GROUP) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .processEvent(EventHubClientConfiguration::processEvent) .processError(EventHubClientConfiguration::processError); } -// @Bean -// EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) { -// return eventHubClientBuilder.buildProducerClient(); -// -// } - - @Bean - EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) { - return eventProcessorClientBuilder.buildEventProcessorClient(); - } - public static void processEvent(EventContext eventContext) { log.info("Processing event from partition {} with sequence number {} with body: {}", eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), diff --git a/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubProcessorClientConfiguration.java b/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubProcessorClientConfiguration.java deleted file mode 100644 index e8d00e5..0000000 --- a/src/main/java/it/gov/pagopa/node/cfgsync/config/EventHubProcessorClientConfiguration.java +++ /dev/null @@ -1,28 +0,0 @@ -package it.gov.pagopa.node.cfgsync.config; - -import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler; -import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Slf4j -public class EventHubProcessorClientConfiguration { - - @Bean - EventHubsRecordMessageListener processEvent() { - return eventContext->log.info("Processing event from partition {} with sequence number {} with body: {}", - eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), - eventContext.getEventData().getBodyAsString()); - } - - @Bean - EventHubsErrorHandler processError() { - return errorContext->log.info("Error occurred in partition processor for partition {}, {}", - errorContext.getPartitionContext().getPartitionId(), - errorContext.getThrowable().getMessage(), - errorContext.getThrowable()); - } - -} \ No newline at end of file diff --git a/src/main/java/it/gov/pagopa/node/cfgsync/config/MessageReceiveConfiguration.java b/src/main/java/it/gov/pagopa/node/cfgsync/config/MessageReceiveConfiguration.java deleted file mode 100644 index d4fde39..0000000 --- a/src/main/java/it/gov/pagopa/node/cfgsync/config/MessageReceiveConfiguration.java +++ /dev/null @@ -1,53 +0,0 @@ -//package it.gov.pagopa.node.cfg_sync.config; -// -//import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; -//import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory; -//import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; -//import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; -//import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; -//import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.beans.factory.annotation.Qualifier; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Configuration; -//import org.springframework.integration.annotation.ServiceActivator; -//import org.springframework.integration.channel.DirectChannel; -//import org.springframework.messaging.MessageChannel; -// -//@Configuration -//@Slf4j -//public class MessageReceiveConfiguration { -// -// private static final String INPUT_CHANNEL = "input"; -// private static final String EVENT_HUB_NAME = ""; -// private static final String CONSUMER_GROUP = "$Default"; -// -// @ServiceActivator(inputChannel = INPUT_CHANNEL) -// public void messageReceiver(byte[] payload) { -// String message = new String(payload); -// log.info("New message received: {}", message); -// } -// -// @Bean -// public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { -// EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); -// containerProperties.setEventHubName(EVENT_HUB_NAME); -// containerProperties.setConsumerGroup(CONSUMER_GROUP); -// containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); -// return new EventHubsMessageListenerContainer(processorFactory, containerProperties); -// } -// -// @Bean -// public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, -// EventHubsMessageListenerContainer listenerContainer) { -// EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer); -// adapter.setOutputChannel(inputChannel); -// return adapter; -// } -// -// @Bean -// public MessageChannel input() { -// return new DirectChannel(); -// } -// -//} \ No newline at end of file diff --git a/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoONexiConfiguration.java b/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoONexiConfiguration.java index 27e9672..f5929ea 100644 --- a/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoONexiConfiguration.java +++ b/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoONexiConfiguration.java @@ -19,7 +19,7 @@ @PropertySource(value = "classpath:/application-${spring.profiles.active}.properties", ignoreResourceNotFound = true) }) @EnableJpaRepositories( - basePackages = "it.gov.pagopa.node.cfg_sync.repository.nexi", + basePackages = "it.gov.pagopa.node.cfgsync.repository.nexi", entityManagerFactoryRef = "nodoNexiOEntityManager", transactionManagerRef = "nodoNexiOTransactionManager" ) @@ -32,7 +32,7 @@ public LocalContainerEntityManagerFactoryBean nodoNexiOEntityManager() { LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean(); em.setDataSource(nodoNexiODataSource()); - em.setPackagesToScan("it.gov.pagopa.node.cfg_sync.repository.model.nexi"); + em.setPackagesToScan("it.gov.pagopa.node.cfgsync.repository.model.nexi"); HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); diff --git a/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoPagoPAPConfiguration.java b/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoPagoPAPConfiguration.java index fa6bf07..4423cca 100644 --- a/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoPagoPAPConfiguration.java +++ b/src/main/java/it/gov/pagopa/node/cfgsync/repository/config/NodoPagoPAPConfiguration.java @@ -19,7 +19,7 @@ @PropertySource(value = "classpath:/application-${spring.profiles.active}.properties", ignoreResourceNotFound = true) }) @EnableJpaRepositories( - basePackages = "it.gov.pagopa.node.cfg_sync.repository.pagopa", + basePackages = "it.gov.pagopa.node.cfgsync.repository.pagopa", entityManagerFactoryRef = "nodoPagoPAPEntityManager", transactionManagerRef = "nodoPagoPAPTransactionManager" ) @@ -34,7 +34,7 @@ public LocalContainerEntityManagerFactoryBean nodoPagoPAPEntityManager() { LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean(); em.setDataSource(nodoPagoPAPDataSource()); - em.setPackagesToScan("it.gov.pagopa.node.cfg_sync.repository.model.pagopa"); + em.setPackagesToScan("it.gov.pagopa.node.cfgsync.repository.model.pagopa"); HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();