diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt index 43e35a4c7c4..8d6f8f0c112 100644 --- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt @@ -1,7 +1,12 @@ package io.airbyte.data.services import io.airbyte.data.repositories.entities.ConnectionTimelineEvent +import io.airbyte.data.services.shared.ConnectionEvent +import java.util.UUID interface ConnectionTimelineEventService { - fun writeEvent(event: ConnectionTimelineEvent): ConnectionTimelineEvent + fun writeEvent( + connectionId: UUID, + event: ConnectionEvent, + ): ConnectionTimelineEvent } diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceImpl.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceImpl.kt index 9c73e088b38..732d317de76 100644 --- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceImpl.kt +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceImpl.kt @@ -1,13 +1,26 @@ package io.airbyte.data.services.impls.data +import com.fasterxml.jackson.databind.ObjectMapper import io.airbyte.data.repositories.ConnectionTimelineEventRepository import io.airbyte.data.repositories.entities.ConnectionTimelineEvent import io.airbyte.data.services.ConnectionTimelineEventService +import io.airbyte.data.services.shared.ConnectionEvent import jakarta.inject.Singleton +import java.util.UUID @Singleton class ConnectionTimelineEventServiceImpl(private val repository: ConnectionTimelineEventRepository) : ConnectionTimelineEventService { - override fun writeEvent(event: ConnectionTimelineEvent): ConnectionTimelineEvent { + override fun writeEvent( + connectionId: UUID, + connectionEvent: ConnectionEvent, + ): ConnectionTimelineEvent { + val serializedEvent = MAPPER.writeValueAsString(connectionEvent) + val event = + ConnectionTimelineEvent(null, connectionId, connectionEvent.getUserId(), connectionEvent.getEventType().toString(), serializedEvent, null) return repository.save(event) } + + companion object { + private val MAPPER = ObjectMapper() + } } diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt new file mode 100644 index 00000000000..eb7dbf9c1b0 --- /dev/null +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt @@ -0,0 +1,15 @@ +package io.airbyte.data.services.shared + +import java.util.UUID + +interface ConnectionEvent { + enum class Type { + SYNC_SUCCEEDED, + } + + fun getUserId(): UUID? { + return null + } + + fun getEventType(): Type +} diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/SyncSucceededEvent.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/SyncSucceededEvent.kt new file mode 100644 index 00000000000..a65d1674ba4 --- /dev/null +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/SyncSucceededEvent.kt @@ -0,0 +1,38 @@ +package io.airbyte.data.services.shared + +class SyncSucceededEvent( + private val jobId: Long, + private val startTimeEpochSeconds: Long, + private val endTimeEpochSeconds: Long, + private val bytesLoaded: Long, + private val recordsLoaded: Long, + private val attemptsCount: Int, +) : ConnectionEvent { + fun startTimeEpochSeconds(): Long { + return this.startTimeEpochSeconds + } + + fun endTimeEpochSeconds(): Long { + return endTimeEpochSeconds + } + + fun getJobId(): Long { + return jobId + } + + fun getBytesLoaded(): Long { + return bytesLoaded + } + + fun getRecordsLoaded(): Long { + return recordsLoaded + } + + fun getAttemptsCount(): Int { + return attemptsCount + } + + override fun getEventType(): ConnectionEvent.Type { + return ConnectionEvent.Type.SYNC_SUCCEEDED + } +}