Skip to content

Commit

Permalink
Merge branch 'main' into catskan/add-ingress-template-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Catskan authored Aug 24, 2024
2 parents d3955d9 + e49b924 commit d403d73
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ object FileConstants {
const val CONNECTION_CONFIGURATION_FILE = "connectionConfiguration.json"
const val INIT_INPUT_FILE = "input.json"
const val SIDECAR_INPUT_FILE = "sidecarInput.json"
const val SOURCE_CONFIG_FILE = "sourceConfig.json"
const val DESTINATION_CONFIG_FILE = "destinationConfig.json"
const val SOURCE_CATALOG_FILE = "sourceCatalog.json"
const val DESTINATION_CATALOG_FILE = "destinationCatalog.json"
const val CONNECTOR_CONFIG_FILE = "connectorConfig.json"
const val CATALOG_FILE = "catalog.json"
const val INPUT_STATE_FILE = "inputState.json"

// marker files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import io.airbyte.workers.ReplicationInputHydrator
import io.airbyte.workers.internal.NamespacingMapper
import io.airbyte.workers.models.ReplicationActivityInput
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.pod.FileConstants.DEST_DIR
import io.airbyte.workers.pod.FileConstants.SOURCE_DIR
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workload.api.client.model.generated.Workload
Expand Down Expand Up @@ -47,13 +49,21 @@ class ReplicationHydrationProcessor(
// source inputs
logger.info { "Writing source inputs..." }
fileClient.writeInputFile(
FileConstants.SOURCE_CATALOG_FILE,
FileConstants.CATALOG_FILE,
protocolSerializer.serialize(hydrated.catalog, false),
SOURCE_DIR,
)

fileClient.writeInputFile(
FileConstants.SOURCE_CONFIG_FILE,
FileConstants.CONNECTOR_CONFIG_FILE,
serializer.serialize(hydrated.sourceConfiguration),
SOURCE_DIR,
)

fileClient.writeInputFile(
FileConstants.INPUT_STATE_FILE,
serializer.serialize(hydrated.state),
SOURCE_DIR,
)

// dest inputs
Expand All @@ -68,19 +78,15 @@ class ReplicationHydrationProcessor(
val destinationCatalog = mapper.mapCatalog(hydrated.catalog)

fileClient.writeInputFile(
FileConstants.DESTINATION_CATALOG_FILE,
FileConstants.CATALOG_FILE,
protocolSerializer.serialize(destinationCatalog, hydrated.destinationSupportsRefreshes),
DEST_DIR,
)

fileClient.writeInputFile(
FileConstants.DESTINATION_CONFIG_FILE,
FileConstants.CONNECTOR_CONFIG_FILE,
serializer.serialize(hydrated.destinationConfiguration),
)

// shared state input
fileClient.writeInputFile(
FileConstants.INPUT_STATE_FILE,
serializer.serialize(hydrated.state),
DEST_DIR,
)

// pipes for passing messages between all three
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ class FileClient {
fun writeInputFile(
fileName: String,
fileContents: String,
baseDir: String = FileConstants.CONFIG_DIR,
) {
Files.writeString(
Path.of(FileConstants.CONFIG_DIR).resolve(fileName),
Path.of(baseDir).resolve(fileName),
fileContents,
StandardCharsets.UTF_8,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ class ReplicationHydrationProcessorTest {
verify { serializer.serialize(hydrated.state) }
verify { protocolSerializer.serialize(hydrated.catalog, false) }
verify { protocolSerializer.serialize(mapper.mapCatalog(hydrated.catalog), hydrated.destinationSupportsRefreshes) }
verify { fileClient.writeInputFile(FileConstants.SOURCE_CATALOG_FILE, serializedSrcCatalog) }
verify { fileClient.writeInputFile(FileConstants.SOURCE_CONFIG_FILE, serializedSrcConfig) }
verify { fileClient.writeInputFile(FileConstants.DESTINATION_CATALOG_FILE, serializedDestCatalog) }
verify { fileClient.writeInputFile(FileConstants.DESTINATION_CONFIG_FILE, serializedDestConfig) }
verify { fileClient.writeInputFile(FileConstants.INPUT_STATE_FILE, serializedState) }
verify { fileClient.writeInputFile(FileConstants.CATALOG_FILE, serializedSrcCatalog, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.CONNECTOR_CONFIG_FILE, serializedSrcConfig, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.INPUT_STATE_FILE, serializedState, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.CATALOG_FILE, serializedDestCatalog, FileConstants.DEST_DIR) }
verify { fileClient.writeInputFile(FileConstants.CONNECTOR_CONFIG_FILE, serializedDestConfig, FileConstants.DEST_DIR) }
verify { fileClient.makeNamedPipes() }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package io.airbyte.workload.launcher.pods.factories
import io.airbyte.workers.pod.ContainerConstants.DESTINATION_CONTAINER_NAME
import io.airbyte.workers.pod.ContainerConstants.ORCHESTRATOR_CONTAINER_NAME
import io.airbyte.workers.pod.ContainerConstants.SOURCE_CONTAINER_NAME
import io.airbyte.workers.pod.FileConstants.CONFIG_DIR
import io.airbyte.workers.pod.FileConstants.DESTINATION_CATALOG_FILE
import io.airbyte.workers.pod.FileConstants.DESTINATION_CONFIG_FILE
import io.airbyte.workers.pod.FileConstants.CATALOG_FILE
import io.airbyte.workers.pod.FileConstants.CONNECTOR_CONFIG_FILE
import io.airbyte.workers.pod.FileConstants.DEST_DIR
import io.airbyte.workers.pod.FileConstants.INPUT_STATE_FILE
import io.airbyte.workers.pod.FileConstants.SOURCE_CATALOG_FILE
import io.airbyte.workers.pod.FileConstants.SOURCE_CONFIG_FILE
import io.airbyte.workers.pod.FileConstants.SOURCE_DIR
import io.airbyte.workload.launcher.config.OrchestratorEnvSingleton
import io.fabric8.kubernetes.api.model.CapabilitiesBuilder
Expand Down Expand Up @@ -60,9 +57,9 @@ class ReplicationContainerFactory(
val mainCommand =
ContainerCommandFactory.replConnector(
"read",
"--config $CONFIG_DIR/${SOURCE_CONFIG_FILE} " +
"--catalog $CONFIG_DIR/${SOURCE_CATALOG_FILE} " +
"--state $CONFIG_DIR/${INPUT_STATE_FILE}",
"--config $SOURCE_DIR/${CONNECTOR_CONFIG_FILE} " +
"--catalog $SOURCE_DIR/${CATALOG_FILE} " +
"--state $SOURCE_DIR/${INPUT_STATE_FILE}",
"/dev/null",
)

Expand All @@ -88,9 +85,8 @@ class ReplicationContainerFactory(
val mainCommand =
ContainerCommandFactory.replConnector(
"write",
"--config $CONFIG_DIR/${DESTINATION_CONFIG_FILE} " +
"--catalog $CONFIG_DIR/${DESTINATION_CATALOG_FILE} " +
"--state $CONFIG_DIR/${INPUT_STATE_FILE}",
"--config $DEST_DIR/${CONNECTOR_CONFIG_FILE} " +
"--catalog $DEST_DIR/${CATALOG_FILE} ",
)

return ContainerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ReplicationPodFactory(
.withImagePullSecrets(imagePullSecrets)
.withVolumes(replicationVolumes.allVolumes)
.withNodeSelector<Any, Any>(nodeSelectors)
.withAutomountServiceAccountToken(false)
.withSecurityContext(podSecurityContext())
.endSpec()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ class VolumeFactory(
val config = config()
volumes.add(config.volume)
orchVolumeMounts.add(config.mount)
sourceVolumeMounts.add(config.mount)
destVolumeMounts.add(config.mount)

val source = source()
volumes.add(source.volume)
Expand Down

0 comments on commit d403d73

Please sign in to comment.