diff --git a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java index 26abbc15..0159b635 100644 --- a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java +++ b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/UserInstitutionCdcService.java @@ -13,19 +13,17 @@ import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.Startup; +import io.quarkus.runtime.configuration.ConfigUtils; import io.smallrye.mutiny.Multi; import it.pagopa.selfcare.user.event.entity.UserInstitution; import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository; import jakarta.enterprise.context.ApplicationScoped; import lombok.extern.slf4j.Slf4j; import org.bson.BsonDocument; -import org.bson.BsonTimestamp; import org.bson.conversions.Bson; import org.eclipse.microprofile.config.inject.ConfigProperty; import java.time.Duration; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.*; import static com.mongodb.client.model.Projections.fields; @@ -81,12 +79,15 @@ private void initOrderStream() { //Retrieve last resumeToken for watching collection at specific operation String resumeToken = null; - try { - TableEntity cdcStartAtEntity = tableClient.getEntity(CDC_START_AT_PARTITION_KEY, CDC_START_AT_ROW_KEY); - if(Objects.nonNull(cdcStartAtEntity)) - resumeToken = (String) cdcStartAtEntity.getProperty(CDC_START_AT_PROPERTY); - } catch (TableServiceException e) { - log.error("Table StarAt not found, it is starting from now ..."); + + if(!ConfigUtils.getProfiles().contains("test")) { + try { + TableEntity cdcStartAtEntity = tableClient.getEntity(CDC_START_AT_PARTITION_KEY, CDC_START_AT_ROW_KEY); + if (Objects.nonNull(cdcStartAtEntity)) + resumeToken = (String) cdcStartAtEntity.getProperty(CDC_START_AT_PROPERTY); + } catch (TableServiceException e) { + log.warn("Table StarAt not found, it is starting from now ..."); + } } // Initialize watching collection diff --git a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepository.java b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepository.java index 92e43265..0a174032 100644 --- a/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepository.java +++ b/apps/user-cdc/src/main/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepository.java @@ -40,18 +40,28 @@ public Uni updateUser(UserInstitution userInstitution) { } private Uni createNewUserInfo(UserInstitution userInstitution) { + if(CollectionUtils.isEmpty(userInstitution.getProducts())){ + return Uni.createFrom().voidItem(); + } + + List institutionRoles = userInstitution.getProducts().stream() + .filter(product -> VALID_PRODUCT_STATE.contains(product.getStatus())) + .map(product -> userMapper.toUserInstitutionRole(userInstitution, product.getRole(), product.getStatus())) + .toList(); + + if(CollectionUtils.isEmpty(institutionRoles)){ + return Uni.createFrom().voidItem(); + } + UserInfo userInfo = new UserInfo(); userInfo.setUserId(userInstitution.getUserId()); userInfo.setInstitutions(new ArrayList<>()); - if(!CollectionUtils.isEmpty(userInstitution.getProducts())){ - userInstitution.getProducts().forEach(product -> { - if (VALID_PRODUCT_STATE.contains(product.getStatus())) { - PartyRole role = product.getRole(); - userInfo.getInstitutions().add(userMapper.toUserInstitutionRole(userInstitution, role, product.getStatus())); - } - }); - } - return UserInfo.persistOrUpdate(userInfo).replaceWith(Uni.createFrom().voidItem()); + userInfo.setInstitutions(institutionRoles); + + return UserInfo.persistOrUpdate(userInfo) + .invoke(() -> log.info(String.format("createNewUserInfo for userId %s and institution %s", + userInstitution.getUserId(),userInstitution.getInstitutionId()))) + .replaceWith(Uni.createFrom().voidItem()); } private Uni deleteInstitutionOrAllUserInfo(ReactivePanacheMongoEntityBase entityBase, UserInstitution userInstitution) { @@ -63,8 +73,11 @@ private Uni deleteInstitutionOrAllUserInfo(ReactivePanacheMongoEntityBase userInfo.getInstitutions().removeIf(userInstitutionRole -> userInstitutionRole.getInstitutionId().equalsIgnoreCase(userInstitution.getInstitutionId())); if (CollectionUtils.isEmpty(userInfo.getInstitutions())) { + log.info(String.format("deleteInstitutionOrAllUserInfo removing userInfo for userId: %s", userInstitution.getUserId())); return UserInfo.deleteById(userInstitution.getUserId()).replaceWith(Uni.createFrom().voidItem()); } else { + log.info(String.format("deleteInstitutionOrAllUserInfo removing institution %s for userId %s", + userInstitution.getInstitutionId(), userInstitution.getUserId())); return UserInfo.persistOrUpdate(userInfo); } } @@ -79,20 +92,27 @@ private Uni updateOrCreateNewUserInfo(ReactivePanacheMongoEntityBase entit } private UserInfo replaceOrAddInstitution(UserInfo userInfo, UserInstitution userInstitution, PartyRole role, OnboardedProductState state) { - if (!CollectionUtils.isEmpty(userInfo.getInstitutions())) { - userInfo.getInstitutions().stream() - .filter(userInstitutionRole -> userInstitution.getInstitutionId().equalsIgnoreCase(userInstitutionRole.getInstitutionId())) - .findFirst() - .ifPresentOrElse(userInstitutionRole -> { - userInstitutionRole.setRole(role); - userInstitutionRole.setStatus(state); - }, - () -> { - List roleList = new ArrayList<>(userInfo.getInstitutions()); - roleList.add(userMapper.toUserInstitutionRole(userInstitution, role, state)); - userInfo.setInstitutions(roleList); - }); + if (CollectionUtils.isEmpty(userInfo.getInstitutions())) { + userInfo.setInstitutions(new ArrayList<>()); } + + userInfo.getInstitutions().stream() + .filter(userInstitutionRole -> userInstitution.getInstitutionId().equalsIgnoreCase(userInstitutionRole.getInstitutionId())) + .findFirst() + .ifPresentOrElse(userInstitutionRole -> { + userInstitutionRole.setRole(role); + userInstitutionRole.setStatus(state); + log.info(String.format("replaceOrAddInstitution execution setting role for userId: %s, institutionId: %s, role: %s", + userInstitution.getUserId(), userInstitution.getInstitutionId(), role.name())); + }, + () -> { + List roleList = new ArrayList<>(userInfo.getInstitutions()); + roleList.add(userMapper.toUserInstitutionRole(userInstitution, role, state)); + userInfo.setInstitutions(roleList); + log.info(String.format("replaceOrAddInstitution execution adding role for userId: %s, institutionId: %s, role: %s", + userInstitution.getUserId(), userInstitution.getInstitutionId(), role.name())); + }); + return userInfo; } diff --git a/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepositoryTest.java b/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepositoryTest.java index 39b470e6..8828da82 100644 --- a/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepositoryTest.java +++ b/apps/user-cdc/src/test/java/it/pagopa/selfcare/user/event/repository/UserInstitutionRepositoryTest.java @@ -1,6 +1,9 @@ package it.pagopa.selfcare.user.event.repository; +import com.azure.data.tables.TableClient; import io.quarkus.panache.mock.PanacheMock; +import io.quarkus.test.InjectMock; +import io.quarkus.test.Mock; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.mongodb.MongoTestResource; @@ -9,6 +12,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import it.pagopa.selfcare.onboarding.common.PartyRole; +import it.pagopa.selfcare.user.event.UserInstitutionCdcService; import it.pagopa.selfcare.user.event.constant.OnboardedProductState; import it.pagopa.selfcare.user.event.entity.OnboardedProduct; import it.pagopa.selfcare.user.event.entity.UserInfo; @@ -18,6 +22,7 @@ import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +36,7 @@ @QuarkusTestResource(MongoTestResource.class) class UserInstitutionRepositoryTest { + public static final String INSTITUTION_ID = "institutionId"; @Inject UserInstitutionRepository userInstitutionRepository; @@ -73,9 +79,12 @@ void initOrderStreamWithFoundedUserIdAndInstitutionId(UniAsserter asserter){ PanacheMock.mock(UserInfo.class); when(UserInfo.findByIdOptional(any())) .thenReturn(Uni.createFrom().item(Optional.of(retrieveUserInfo()))); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.atLeastOnce()).persistOrUpdate(Mockito. any(), any()); + }); } @Test @@ -87,9 +96,32 @@ void initOrderStreamWithFoundedUserId(UniAsserter asserter){ when(UserInfo.findByIdOptional(any())) .thenReturn(Uni.createFrom().item(Optional.of(retrieveUserInfo()))); mockPersistUserInfo(asserter); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.atLeastOnce()).persistOrUpdate(Mockito. any(), any()); + }); + } + + @Test + @RunOnVertxContext + void initOrderStreamWithFoundedUserIdAndInstitutionEmpty(UniAsserter asserter){ + UserInstitution userInstitution = constructUserInstitution(); + userInstitution.setInstitutionId("institutionId2"); + PanacheMock.mock(UserInfo.class); + UserInfo userInfo = new UserInfo(); + userInfo.setInstitutions(List.of()); + when(UserInfo.findByIdOptional(any())) + .thenReturn(Uni.createFrom().item(Optional.of(retrieveUserInfo()))); + mockPersistUserInfo(asserter); + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.atLeastOnce()).persistOrUpdate(Mockito. any(), any()); + }); + } @Test @@ -99,9 +131,12 @@ void initOrderStreamWithNotFoundUserId(UniAsserter asserter){ PanacheMock.mock(UserInfo.class); when(UserInfo.findByIdOptional(anyString())) .thenReturn(Uni.createFrom().item(Optional.empty())); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.atLeastOnce()).persistOrUpdate(Mockito. any(), any()); + }); } @Test @@ -109,6 +144,7 @@ void initOrderStreamWithNotFoundUserId(UniAsserter asserter){ void initOrderStreamWithNotFoundValidState(UniAsserter asserter){ UserInstitution userInstitution = constructUserInstitution(); userInstitution.getProducts().get(0).setStatus(OnboardedProductState.SUSPENDED); + PanacheMock.mock(UserInfo.class); UserInfo userInfo = retrieveUserInfo(); List userInstitutionRoles = new ArrayList<>(userInfo.getInstitutions()); @@ -117,9 +153,14 @@ void initOrderStreamWithNotFoundValidState(UniAsserter asserter){ when(UserInfo.findByIdOptional(any())) .thenReturn(Uni.createFrom().item(Optional.of(userInfo))); mockPersistUserInfo(asserter); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); } + + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.atLeastOnce()).deleteById(userInfo.getUserId()); + }); + } @Test @RunOnVertxContext @@ -131,9 +172,14 @@ void initOrderStreamWithNotFoundValidStateAndInstitutionId(UniAsserter asserter) when(UserInfo.findByIdOptional(any())) .thenReturn(Uni.createFrom().item(Optional.of(retrieveUserInfo()))); mockPersistUserInfo(asserter); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); } + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.times(1)).findByIdOptional(userInstitution.getUserId()); + PanacheMock.verifyNoMoreInteractions(UserInfo.class); + }); + } @Test @@ -146,9 +192,15 @@ void initOrderStreamWithNotFoundValidStateButMultipleInstitutionInUserInfo(UniAs when(UserInfo.findByIdOptional(any())) .thenReturn(Uni.createFrom().item(Optional.of(retrieveUserInfo()))); mockPersistUserInfo(asserter); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); } + + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.times(1)).findByIdOptional(userInstitution.getUserId()); + PanacheMock.verifyNoMoreInteractions(UserInfo.class); + }); + } @Test @@ -161,15 +213,22 @@ void initOrderStreamWithNotFoundValidStateButMoreInstitutions(UniAsserter assert when(UserInfo.findByIdOptional(any())) .thenReturn(Uni.createFrom().item(Optional.of(userInfo))); mockPersistUserInfo(asserter); - UniAssertSubscriber subscriber = userInstitutionRepository.updateUser(userInstitution) - .subscribe().withSubscriber(UniAssertSubscriber.create()); - subscriber.assertCompleted(); } + + userInstitutionRepository.updateUser(userInstitution) + .subscribe().withSubscriber(UniAssertSubscriber.create()).assertCompleted(); + + asserter.execute(() -> { + PanacheMock.verify(UserInfo.class, Mockito.times(1)).findByIdOptional(userInstitution.getUserId()); + PanacheMock.verify(UserInfo.class, Mockito.atLeastOnce()).persistOrUpdate(Mockito. any(), any()); + PanacheMock.verifyNoMoreInteractions(UserInfo.class); + }); + } private UserInfo retrieveUserInfo() { UserInfo userInfo = new UserInfo(); userInfo.setUserId("userId"); UserInstitutionRole role = new UserInstitutionRole(); - role.setInstitutionId("institutionId"); + role.setInstitutionId(INSTITUTION_ID); role.setInstitutionName("institutionName"); role.setRole(PartyRole.DELEGATE); UserInstitutionRole role1 = new UserInstitutionRole(); @@ -187,7 +246,7 @@ private UserInfo retrieveUserInfo() { private UserInstitution constructUserInstitution() { UserInstitution userInstitution = new UserInstitution(); userInstitution.setUserId("userId"); - userInstitution.setInstitutionId("institutionId"); + userInstitution.setInstitutionId(INSTITUTION_ID); OnboardedProduct onboardedProduct = new OnboardedProduct(); onboardedProduct.setRole(PartyRole.MANAGER); onboardedProduct.setProductId("productId");