Skip to content

Commit

Permalink
PN-10456: restored hotfix/PN-10525
Browse files Browse the repository at this point in the history
  • Loading branch information
mottone-dgs committed Apr 4, 2024
1 parent 82d6d66 commit d8e0762
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 136 deletions.
13 changes: 6 additions & 7 deletions src/main/java/it/pagopa/pn/ec/commons/utils/LogUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,8 @@ private LogUtils() {
public static final String PEC_GET_ATTACHMENTS = "PecService.getAttachments()";
public static final String PEC_DOWNLOAD_ATTACHMENT = "PecService.downloadAttachment()";
public static final String PEC_SEND_MAIL = "PecService.sendMail()";
public static final String PN_PEC_SEND_MAIL = "PnPecService.sendMail()";
public static final String PEC_SEND_MESSAGE = "PnPecService.sendMessage()";
public static final String PEC_GET_UNREAD_MESSAGES = "PnPecService.getUnreadMessages()";
public static final String PEC_SEND_MESSAGE = "PecService.sendMessage()";
public static final String PEC_SET_MESSAGE_ID_IN_REQUEST_METADATA = "PecService.setMessageIdInRequestMetadata()";
public static final String PEC_MARK_MESSAGE_AS_READ = "PnPecService.markMessageAsRead()";


//PAPER
public static final String SEND_PAPER_ENGAGE_REQUEST = "sendPaperEngageRequest";
Expand Down Expand Up @@ -222,8 +218,11 @@ private LogUtils() {

//PN-PEC
public static final String PN_PEC = "pn-pec";
public static final String PEC_GET_MESSAGE_COUNT = "PnPecService.getMessagesCount()";
public static final String PEC_DELETE_MESSAGE = "PnPecService.deleteMessage()";
public static final String PN_EC_PEC_SEND_MAIL = "PnEcPecService.sendMail()";
public static final String PN_EC_PEC_GET_UNREAD_MESSAGES = "PnEcPecService.getUnreadMessages()";
public static final String PN_EC_PEC_GET_MESSAGE_COUNT = "PnEcPecService.getMessagesCount()";
public static final String PN_EC_PEC_MARK_MESSAGE_AS_READ = "PnEcPecService.markMessageAsRead()";
public static final String PN_EC_PEC_DELETE_MESSAGE = "PnEcPecService.deleteMessage()";
public static final String NOT_VALID_FOR_DELETE = "Event with requestId '{}' is not valid for delete.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import it.pagopa.pn.ec.pec.exception.MaxSizeExceededException;
import it.pagopa.pn.ec.pec.model.pojo.PecPresaInCaricoInfo;
import it.pagopa.pn.ec.rest.v1.dto.*;
import it.pagopa.pn.library.pec.service.PnPecService;
import it.pagopa.pn.library.pec.service.PnEcPecService;
import lombok.CustomLog;
import org.apache.commons.io.output.CountingOutputStream;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.Scheduled;
Expand Down Expand Up @@ -75,7 +74,7 @@
public class PecService extends PresaInCaricoService implements QueueOperationsService {

private final SqsService sqsService;
private final PnPecService pnPecService;
private final PnEcPecService pnPecService;
private final GestoreRepositoryCall gestoreRepositoryCall;
private final AttachmentServiceImpl attachmentService;
private final DownloadCall downloadCall;
Expand All @@ -85,7 +84,7 @@ public class PecService extends PresaInCaricoService implements QueueOperationsS
private final PnPecConfigurationProperties pnPecProps;
private String idSaved;

protected PecService(AuthService authService,@Qualifier("pnPecServiceImpl") PnPecService pnPecService, GestoreRepositoryCall gestoreRepositoryCall, SqsService sqsService
protected PecService(AuthService authService, PnEcPecService pnPecService, GestoreRepositoryCall gestoreRepositoryCall, SqsService sqsService
, AttachmentServiceImpl attachmentService, DownloadCall downloadCall, NotificationTrackerSqsName notificationTrackerSqsName, PecSqsQueueName pecSqsQueueName, @Value("${lavorazione-pec.max-thread-pool-size}") Integer maxThreadPoolSize, PnPecConfigurationProperties pnPecProps) {
super(authService);
this.pnPecService = pnPecService;
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/it/pagopa/pn/ec/pec/utils/MessageIdUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import it.pagopa.pn.ec.pec.exception.MessageIdException;
import org.springframework.util.Base64Utils;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;

public class MessageIdUtils {

private static final String SEPARATORE = "~";
Expand Down Expand Up @@ -47,17 +50,22 @@ public static String encodeMessageId(String idClient, String idRequest) {
*/
public static PresaInCaricoInfo decodeMessageId(String messageId) {
try {
var splitAtPipe = messageId.split(SEPARATORE);
var splitAtPipe = removeBracketsFromMessageId(messageId).split(SEPARATORE);
var base64ClientId = splitAtPipe[0];
var base64RequestId = splitAtPipe[1].split(String.valueOf(DOMAIN.charAt(0)))[0];
var decodedClientId = new String(Base64Utils.decodeFromString(base64ClientId));
var decodedRequestId = new String(Base64Utils.decodeFromString(base64RequestId));
//Rimuove le parentesi angolari da inizio clientID e fine requestID, se presenti.
return new PresaInCaricoInfo(decodedRequestId.endsWith(">") ? decodedRequestId.substring(0, decodedRequestId.length() - 1) : decodedRequestId,
decodedClientId.startsWith("<") ? decodedClientId.substring(1) : decodedClientId,
new StepError());
return new PresaInCaricoInfo(decodedRequestId, decodedClientId, new StepError());
} catch (Exception e) {
throw new MessageIdException.DecodeMessageIdException();
}
}

public static String removeBracketsFromMessageId(String messageId) {
//Rimuove le parentesi angolari dal messageID.
messageId = URLDecoder.decode(messageId, StandardCharsets.UTF_8);
if (messageId.startsWith("<") && messageId.endsWith(">"))
messageId = messageId.substring(1, messageId.length() - 1);
return messageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

import it.pagopa.pn.commons.utils.MDCUtils;
import it.pagopa.pn.library.pec.model.IPostacert;
import it.pagopa.pn.library.pec.model.pojo.PnEcPecListOfMessages;
import it.pagopa.pn.library.pec.model.pojo.PnPostacert;
import it.pagopa.pn.library.pec.service.DaticertService;
import it.pagopa.pn.ec.commons.service.SqsService;
import it.pagopa.pn.ec.scaricamentoesitipec.configurationproperties.ScaricamentoEsitiPecProperties;
import it.pagopa.pn.ec.scaricamentoesitipec.model.pojo.RicezioneEsitiPecDto;
import it.pagopa.pn.ec.scaricamentoesitipec.utils.ScaricamentoEsitiPecUtils;
import it.pagopa.pn.library.pec.pojo.PnListOfMessages;
import it.pagopa.pn.library.pec.service.PnPecService;
import it.pagopa.pn.library.pec.service.PnEcPecService;
import lombok.CustomLog;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import static it.pagopa.pn.ec.commons.utils.EmailUtils.*;
import static it.pagopa.pn.ec.commons.utils.LogUtils.*;
import static it.pagopa.pn.ec.pec.utils.MessageIdUtils.DOMAIN;
import static it.pagopa.pn.ec.pec.utils.MessageIdUtils.*;
import static it.pagopa.pn.ec.scaricamentoesitipec.constant.PostacertTypes.POSTA_CERTIFICATA;

@Component
Expand All @@ -32,13 +32,13 @@ public class ScaricamentoEsitiPecScheduler {
private final DaticertService daticertService;
private final SqsService sqsService;
private final ScaricamentoEsitiPecProperties scaricamentoEsitiPecProperties;
private final PnPecService pnPecService;
private final PnEcPecService pnPecService;
@Value("${scaricamento-esiti-pec.limit-rate}")
private Integer limitRate;
@Value("${pn.ec.storage.sqs.messages.staging.bucket}")
private String storageSqsMessagesStagingBucket;

public ScaricamentoEsitiPecScheduler(DaticertService daticertService, SqsService sqsService, ScaricamentoEsitiPecProperties scaricamentoEsitiPecProperties, @Qualifier("pnPecServiceImpl") PnPecService pnPecService) {
public ScaricamentoEsitiPecScheduler(DaticertService daticertService, SqsService sqsService, ScaricamentoEsitiPecProperties scaricamentoEsitiPecProperties, PnEcPecService pnPecService) {
this.daticertService = daticertService;
this.sqsService = sqsService;
this.scaricamentoEsitiPecProperties = scaricamentoEsitiPecProperties;
Expand All @@ -47,7 +47,7 @@ public ScaricamentoEsitiPecScheduler(DaticertService daticertService, SqsService

private final Predicate<IPostacert> isPostaCertificataPredicate = postacert -> postacert.getTipo().equals(POSTA_CERTIFICATA);
private final Predicate<IPostacert> endsWithDomainPredicate = postacert -> postacert.getDati().getMsgid().endsWith(DOMAIN);
private final Predicate<PnListOfMessages> hasNoMessages = pnListOfMessages -> Objects.isNull(pnListOfMessages) || Objects.isNull(pnListOfMessages.getMessages()) || pnListOfMessages.getMessages().isEmpty();
private final Predicate<PnEcPecListOfMessages> hasNoMessages = pnEcPecListOfMessages -> Objects.isNull(pnEcPecListOfMessages) || Objects.isNull(pnEcPecListOfMessages.getMessages()) || pnEcPecListOfMessages.getMessages().isEmpty();

@Scheduled(cron = "${PnEcCronScaricamentoEsitiPec ?:0 */5 * * * *}")
public void scaricamentoEsitiPecScheduler() {
Expand All @@ -60,13 +60,15 @@ public void scaricamentoEsitiPecScheduler() {
MDCUtils.addMDCToContextAndExecute(pnPecService.getMessageCount()
.then(Mono.defer(() -> pnPecService.getUnreadMessages(Integer.parseInt(scaricamentoEsitiPecProperties.getMessagesLimit()))))
.flatMap(pnGetMessagesResponse -> {
var listOfMessages = pnGetMessagesResponse.getPnListOfMessages();
var listOfMessages = pnGetMessagesResponse.getPnEcPecListOfMessages();
if (hasNoMessages.test(listOfMessages))
hasMessages.set(false);
return Mono.justOrEmpty(listOfMessages);
})
.flatMapIterable(PnListOfMessages::getMessages)
.flatMap(message -> {
.flatMapIterable(PnEcPecListOfMessages::getMessages)
.flatMap(pnEcMessage -> {
byte[] message = pnEcMessage.getMessage();
String providerName = pnEcMessage.getProviderName();

var mimeMessage = getMimeMessage(message);
var messageID = getMessageIdFromMimeMessage(mimeMessage);
Expand All @@ -93,8 +95,7 @@ public void scaricamentoEsitiPecScheduler() {
.map(postacert -> {
var dati = postacert.getDati();
var msgId = dati.getMsgid();
if (msgId.startsWith("<") && msgId.endsWith(">"))
dati.setMsgid(msgId.substring(1, msgId.length() - 1));
dati.setMsgid(removeBracketsFromMessageId(msgId));
log.debug(SCARICAMENTO_ESITI_PEC + "- PEC '{}' has '{}' msgId", finalMessageID, msgId);
return postacert;
})
Expand All @@ -120,12 +121,12 @@ public void scaricamentoEsitiPecScheduler() {
.receiversDomain(getDomainFromAddress(getFromFromMimeMessage(mimeMessage)[0]))
.retry(0)
.build()))
.thenReturn(finalMessageID);
.thenReturn(Tuples.of(finalMessageID, providerName));
}
else return Mono.just(finalMessageID);
else return Mono.just(Tuples.of(finalMessageID, providerName));
})
//Marca il messaggio come letto.
.flatMap(pnPecService::markMessageAsRead, limitRate)
.flatMap(tuple -> pnPecService.markMessageAsRead(tuple.getT1(), tuple.getT2()), limitRate)
.doOnError(throwable -> log.fatal(SCARICAMENTO_ESITI_PEC, throwable))
.onErrorResume(throwable -> Mono.empty())
.repeat(hasMessages::get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,30 @@
import it.pagopa.pn.ec.commons.rest.call.ec.gestorerepository.GestoreRepositoryCall;
import it.pagopa.pn.ec.rest.v1.dto.EventsDto;
import it.pagopa.pn.ec.scaricamentoesitipec.model.pojo.CancellazioneRicevutePecDto;
import it.pagopa.pn.library.pec.service.PnPecService;
import it.pagopa.pn.library.pec.service.PnEcPecService;
import lombok.CustomLog;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.Semaphore;

import static it.pagopa.pn.ec.commons.constant.Status.ACCEPTED;
import static it.pagopa.pn.ec.commons.utils.LogUtils.*;
import static it.pagopa.pn.ec.commons.utils.RequestUtils.concatRequestId;

@Service
@CustomLog
public class CancellazioneRicevutePecService {

private final PnPecService pnPecService;
private final PnEcPecService pnPecService;
private final GestoreRepositoryCall gestoreRepositoryCall;
private final Semaphore semaphore;

public CancellazioneRicevutePecService(@Qualifier("pnPecServiceImpl") PnPecService pnPecService, GestoreRepositoryCall gestoreRepositoryCall, @Value("${cancellazione-ricevute-pec.max-thread-pool-size}") Integer maxThreadPoolSize) {
public CancellazioneRicevutePecService(PnEcPecService pnPecService, GestoreRepositoryCall gestoreRepositoryCall, @Value("${cancellazione-ricevute-pec.max-thread-pool-size}") Integer maxThreadPoolSize) {
this.pnPecService = pnPecService;
this.gestoreRepositoryCall = gestoreRepositoryCall;
log.debug("{} max thread pool size : {} ", CANCELLAZIONE_RICEVUTE_PEC, maxThreadPoolSize);
Expand Down Expand Up @@ -66,15 +67,17 @@ public Mono<Void> cancellazioneRicevutePec(final CancellazioneRicevutePecDto can
return Mono.justOrEmpty(requestDto.getRequestMetadata())
.flatMapMany(requestMetadataDto -> Flux.fromIterable(requestMetadataDto.getEventsList()))
.map(EventsDto::getDigProgrStatus)
.filter(digitalProgressStatusDto -> digitalLegal.getEventCode().getValue().equals(digitalProgressStatusDto.getStatusCode()))
.next()
.doOnSuccess(digitalProgressStatusDto -> {
if (digitalProgressStatusDto == null)
log.warn(NOT_VALID_FOR_DELETE, requestId);
.reduce(new MutablePair<String, String>(), (pair, digitalProgressStatusDto) -> {
if (digitalProgressStatusDto.getStatus().equals(ACCEPTED.getStatusTransactionTableCompliant()))
pair.setRight(digitalProgressStatusDto.getGeneratedMessage().getId());
else if (digitalLegal.getEventCode().getValue().equals(digitalProgressStatusDto.getStatusCode()))
pair.setLeft(digitalProgressStatusDto.getGeneratedMessage().getId());
return pair;
});
})
.map(digitalProgressStatusDto -> digitalProgressStatusDto.getGeneratedMessage().getId())
.flatMap(pnPecService::deleteMessage)
.filter(pair -> pair.getLeft() != null && pair.getRight() != null)
.doOnDiscard(Pair.class, pair -> log.warn(NOT_VALID_FOR_DELETE, requestId))
.flatMap(pair -> pnPecService.deleteMessage(pair.getLeft(), pair.getRight()))
.doOnError(throwable -> log.fatal(CANCELLAZIONE_RICEVUTE_PEC, throwable, throwable.getMessage()))
.doOnSuccess(result -> acknowledgment.acknowledge())
.doFinally(signalType -> semaphore.release());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import reactor.util.function.Tuples;

import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package it.pagopa.pn.library.pec.model.pojo;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class PnEcPecGetMessagesResponse {

private PnEcPecListOfMessages pnEcPecListOfMessages;
private int numOfMessages;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package it.pagopa.pn.library.pec.model.pojo;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.List;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class PnEcPecListOfMessages {

private List<PnEcPecMessage> messages;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package it.pagopa.pn.library.pec.model.pojo;

import lombok.*;
import lombok.Data;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class PnEcPecMessage {

@ToString.Exclude
private byte[] message;
private String providerName;

public PnEcPecMessage message(byte[] message) {
this.message = message;
return this;
}

public PnEcPecMessage providerName(String providerName) {
this.providerName = providerName;
return this;
}

}
16 changes: 16 additions & 0 deletions src/main/java/it/pagopa/pn/library/pec/service/PnEcPecService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package it.pagopa.pn.library.pec.service;

import it.pagopa.pn.library.pec.model.pojo.PnEcPecGetMessagesResponse;
import reactor.core.publisher.Mono;

public interface PnEcPecService {
Mono<String> sendMail(byte[] message);

Mono<Void> markMessageAsRead(String messageID, String providerName);

Mono<Void> deleteMessage(String messageID, String senderMessageID);

Mono<PnEcPecGetMessagesResponse> getUnreadMessages(int limit);

Mono<Integer> getMessageCount();
}
Loading

0 comments on commit d8e0762

Please sign in to comment.