diff --git a/src/main/java/org/zalando/nakadiproducer/AccessTokenProvider.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/AccessTokenProvider.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/AccessTokenProvider.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/AccessTokenProvider.java diff --git a/src/main/java/org/zalando/nakadiproducer/EnableNakadiProducer.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/EnableNakadiProducer.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/EnableNakadiProducer.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/EnableNakadiProducer.java diff --git a/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java similarity index 78% rename from src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java index 251a9a39..43617ccc 100644 --- a/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java @@ -28,7 +28,12 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.zalando.fahrschein.NakadiClient; +import org.zalando.nakadiproducer.eventlog.EventLogWriter; +import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository; +import org.zalando.nakadiproducer.eventlog.impl.EventLogRepositoryImpl; +import org.zalando.nakadiproducer.eventlog.impl.EventLogWriterImpl; import org.zalando.nakadiproducer.flowid.FlowIdComponent; import org.zalando.nakadiproducer.flowid.NoopFlowIdComponent; import org.zalando.nakadiproducer.flowid.TracerFlowIdComponent; @@ -38,9 +43,13 @@ import org.zalando.nakadiproducer.snapshots.impl.SnapshotEventCreationMvcEndpoint; import org.zalando.nakadiproducer.snapshots.impl.SnapshotEventProviderNotImplementedException; import org.zalando.nakadiproducer.transmission.NakadiPublishingClient; +import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService; +import org.zalando.nakadiproducer.transmission.impl.EventTransmitter; import org.zalando.nakadiproducer.transmission.impl.FahrscheinNakadiPublishingClient; import org.zalando.tracer.Tracer; +import com.fasterxml.jackson.databind.ObjectMapper; + @Configuration @Slf4j @ComponentScan @@ -133,6 +142,31 @@ public SnapshotEventCreationMvcEndpoint snapshotEventCreationMvcEndpoint(Snapsho return new SnapshotEventCreationMvcEndpoint(snapshotEventCreationEndpoint); } + @Bean + public SnapshotCreationService snapshotCreationService(SnapshotEventProvider snapshotEventProvider, EventLogWriter eventLogWriter) { + return new SnapshotCreationService(snapshotEventProvider, eventLogWriter); + } + + @Bean + public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper, FlowIdComponent flowIdComponent) { + return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent); + } + + @Bean + public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate) { + return new EventLogRepositoryImpl(namedParameterJdbcTemplate); + } + + @Bean + public EventTransmitter eventTransmitter(EventTransmissionService eventTransmissionService) { + return new EventTransmitter(eventTransmissionService); + } + + @Bean + public EventTransmissionService eventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) { + return new EventTransmissionService(eventLogRepository, nakadiPublishingClient, objectMapper); + } + @PostConstruct public void migrateFlyway() { Flyway flyway = new Flyway(); diff --git a/src/main/java/org/zalando/nakadiproducer/StupsTokenComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/StupsTokenComponent.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/StupsTokenComponent.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/StupsTokenComponent.java diff --git a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java similarity index 94% rename from src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java index 87c8f65b..c4e1d789 100644 --- a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java @@ -15,14 +15,15 @@ import org.springframework.stereotype.Component; @Component -public class EventLogRepository { +public class EventLogRepositoryImpl implements EventLogRepository { private NamedParameterJdbcTemplate jdbcTemplate; @Autowired - public EventLogRepository(NamedParameterJdbcTemplate jdbcTemplate) { + public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } + @Override public Collection findByLockedByAndLockedUntilGreaterThan(String lockedBy, Instant lockedUntil) { Map namedParameterMap = new HashMap<>(); namedParameterMap.put("lockedBy", lockedBy); @@ -34,6 +35,7 @@ public Collection findByLockedByAndLockedUntilGreaterThan(String locke ); } + @Override public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) { Map namedParameterMap = new HashMap<>(); namedParameterMap.put("lockId", lockId); @@ -45,6 +47,7 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) { ); } + @Override public void delete(EventLog eventLog) { Map namedParameterMap = new HashMap<>(); namedParameterMap.put("id", eventLog.getId()); @@ -54,6 +57,7 @@ public void delete(EventLog eventLog) { ); } + @Override public void persist(EventLog eventLog) { Timestamp now = toSqlTimestamp(Instant.now()); MapSqlParameterSource namedParameterMap = new MapSqlParameterSource(); @@ -85,10 +89,12 @@ private Timestamp toSqlTimestamp(Instant now) { return Timestamp.from(now); } + @Override public void deleteAll() { jdbcTemplate.update("DELETE from nakadi_events.event_log", new HashMap<>()); } + @Override public EventLog findOne(Integer id) { Map namedParameterMap = new HashMap<>(); namedParameterMap.put("id", id); diff --git a/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/NoopFlowIdComponent.java diff --git a/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponent.java diff --git a/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java diff --git a/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java rename to nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java diff --git a/src/main/resources/META-INF/spring.factories b/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories similarity index 100% rename from src/main/resources/META-INF/spring.factories rename to nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories diff --git a/src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.1__event_log_table.sql b/nakadi-producer-spring-boot-starter/src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.1__event_log_table.sql similarity index 100% rename from src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.1__event_log_table.sql rename to nakadi-producer-spring-boot-starter/src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.1__event_log_table.sql diff --git a/src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.2__schema_permissions.sql b/nakadi-producer-spring-boot-starter/src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.2__schema_permissions.sql similarity index 100% rename from src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.2__schema_permissions.sql rename to nakadi-producer-spring-boot-starter/src/main/resources/db_nakadiproducer/migrations/V1/V1029384757.1.2__schema_permissions.sql diff --git a/src/main/resources/db_nakadiproducer/migrations/V2/V2133546886.1.0__drop_data_event__specific_columns.sql b/nakadi-producer-spring-boot-starter/src/main/resources/db_nakadiproducer/migrations/V2/V2133546886.1.0__drop_data_event__specific_columns.sql similarity index 100% rename from src/main/resources/db_nakadiproducer/migrations/V2/V2133546886.1.0__drop_data_event__specific_columns.sql rename to nakadi-producer-spring-boot-starter/src/main/resources/db_nakadiproducer/migrations/V2/V2133546886.1.0__drop_data_event__specific_columns.sql diff --git a/src/test/java/org/zalando/nakadiproducer/BaseMockedExternalCommunicationIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/BaseMockedExternalCommunicationIT.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/BaseMockedExternalCommunicationIT.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/BaseMockedExternalCommunicationIT.java diff --git a/src/test/java/org/zalando/nakadiproducer/EndToEndTestIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EndToEndTestIT.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/EndToEndTestIT.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EndToEndTestIT.java diff --git a/src/test/java/org/zalando/nakadiproducer/TestApplication.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/TestApplication.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/TestApplication.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/TestApplication.java diff --git a/src/test/java/org/zalando/nakadiproducer/config/EmbeddedDataSourceConfig.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/config/EmbeddedDataSourceConfig.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/config/EmbeddedDataSourceConfig.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/config/EmbeddedDataSourceConfig.java diff --git a/src/test/java/org/zalando/nakadiproducer/config/MockNakadiClientConfig.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/config/MockNakadiClientConfig.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/config/MockNakadiClientConfig.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/config/MockNakadiClientConfig.java diff --git a/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java similarity index 97% rename from src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java index 93c2e52b..845785e2 100644 --- a/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java @@ -14,7 +14,7 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { @Autowired - private EventLogRepository eventLogRepository; + private EventLogRepositoryImpl eventLogRepository; private static final String WAREHOUSE_EVENT_BODY_DATA = ("{'self':'http://WAREHOUSE_DOMAIN'," diff --git a/src/test/java/org/zalando/nakadiproducer/TracerFlowIdComponentTest.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java similarity index 95% rename from src/test/java/org/zalando/nakadiproducer/TracerFlowIdComponentTest.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java index 605964af..28374823 100644 --- a/src/test/java/org/zalando/nakadiproducer/TracerFlowIdComponentTest.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/flowid/TracerFlowIdComponentTest.java @@ -1,4 +1,4 @@ -package org.zalando.nakadiproducer; +package org.zalando.nakadiproducer.flowid; import static org.junit.Assert.assertThat; diff --git a/src/test/java/org/zalando/nakadiproducer/util/Fixture.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/util/Fixture.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/util/Fixture.java rename to nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/util/Fixture.java diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java new file mode 100644 index 00000000..c986088c --- /dev/null +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java @@ -0,0 +1,49 @@ +package org.zalando.nakadiproducer.util; + +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@ToString +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder(toBuilder = true) +public class MockPayload { + private Integer id; + + private String code; + + @Builder.Default + private boolean isActive = true; + + private SubClass more; + + private List items; + + @Builder(toBuilder = true) + @Getter + @Setter + @ToString + @AllArgsConstructor + @NoArgsConstructor + public static class SubClass { + private String info; + } + + @Builder(toBuilder = true) + @Getter + @Setter + @ToString + @AllArgsConstructor + @NoArgsConstructor + public static class SubListItem { + private String detail; + } +} diff --git a/src/test/resources/application-test.yml b/nakadi-producer-spring-boot-starter/src/test/resources/application-test.yml similarity index 100% rename from src/test/resources/application-test.yml rename to nakadi-producer-spring-boot-starter/src/test/resources/application-test.yml diff --git a/src/test/resources/database/it_database_setup.sql b/nakadi-producer-spring-boot-starter/src/test/resources/database/it_database_setup.sql similarity index 100% rename from src/test/resources/database/it_database_setup.sql rename to nakadi-producer-spring-boot-starter/src/test/resources/database/it_database_setup.sql diff --git a/src/test/resources/db/migration/1__application_db_stub.sql b/nakadi-producer-spring-boot-starter/src/test/resources/db/migration/1__application_db_stub.sql similarity index 100% rename from src/test/resources/db/migration/1__application_db_stub.sql rename to nakadi-producer-spring-boot-starter/src/test/resources/db/migration/1__application_db_stub.sql diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml new file mode 100644 index 00000000..d8b151e0 --- /dev/null +++ b/nakadi-producer/pom.xml @@ -0,0 +1,177 @@ + + + 4.0.0 + + + 3.2 + + + + org.zalando + nakadi-producer-reactor + 3.1.0-SNAPSHOT + + + nakadi-producer + org.zalando + 3.1.0-SNAPSHOT + Nakadi Event Producer + Reliable transactional Nakadi event producer + + + 1.8 + 1.8 + 1.8 + 0.4.24 + + + + + javax.transaction + javax.transaction-api + + + com.fasterxml.jackson.core + jackson-databind + 2.8.8 + + + org.slf4j + slf4j-api + 1.7.25 + + + org.zalando + fahrschein + 0.9.1 + + + org.projectlombok + lombok + provided + + + + javax.interceptor + javax.interceptor-api + 1.2 + provided + true + + + org.hamcrest + hamcrest-all + 1.3 + test + + + org.mockito + mockito-core + 2.8.9 + test + + + junit + junit + 4.12 + test + + + + + ${project.artifactId} + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + integration-test + verify + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9.1 + + -Xdoclint:none + + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + + + + + MIT + https://opensource.org/licenses/MIT + repo + + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + \ No newline at end of file diff --git a/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java diff --git a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/DataChangeEventEnvelope.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/DataChangeEventEnvelope.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/eventlog/impl/DataChangeEventEnvelope.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/DataChangeEventEnvelope.java diff --git a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventDataOperation.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventDataOperation.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventDataOperation.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventDataOperation.java diff --git a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLog.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLog.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLog.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLog.java diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java new file mode 100644 index 00000000..937ffdc3 --- /dev/null +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java @@ -0,0 +1,18 @@ +package org.zalando.nakadiproducer.eventlog.impl; + +import java.time.Instant; +import java.util.Collection; + +public interface EventLogRepository { + Collection findByLockedByAndLockedUntilGreaterThan(String lockedBy, Instant lockedUntil); + + void lockSomeMessages(String lockId, Instant now, Instant lockExpires); + + void delete(EventLog eventLog); + + void persist(EventLog eventLog); + + void deleteAll(); + + EventLog findOne(Integer id); +} diff --git a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java similarity index 95% rename from src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java index 8f28f41d..38dbf80d 100644 --- a/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java @@ -8,15 +8,11 @@ import org.zalando.nakadiproducer.flowid.FlowIdComponent; import org.zalando.nakadiproducer.eventlog.EventLogWriter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - import javax.transaction.Transactional; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -@Service public class EventLogWriterImpl implements EventLogWriter { private final EventLogRepository eventLogRepository; @@ -24,7 +20,6 @@ public class EventLogWriterImpl implements EventLogWriter { private final ObjectMapper objectMapper; private final FlowIdComponent flowIdComponent; - @Autowired public EventLogWriterImpl(EventLogRepository eventLogRepository, ObjectMapper objectMapper, FlowIdComponent flowIdComponent) { this.eventLogRepository = eventLogRepository; this.objectMapper = objectMapper; diff --git a/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/flowid/FlowIdComponent.java diff --git a/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java similarity index 91% rename from src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java index 507d874e..3b453fd3 100644 --- a/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java @@ -6,9 +6,6 @@ import java.util.List; import java.util.Set; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - /** * The {@code SnapshotEventProvider} interface should be implemented by any * Event Producer that wants to support snapshot events feature. The @@ -35,7 +32,7 @@ public interface SnapshotEventProvider { * @return list of elements ordered by their id * @throws UnknownEventTypeException if {@code eventType} is unknown */ - List getSnapshot(@Nonnull String eventType, @Nullable Object withIdGreaterThan); + List getSnapshot(String eventType, Object withIdGreaterThan); Set getSupportedEventTypes(); diff --git a/src/main/java/org/zalando/nakadiproducer/snapshots/UnknownEventTypeException.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/UnknownEventTypeException.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/snapshots/UnknownEventTypeException.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/UnknownEventTypeException.java diff --git a/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java similarity index 90% rename from src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java index 6d46508e..9f9ba5e8 100644 --- a/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java @@ -3,20 +3,16 @@ import java.util.List; import java.util.Set; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; import org.zalando.nakadiproducer.eventlog.EventLogWriter; import org.zalando.nakadiproducer.snapshots.SnapshotEventProvider; import org.zalando.nakadiproducer.snapshots.SnapshotEventProvider.Snapshot; -@Service public class SnapshotCreationService { private final SnapshotEventProvider snapshotEventProvider; private final EventLogWriter eventLogWriter; - @Autowired public SnapshotCreationService(SnapshotEventProvider snapshotEventProvider, EventLogWriter eventLogWriter) { this.snapshotEventProvider = snapshotEventProvider; this.eventLogWriter = eventLogWriter; diff --git a/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventProviderNotImplementedException.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventProviderNotImplementedException.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventProviderNotImplementedException.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventProviderNotImplementedException.java diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/NakadiPublishingClient.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/NakadiPublishingClient.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/transmission/NakadiPublishingClient.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/NakadiPublishingClient.java diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java similarity index 90% rename from src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java index 77638f23..0900b68b 100644 --- a/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java @@ -14,8 +14,6 @@ import javax.transaction.Transactional; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; import org.zalando.nakadiproducer.eventlog.impl.EventLog; import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository; import org.zalando.nakadiproducer.transmission.NakadiPublishingClient; @@ -23,19 +21,21 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -@Service @Slf4j public class EventTransmissionService { - @Autowired private EventLogRepository eventLogRepository; - @Autowired private NakadiPublishingClient nakadiPublishingClient; - @Autowired private ObjectMapper objectMapper; + public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) { + this.eventLogRepository = eventLogRepository; + this.nakadiPublishingClient = nakadiPublishingClient; + this.objectMapper = objectMapper; + } + @Transactional public Collection lockSomeEvents() { String lockId = UUID.randomUUID().toString(); diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java new file mode 100644 index 00000000..bc34bda3 --- /dev/null +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java @@ -0,0 +1,13 @@ +package org.zalando.nakadiproducer.transmission.impl; + +public class EventTransmitter { + private final EventTransmissionService eventTransmissionService; + + public EventTransmitter(EventTransmissionService eventTransmissionService) { + this.eventTransmissionService = eventTransmissionService; + } + + public void sendEvents() { + eventTransmissionService.lockSomeEvents().forEach(eventTransmissionService::sendEvent); + } +} diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/impl/FahrscheinNakadiPublishingClient.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/FahrscheinNakadiPublishingClient.java similarity index 88% rename from src/main/java/org/zalando/nakadiproducer/transmission/impl/FahrscheinNakadiPublishingClient.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/FahrscheinNakadiPublishingClient.java index bc6ad4d1..933e01dc 100644 --- a/src/main/java/org/zalando/nakadiproducer/transmission/impl/FahrscheinNakadiPublishingClient.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/FahrscheinNakadiPublishingClient.java @@ -2,13 +2,11 @@ import java.util.List; -import org.springframework.beans.factory.annotation.Autowired; import org.zalando.nakadiproducer.transmission.NakadiPublishingClient; public class FahrscheinNakadiPublishingClient implements NakadiPublishingClient { private final org.zalando.fahrschein.NakadiClient delegate; - @Autowired public FahrscheinNakadiPublishingClient(org.zalando.fahrschein.NakadiClient delegate) { this.delegate = delegate; } diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiEvent.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiEvent.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiEvent.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiEvent.java diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiMetadata.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiMetadata.java similarity index 100% rename from src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiMetadata.java rename to nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/NakadiMetadata.java diff --git a/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java rename to nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java diff --git a/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java similarity index 95% rename from src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java rename to nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java index a8aa82e1..a91f0484 100644 --- a/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java @@ -22,7 +22,7 @@ import org.mockito.Captor; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.junit.MockitoJUnitRunner; import org.zalando.nakadiproducer.eventlog.EventLogWriter; import org.zalando.nakadiproducer.snapshots.SnapshotEventProvider; import org.zalando.nakadiproducer.snapshots.SnapshotEventProvider.Snapshot; @@ -73,7 +73,7 @@ public void testSnapshotSavedInBatches() { // when snapshot returns 5 item stream when(snapshotEventProvider.getSnapshot(PUBLISHER_EVENT_TYPE, null)).thenReturn(eventPayloads.subList(0, 3)); when(snapshotEventProvider.getSnapshot(PUBLISHER_EVENT_TYPE, 2)).thenReturn(eventPayloads.subList(3, 5)); - when(snapshotEventProvider.getSnapshot(PUBLISHER_EVENT_TYPE, 5)).thenReturn(Collections.emptyList()); + //when(snapshotEventProvider.getSnapshot(PUBLISHER_EVENT_TYPE, 5)).thenReturn(Collections.emptyList()); // create a snapshot eventTransmissionService.createSnapshotEvents(PUBLISHER_EVENT_TYPE); diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/util/Fixture.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/util/Fixture.java new file mode 100644 index 00000000..9ab4d652 --- /dev/null +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/util/Fixture.java @@ -0,0 +1,60 @@ +package org.zalando.nakadiproducer.util; + +import java.util.ArrayList; +import java.util.List; + +import org.zalando.nakadiproducer.snapshots.SnapshotEventProvider.Snapshot; + +public class Fixture { + + public static final String PUBLISHER_EVENT_TYPE = "wholesale.some-publisher-change-event"; + public static final String PUBLISHER_DATA_TYPE = "nakadi:some-publisher"; + + public static MockPayload mockPayload(Integer id, String code, Boolean isActive, + MockPayload.SubClass more, List items) { + return MockPayload.builder() + .id(id) + .code(code) + .isActive(isActive) + .more(more) + .items(items) + .build(); + } + + public static MockPayload mockPayload(Integer id, String code) { + return mockPayload(id, code, true, mockSubClass(), mockSubList(3)); + } + + public static List mockSnapshotList(Integer size) { + List list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add(new Snapshot(i, PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, mockPayload(i + 1, "code" + i, true, mockSubClass("some info " + i), mockSubList(3, "some detail for code" + i)))); + } + return list; + } + + public static MockPayload.SubClass mockSubClass(String info) { + return MockPayload.SubClass.builder().info(info).build(); + } + + private static MockPayload.SubClass mockSubClass() { + return mockSubClass("Info something"); + } + + private static MockPayload.SubListItem mockSubListItem(String detail) { + return MockPayload.SubListItem.builder().detail(detail).build(); + } + + public static List mockSubList(Integer size, String detail) { + List items = new ArrayList<>(); + for (int i = 0; i < size; i++) { + items.add(mockSubListItem(detail + i)); + } + return items; + } + + private static List mockSubList(Integer size) { + return mockSubList(size, "Detail something "); + } + +} diff --git a/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java similarity index 100% rename from src/test/java/org/zalando/nakadiproducer/util/MockPayload.java rename to nakadi-producer/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java index 317d1d2c..e2611b91 100644 --- a/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/util/MockPayload.java @@ -1,7 +1,5 @@ package org.zalando.nakadiproducer.util; -import java.util.List; - import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -9,6 +7,8 @@ import lombok.Setter; import lombok.ToString; +import java.util.List; + @ToString @Getter @Setter diff --git a/pom.xml b/pom.xml index 77b025f5..a8775c16 100644 --- a/pom.xml +++ b/pom.xml @@ -13,191 +13,16 @@ 1.5.3.RELEASE - nakadi-producer-spring-boot-starter + nakadi-producer-reactor org.zalando - 3.0.1 - Nakadi Event Producer: Spring Boot Starter - Spring Boot Auto Configuration for Nakadi event producer + 3.1.0-SNAPSHOT + pom + Nakadi Event Producer Reactor - - 1.8 - 1.8 - 1.8 - 0.4.24 - - - - - org.springframework - spring-context - ${spring.version} - - - org.springframework - spring-jdbc - ${spring.version} - - - javax.transaction - javax.transaction-api - - - org.flywaydb - flyway-core - - - org.zalando - fahrschein - 0.9.1 - - - org.zalando - tracer-core - 0.11.2 - true - - - org.zalando.stups - tokens - 0.11.0-beta-2 - true - - - org.springframework.boot - spring-boot-actuator - true - - - org.projectlombok - lombok - provided - - - org.springframework.boot - spring-boot-starter-test - test - - - com.jayway.restassured - rest-assured - 2.9.0 - test - - - org.postgresql - postgresql - 9.4.1211 - test - - - com.opentable.components - otj-pg-embedded - 0.7.1 - test - - - postgresql - postgresql - - - - - commons-logging - commons-logging - 1.2 - test - - - org.springframework.boot - spring-boot-configuration-processor - true - - - - javax.interceptor - javax.interceptor-api - 1.2 - provided - true - - - - - - ${project.artifactId} - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - - integration-test - verify - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.2.1 - - - attach-sources - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.9.1 - - -Xdoclint:none - - - - attach-javadocs - - jar - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.5 - - - sign-artifacts - verify - - sign - - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - 1.6.7 - true - - ossrh - https://oss.sonatype.org/ - true - - - - + + nakadi-producer + nakadi-producer-spring-boot-starter + @@ -207,15 +32,4 @@ - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - \ No newline at end of file diff --git a/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java b/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java deleted file mode 100644 index 1e823c82..00000000 --- a/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.zalando.nakadiproducer.transmission.impl; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -@Component -public class EventTransmitter { - @Autowired - private EventTransmissionService eventLogService; - - @Value("${nakadi-producer.scheduled-transmission-enabled:true}") - private boolean scheduledTransmissionEnabled; - - @Scheduled(fixedDelayString = "${nakadi-producer.transmission-polling-delay:1000}") - protected void sendEventsIfSchedulingEnabled() { - if (scheduledTransmissionEnabled) { - sendEvents(); - } - } - - public void sendEvents() { - eventLogService.lockSomeEvents().forEach(eventLogService::sendEvent); - } -} diff --git a/src/main/resources/META-INF/spring-configuration-metadata.json b/src/main/resources/META-INF/spring-configuration-metadata.json deleted file mode 100644 index b4872a64..00000000 --- a/src/main/resources/META-INF/spring-configuration-metadata.json +++ /dev/null @@ -1,33 +0,0 @@ -{"groups": [ - { - "name": "nakadi-producer" - } -],"properties": [ - { - "name": "nakadi-producer.scheduled-transmission-enabled", - "type": "java.lang.Boolean", - "defaultValue": true, - "description": "determines, if persisted events should be automatically and periodically transmitted to nakadi" - }, - { - "name": "nakadi-producer.nakadi-base-uri", - "type": "java.net.URI", - "description": "the base URI of your nakadi instance" - }, - { - "name": "nakadi-producer.access-token-uri", - "type": "java.net.URI", - "description": "if you use the stups tokens library, this configures the URI of your OAuth2 server" - }, - { - "name": "nakadi-producer.access-token-scopes", - "type": "java.util.Collection", - "description": "if you use the stups tokens library, provide a list of oAuth scopes that are required to send nakadi events" - }, - { - "name": "nakadi-producer.transmission-polling-delay", - "type": "java.lang.Long", - "defaultValue": 1000, - "description": "When all existing events have been transmitted to nakadi, the sender will sleep for the specified amount of milliseconds before trying to fetch a new batch of events for transmission" - } -]}