Skip to content

Commit

Permalink
Merge branch 'feature/signals-persister' of https://github.com/pagopa…
Browse files Browse the repository at this point in the history
…/interop-be-signalhub-signals-persister into feature/signals-persister
  • Loading branch information
ariannazafarana committed Oct 13, 2023
2 parents 644734b + 35c60e1 commit 22f53a2
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class DeadSignal {
@Id
@Column("id")
private Long id;
@Column("correlation_id")
private String correlationId;
@Column("signal_id")
private Long signalId;
@Column("object_id")
Expand All @@ -30,6 +32,6 @@ public class DeadSignal {
@CreatedDate
@Column("tmst_insert")
private Instant tmstInsert;
@Column("error")
private String error;
}
@Column("error_reason")
private String errorReason;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class Signal {
@Id
@Column("id")
private Long id;
@Column("correlation_id")
private String correlationId;
@Column("signal_id")
private Long signalId;
@Column("object_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

@Mapper(componentModel = "spring")
public interface SignalMapper {
String CORRELATION_ID_HEADER_KEY = "correlationId";

@Mapping(target = "indexSignal", source = "signal.signalId")
SignalEvent signalToSignalEvent(Signal signal);
@Mapping(target = "signalId", source = "signalEvent.indexSignal")
Signal signalEventToSignal(SignalEvent signalEvent);
@Mapping(target = "correlationId", source = "correlationId")
Signal signalEventToSignal(SignalEvent signalEvent, String correlationId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ public class SqsInternalListener {
@SqsListener(value = "${poc.signal-hub.internal-queue-name}")
public CompletableFuture<Void> pullFromAwsInternalQueue(@Payload String node, @Headers Map<String, Object> headers) {
log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers);
String correlationId = (String) headers.get(SignalMapper.CORRELATION_ID_HEADER_KEY);

return Mono.just(node)
.map(json -> Utility.jsonToObject(node, SignalEvent.class))
.map(signalMapper::signalEventToSignal)
.map((signalEvent) -> signalMapper.signalEventToSignal(signalEvent, correlationId))
.flatMap(signalService::signalServiceFlow)
.then()
.toFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Mono<Signal> signalServiceFlow(Signal signal) {

private Mono<DeadSignal> getDeadSignal(Signal signal) {
DeadSignal deadSignal = deadSignalMapper.signalToDeadSignal(signal);
deadSignal.setError(DUPLICATE_SIGNAL_ERROR.toString());
deadSignal.setErrorReason(DUPLICATE_SIGNAL_ERROR.toString());
return Mono.just(deadSignal);
}

Expand Down
56 changes: 26 additions & 30 deletions src/main/resources/db/changelog/changes/init_table/init_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ CREATE TABLE IF NOT EXISTS ESERVICE (
tmst_last_edit TIMESTAMP,
UNIQUE (eservice_id, producer_id)
);
CREATE INDEX ESERVICE_INDEX_ID ON ESERVICE USING hash (eservice_id);
CREATE INDEX ESERVICE_INDEX_PRODUCER_ID ON ESERVICE USING hash (producer_id);


CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE (
Expand All @@ -16,48 +18,42 @@ CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE (
tmst_last_edit TIMESTAMP,
UNIQUE (eservice_id, consumer_id)
);
CREATE INDEX CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE USING hash (eservice_id);
CREATE INDEX ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE USING hash (consumer_id);


CREATE TABLE IF NOT EXISTS SIGNAL (
id SERIAL PRIMARY KEY,
correlation_id VARCHAR(50) NOT NULL,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
UNIQUE (signal_id, eservice_id)
);


CREATE INDEX SIGNAL_INDEX_SIGNAL_ID ON SIGNAL USING hash (signal_id);


CREATE INDEX SIGNAL_INDEX_ESERVICE_ID ON SIGNAL USING hash (eservice_id);

CREATE INDEX ESERVICE_INDEX_ID ON ESERVICE USING hash (eservice_id);
CREATE INDEX ESERVICE_INDEX_PRODUCER_ID ON ESERVICE USING hash (producer_id);

CREATE INDEX CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE USING hash (eservice_id);
CREATE INDEX ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE USING hash (consumer_id);

CREATE TABLE IF NOT EXISTS TRACING_BATCH (
batch_id SERIAL PRIMARY KEY,
state VARCHAR (50) NOT NULL,
last_event_id BIGINT,
tmst_started TIMESTAMP NOT NULL,
tmst_ended TIMESTAMP
CREATE TABLE IF NOT EXISTS DEAD_SIGNAL (
id SERIAL PRIMARY KEY,
correlation_id VARCHAR(50) NOT NULL,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
error_reason VARCHAR(255) NOT NULL
);


CREATE TABLE IF NOT EXISTS DEAD_SIGNAL (
id SERIAL PRIMARY KEY,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
error VARCHAR(255) NOT NULL
CREATE TABLE IF NOT EXISTS TRACING_BATCH (
batch_id SERIAL PRIMARY KEY,
state VARCHAR (50) NOT NULL,
last_event_id BIGINT,
tmst_started TIMESTAMP NOT NULL,
tmst_ended TIMESTAMP
);
61 changes: 32 additions & 29 deletions src/test/resources/schema-h2.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@

CREATE TABLE IF NOT EXISTS ORGANIZATION_ESERVICE (
CREATE TABLE IF NOT EXISTS ESERVICE (
eservice_id VARCHAR (50) UNIQUE NOT NULL,
organization_id VARCHAR (50) NOT NULL,
producer_id VARCHAR (50) NOT NULL,
state VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
tmst_last_edit TIMESTAMP,
UNIQUE (eservice_id, organization_id)
UNIQUE (eservice_id, producer_id)
);
CREATE INDEX ESERVICE_INDEX_ID ON ESERVICE (eservice_id);
CREATE INDEX ESERVICE_INDEX_PRODUCER_ID ON ESERVICE (producer_id);


CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE (
Expand All @@ -17,15 +18,19 @@ CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE (
tmst_last_edit TIMESTAMP,
UNIQUE (eservice_id, consumer_id)
);
CREATE INDEX CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE (eservice_id);
CREATE INDEX ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE (consumer_id);


CREATE TABLE IF NOT EXISTS SIGNAL (
id SERIAL PRIMARY KEY,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
correlation_id VARCHAR(50) NOT NULL,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
UNIQUE (signal_id, eservice_id)
);

Expand All @@ -37,26 +42,24 @@ CREATE INDEX SIGNAL_INDEX_ESERVICE_ID ON SIGNAL USING hash (eservice_id);
CREATE INDEX SIGNAL_INDEX_SIGNAL_ID ON SIGNAL (signal_id);
CREATE INDEX SIGNAL_INDEX_ESERVICE_ID ON SIGNAL (eservice_id);

CREATE TABLE IF NOT EXISTS EVENT_TEMP (
event_temp_id SERIAL PRIMARY KEY,
event_id BIGINT UNIQUE NOT NULL,
event_type VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
object_id VARCHAR (50) NOT NULL,
state VARCHAR (50) NOT NULL,
state_processing VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
tmst_processing TIMESTAMP
);


CREATE TABLE IF NOT EXISTS DEAD_SIGNAL (
id SERIAL PRIMARY KEY,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
error VARCHAR(255) NOT NULL
correlation_id VARCHAR(50) NOT NULL,
signal_id BIGINT NOT NULL,
object_id VARCHAR (50) NOT NULL,
eservice_id VARCHAR (50) NOT NULL,
object_type VARCHAR (50) NOT NULL,
signal_type VARCHAR (50) NOT NULL,
tmst_insert TIMESTAMP NOT NULL,
error_reason VARCHAR(255) NOT NULL
);


CREATE TABLE IF NOT EXISTS TRACING_BATCH (
batch_id SERIAL PRIMARY KEY,
state VARCHAR (50) NOT NULL,
last_event_id BIGINT,
tmst_started TIMESTAMP NOT NULL,
tmst_ended TIMESTAMP
);

0 comments on commit 22f53a2

Please sign in to comment.