Skip to content

Commit

Permalink
[SELC-5887] Feat: Added send event for FD in user CDC (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
flaminiaScarciofolo authored Oct 31, 2024
1 parent deb015e commit 612d21f
Show file tree
Hide file tree
Showing 22 changed files with 845 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.client.EventHubFdRestClient;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -42,6 +46,7 @@
import static it.pagopa.selfcare.user.model.TrackEventInput.toTrackEventInput;
import static it.pagopa.selfcare.user.model.constants.EventsMetric.*;
import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_CDC_NAME;
import static it.pagopa.selfcare.user.model.constants.EventsName.FD_EVENT_USER_CDC_NAME;
import static java.util.Arrays.asList;

@Startup
Expand All @@ -52,6 +57,10 @@ public class UserInstitutionCdcService {
private static final String COLLECTION_NAME = "userInstitutions";
private static final String OPERATION_NAME = "USER-CDC-UserInfoUpdate";
public static final String USERS_FIELD_LIST_WITHOUT_FISCAL_CODE = "name,familyName,email,workContacts";
private static final String PROD_FD = "prod-fd";
private static final String PROD_FD_GARANTITO = "prod-fd-garantito";
public static final String ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE = "Error during subscribe collection, exception: {} , message: {}";


private final TelemetryClient telemetryClient;

Expand All @@ -72,6 +81,10 @@ public class UserInstitutionCdcService {
@Inject
EventHubRestClient eventHubRestClient;

@RestClient
@Inject
EventHubFdRestClient eventHubFdRestClient;

private final NotificationMapper notificationMapper;


Expand All @@ -81,6 +94,7 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
@ConfigProperty(name = "user-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "user-cdc.retry") Integer maxRetry,
@ConfigProperty(name = "user-cdc.send-events.watch.enabled") Boolean sendEventsEnabled,
@ConfigProperty(name = "user-cdc.send-events-fd.watch.enabled") Boolean sendFdEventsEnabled,
UserInstitutionRepository userInstitutionRepository,
TelemetryClient telemetryClient,
TableClient tableClient, NotificationMapper notificationMapper) {
Expand All @@ -94,16 +108,16 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
this.tableClient = tableClient;
this.notificationMapper = notificationMapper;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream(sendEventsEnabled);
initOrderStream(sendEventsEnabled, sendFdEventsEnabled);
}

private void initOrderStream(Boolean sendEventsEnabled) {
private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnabled) {
log.info("Starting initOrderStream ... ");

//Retrieve last resumeToken for watching collection at specific operation
String resumeToken = null;

if(!ConfigUtils.getProfiles().contains("test")) {
if (!ConfigUtils.getProfiles().contains("test")) {
try {
TableEntity cdcStartAtEntity = tableClient.getEntity(CDC_START_AT_PARTITION_KEY, CDC_START_AT_ROW_KEY);
if (Objects.nonNull(cdcStartAtEntity))
Expand All @@ -117,7 +131,7 @@ private void initOrderStream(Boolean sendEventsEnabled) {
ReactiveMongoCollection<UserInstitution> dataCollection = getCollection();
ChangeStreamOptions options = new ChangeStreamOptions()
.fullDocument(FullDocument.UPDATE_LOOKUP);
if(Objects.nonNull(resumeToken))
if (Objects.nonNull(resumeToken))
options = options.resumeAfter(BsonDocument.parse(resumeToken));

Bson match = Aggregates.match(Filters.in("operationType", asList("update", "replace", "insert")));
Expand All @@ -128,21 +142,32 @@ private void initOrderStream(Boolean sendEventsEnabled) {
publisher.subscribe().with(
this::consumerUserInstitutionRepositoryEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});

if(sendEventsEnabled) {
if (Boolean.TRUE.equals(sendEventsEnabled)) {
publisher.subscribe().with(
this::consumerToSendScUserEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}

if (Boolean.TRUE.equals(sendFdEventsEnabled)) {
publisher.subscribe().with(
this::consumerToSendUserEventForFD,
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}


log.info("Completed initOrderStream ... ");
}

Expand Down Expand Up @@ -196,13 +221,13 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Multi.createFrom().iterable(UserUtils.groupingProductAndReturnMinStateProduct(userInstitutionChanged.getProducts()))
.map(onboardedProduct -> notificationMapper.toUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource))
.onItem().transformToUniAndMerge(userNotificationToSend -> eventHubRestClient.sendMessage(userNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.toUni()
)
.subscribe().with(
Expand All @@ -216,6 +241,47 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
});
}

public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document) {

if (Objects.nonNull(document.getFullDocument()) && Objects.nonNull(document.getDocumentKey())) {
UserInstitution userInstitutionChanged = document.getFullDocument();

log.info("Starting consumerToSendUserEventForFd ... ");

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD, PROD_FD_GARANTITO)))
.onItem().ifNotNull().transform(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> {
log.info("Sending message to EventHubFdRestClient ... ");
return eventHubFdRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D)));
}
))
.subscribe().with(
result -> {
log.info("SendFdEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
telemetryClient.trackEvent(FD_EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(FD_EVENTS_USER_INSTITUTION_SUCCESS, 1D));
},
failure -> {
log.error("Error during SendFdEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
telemetryClient.trackEvent(FD_EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(FD_EVENTS_USER_INSTITUTION_FAILURE, 1D));
});
}
}

private NotificationUserType evaluateType(OnboardedProduct onboardedProduct) {
return switch (onboardedProduct.getStatus()) {
case ACTIVE -> NotificationUserType.ACTIVE_USER;
case SUSPENDED -> NotificationUserType.SUSPEND_USER;
case DELETED -> NotificationUserType.DELETE_USER;
default -> null;
};
}

private boolean checkIfIsRetryableException(Throwable throwable) {
return throwable instanceof TimeoutException ||
(throwable instanceof ClientWebApplicationException webApplicationException && webApplicationException.getResponse().getStatus() == 429);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package it.pagopa.selfcare.user.event.mapper;

import com.microsoft.applicationinsights.web.dependencies.apachecommons.lang3.StringUtils;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import it.pagopa.selfcare.user.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.openapi.quarkus.user_registry_json.model.CertifiableFieldResourceOfstring;
import org.openapi.quarkus.user_registry_json.model.UserResource;

import java.util.UUID;
import javax.swing.text.html.Option;
import java.util.*;

@Mapper(componentModel = "cdi", imports = UUID.class)
public interface NotificationMapper {
Expand All @@ -18,13 +20,7 @@ public interface NotificationMapper {
@Mapping(target = "productId", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user.role", source = "product.role")
@Mapping(target = "user.productRole", source = "product.productRole")
@Mapping(target = "user.relationshipStatus", source = "product.status")
@Mapping(target = "user.userId", source = "userResource.id", ignore = true)
@Mapping(target = "user.name", source = "userResource.name.value")
@Mapping(target = "user.familyName", source = "userResource.familyName.value")
@Mapping(target = "user.email", source = "userResource.email.value")
@Mapping(target = "user", expression = "java(mapUser(userResource, userInstitution.getUserMailUuid(), product))")
@Mapping(target = "id", expression = "java(toUniqueIdNotification(userInstitution, product))")
@Mapping(target = "eventType", expression = "java(it.pagopa.selfcare.user.model.constants.QueueEvent.UPDATE)")
UserNotificationToSend toUserNotificationToSend(UserInstitution userInstitution, OnboardedProduct product, UserResource userResource);
Expand All @@ -33,4 +29,43 @@ public interface NotificationMapper {
default String toUniqueIdNotification(UserInstitution userInstitution, OnboardedProduct product) {
return UserUtils.uniqueIdNotification(userInstitution.getId().toHexString(), product.getProductId(), product.getProductRole());
}

@Mapping(target = "id", expression = "java(toUniqueIdNotification(userInstitutionChanged, product))")
@Mapping(target = "onboardingTokenId", source = "product.tokenId")
@Mapping(target = "product", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user", expression = "java(mapUserForFD(userResource, product))")
@Mapping(target = "type", source = "type")
FdUserNotificationToSend toFdUserNotificationToSend(UserInstitution userInstitutionChanged, OnboardedProduct product, UserResource userResource, NotificationUserType type);

@Named("mapUserForFD")
default UserToNotify mapUserForFD(UserResource userResource,OnboardedProduct onboardedProduct) {
UserToNotify userToNotify = new UserToNotify();
userToNotify.setUserId(Optional.ofNullable(userResource.getId()).map(UUID::toString).orElse(null));
userToNotify.setRoles(StringUtils.isNotBlank(onboardedProduct.getProductRole()) ? List.of(onboardedProduct.getProductRole()) : Collections.emptyList());
userToNotify.setRole(Optional.ofNullable(onboardedProduct.getRole()).map(Enum::name).orElse(null));
return userToNotify;
}

@Named("mapUser")
default UserToNotify mapUser(UserResource userResource, String userMailUuid, OnboardedProduct onboardedProduct) {
UserToNotify userToNotify = new UserToNotify();
userToNotify.setUserId(Optional.ofNullable(userResource.getId()).map(UUID::toString).orElse(null));
userToNotify.setName(Optional.ofNullable(userResource.getName()).map(CertifiableFieldResourceOfstring::getValue).orElse(null));
userToNotify.setFamilyName(Optional.ofNullable(userResource.getFamilyName()).map(CertifiableFieldResourceOfstring::getValue).orElse(null));
userToNotify.setEmail(Optional.ofNullable(userMailUuid).map(mailUuid -> retrieveMailFromWorkContacts(userResource, mailUuid)).orElse(null));
userToNotify.setProductRole(onboardedProduct.getProductRole());
userToNotify.setRole(Optional.ofNullable(onboardedProduct.getRole()).map(Enum::name).orElse(null));
userToNotify.setRelationshipStatus(onboardedProduct.getStatus());
return userToNotify;
}

default String retrieveMailFromWorkContacts(UserResource userResource, String userMailUuid) {
return Optional.ofNullable(userResource.getWorkContacts())
.flatMap(stringWorkContactResourceMap -> Optional.ofNullable(stringWorkContactResourceMap.get(userMailUuid))
.flatMap(workContactResource -> Optional.ofNullable(workContactResource.getEmail())
.map(CertifiableFieldResourceOfstring::getValue)))
.orElse(null);
}
}
11 changes: 8 additions & 3 deletions apps/user-cdc/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ quarkus.mongodb.connection-string = ${MONGODB-CONNECTION-STRING}
quarkus.mongodb.database = selcUser

#False for pnpg use case because we must not send events
user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:false}
user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:true}
user-cdc.send-events-fd.watch.enabled=${USER_CDC_SEND_EVENTS_FD_WATCH_ENABLED:true}
user-cdc.appinsights.connection-string=${APPLICATIONINSIGHTS_CONNECTION_STRING:InstrumentationKey=00000000-0000-0000-0000-000000000000}
user-cdc.table.name=${START_AT_TABLE_NAME:CdCStartAt}
user-cdc.storage.connection-string=${STORAGE_CONNECTION_STRING:UseDevelopmentStorage=true;}
Expand All @@ -23,6 +24,10 @@ quarkus.openapi-generator.codegen.spec.user_registry_json.additional-model-type-
quarkus.openapi-generator.user_registry_json.auth.api_key.api-key = ${USER-REGISTRY-API-KEY:example-api-key}
quarkus.rest-client."org.openapi.quarkus.user_registry_json.api.UserApi".url=${USER_REGISTRY_URL:http://localhost:8080}

quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}
quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}${EVENT_HUB_SC_USERS_TOPIC:sc-users}
eventhub.rest-client.keyName=${SHARED_ACCESS_KEY_NAME:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}

quarkus.rest-client.event-hub-fd.url=${EVENT_HUB_BASE_PATH:test}${EVENT_HUB_SELFCARE_FD_TOPIC:selfcare-fd}
eventhubfd.rest-client.keyName=${FD_SHARED_ACCESS_KEY_NAME:test}
eventhubfd.rest-client.key=${EVENTHUB_SELFCARE_FD_EXTERNAL_KEY_LC:test}
Loading

0 comments on commit 612d21f

Please sign in to comment.