diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 22b5d0282e02..73fdc5849dfb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -84,18 +84,24 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) { if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) { + LOGGER.info(">>>NORMALIZATION STARTED: " + connectionId + " " + System.currentTimeMillis() + "\njob: " + jobRunConfig + "\nsource: " + sourceLauncherConfig + "\n dest: " + + destinationLauncherConfig); final Configs configs = new EnvConfigs(); final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput, configs); final NormalizationSummary normalizationSummary = normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); + LOGGER.info(">>>NORMALIZATION FINISHED: " + connectionId + " " + System.currentTimeMillis()); syncOutput = syncOutput.withNormalizationSummary(normalizationSummary); } else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) { + LOGGER.info(">>>DBT STARTED: " + connectionId + " " + System.currentTimeMillis() + "\njob: " + jobRunConfig + "\nsource: " + sourceLauncherConfig + "\n dest: " + + destinationLauncherConfig); final OperatorDbtInput operatorDbtInput = new OperatorDbtInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) .withOperatorDbt(standardSyncOperation.getOperatorDbt()); dbtTransformationActivity.run(jobRunConfig, destinationLauncherConfig, syncInput.getResourceRequirements(), operatorDbtInput); + LOGGER.info(">>>DBT FINISHED: " + connectionId + " " + System.currentTimeMillis()); } else { final String message = String.format("Unsupported operation type: %s", standardSyncOperation.getOperatorType()); LOGGER.error(message);