Skip to content

Commit

Permalink
PN-10443: changed pn-ec-aruba-pec version to 1.1.0, added CX id propa…
Browse files Browse the repository at this point in the history
…gation to markMessageAsRead
  • Loading branch information
ElenaPagnacco committed Apr 9, 2024
1 parent c6188fa commit 7752caf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>it.pagopa.pn</groupId>
<artifactId>pn-ec-aruba-pec</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.angus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import it.pagopa.pn.commons.utils.MDCUtils;
import it.pagopa.pn.library.pec.model.IPostacert;
import it.pagopa.pn.library.pec.model.pojo.PnEcPecListOfMessages;
import it.pagopa.pn.library.pec.model.pojo.PnEcPecMessage;
import it.pagopa.pn.library.pec.model.pojo.PnPostacert;
import it.pagopa.pn.library.pec.service.DaticertService;
import it.pagopa.pn.ec.commons.service.SqsService;
Expand All @@ -18,6 +19,7 @@
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void scaricamentoEsitiPecScheduler() {
AtomicBoolean hasMessages = new AtomicBoolean();
hasMessages.set(true);

MDCUtils.addMDCToContextAndExecute(pnPecService.getMessageCount()
pnPecService.getMessageCount()
.then(Mono.defer(() -> pnPecService.getUnreadMessages(Integer.parseInt(scaricamentoEsitiPecProperties.getMessagesLimit()))))
.flatMap(pnGetMessagesResponse -> {
var listOfMessages = pnGetMessagesResponse.getPnEcPecListOfMessages();
Expand All @@ -66,73 +68,77 @@ public void scaricamentoEsitiPecScheduler() {
return Mono.justOrEmpty(listOfMessages);
})
.flatMapIterable(PnEcPecListOfMessages::getMessages)
.flatMap(pnEcMessage -> {
byte[] message = pnEcMessage.getMessage();
String providerName = pnEcMessage.getProviderName();
.flatMap(this::lavorazioneEsito, limitRate)
.doOnError(throwable -> log.fatal(SCARICAMENTO_ESITI_PEC, throwable))
.onErrorResume(throwable -> Mono.empty())
.repeat(hasMessages::get)
.doOnComplete(() -> log.logEndingProcess(SCARICAMENTO_ESITI_PEC))
.subscribe();

}

public Mono<Void> lavorazioneEsito(PnEcPecMessage pecMessage) {

byte[] message = pecMessage.getMessage();
String providerName = pecMessage.getProviderName();
var mimeMessage = getMimeMessage(message);
var messageID = getMessageIdFromMimeMessage(mimeMessage);
//Rimozione delle parentesi angolari dal messageID
if (messageID.startsWith("<") && messageID.endsWith(">"))
messageID = messageID.substring(1, messageID.length() - 1);
MDC.put(MDC_CORR_ID_KEY, messageID);
var finalMessageID = messageID;

var mimeMessage = getMimeMessage(message);
var messageID = getMessageIdFromMimeMessage(mimeMessage);
//Rimozione delle parentesi angolari dal messageID
if (messageID.startsWith("<") && messageID.endsWith(">"))
messageID = messageID.substring(1, messageID.length() - 1);
MDC.put(MDC_CORR_ID_KEY, messageID);
var finalMessageID = messageID;
var attachBytes = getAttachmentFromMimeMessage(mimeMessage, "daticert.xml");
return MDCUtils.addMDCToContextAndExecute(Mono.defer(() -> {

log.debug(SCARICAMENTO_ESITI_PEC + " - Try to download PEC '{}' daticert.xml", finalMessageID);
var attachBytes = getAttachmentFromMimeMessage(mimeMessage, "daticert.xml");

log.debug(SCARICAMENTO_ESITI_PEC + " - Try to download PEC '{}' daticert.xml", finalMessageID);

// Check se daticert.xml è presente controllando la lunghezza del byte[]
if (!Objects.isNull(attachBytes) && attachBytes.length > 0) {
if (!Objects.isNull(attachBytes) && attachBytes.length > 0) {

log.debug(SCARICAMENTO_ESITI_PEC + " - PEC '{}' has daticert.xml", finalMessageID);
log.debug(SCARICAMENTO_ESITI_PEC + " - PEC '{}' has daticert.xml", finalMessageID);

// Deserialize daticert.xml. Start a new Mono inside the flatMap
return Mono.fromCallable(() -> daticertService.getPostacertFromByteArray(attachBytes))
return Mono.fromCallable(() -> daticertService.getPostacertFromByteArray(attachBytes))
// Escludere questi daticert. Non sono delle 'comunicazione esiti'
.filter(isPostaCertificataPredicate.negate())
.filter(isPostaCertificataPredicate.negate())

// msgid arriva all'interno di due angolari <msgid>. Eliminare il primo e l'ultimo carattere
.map(postacert -> {
var dati = postacert.getDati();
var msgId = dati.getMsgid();
dati.setMsgid(removeBracketsFromMessageId(msgId));
log.debug(SCARICAMENTO_ESITI_PEC + "- PEC '{}' has '{}' msgId", finalMessageID, msgId);
return postacert;
})
.map(postacert -> {
var dati = postacert.getDati();
var msgId = dati.getMsgid();
dati.setMsgid(removeBracketsFromMessageId(msgId));
log.debug(SCARICAMENTO_ESITI_PEC + "- PEC '{}' has '{}' msgId", finalMessageID, msgId);
return postacert;
})

// Escludere questi daticert. Non avendo il msgid terminante con il dominio pago non sono state inviate
// da noi
.filter(endsWithDomainPredicate)
.filter(endsWithDomainPredicate)

// Daticert filtrati
.doOnDiscard(PnPostacert.class, postacert -> {
if (isPostaCertificataPredicate.test(postacert)) {
log.debug(PEC_DISCARDED, finalMessageID, SCARICAMENTO_ESITI_PEC, POSTA_CERTIFICATA);
} else if (!endsWithDomainPredicate.test(postacert)) {
log.debug(PEC_DISCARDED,finalMessageID, SCARICAMENTO_ESITI_PEC, NOT_SENT_BY_US);
}
})
.flatMap(unused -> sqsService.sendWithLargePayload(scaricamentoEsitiPecProperties.sqsQueueName(),
finalMessageID,
storageSqsMessagesStagingBucket,
RicezioneEsitiPecDto.builder()
.messageID(finalMessageID)
.message(message)
.receiversDomain(getDomainFromAddress(getFromFromMimeMessage(mimeMessage)[0]))
.retry(0)
.build()))
.thenReturn(Tuples.of(finalMessageID, providerName));
}
else return Mono.just(Tuples.of(finalMessageID, providerName));
})
//Marca il messaggio come letto.
.flatMap(tuple -> pnPecService.markMessageAsRead(tuple.getT1(), tuple.getT2()), limitRate)
.doOnError(throwable -> log.fatal(SCARICAMENTO_ESITI_PEC, throwable))
.onErrorResume(throwable -> Mono.empty())
.repeat(hasMessages::get)
.doOnComplete(() -> log.logEndingProcess(SCARICAMENTO_ESITI_PEC)))
.subscribe();

.doOnDiscard(PnPostacert.class, postacert -> {
if (isPostaCertificataPredicate.test(postacert)) {
log.debug(PEC_DISCARDED, finalMessageID, SCARICAMENTO_ESITI_PEC, POSTA_CERTIFICATA);
} else if (!endsWithDomainPredicate.test(postacert)) {
log.debug(PEC_DISCARDED, finalMessageID, SCARICAMENTO_ESITI_PEC, NOT_SENT_BY_US);
}
})
.flatMap(unused -> sqsService.sendWithLargePayload(scaricamentoEsitiPecProperties.sqsQueueName(),
finalMessageID,
storageSqsMessagesStagingBucket,
RicezioneEsitiPecDto.builder()
.messageID(finalMessageID)
.message(message)
.receiversDomain(getDomainFromAddress(getFromFromMimeMessage(mimeMessage)[0]))
.retry(0)
.build()))
.thenReturn(Tuples.of(finalMessageID, providerName));
} else return Mono.just(Tuples.of(finalMessageID, providerName));
})
//Marca il messaggio come letto.
.flatMap(tuple -> pnPecService.markMessageAsRead(tuple.getT1(), tuple.getT2())));
}

}

0 comments on commit 7752caf

Please sign in to comment.