diff --git a/pom.xml b/pom.xml index a38d3745..b864b7bb 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ it.pagopa.pn pn-ec-aruba-pec - 1.1.0-SNAPSHOT + 1.1.0 org.eclipse.angus diff --git a/src/main/java/it/pagopa/pn/ec/scaricamentoesitipec/scheduler/ScaricamentoEsitiPecScheduler.java b/src/main/java/it/pagopa/pn/ec/scaricamentoesitipec/scheduler/ScaricamentoEsitiPecScheduler.java index 6a0179dd..4a7bf154 100644 --- a/src/main/java/it/pagopa/pn/ec/scaricamentoesitipec/scheduler/ScaricamentoEsitiPecScheduler.java +++ b/src/main/java/it/pagopa/pn/ec/scaricamentoesitipec/scheduler/ScaricamentoEsitiPecScheduler.java @@ -3,6 +3,7 @@ import it.pagopa.pn.commons.utils.MDCUtils; import it.pagopa.pn.library.pec.model.IPostacert; import it.pagopa.pn.library.pec.model.pojo.PnEcPecListOfMessages; +import it.pagopa.pn.library.pec.model.pojo.PnEcPecMessage; import it.pagopa.pn.library.pec.model.pojo.PnPostacert; import it.pagopa.pn.library.pec.service.DaticertService; import it.pagopa.pn.ec.commons.service.SqsService; @@ -18,6 +19,7 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuples; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -57,7 +59,7 @@ public void scaricamentoEsitiPecScheduler() { AtomicBoolean hasMessages = new AtomicBoolean(); hasMessages.set(true); - MDCUtils.addMDCToContextAndExecute(pnPecService.getMessageCount() + pnPecService.getMessageCount() .then(Mono.defer(() -> pnPecService.getUnreadMessages(Integer.parseInt(scaricamentoEsitiPecProperties.getMessagesLimit())))) .flatMap(pnGetMessagesResponse -> { var listOfMessages = pnGetMessagesResponse.getPnEcPecListOfMessages(); @@ -66,73 +68,77 @@ public void scaricamentoEsitiPecScheduler() { return Mono.justOrEmpty(listOfMessages); }) .flatMapIterable(PnEcPecListOfMessages::getMessages) - .flatMap(pnEcMessage -> { - byte[] message = pnEcMessage.getMessage(); - String providerName = pnEcMessage.getProviderName(); + .flatMap(this::lavorazioneEsito, limitRate) + .doOnError(throwable -> log.fatal(SCARICAMENTO_ESITI_PEC, throwable)) + .onErrorResume(throwable -> Mono.empty()) + .repeat(hasMessages::get) + .doOnComplete(() -> log.logEndingProcess(SCARICAMENTO_ESITI_PEC)) + .subscribe(); + + } + + public Mono lavorazioneEsito(PnEcPecMessage pecMessage) { + + byte[] message = pecMessage.getMessage(); + String providerName = pecMessage.getProviderName(); + var mimeMessage = getMimeMessage(message); + var messageID = getMessageIdFromMimeMessage(mimeMessage); + //Rimozione delle parentesi angolari dal messageID + if (messageID.startsWith("<") && messageID.endsWith(">")) + messageID = messageID.substring(1, messageID.length() - 1); + MDC.put(MDC_CORR_ID_KEY, messageID); + var finalMessageID = messageID; - var mimeMessage = getMimeMessage(message); - var messageID = getMessageIdFromMimeMessage(mimeMessage); - //Rimozione delle parentesi angolari dal messageID - if (messageID.startsWith("<") && messageID.endsWith(">")) - messageID = messageID.substring(1, messageID.length() - 1); - MDC.put(MDC_CORR_ID_KEY, messageID); - var finalMessageID = messageID; - var attachBytes = getAttachmentFromMimeMessage(mimeMessage, "daticert.xml"); + return MDCUtils.addMDCToContextAndExecute(Mono.defer(() -> { - log.debug(SCARICAMENTO_ESITI_PEC + " - Try to download PEC '{}' daticert.xml", finalMessageID); + var attachBytes = getAttachmentFromMimeMessage(mimeMessage, "daticert.xml"); + + log.debug(SCARICAMENTO_ESITI_PEC + " - Try to download PEC '{}' daticert.xml", finalMessageID); // Check se daticert.xml รจ presente controllando la lunghezza del byte[] - if (!Objects.isNull(attachBytes) && attachBytes.length > 0) { + if (!Objects.isNull(attachBytes) && attachBytes.length > 0) { - log.debug(SCARICAMENTO_ESITI_PEC + " - PEC '{}' has daticert.xml", finalMessageID); + log.debug(SCARICAMENTO_ESITI_PEC + " - PEC '{}' has daticert.xml", finalMessageID); // Deserialize daticert.xml. Start a new Mono inside the flatMap - return Mono.fromCallable(() -> daticertService.getPostacertFromByteArray(attachBytes)) + return Mono.fromCallable(() -> daticertService.getPostacertFromByteArray(attachBytes)) // Escludere questi daticert. Non sono delle 'comunicazione esiti' - .filter(isPostaCertificataPredicate.negate()) + .filter(isPostaCertificataPredicate.negate()) // msgid arriva all'interno di due angolari . Eliminare il primo e l'ultimo carattere - .map(postacert -> { - var dati = postacert.getDati(); - var msgId = dati.getMsgid(); - dati.setMsgid(removeBracketsFromMessageId(msgId)); - log.debug(SCARICAMENTO_ESITI_PEC + "- PEC '{}' has '{}' msgId", finalMessageID, msgId); - return postacert; - }) + .map(postacert -> { + var dati = postacert.getDati(); + var msgId = dati.getMsgid(); + dati.setMsgid(removeBracketsFromMessageId(msgId)); + log.debug(SCARICAMENTO_ESITI_PEC + "- PEC '{}' has '{}' msgId", finalMessageID, msgId); + return postacert; + }) // Escludere questi daticert. Non avendo il msgid terminante con il dominio pago non sono state inviate // da noi - .filter(endsWithDomainPredicate) + .filter(endsWithDomainPredicate) // Daticert filtrati - .doOnDiscard(PnPostacert.class, postacert -> { - if (isPostaCertificataPredicate.test(postacert)) { - log.debug(PEC_DISCARDED, finalMessageID, SCARICAMENTO_ESITI_PEC, POSTA_CERTIFICATA); - } else if (!endsWithDomainPredicate.test(postacert)) { - log.debug(PEC_DISCARDED,finalMessageID, SCARICAMENTO_ESITI_PEC, NOT_SENT_BY_US); - } - }) - .flatMap(unused -> sqsService.sendWithLargePayload(scaricamentoEsitiPecProperties.sqsQueueName(), - finalMessageID, - storageSqsMessagesStagingBucket, - RicezioneEsitiPecDto.builder() - .messageID(finalMessageID) - .message(message) - .receiversDomain(getDomainFromAddress(getFromFromMimeMessage(mimeMessage)[0])) - .retry(0) - .build())) - .thenReturn(Tuples.of(finalMessageID, providerName)); - } - else return Mono.just(Tuples.of(finalMessageID, providerName)); - }) - //Marca il messaggio come letto. - .flatMap(tuple -> pnPecService.markMessageAsRead(tuple.getT1(), tuple.getT2()), limitRate) - .doOnError(throwable -> log.fatal(SCARICAMENTO_ESITI_PEC, throwable)) - .onErrorResume(throwable -> Mono.empty()) - .repeat(hasMessages::get) - .doOnComplete(() -> log.logEndingProcess(SCARICAMENTO_ESITI_PEC))) - .subscribe(); - + .doOnDiscard(PnPostacert.class, postacert -> { + if (isPostaCertificataPredicate.test(postacert)) { + log.debug(PEC_DISCARDED, finalMessageID, SCARICAMENTO_ESITI_PEC, POSTA_CERTIFICATA); + } else if (!endsWithDomainPredicate.test(postacert)) { + log.debug(PEC_DISCARDED, finalMessageID, SCARICAMENTO_ESITI_PEC, NOT_SENT_BY_US); + } + }) + .flatMap(unused -> sqsService.sendWithLargePayload(scaricamentoEsitiPecProperties.sqsQueueName(), + finalMessageID, + storageSqsMessagesStagingBucket, + RicezioneEsitiPecDto.builder() + .messageID(finalMessageID) + .message(message) + .receiversDomain(getDomainFromAddress(getFromFromMimeMessage(mimeMessage)[0])) + .retry(0) + .build())) + .thenReturn(Tuples.of(finalMessageID, providerName)); + } else return Mono.just(Tuples.of(finalMessageID, providerName)); + }) + //Marca il messaggio come letto. + .flatMap(tuple -> pnPecService.markMessageAsRead(tuple.getT1(), tuple.getT2()))); } - }