From e10a19801fd53ffd0f403de0f4a1c6a86e46a4ba Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Wed, 29 Sep 2021 20:03:21 +0200 Subject: [PATCH] Inject oauth params when using "sync now" button too (#6545) * Inject oauth params when using "sync now" button too * Format secrets code --- .../job_factory/OAuthConfigSupplier.java | 6 +++ .../secretsmigration/SecretsMigration.java | 22 +---------- .../SecretsMigrationTest.java | 22 +---------- .../airbyte/server/apis/ConfigurationApi.java | 4 +- .../server/handlers/SchedulerHandler.java | 38 ++++++++++++++----- .../server/handlers/SchedulerHandlerTest.java | 4 +- 6 files changed, 42 insertions(+), 54 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java index 4ce7a1339caf..070ee6f8c4ce 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java @@ -16,9 +16,13 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class OAuthConfigSupplier { + private static final Logger LOGGER = LoggerFactory.getLogger(OAuthConfigSupplier.class); + public static final String SECRET_MASK = "******"; final private ConfigRepository configRepository; private final boolean maskSecrets; @@ -72,9 +76,11 @@ void injectJsonNode(ObjectNode mainConfig, ObjectNode fromConfig) { // TODO secrets should be masked with the correct type // https://github.com/airbytehq/airbyte/issues/5990 // In the short-term this is not world-ending as all secret fields are currently strings + LOGGER.debug(String.format("Masking instance wide parameter %s in config", key)); mainConfig.set(key, Jsons.jsonNode(SECRET_MASK)); } else { if (!mainConfig.has(key) || isSecretMask(mainConfig.get(key).asText())) { + LOGGER.debug(String.format("injecting instance wide parameter %s into config", key)); mainConfig.set(key, fromConfig.get(key)); } } diff --git a/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java b/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java index a4e25245be36..1c29e61caf2a 100644 --- a/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java +++ b/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java @@ -1,25 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ package io.airbyte.secretsmigration; diff --git a/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java b/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java index f20acaaf52a9..5c57fa8e281d 100644 --- a/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java +++ b/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java @@ -1,25 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ package io.airbyte.secretsmigration; diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 60f40ff0a967..f2c5a0da81ae 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -87,6 +87,7 @@ import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; +import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.BadObjectSchemaKnownException; import io.airbyte.server.errors.IdNotFoundKnownException; @@ -158,7 +159,8 @@ public ConfigurationApi(final ConfigRepository configRepository, synchronousSchedulerClient, jobPersistence, jobNotifier, - temporalService); + temporalService, + new OAuthConfigSupplier(configRepository, false)); final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient); final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence); sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 2efc524208a2..17dfb836bf2f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -4,6 +4,7 @@ package io.airbyte.server.handlers; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.airbyte.api.model.AuthSpecification; @@ -45,6 +46,7 @@ import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.server.converters.CatalogConverter; import io.airbyte.server.converters.ConfigurationUpdate; import io.airbyte.server.converters.JobConverter; @@ -76,13 +78,15 @@ public class SchedulerHandler { private final JobPersistence jobPersistence; private final JobNotifier jobNotifier; private final WorkflowServiceStubs temporalService; + private final OAuthConfigSupplier oAuthConfigSupplier; public SchedulerHandler(ConfigRepository configRepository, SchedulerJobClient schedulerJobClient, SynchronousSchedulerClient synchronousSchedulerClient, JobPersistence jobPersistence, JobNotifier jobNotifier, - WorkflowServiceStubs temporalService) { + WorkflowServiceStubs temporalService, + OAuthConfigSupplier oAuthConfigSupplier) { this( configRepository, schedulerJobClient, @@ -92,7 +96,8 @@ public SchedulerHandler(ConfigRepository configRepository, new SpecFetcher(synchronousSchedulerClient), jobPersistence, jobNotifier, - temporalService); + temporalService, + oAuthConfigSupplier); } @VisibleForTesting @@ -104,7 +109,8 @@ public SchedulerHandler(ConfigRepository configRepository, SpecFetcher specFetcher, JobPersistence jobPersistence, JobNotifier jobNotifier, - WorkflowServiceStubs temporalService) { + WorkflowServiceStubs temporalService, + OAuthConfigSupplier oAuthConfigSupplier) { this.configRepository = configRepository; this.schedulerJobClient = schedulerJobClient; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -114,6 +120,7 @@ public SchedulerHandler(ConfigRepository configRepository, this.jobPersistence = jobPersistence; this.jobNotifier = jobNotifier; this.temporalService = temporalService; + this.oAuthConfigSupplier = oAuthConfigSupplier; } public CheckConnectionRead checkSourceConnectionFromSourceId(SourceIdRequestBody sourceIdRequestBody) @@ -276,13 +283,24 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ final UUID connectionId = connectionIdRequestBody.getConnectionId(); final StandardSync standardSync = configRepository.getStandardSync(connectionId); - final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId()); - final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); - - final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()); + final SourceConnection sourceConnection = configRepository.getSourceConnection(standardSync.getSourceId()); + final DestinationConnection destinationConnection = configRepository.getDestinationConnection(standardSync.getDestinationId()); + final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters( + sourceConnection.getSourceDefinitionId(), + sourceConnection.getWorkspaceId(), + sourceConnection.getConfiguration()); + sourceConnection.withConfiguration(sourceConfiguration); + final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters( + destinationConnection.getDestinationId(), + destinationConnection.getWorkspaceId(), + destinationConnection.getConfiguration()); + destinationConnection.withConfiguration(destinationConfiguration); + + final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()); final String sourceImageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag()); - final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()); + final StandardDestinationDefinition destinationDef = + configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()); final String destinationImageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()); final List standardSyncOperations = Lists.newArrayList(); @@ -292,8 +310,8 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ } final Job job = schedulerJobClient.createOrGetActiveSyncJob( - source, - destination, + sourceConnection, + destinationConnection, standardSync, sourceImageName, destinationImageName, diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 79206602030f..dd6901929703 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -61,6 +61,7 @@ import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.server.converters.ConfigurationUpdate; import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.helpers.ConnectionHelpers; @@ -149,7 +150,8 @@ void setup() { specFetcher, jobPersistence, jobNotifier, - mock(WorkflowServiceStubs.class)); + mock(WorkflowServiceStubs.class), + mock(OAuthConfigSupplier.class)); } @Test