From edfc419043cba3fea7edcb2e7cb49fa87817925c Mon Sep 17 00:00:00 2001 From: flaminiaScarciofolo Date: Wed, 13 Nov 2024 18:58:23 +0100 Subject: [PATCH] [SELC-5981] changed user-cdc business logic to eveluate if mail is changed --- .../user/event/UserInstitutionCdcService.java | 90 +++++++++++-- .../user/event/entity/UserInstitution.java | 2 + .../UserInstitutionCdcServiceTest.java | 127 ++++++++++++++++-- .../src/test/resources/application.properties | 3 +- .../it/pagopa/selfcare/user/UserUtils.java | 19 ++- .../pagopa/selfcare/user/UserUtilsTest.java | 22 +-- 6 files changed, 227 insertions(+), 36 deletions(-) diff --git a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java index 6b39762a..2fafb4dd 100644 --- a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java +++ b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java @@ -15,16 +15,17 @@ import io.quarkus.runtime.Startup; import io.quarkus.runtime.configuration.ConfigUtils; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; import it.pagopa.selfcare.user.UserUtils; import it.pagopa.selfcare.user.client.EventHubFdRestClient; import it.pagopa.selfcare.user.client.EventHubRestClient; import it.pagopa.selfcare.user.event.entity.UserInstitution; import it.pagopa.selfcare.user.event.mapper.NotificationMapper; import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository; +import it.pagopa.selfcare.user.model.FdUserNotificationToSend; import it.pagopa.selfcare.user.model.NotificationUserType; import it.pagopa.selfcare.user.model.OnboardedProduct; import it.pagopa.selfcare.user.model.TrackEventInput; +import it.pagopa.selfcare.user.model.constants.OnboardedProductState; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import lombok.extern.slf4j.Slf4j; @@ -36,6 +37,7 @@ import org.openapi.quarkus.user_registry_json.api.UserApi; import java.time.Duration; +import java.time.OffsetDateTime; import java.util.*; import java.util.concurrent.TimeoutException; @@ -50,6 +52,8 @@ import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_CDC_NAME; import static it.pagopa.selfcare.user.model.constants.EventsName.FD_EVENT_USER_CDC_NAME; import static java.util.Arrays.asList; +import static java.util.Comparator.naturalOrder; +import static java.util.Comparator.nullsLast; @Startup @Slf4j @@ -72,6 +76,8 @@ public class UserInstitutionCdcService { private final Integer retryMinBackOff; private final Integer retryMaxBackOff; private final Integer maxRetry; + private final boolean sendEventsEnabled; + private final boolean sendFdEventsEnabled; @RestClient @@ -107,11 +113,13 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient, this.telemetryClient = telemetryClient; this.tableClient = tableClient; this.notificationMapper = notificationMapper; + this.sendEventsEnabled = sendEventsEnabled; + this.sendFdEventsEnabled = sendFdEventsEnabled; telemetryClient.getContext().getOperation().setName(OPERATION_NAME); - initOrderStream(sendEventsEnabled, sendFdEventsEnabled); + initOrderStream(); } - private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnabled) { + private void initOrderStream() { log.info("Starting initOrderStream ... "); //Retrieve last resumeToken for watching collection at specific operation @@ -140,12 +148,42 @@ private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnab Multi> publisher = dataCollection.watch(pipeline, UserInstitution.class, options); publisher.subscribe().with( - this::consumerUserInstitutionRepositoryEvent, + document -> propagateDocumentToConsumers(document, publisher), failure -> { log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage()); telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D)); Quarkus.asyncExit(); }); + log.info("Completed initOrderStream ... "); + } + + /** + * + This method acts as a gateway to direct the modified entity to the correct consumers based on the following logic: + if the modification concerns the user's email, it will invoke the consumer to send events to both the sc-user queue and the selfcare-fd queue, + but the latter only if the entity contains at least one active product from prod-fd or prod-fd-garantito. If, however, the modification concerns one + of the products within the entity, all three consumers will be invoked (two consumers for the events and the consumer for updating the userinfo collection). + */ + public void propagateDocumentToConsumers(ChangeStreamDocument document, Multi> publisher) { + assert document.getFullDocument() != null; + assert document.getDocumentKey() != null; + UserInstitution userInstitutionChanged = document.getFullDocument(); + + boolean hasActiveFdProduct = userInstitutionChanged.getProducts().stream() + .anyMatch(product -> (PROD_FD.getValue().equals(product.getProductId()) || PROD_FD_GARANTITO.getValue().equals(product.getProductId())) + && OnboardedProductState.ACTIVE.equals(product.getStatus())); + + boolean userMailIsChanged = isUserMailChanged(userInstitutionChanged); + + if (Boolean.FALSE.equals(userMailIsChanged)) { + publisher.subscribe().with( + this::consumerUserInstitutionRepositoryEvent, + failure -> { + log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage()); + telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D)); + Quarkus.asyncExit(); + }); + } if (Boolean.TRUE.equals(sendEventsEnabled)) { publisher.subscribe().with( @@ -157,18 +195,27 @@ private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnab }); } - if (Boolean.TRUE.equals(sendFdEventsEnabled)) { + if (Boolean.TRUE.equals(sendFdEventsEnabled) && hasActiveFdProduct) { publisher.subscribe().with( - this::consumerToSendUserEventForFD, + subscription -> consumerToSendUserEventForFD(document, userMailIsChanged), failure -> { log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage()); telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D)); Quarkus.asyncExit(); }); } + } - - log.info("Completed initOrderStream ... "); + private boolean isUserMailChanged(UserInstitution userInstitutionChanged) { + OffsetDateTime maxProductUpdateAt = null; + if (Objects.nonNull(userInstitutionChanged.getProducts()) && !userInstitutionChanged.getProducts().isEmpty()) { + maxProductUpdateAt = userInstitutionChanged.getProducts().stream() + .max(Comparator.comparing(OnboardedProduct::getUpdatedAt, nullsLast(naturalOrder()))) + .map(OnboardedProduct::getUpdatedAt) + .orElse(null); + } + OffsetDateTime maxUserMailUpdateAt = userInstitutionChanged.getUserMailUpdatedAt(); + return Objects.nonNull(maxProductUpdateAt) && Objects.nonNull(maxUserMailUpdateAt) && maxUserMailUpdateAt.isAfter(maxProductUpdateAt); } private ReactiveMongoCollection getCollection() { @@ -241,7 +288,15 @@ public void consumerToSendScUserEvent(ChangeStreamDocument docu }); } - public void consumerToSendUserEventForFD(ChangeStreamDocument document) { + + /** + * + This method handles the sending of events for the selfcare fd topic. + In case the modification concerns the user's email, it sends two events: the first of type DELETE and the second of type ACTIVE. + On the other hand, if the modification relates to the activation, suspension, reactivation, or cancellation of the prod-fd or prod-fd-garantito product, + it will send a single event indicating the type of action performed. + */ + public void consumerToSendUserEventForFD(ChangeStreamDocument document, boolean isUserMailChanged) { if (Objects.nonNull(document.getFullDocument()) && Objects.nonNull(document.getDocumentKey())) { UserInstitution userInstitutionChanged = document.getFullDocument(); @@ -251,10 +306,21 @@ public void consumerToSendUserEventForFD(ChangeStreamDocument d userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId()) .onFailure(this::checkIfIsRetryableException) .retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry) - .onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD.getValue(), PROD_FD_GARANTITO.getValue()))) - .onItem().ifNotNull().transform(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct))) - .onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> { + .onItem().transformToMulti(userResource -> Multi.createFrom().iterable(UserUtils.retrieveFdProduct(userInstitutionChanged.getProducts(), List.of(PROD_FD.getValue(), PROD_FD_GARANTITO.getValue()), isUserMailChanged)) + .onItem().transformToUniAndMerge(onboardedProduct -> { + FdUserNotificationToSend fdUserNotificationToSend = notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)); log.info("Sending message to EventHubFdRestClient ... "); + if (isUserMailChanged && NotificationUserType.ACTIVE_USER.equals(fdUserNotificationToSend.getType())) { + log.info("User mail is changed, sending DELETE_USER event first ... "); + FdUserNotificationToSend fdUserNotificationToSendDelete = notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, NotificationUserType.DELETE_USER); + return eventHubFdRestClient.sendMessage(fdUserNotificationToSendDelete) + .onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry) + .onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSendDelete)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D))) + .onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSendDelete)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))) + .onItem().transformToUni(unused -> eventHubFdRestClient.sendMessage(fdUserNotificationToSend)) + .onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D))) + .onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))); + } return eventHubFdRestClient.sendMessage(fdUserNotificationToSend) .onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry) .onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D))) diff --git a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/entity/UserInstitution.java b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/entity/UserInstitution.java index 87e2b757..882c7906 100644 --- a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/entity/UserInstitution.java +++ b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/entity/UserInstitution.java @@ -8,6 +8,7 @@ import lombok.experimental.FieldNameConstants; import org.bson.types.ObjectId; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; @@ -24,5 +25,6 @@ public class UserInstitution extends ReactivePanacheMongoEntity { private String institutionRootName; private List products = new ArrayList<>(); private String userMailUuid; + private OffsetDateTime userMailUpdatedAt; } diff --git a/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/service/UserInstitutionCdcServiceTest.java b/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/service/UserInstitutionCdcServiceTest.java index 1a97a627..75ecea58 100644 --- a/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/service/UserInstitutionCdcServiceTest.java +++ b/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/service/UserInstitutionCdcServiceTest.java @@ -5,13 +5,13 @@ import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.mongodb.MongoTestResource; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import it.pagopa.selfcare.user.client.EventHubFdRestClient; import it.pagopa.selfcare.user.client.EventHubRestClient; import it.pagopa.selfcare.user.event.UserInstitutionCdcService; -import it.pagopa.selfcare.user.event.entity.UserInfo; import it.pagopa.selfcare.user.event.entity.UserInstitution; +import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository; import it.pagopa.selfcare.user.model.FdUserNotificationToSend; import it.pagopa.selfcare.user.model.OnboardedProduct; import it.pagopa.selfcare.user.model.UserNotificationToSend; @@ -32,6 +32,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.UUID; import static it.pagopa.selfcare.user.event.UserInstitutionCdcService.USERS_FIELD_LIST_WITHOUT_FISCAL_CODE; import static it.pagopa.selfcare.user.model.NotificationUserType.*; @@ -56,6 +57,9 @@ public class UserInstitutionCdcServiceTest { @InjectMock EventHubFdRestClient eventHubFdRestClient; + @InjectMock + UserInstitutionRepository userInstitutionRepository; + @Test void consumerToSendScUserEvent() { @@ -89,7 +93,7 @@ void consumerToSendUserEventForFDSendACTIVE_USER() { when(eventHubFdRestClient.sendMessage(argumentCaptor.capture())) .thenReturn(Uni.createFrom().nullItem()); - userInstitutionCdcService.consumerToSendUserEventForFD(document); + userInstitutionCdcService.consumerToSendUserEventForFD(document, false); verify(userRegistryApi, times(1)). findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()); verify(eventHubFdRestClient, times(1)). @@ -97,6 +101,28 @@ void consumerToSendUserEventForFDSendACTIVE_USER() { Assertions.assertEquals(ACTIVE_USER, argumentCaptor.getValue().getType()); } + @Test + void consumerToSendUserEventForFDSendACTIVE_USER_mailChanged() { + UserInstitution userInstitution = dummyUserInstitution(false, OnboardedProductState.ACTIVE); + ChangeStreamDocument document = Mockito.mock(ChangeStreamDocument.class); + when(document.getFullDocument()).thenReturn(userInstitution); + when(document.getDocumentKey()).thenReturn(new BsonDocument()); + + UserResource userResource = dummyUserResource(); + when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId())) + .thenReturn(Uni.createFrom().item(userResource)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(FdUserNotificationToSend.class); + when(eventHubFdRestClient.sendMessage(argumentCaptor.capture())) + .thenReturn(Uni.createFrom().nullItem()); + + userInstitutionCdcService.consumerToSendUserEventForFD(document, true); + verify(userRegistryApi, times(1)). + findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()); + verify(eventHubFdRestClient, times(2)). + sendMessage(any(FdUserNotificationToSend.class)); + Assertions.assertEquals(ACTIVE_USER, argumentCaptor.getValue().getType()); + } + @Test void consumerToSendUserEventForFDSendSUSPEND_USER() { UserInstitution userInstitution = dummyUserInstitution(true, OnboardedProductState.SUSPENDED); @@ -112,7 +138,7 @@ void consumerToSendUserEventForFDSendSUSPEND_USER() { when(eventHubFdRestClient.sendMessage(argumentCaptor.capture())) .thenReturn(Uni.createFrom().nullItem()); - userInstitutionCdcService.consumerToSendUserEventForFD(document); + userInstitutionCdcService.consumerToSendUserEventForFD(document, false); verify(userRegistryApi, times(1)). findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()); verify(eventHubFdRestClient, times(1)). @@ -134,7 +160,7 @@ void consumerToSendUserEventForFDSendDELETE_USER() { when(eventHubFdRestClient.sendMessage(argumentCaptor.capture())) .thenReturn(Uni.createFrom().nullItem()); - userInstitutionCdcService.consumerToSendUserEventForFD(document); + userInstitutionCdcService.consumerToSendUserEventForFD(document, false); verify(userRegistryApi, times(1)). findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()); verify(eventHubFdRestClient, times(1)). @@ -153,7 +179,7 @@ void consumerToSendUserEventForFDNotSend() { when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId())) .thenReturn(Uni.createFrom().item(userResource)); - userInstitutionCdcService.consumerToSendUserEventForFD(document); + userInstitutionCdcService.consumerToSendUserEventForFD(document, false); verify(userRegistryApi, times(1)). findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId()); verify(eventHubFdRestClient, times(0)). @@ -167,13 +193,14 @@ UserResource dummyUserResource() { return userResource; } - UserInstitution dummyUserInstitution(boolean sendForFd, OnboardedProductState state){ + UserInstitution dummyUserInstitution(boolean sendForFd, OnboardedProductState state) { UserInstitution userInstitution = new UserInstitution(); userInstitution.setId(ObjectId.get()); - if(sendForFd) { + userInstitution.setUserMailUpdatedAt(OffsetDateTime.of(2023, 1, 3, 0, 0, 0, 0, ZoneOffset.UTC)); + if (sendForFd) { userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", state, 2, "prod-fd"), dummyOnboardedProduct("example-2", OnboardedProductState.ACTIVE, 1, "prod-io"))); - }else { + } else { userInstitution.setProducts(List.of(dummyOnboardedProduct("example-1", OnboardedProductState.ACTIVE, 2, "prod-io"), dummyOnboardedProduct("example-2", OnboardedProductState.ACTIVE, 1, "prod-fd"))); } @@ -191,4 +218,86 @@ OnboardedProduct dummyOnboardedProduct(String productRole, OnboardedProductState return onboardedProduct; } + + @Test + void propagateDocumentToConsumers_withChangeUserMailFalse() { + ChangeStreamDocument document = mock(ChangeStreamDocument.class); + + Multi> publisher = Multi.createFrom().item(document); + + UserInstitution userInstitution = new UserInstitution(); + userInstitution.setId(new ObjectId()); + OnboardedProduct product = new OnboardedProduct(); + product.setProductId("prod-io"); + product.setStatus(OnboardedProductState.ACTIVE); + userInstitution.setProducts(List.of(product)); + + UserResource userResource = dummyUserResource(); + when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId())) + .thenReturn(Uni.createFrom().item(userResource)); + + when(document.getFullDocument()).thenReturn(userInstitution); + when(document.getDocumentKey()).thenReturn(new BsonDocument()); + + userInstitutionCdcService.propagateDocumentToConsumers(document, publisher); + verify(eventHubFdRestClient, times(0)).sendMessage(any(FdUserNotificationToSend.class)); + verify(eventHubRestClient, times(1)).sendMessage(any(UserNotificationToSend.class)); + verify(userInstitutionRepository, times(1)).updateUser(any()); + } + + @Test + void propagateDocumentToConsumers_withChangeUserMailWithFd() { + ChangeStreamDocument document = mock(ChangeStreamDocument.class); + + Multi> publisher = Multi.createFrom().item(document); + + UserInstitution userInstitution = new UserInstitution(); + userInstitution.setId(new ObjectId()); + OnboardedProduct product = new OnboardedProduct(); + product.setProductId("prod-fd"); + product.setStatus(OnboardedProductState.ACTIVE); + userInstitution.setProducts(List.of(product)); + userInstitution.setUserMailUpdatedAt(OffsetDateTime.of(LocalDate.now(), LocalTime.now(), ZoneOffset.UTC)); + + UserResource userResource = dummyUserResource(); + when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId())) + .thenReturn(Uni.createFrom().item(userResource)); + + when(document.getFullDocument()).thenReturn(userInstitution); + when(document.getDocumentKey()).thenReturn(new BsonDocument()); + + userInstitutionCdcService.propagateDocumentToConsumers(document, publisher); + verify(eventHubFdRestClient, times(1)).sendMessage(any(FdUserNotificationToSend.class)); + verify(eventHubRestClient, times(1)).sendMessage(any(UserNotificationToSend.class)); + verify(userInstitutionRepository, times(1)).updateUser(any()); + } + + + @Test + void propagateDocumentToConsumers_withChangeUserMailTrueWithoutFd() { + + ChangeStreamDocument document = mock(ChangeStreamDocument.class); + + Multi> publisher = Multi.createFrom().item(document); + + UserInstitution userInstitution = new UserInstitution(); + userInstitution.setId(new ObjectId()); + OnboardedProduct product = new OnboardedProduct(); + product.setProductId("prod-io"); + product.setStatus(OnboardedProductState.ACTIVE); + userInstitution.setProducts(List.of(product)); + userInstitution.setUserMailUpdatedAt(OffsetDateTime.of(LocalDate.now(), LocalTime.now(), ZoneOffset.UTC)); + + UserResource userResource = dummyUserResource(); + when(userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitution.getUserId())) + .thenReturn(Uni.createFrom().item(userResource)); + + when(document.getFullDocument()).thenReturn(userInstitution); + when(document.getDocumentKey()).thenReturn(new BsonDocument()); + + userInstitutionCdcService.propagateDocumentToConsumers(document, publisher); + verify(eventHubFdRestClient, times(0)).sendMessage(any(FdUserNotificationToSend.class)); + verify(eventHubRestClient, times(1)).sendMessage(any(UserNotificationToSend.class)); + verify(userInstitutionRepository, times(1)).updateUser(any()); + } } diff --git a/apps/user-cdc/src/test/resources/application.properties b/apps/user-cdc/src/test/resources/application.properties index 8b137891..12620dbb 100644 --- a/apps/user-cdc/src/test/resources/application.properties +++ b/apps/user-cdc/src/test/resources/application.properties @@ -1 +1,2 @@ - +user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:true} +user-cdc.send-events-fd.watch.enabled=${USER_CDC_SEND_EVENTS_FD_WATCH_ENABLED:true} diff --git a/libs/user-sdk-event/src/main/java/it/pagopa/selfcare/user/UserUtils.java b/libs/user-sdk-event/src/main/java/it/pagopa/selfcare/user/UserUtils.java index 628d78df..43d1ec6e 100644 --- a/libs/user-sdk-event/src/main/java/it/pagopa/selfcare/user/UserUtils.java +++ b/libs/user-sdk-event/src/main/java/it/pagopa/selfcare/user/UserUtils.java @@ -80,13 +80,18 @@ public static Map mapPropsForTrackEvent(TrackEventInput trackEve * The retrieveFdProductIfItChanged method is designed to retrieve the most recently updated OnboardedProduct * from a list of products, provided that the product's ID is included in a specified list of product IDs to check. */ - public static OnboardedProduct retrieveFdProductIfItChanged(List products, List productIdToCheck) { + public static List retrieveFdProduct(List products, List productIdToCheck, boolean isUserMailChanged) { if (Objects.nonNull(products) && !products.isEmpty()) { - return products.stream() + if(isUserMailChanged){ + return products.stream() + .filter(onboardedProduct -> productIdToCheck.contains(onboardedProduct.getProductId())) + .toList(); + } + return List.of(Objects.requireNonNull(products.stream() .max(Comparator.comparing(OnboardedProduct::getUpdatedAt, nullsLast(naturalOrder())) .thenComparing(OnboardedProduct::getCreatedAt, nullsLast(naturalOrder()))) .filter(onboardedProduct -> productIdToCheck.contains(onboardedProduct.getProductId())) - .orElse(null); + .orElse(null))); } return null; } @@ -123,4 +128,12 @@ public static String getHMAC256(String key, String input) { return hash; } + public static List retrieveFdProduct(List products, List productIdToCheck) { + if (Objects.nonNull(products) && !products.isEmpty()) { + return products.stream() + .filter(onboardedProduct -> productIdToCheck.contains(onboardedProduct.getProductId())) + .toList(); + } + return null; + } } diff --git a/libs/user-sdk-event/src/test/java/it/pagopa/selfcare/user/UserUtilsTest.java b/libs/user-sdk-event/src/test/java/it/pagopa/selfcare/user/UserUtilsTest.java index f8cfe5f0..f40b0051 100644 --- a/libs/user-sdk-event/src/test/java/it/pagopa/selfcare/user/UserUtilsTest.java +++ b/libs/user-sdk-event/src/test/java/it/pagopa/selfcare/user/UserUtilsTest.java @@ -123,7 +123,7 @@ void getHMAC256_withValidInputs_shouldReturnValidHash() { } @Test - void retrieveFdProductIfItChanged_withValidProducts_shouldReturnMostRecentlyUpdatedProduct() { + void retrieveFdProduct_withValidProducts_shouldReturnMostRecentlyUpdatedProduct() { List products = new ArrayList<>(); OnboardedProduct product1 = new OnboardedProduct(); product1.setProductId("1"); @@ -139,24 +139,24 @@ void retrieveFdProductIfItChanged_withValidProducts_shouldReturnMostRecentlyUpda List productIdToCheck = List.of("1", "2"); - OnboardedProduct result = UserUtils.retrieveFdProductIfItChanged(products, productIdToCheck); + List result = UserUtils.retrieveFdProduct(products, productIdToCheck); assertNotNull(result); - assertEquals("2", result.getProductId()); + assertEquals("1", result.get(0).getProductId()); } @Test - void retrieveFdProductIfItChanged_withEmptyProductList() { + void retrieveFdProduct_withEmptyProductList() { List products = Collections.emptyList(); List productIdToCheck = List.of("1", "2"); - OnboardedProduct result = UserUtils.retrieveFdProductIfItChanged(products, productIdToCheck); + List result = UserUtils.retrieveFdProduct(products, productIdToCheck); assertNull(result); } @Test - void retrieveFdProductIfItChanged_withNoFdProductIds() { + void retrieveFdProduct_withNoFdProductIds() { List products = new ArrayList<>(); OnboardedProduct product1 = new OnboardedProduct(); product1.setProductId("1"); @@ -172,13 +172,13 @@ void retrieveFdProductIfItChanged_withNoFdProductIds() { List productIdToCheck = List.of("3", "4"); - OnboardedProduct result = UserUtils.retrieveFdProductIfItChanged(products, productIdToCheck); + List result = UserUtils.retrieveFdProduct(products, productIdToCheck); - assertNull(result); + assertEquals(0, result.size()); } @Test - void retrieveFdProductIfItChanged_withProductsWithSameUpdatedAt() { + void retrieveFdProduct_withProductsWithSameUpdatedAt() { List products = new ArrayList<>(); OnboardedProduct product1 = new OnboardedProduct(); product1.setProductId("1"); @@ -194,9 +194,9 @@ void retrieveFdProductIfItChanged_withProductsWithSameUpdatedAt() { List productIdToCheck = List.of("1", "2"); - OnboardedProduct result = UserUtils.retrieveFdProductIfItChanged(products, productIdToCheck); + List result = UserUtils.retrieveFdProduct(products, productIdToCheck); assertNotNull(result); - assertEquals("2", result.getProductId()); + assertEquals("1", result.get(0).getProductId()); } }