diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/entity/DeadSignal.java b/src/main/java/it/pagopa/interop/signalhub/persister/entity/DeadSignal.java index a6d8215..ba61682 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/entity/DeadSignal.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/entity/DeadSignal.java @@ -17,6 +17,8 @@ public class DeadSignal { @Id @Column("id") private Long id; + @Column("correlation_id") + private String correlationId; @Column("signal_id") private Long signalId; @Column("object_id") @@ -30,6 +32,6 @@ public class DeadSignal { @CreatedDate @Column("tmst_insert") private Instant tmstInsert; - @Column("error") - private String error; -} + @Column("error_reason") + private String errorReason; +} \ No newline at end of file diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/entity/Signal.java b/src/main/java/it/pagopa/interop/signalhub/persister/entity/Signal.java index b39b5d3..253f9e9 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/entity/Signal.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/entity/Signal.java @@ -18,6 +18,8 @@ public class Signal { @Id @Column("id") private Long id; + @Column("correlation_id") + private String correlationId; @Column("signal_id") private Long signalId; @Column("object_id") diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapper.java b/src/main/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapper.java index e35b666..758c720 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapper.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapper.java @@ -8,8 +8,11 @@ @Mapper(componentModel = "spring") public interface SignalMapper { + String CORRELATION_ID_HEADER_KEY = "correlationId"; + @Mapping(target = "indexSignal", source = "signal.signalId") SignalEvent signalToSignalEvent(Signal signal); @Mapping(target = "signalId", source = "signalEvent.indexSignal") - Signal signalEventToSignal(SignalEvent signalEvent); + @Mapping(target = "correlationId", source = "correlationId") + Signal signalEventToSignal(SignalEvent signalEvent, String correlationId); } diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java b/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java index 65ec3ad..2244bf0 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java @@ -29,10 +29,11 @@ public class SqsInternalListener { @SqsListener(value = "${poc.signal-hub.internal-queue-name}") public CompletableFuture pullFromAwsInternalQueue(@Payload String node, @Headers Map headers) { log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers); + String correlationId = (String) headers.get(SignalMapper.CORRELATION_ID_HEADER_KEY); return Mono.just(node) .map(json -> Utility.jsonToObject(node, SignalEvent.class)) - .map(signalMapper::signalEventToSignal) + .map((signalEvent) -> signalMapper.signalEventToSignal(signalEvent, correlationId)) .flatMap(signalService::signalServiceFlow) .then() .toFuture(); diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/service/SignalService.java b/src/main/java/it/pagopa/interop/signalhub/persister/service/SignalService.java index 28f4c33..5fe0fca 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/service/SignalService.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/service/SignalService.java @@ -40,7 +40,7 @@ public Mono signalServiceFlow(Signal signal) { private Mono getDeadSignal(Signal signal) { DeadSignal deadSignal = deadSignalMapper.signalToDeadSignal(signal); - deadSignal.setError(DUPLICATE_SIGNAL_ERROR.toString()); + deadSignal.setErrorReason(DUPLICATE_SIGNAL_ERROR.toString()); return Mono.just(deadSignal); } diff --git a/src/main/resources/db/changelog/changes/init_table/init_table.sql b/src/main/resources/db/changelog/changes/init_table/init_table.sql index 6ce9371..ed41b34 100644 --- a/src/main/resources/db/changelog/changes/init_table/init_table.sql +++ b/src/main/resources/db/changelog/changes/init_table/init_table.sql @@ -6,6 +6,8 @@ CREATE TABLE IF NOT EXISTS ESERVICE ( tmst_last_edit TIMESTAMP, UNIQUE (eservice_id, producer_id) ); +CREATE INDEX ESERVICE_INDEX_ID ON ESERVICE USING hash (eservice_id); +CREATE INDEX ESERVICE_INDEX_PRODUCER_ID ON ESERVICE USING hash (producer_id); CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE ( @@ -16,48 +18,42 @@ CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE ( tmst_last_edit TIMESTAMP, UNIQUE (eservice_id, consumer_id) ); +CREATE INDEX CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE USING hash (eservice_id); +CREATE INDEX ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE USING hash (consumer_id); CREATE TABLE IF NOT EXISTS SIGNAL ( id SERIAL PRIMARY KEY, correlation_id VARCHAR(50) NOT NULL, - signal_id BIGINT NOT NULL, - object_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - signal_type VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, + signal_id BIGINT NOT NULL, + object_id VARCHAR (50) NOT NULL, + eservice_id VARCHAR (50) NOT NULL, + object_type VARCHAR (50) NOT NULL, + signal_type VARCHAR (50) NOT NULL, + tmst_insert TIMESTAMP NOT NULL, UNIQUE (signal_id, eservice_id) ); - - CREATE INDEX SIGNAL_INDEX_SIGNAL_ID ON SIGNAL USING hash (signal_id); - - CREATE INDEX SIGNAL_INDEX_ESERVICE_ID ON SIGNAL USING hash (eservice_id); -CREATE INDEX ESERVICE_INDEX_ID ON ESERVICE USING hash (eservice_id); -CREATE INDEX ESERVICE_INDEX_PRODUCER_ID ON ESERVICE USING hash (producer_id); -CREATE INDEX CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE USING hash (eservice_id); -CREATE INDEX ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE USING hash (consumer_id); - -CREATE TABLE IF NOT EXISTS TRACING_BATCH ( - batch_id SERIAL PRIMARY KEY, - state VARCHAR (50) NOT NULL, - last_event_id BIGINT, - tmst_started TIMESTAMP NOT NULL, - tmst_ended TIMESTAMP +CREATE TABLE IF NOT EXISTS DEAD_SIGNAL ( + id SERIAL PRIMARY KEY, + correlation_id VARCHAR(50) NOT NULL, + signal_id BIGINT NOT NULL, + object_id VARCHAR (50) NOT NULL, + eservice_id VARCHAR (50) NOT NULL, + object_type VARCHAR (50) NOT NULL, + signal_type VARCHAR (50) NOT NULL, + tmst_insert TIMESTAMP NOT NULL, + error_reason VARCHAR(255) NOT NULL ); -CREATE TABLE IF NOT EXISTS DEAD_SIGNAL ( - id SERIAL PRIMARY KEY, - signal_id BIGINT NOT NULL, - object_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - signal_type VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, - error VARCHAR(255) NOT NULL +CREATE TABLE IF NOT EXISTS TRACING_BATCH ( + batch_id SERIAL PRIMARY KEY, + state VARCHAR (50) NOT NULL, + last_event_id BIGINT, + tmst_started TIMESTAMP NOT NULL, + tmst_ended TIMESTAMP ); \ No newline at end of file diff --git a/src/test/resources/schema-h2.sql b/src/test/resources/schema-h2.sql index 9b23a27..3077371 100644 --- a/src/test/resources/schema-h2.sql +++ b/src/test/resources/schema-h2.sql @@ -1,12 +1,13 @@ - -CREATE TABLE IF NOT EXISTS ORGANIZATION_ESERVICE ( +CREATE TABLE IF NOT EXISTS ESERVICE ( eservice_id VARCHAR (50) UNIQUE NOT NULL, - organization_id VARCHAR (50) NOT NULL, + producer_id VARCHAR (50) NOT NULL, state VARCHAR (50) NOT NULL, tmst_insert TIMESTAMP NOT NULL, tmst_last_edit TIMESTAMP, - UNIQUE (eservice_id, organization_id) + UNIQUE (eservice_id, producer_id) ); +CREATE INDEX ESERVICE_INDEX_ID ON ESERVICE (eservice_id); +CREATE INDEX ESERVICE_INDEX_PRODUCER_ID ON ESERVICE (producer_id); CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE ( @@ -17,15 +18,19 @@ CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE ( tmst_last_edit TIMESTAMP, UNIQUE (eservice_id, consumer_id) ); +CREATE INDEX CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE (eservice_id); +CREATE INDEX ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE (consumer_id); + CREATE TABLE IF NOT EXISTS SIGNAL ( id SERIAL PRIMARY KEY, - signal_id BIGINT NOT NULL, - object_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - signal_type VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, + correlation_id VARCHAR(50) NOT NULL, + signal_id BIGINT NOT NULL, + object_id VARCHAR (50) NOT NULL, + eservice_id VARCHAR (50) NOT NULL, + object_type VARCHAR (50) NOT NULL, + signal_type VARCHAR (50) NOT NULL, + tmst_insert TIMESTAMP NOT NULL, UNIQUE (signal_id, eservice_id) ); @@ -37,26 +42,24 @@ CREATE INDEX SIGNAL_INDEX_ESERVICE_ID ON SIGNAL USING hash (eservice_id); CREATE INDEX SIGNAL_INDEX_SIGNAL_ID ON SIGNAL (signal_id); CREATE INDEX SIGNAL_INDEX_ESERVICE_ID ON SIGNAL (eservice_id); -CREATE TABLE IF NOT EXISTS EVENT_TEMP ( - event_temp_id SERIAL PRIMARY KEY, - event_id BIGINT UNIQUE NOT NULL, - event_type VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - object_id VARCHAR (50) NOT NULL, - state VARCHAR (50) NOT NULL, - state_processing VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, - tmst_processing TIMESTAMP -); - CREATE TABLE IF NOT EXISTS DEAD_SIGNAL ( id SERIAL PRIMARY KEY, - signal_id BIGINT NOT NULL, - object_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - signal_type VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, - error VARCHAR(255) NOT NULL + correlation_id VARCHAR(50) NOT NULL, + signal_id BIGINT NOT NULL, + object_id VARCHAR (50) NOT NULL, + eservice_id VARCHAR (50) NOT NULL, + object_type VARCHAR (50) NOT NULL, + signal_type VARCHAR (50) NOT NULL, + tmst_insert TIMESTAMP NOT NULL, + error_reason VARCHAR(255) NOT NULL +); + + +CREATE TABLE IF NOT EXISTS TRACING_BATCH ( + batch_id SERIAL PRIMARY KEY, + state VARCHAR (50) NOT NULL, + last_event_id BIGINT, + tmst_started TIMESTAMP NOT NULL, + tmst_ended TIMESTAMP ); \ No newline at end of file