Skip to content

Commit

Permalink
NOD-690: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fparisitas committed Feb 8, 2024
1 parent 377f02d commit a3f386c
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 123 deletions.
28 changes: 16 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
<oracle.version>21.4.0.0</oracle.version>
</properties>

<!-- <dependencyManagement>-->
<!-- <dependencies>-->
<!-- <dependency>-->
<!-- <groupId>com.azure.spring</groupId>-->
<!-- <artifactId>spring-cloud-azure-dependencies</artifactId>-->
<!-- <version>4.5.0</version>-->
<!-- <type>pom</type>-->
<!-- <scope>import</scope>-->
<!-- </dependency>-->
<!-- </dependencies>-->
<!-- </dependencyManagement>-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-dependencies</artifactId>
<version>4.5.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
Expand Down Expand Up @@ -98,11 +98,15 @@
<artifactId>caffeine</artifactId>
</dependency>

<!-- &lt;!&ndash; Azure &ndash;&gt;-->
<!-- Azure -->
<!-- <dependency>-->
<!-- <groupId>com.azure.spring</groupId>-->
<!-- <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

<!-- Feign Client -->
<dependency>
Expand Down
72 changes: 42 additions & 30 deletions src/main/java/it/gov/pagopa/node/cfg_sync/Application.java
Original file line number Diff line number Diff line change
@@ -1,40 +1,52 @@
package it.gov.pagopa.node.cfg_sync;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;

@SpringBootApplication
@EnableAutoConfiguration(exclude = {
DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
HibernateJpaAutoConfiguration.class})
@Slf4j
public class Application implements CommandLineRunner {

//package it.gov.pagopa.node.cfg_sync;
//
//import com.azure.messaging.eventhubs.EventData;
//import com.azure.messaging.eventhubs.EventHubProducerClient;
//import com.azure.messaging.eventhubs.EventProcessorClient;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.boot.SpringApplication;
//import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
//import org.springframework.boot.autoconfigure.SpringBootApplication;
//import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
//import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
//import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
//
//import java.util.Collections;
//import java.util.concurrent.TimeUnit;
//
//@SpringBootApplication
//@EnableAutoConfiguration(exclude = {
// DataSourceAutoConfiguration.class,
// DataSourceTransactionManagerAutoConfiguration.class,
// HibernateJpaAutoConfiguration.class})
//@Slf4j
//public class Application implements CommandLineRunner {
//
//// private final EventHubProducerClient eventHubProducerClient;
// private final EventProcessorClient eventProcessorClient;
//
// public Application(EventProcessorClient eventProcessorClient) {
// public Application(//EventHubProducerClient eventHubProducerClient,
// EventProcessorClient eventProcessorClient) {
//// this.eventHubProducerClient = eventHubProducerClient;
// this.eventProcessorClient = eventProcessorClient;
// }

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Override
public void run(String... args) throws Exception {
//
// public static void main(String[] args) {
// SpringApplication.run(Application.class, args);
// }
//
// @Override
// public void run(String... args) throws Exception {
// eventProcessorClient.start();
// // Wait for the processor client to be ready
// TimeUnit.SECONDS.sleep(10);
//
//// eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
//// log.info("Successfully sent a message to Event Hubs.");
//// eventHubProducerClient.close();
// log.info("Stopping and closing the processor");
// eventProcessorClient.stop();
}

}
// }
//
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package it.gov.pagopa.node.cfg_sync;

import com.azure.messaging.eventhubs.EventProcessorClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class EventHubClientApplication implements CommandLineRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
// private final EventHubProducerClient eventHubProducerClient;
private final EventProcessorClient eventProcessorClient;

public EventHubClientApplication(//EventHubProducerClient eventHubProducerClient,
EventProcessorClient eventProcessorClient) {
// this.eventHubProducerClient = eventHubProducerClient;
this.eventProcessorClient = eventProcessorClient;
}

public static void main(String[] args) {
SpringApplication.run(EventHubClientApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
eventProcessorClient.start();
// Wait for the processor client to be ready
TimeUnit.SECONDS.sleep(10);

// eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
// LOGGER.info("Successfully sent a message to Event Hubs.");
// eventHubProducerClient.close();
LOGGER.info("Stopping and closing the processor");
eventProcessorClient.stop();
}

}
Original file line number Diff line number Diff line change
@@ -1,82 +1,86 @@
//package it.gov.pagopa.node.cfg_sync.config;
//
//import com.azure.identity.DefaultAzureCredentialBuilder;
//import com.azure.messaging.eventhubs.EventHubClientBuilder;
//import com.azure.messaging.eventhubs.EventHubProducerClient;
//import com.azure.messaging.eventhubs.EventProcessorClient;
//import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
//import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
//import com.azure.messaging.eventhubs.models.ErrorContext;
//import com.azure.messaging.eventhubs.models.EventContext;
//import com.azure.storage.blob.BlobContainerAsyncClient;
//import com.azure.storage.blob.BlobContainerClientBuilder;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//@Configuration
//public class EventHubClientConfiguration {
//
// private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class);
// private static final String CONSUMER_GROUP = "$Default";
// private static final String STORAGE_ACCOUNT_ENDPOINT = "https://pagopadapiconfigfesa.blob.core.windows.net/";
// private static final String STORAGE_CONTAINER_NAME = "cfg-sync";
//
// @Value("${nodo-dei-pagamenti-cache-rx-connection-string}")
// private String ndpConnectionString;
//
// @Value("${nodo-dei-pagamenti-cache-rx-name}")
// private String ndpEventHubName;
//
// @Bean
// EventHubClientBuilder eventHubClientBuilder() {
// return new EventHubClientBuilder().credential(ndpConnectionString, ndpEventHubName,
// new DefaultAzureCredentialBuilder()
// .build());
// }
//
// @Bean
// BlobContainerClientBuilder blobContainerClientBuilder() {
// return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder()
// .build())
// .endpoint(STORAGE_ACCOUNT_ENDPOINT)
// .containerName(STORAGE_CONTAINER_NAME);
// }
//
// @Bean
// BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) {
// return blobContainerClientBuilder.buildAsyncClient();
// }
//
package it.gov.pagopa.node.cfg_sync.config;

import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class EventHubClientConfiguration {

private static final String CONSUMER_GROUP = "$Default";
private static final String STORAGE_ACCOUNT_ENDPOINT = "";
private static final String STORAGE_CONTAINER_NAME = "prova";

@Value("${nodo-dei-pagamenti-cache-rx-connection-string}")
private String nodoCacheRxConnectionString;
@Value("${nodo-dei-pagamenti-cache-rx-name}")
private String nodoCacheRxName;

@Bean
EventHubClientBuilder eventHubClientBuilder() {
return new EventHubClientBuilder().credential(nodoCacheRxConnectionString, nodoCacheRxName,
new DefaultAzureCredentialBuilder()
.build());
}

@Bean
BlobContainerClientBuilder blobContainerClientBuilder() {
return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder()
.build())
.endpoint(STORAGE_ACCOUNT_ENDPOINT)
.containerName(STORAGE_CONTAINER_NAME);
}

@Bean
BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) {
return blobContainerClientBuilder.buildAsyncClient();
}

@Bean
EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) {
return new EventProcessorClientBuilder().credential(nodoCacheRxConnectionString, nodoCacheRxName,
new DefaultAzureCredentialBuilder()
.build())
.consumerGroup(CONSUMER_GROUP)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(EventHubClientConfiguration::processEvent)
.processError(EventHubClientConfiguration::processError);
}

// @Bean
// EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) {
// return new EventProcessorClientBuilder().credential(ndpConnectionString, ndpEventHubName,
// new DefaultAzureCredentialBuilder()
// .build())
// .consumerGroup(CONSUMER_GROUP)
// .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
// .processEvent(EventHubClientConfiguration::processEvent)
// .processError(EventHubClientConfiguration::processError);
// }
// EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) {
// return eventHubClientBuilder.buildProducerClient();
//
// @Bean
// EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) {
// return eventProcessorClientBuilder.buildEventProcessorClient();
// }
//
// public static void processEvent(EventContext eventContext) {
// LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
// eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
// eventContext.getEventData().getBodyAsString());
// }
//
// public static void processError(ErrorContext errorContext) {
// LOGGER.info("Error occurred in partition processor for partition {}, {}",
// errorContext.getPartitionContext().getPartitionId(),
// errorContext.getThrowable().getMessage(),
// errorContext.getThrowable());
// }
//
//}

@Bean
EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) {
return eventProcessorClientBuilder.buildEventProcessorClient();
}

public static void processEvent(EventContext eventContext) {
log.info("Processing event from partition {} with sequence number {} with body: {}",
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
eventContext.getEventData().getBodyAsString());
}

public static void processError(ErrorContext errorContext) {
log.info("Error occurred in partition processor for partition {}, {}",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable().getMessage(),
errorContext.getThrowable());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package it.gov.pagopa.node.cfg_sync.config;

import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class EventHubProcessorClientConfiguration {

@Bean
EventHubsRecordMessageListener processEvent() {
return eventContext->log.info("Processing event from partition {} with sequence number {} with body: {}",
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
eventContext.getEventData().getBodyAsString());
}

@Bean
EventHubsErrorHandler processError() {
return errorContext->log.info("Error occurred in partition processor for partition {}, {}",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable().getMessage(),
errorContext.getThrowable());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void handleContextRefresh(ContextRefreshedEvent event) {
|| prop.toLowerCase().contains("key")
|| prop.toLowerCase().contains("secret")
))
.forEach(prop -> log.debug("{}: {}", prop, env.getProperty(prop)));
.forEach(prop -> log.debug("{}: {}", prop, ""));//env.getProperty(prop)));
}

@Around(value = "restController()")
Expand Down
Loading

0 comments on commit a3f386c

Please sign in to comment.