Skip to content

Commit

Permalink
[SELC-5981] changed user-cdc business logic to eveluate if mail is ch…
Browse files Browse the repository at this point in the history
…anged
  • Loading branch information
flaminiaScarciofolo committed Nov 13, 2024
1 parent e84cfae commit edfc419
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.client.EventHubFdRestClient;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.FdUserNotificationToSend;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +37,7 @@
import org.openapi.quarkus.user_registry_json.api.UserApi;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.concurrent.TimeoutException;

Expand All @@ -50,6 +52,8 @@
import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_CDC_NAME;
import static it.pagopa.selfcare.user.model.constants.EventsName.FD_EVENT_USER_CDC_NAME;
import static java.util.Arrays.asList;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.nullsLast;

@Startup
@Slf4j
Expand All @@ -72,6 +76,8 @@ public class UserInstitutionCdcService {
private final Integer retryMinBackOff;
private final Integer retryMaxBackOff;
private final Integer maxRetry;
private final boolean sendEventsEnabled;
private final boolean sendFdEventsEnabled;


@RestClient
Expand Down Expand Up @@ -107,11 +113,13 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
this.telemetryClient = telemetryClient;
this.tableClient = tableClient;
this.notificationMapper = notificationMapper;
this.sendEventsEnabled = sendEventsEnabled;
this.sendFdEventsEnabled = sendFdEventsEnabled;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream(sendEventsEnabled, sendFdEventsEnabled);
initOrderStream();
}

private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnabled) {
private void initOrderStream() {
log.info("Starting initOrderStream ... ");

//Retrieve last resumeToken for watching collection at specific operation
Expand Down Expand Up @@ -140,12 +148,42 @@ private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnab

Multi<ChangeStreamDocument<UserInstitution>> publisher = dataCollection.watch(pipeline, UserInstitution.class, options);
publisher.subscribe().with(
this::consumerUserInstitutionRepositoryEvent,
document -> propagateDocumentToConsumers(document, publisher),
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});
log.info("Completed initOrderStream ... ");
}

/**
*
This method acts as a gateway to direct the modified entity to the correct consumers based on the following logic:
if the modification concerns the user's email, it will invoke the consumer to send events to both the sc-user queue and the selfcare-fd queue,
but the latter only if the entity contains at least one active product from prod-fd or prod-fd-garantito. If, however, the modification concerns one
of the products within the entity, all three consumers will be invoked (two consumers for the events and the consumer for updating the userinfo collection).
*/
public void propagateDocumentToConsumers(ChangeStreamDocument<UserInstitution> document, Multi<ChangeStreamDocument<UserInstitution>> publisher) {
assert document.getFullDocument() != null;
assert document.getDocumentKey() != null;
UserInstitution userInstitutionChanged = document.getFullDocument();

boolean hasActiveFdProduct = userInstitutionChanged.getProducts().stream()
.anyMatch(product -> (PROD_FD.getValue().equals(product.getProductId()) || PROD_FD_GARANTITO.getValue().equals(product.getProductId()))
&& OnboardedProductState.ACTIVE.equals(product.getStatus()));

boolean userMailIsChanged = isUserMailChanged(userInstitutionChanged);

if (Boolean.FALSE.equals(userMailIsChanged)) {
publisher.subscribe().with(
this::consumerUserInstitutionRepositoryEvent,
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});
}

if (Boolean.TRUE.equals(sendEventsEnabled)) {
publisher.subscribe().with(
Expand All @@ -157,18 +195,27 @@ private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnab
});
}

if (Boolean.TRUE.equals(sendFdEventsEnabled)) {
if (Boolean.TRUE.equals(sendFdEventsEnabled) && hasActiveFdProduct) {
publisher.subscribe().with(
this::consumerToSendUserEventForFD,
subscription -> consumerToSendUserEventForFD(document, userMailIsChanged),
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}
}


log.info("Completed initOrderStream ... ");
private boolean isUserMailChanged(UserInstitution userInstitutionChanged) {
OffsetDateTime maxProductUpdateAt = null;
if (Objects.nonNull(userInstitutionChanged.getProducts()) && !userInstitutionChanged.getProducts().isEmpty()) {
maxProductUpdateAt = userInstitutionChanged.getProducts().stream()
.max(Comparator.comparing(OnboardedProduct::getUpdatedAt, nullsLast(naturalOrder())))
.map(OnboardedProduct::getUpdatedAt)
.orElse(null);
}
OffsetDateTime maxUserMailUpdateAt = userInstitutionChanged.getUserMailUpdatedAt();
return Objects.nonNull(maxProductUpdateAt) && Objects.nonNull(maxUserMailUpdateAt) && maxUserMailUpdateAt.isAfter(maxProductUpdateAt);
}

private ReactiveMongoCollection<UserInstitution> getCollection() {
Expand Down Expand Up @@ -241,7 +288,15 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
});
}

public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document) {

/**
*
This method handles the sending of events for the selfcare fd topic.
In case the modification concerns the user's email, it sends two events: the first of type DELETE and the second of type ACTIVE.
On the other hand, if the modification relates to the activation, suspension, reactivation, or cancellation of the prod-fd or prod-fd-garantito product,
it will send a single event indicating the type of action performed.
*/
public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document, boolean isUserMailChanged) {

if (Objects.nonNull(document.getFullDocument()) && Objects.nonNull(document.getDocumentKey())) {
UserInstitution userInstitutionChanged = document.getFullDocument();
Expand All @@ -251,10 +306,21 @@ public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> d
userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD.getValue(), PROD_FD_GARANTITO.getValue())))
.onItem().ifNotNull().transform(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> {
.onItem().transformToMulti(userResource -> Multi.createFrom().iterable(UserUtils.retrieveFdProduct(userInstitutionChanged.getProducts(), List.of(PROD_FD.getValue(), PROD_FD_GARANTITO.getValue()), isUserMailChanged))
.onItem().transformToUniAndMerge(onboardedProduct -> {
FdUserNotificationToSend fdUserNotificationToSend = notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct));
log.info("Sending message to EventHubFdRestClient ... ");
if (isUserMailChanged && NotificationUserType.ACTIVE_USER.equals(fdUserNotificationToSend.getType())) {
log.info("User mail is changed, sending DELETE_USER event first ... ");
FdUserNotificationToSend fdUserNotificationToSendDelete = notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, NotificationUserType.DELETE_USER);
return eventHubFdRestClient.sendMessage(fdUserNotificationToSendDelete)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSendDelete)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSendDelete)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D)))
.onItem().transformToUni(unused -> eventHubFdRestClient.sendMessage(fdUserNotificationToSend))
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D)));
}
return eventHubFdRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.experimental.FieldNameConstants;
import org.bson.types.ObjectId;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -24,5 +25,6 @@ public class UserInstitution extends ReactivePanacheMongoEntity {
private String institutionRootName;
private List<OnboardedProduct> products = new ArrayList<>();
private String userMailUuid;
private OffsetDateTime userMailUpdatedAt;

}
Loading

0 comments on commit edfc419

Please sign in to comment.