Skip to content

Commit

Permalink
PN-12780: fixed failure in SQS send on cartaceo DLQ queue. (#537)
Browse files Browse the repository at this point in the history
  • Loading branch information
michelescara authored Sep 25, 2024
1 parent c8146a6 commit e256b49
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public Mono<SendMessageResponse> sendNotificationOnStatusQueue(PresaInCaricoInfo

@Override
public Mono<SendMessageResponse> sendNotificationOnDlqErrorQueue(PresaInCaricoInfo presaInCaricoInfo) {
return sqsService.send(cartaceoSqsQueueName.dlqErrorName(), presaInCaricoInfo);
return sqsService.sendWithDeduplicationId(cartaceoSqsQueueName.dlqErrorName(), presaInCaricoInfo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ public interface SqsService {

<T> Mono<SendMessageResponse> send(final String queueName, final T queuePayload) throws SqsClientException;

<T> Mono<SendMessageResponse> sendWithDeduplicationId(final String queueName, final T queuePayload) throws SqsClientException;

<T> Mono<SendMessageResponse> send(final String queueName, Integer delaySeconds, final T queuePayload) throws SqsClientException;

<T> Mono<SendMessageResponse> send(final String queueName, String messageGroupId, final T queuePayload) throws SqsClientException;

<T> Mono<SendMessageResponse> send(final String queueName, final String messageGroupId, Integer delaySeconds, final T queuePayload) throws SqsClientException;
<T> Mono<SendMessageResponse> send(final String queueName, final String messageGroupId, String messageDeduplicationId, Integer delaySeconds, final T queuePayload) throws SqsClientException;

<T> Mono<SendMessageResponse> sendWithLargePayload(final String queueName, String messageGroupId, String bucketName, final T queuePayload) throws SqsClientException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SqsServiceImpl implements SqsService {
private final S3Service s3Service;
private final RetryBackoffSpec sqsRetryStrategy;
private static final int MESSAGE_GROUP_ID_LENGTH = 64;
private static final int MESSAGE_DEDUPLICATION_ID_LENGTH = 64;
@Value("${sqs.queue.max-message-size}")
private Integer sqsQueueMaxMessageSize;
@Value("${SqsQueueMaxMessages:#{1000}}")
Expand All @@ -68,25 +69,31 @@ public <T> Mono<SendMessageResponse> send(final String queueName, final T queueP
return send(queueName, (Integer) null, queuePayload);
}

@Override
public <T> Mono<SendMessageResponse> sendWithDeduplicationId(String queueName, T queuePayload) throws SqsClientException {
return send(queueName, RandomStringUtils.randomAlphanumeric(MESSAGE_GROUP_ID_LENGTH), RandomStringUtils.randomAlphanumeric(MESSAGE_DEDUPLICATION_ID_LENGTH), null, queuePayload);
}

@Override
public <T> Mono<SendMessageResponse> send(String queueName, Integer delaySeconds, T queuePayload) throws SqsClientException {
return send(queueName, RandomStringUtils.randomAlphanumeric(MESSAGE_GROUP_ID_LENGTH), delaySeconds, queuePayload);
return send(queueName, RandomStringUtils.randomAlphanumeric(MESSAGE_GROUP_ID_LENGTH), null, delaySeconds, queuePayload);
}

@Override
public <T> Mono<SendMessageResponse> send(String queueName, String messageGroupId, T queuePayload) throws SqsClientException {
return send(queueName, messageGroupId, null, queuePayload);
return send(queueName, messageGroupId, null, null, queuePayload);
}

@Override
public <T> Mono<SendMessageResponse> send(String queueName, String messageGroupId, Integer delaySeconds, T queuePayload) throws SqsClientException {
public <T> Mono<SendMessageResponse> send(String queueName, String messageGroupId, String messageDeduplicationId, Integer delaySeconds, T queuePayload) throws SqsClientException {
log.debug(INSERTING_DATA_IN_SQS, queuePayload, queueName);
return Mono.fromCallable(() -> objectMapper.writeValueAsString(queuePayload))
.doOnSuccess(sendMessageResponse -> log.info("Try to publish on {} with payload {}", queueName, queuePayload))
.zipWith(getQueueUrlFromName(queueName))
.flatMap(objects -> Mono.fromCompletionStage(sqsAsyncClient.sendMessage(builder -> builder.queueUrl(objects.getT2())
.messageBody(objects.getT1())
.messageGroupId(messageGroupId)
.messageDeduplicationId(messageDeduplicationId)
.delaySeconds(delaySeconds))))
.retryWhen(getSqsRetryStrategy())
.onErrorResume(throwable -> {
Expand Down

0 comments on commit e256b49

Please sign in to comment.