Skip to content

Commit

Permalink
track destination calls elapsed time (#9171)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpefaur committed Oct 6, 2023
1 parent 63fae24 commit ca3b8e1
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public ReplicationWorker create(final ReplicationInput replicationInput,

log.info("Setting up destination...");
final var airbyteDestination = airbyteIntegrationLauncherFactory.createAirbyteDestination(destinationLauncherConfig,
replicationInput.getSyncResourceRequirements(), replicationInput.getCatalog());
replicationInput.getSyncResourceRequirements(), replicationInput.getCatalog(), metricClient, replicationInput, featureFlagClient);

final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.internal;

import static io.airbyte.metrics.lib.OssMetricsRegistry.WORKER_DESTINATION_ACCEPT_ELAPSED_MILLISECS;
import static io.airbyte.metrics.lib.OssMetricsRegistry.WORKER_DESTINATION_NOTIFY_END_OF_INPUT_ELAPSED_MILLISECS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
Expand All @@ -17,6 +20,9 @@
import io.airbyte.commons.protocol.DefaultProtocolSerializer;
import io.airbyte.commons.protocol.ProtocolSerializer;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricsRegistry;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerUtils;
Expand All @@ -30,6 +36,9 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
Expand All @@ -49,10 +58,18 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
143 // SIGTERM
);

private static final int EXECUTOR_SHUTDOWN_GRACE_PERIOD_SECONDS = 10;
public static final int EMIT_ELAPSED_TIME_METRIC_INTERVAL_SECONDS = 10 * 60;

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final AirbyteMessageBufferedWriterFactory messageWriterFactory;
private final ProtocolSerializer protocolSerializer;
private final boolean elapsedTimeTrackingEnabled;
private final ScheduledExecutorService scheduledExecutor;
private final MetricClient metricClient;
private final MetricAttribute[] metricAttrs;
private final int emitElapsedTimeMetricIntervalSeconds;

private final AtomicBoolean inputHasEnded = new AtomicBoolean(false);

Expand All @@ -67,17 +84,45 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher)
VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(LOGGER, CONTAINER_LOG_MDC_BUILDER, Optional.empty(),
Runtime.getRuntime().maxMemory(), false),
new DefaultAirbyteMessageBufferedWriterFactory(),
new DefaultProtocolSerializer());
new DefaultProtocolSerializer(), false, null, null);
}

public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final AirbyteMessageBufferedWriterFactory messageWriterFactory,
final ProtocolSerializer protocolSerializer) {
@VisibleForTesting
DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final AirbyteMessageBufferedWriterFactory messageWriterFactory,
final ProtocolSerializer protocolSerializer,
final boolean elapsedTimeTrackingEnabled,
final MetricClient metricClient,
final MetricAttribute[] metricAttrs,
final int emitElapsedTimeMetricIntervalSeconds) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.messageWriterFactory = messageWriterFactory;
this.protocolSerializer = protocolSerializer;
this.elapsedTimeTrackingEnabled = elapsedTimeTrackingEnabled;
this.metricClient = metricClient;
this.metricAttrs = metricAttrs;
this.emitElapsedTimeMetricIntervalSeconds = emitElapsedTimeMetricIntervalSeconds;
this.scheduledExecutor = elapsedTimeTrackingEnabled ? Executors.newSingleThreadScheduledExecutor() : null;
}

public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final AirbyteMessageBufferedWriterFactory messageWriterFactory,
final ProtocolSerializer protocolSerializer,
final boolean elapsedTimeTrackingEnabled,
final MetricClient metricClient,
final MetricAttribute[] metricAttrs) {
this(
integrationLauncher,
streamFactory,
messageWriterFactory,
protocolSerializer,
elapsedTimeTrackingEnabled,
metricClient,
metricAttrs,
EMIT_ELAPSED_TIME_METRIC_INTERVAL_SECONDS);
}

@Override
Expand All @@ -104,13 +149,35 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo

@Override
public void accept(final AirbyteMessage message) throws IOException {
if (elapsedTimeTrackingEnabled) {
final ScheduledFuture<?> scheduledFuture = emitAcceptElapsedTimeMetricOnInterval();
acceptWithoutTrackingElapsedTime(message);
scheduledFuture.cancel(true);
} else {
acceptWithoutTrackingElapsedTime(message);
}
}

@VisibleForTesting
void acceptWithoutTrackingElapsedTime(final AirbyteMessage message) throws IOException {
Preconditions.checkState(destinationProcess != null && !inputHasEnded.get());

writer.write(message);
}

@Override
public void notifyEndOfInput() throws IOException {
if (elapsedTimeTrackingEnabled) {
final ScheduledFuture<?> scheduledFuture = emitNotifyEndOfInputElapsedTimeMetricOnInterval();
notifyEndOfInputWithoutTrackingElapsedTime();
scheduledFuture.cancel(true);
} else {
notifyEndOfInputWithoutTrackingElapsedTime();
}
}

@VisibleForTesting
void notifyEndOfInputWithoutTrackingElapsedTime() throws IOException {
Preconditions.checkState(destinationProcess != null && !inputHasEnded.get());

writer.flush();
Expand All @@ -129,6 +196,20 @@ public void close() throws Exception {
notifyEndOfInput();
}

if (elapsedTimeTrackingEnabled) {
scheduledExecutor.shutdownNow();

try {
scheduledExecutor.awaitTermination(EXECUTOR_SHUTDOWN_GRACE_PERIOD_SECONDS, TimeUnit.SECONDS);
if (!scheduledExecutor.isTerminated()) {
LOGGER.error("Failed to shutdown scheduled executor");
}
} catch (final InterruptedException e) {
// Preserve the interrupt status
Thread.currentThread().interrupt();
}
}

LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 1, TimeUnit.MINUTES);
if (destinationProcess.isAlive() || !IGNORED_EXIT_CODES.contains(getExitValue())) {
Expand Down Expand Up @@ -180,4 +261,21 @@ public Optional<AirbyteMessage> attemptRead() {
return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
}

public ScheduledFuture<?> emitNotifyEndOfInputElapsedTimeMetricOnInterval() {
return emitMetricOnInterval(WORKER_DESTINATION_NOTIFY_END_OF_INPUT_ELAPSED_MILLISECS);
}

public ScheduledFuture<?> emitAcceptElapsedTimeMetricOnInterval() {
return emitMetricOnInterval(WORKER_DESTINATION_ACCEPT_ELAPSED_MILLISECS);
}

private ScheduledFuture<?> emitMetricOnInterval(final MetricsRegistry metric) {
final long startTime = System.currentTimeMillis();

return scheduledExecutor.scheduleAtFixedRate(() -> metricClient.gauge(
metric,
System.currentTimeMillis() - startTime,
metricAttrs), emitElapsedTimeMetricIntervalSeconds, emitElapsedTimeMetricIntervalSeconds, TimeUnit.SECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@
import io.airbyte.commons.protocol.VersionedProtocolSerializer;
import io.airbyte.config.SyncResourceRequirements;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.DestinationCallsElapsedTimeTrackingEnabled;
import io.airbyte.featureflag.FailSyncIfTooBig;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.ReplicationInput;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteSource;
Expand All @@ -28,6 +33,7 @@
import io.airbyte.workers.internal.exception.DestinationException;
import io.airbyte.workers.internal.exception.SourceException;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -119,14 +125,41 @@ public AirbyteSource createAirbyteSource(final IntegrationLauncherConfig sourceL
*/
public AirbyteDestination createAirbyteDestination(final IntegrationLauncherConfig destinationLauncherConfig,
final SyncResourceRequirements syncResourceRequirements,
final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog,
final MetricClient metricClient,
final ReplicationInput replicationInput,
final FeatureFlagClient featureFlagClient) {
final boolean destinationElapsedTimeTrackingEnabled = featureFlagClient.boolVariation(DestinationCallsElapsedTimeTrackingEnabled.INSTANCE,
new Workspace(replicationInput.getWorkspaceId()));

final IntegrationLauncher destinationLauncher = createIntegrationLauncher(destinationLauncherConfig, syncResourceRequirements);
return new DefaultAirbyteDestination(destinationLauncher,
getStreamFactory(destinationLauncherConfig, configuredAirbyteCatalog, DestinationException.class,
DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER, false),
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(),
Optional.of(configuredAirbyteCatalog)),
getProtocolSerializer(destinationLauncherConfig));
getProtocolSerializer(destinationLauncherConfig), destinationElapsedTimeTrackingEnabled, metricClient, toConnectionAttrs(replicationInput));
}

private MetricAttribute[] toConnectionAttrs(final ReplicationInput replicationInput) {
final var attrs = new ArrayList<MetricAttribute>();
if (replicationInput.getConnectionId() != null) {
attrs.add(new MetricAttribute(MetricTags.CONNECTION_ID, replicationInput.getConnectionId().toString()));
}

if (replicationInput.getDestinationId() != null) {
attrs.add(new MetricAttribute(MetricTags.DESTINATION_ID, replicationInput.getDestinationId().toString()));
}

if (replicationInput.getSourceId() != null) {
attrs.add(new MetricAttribute(MetricTags.SOURCE_ID, replicationInput.getSourceId().toString()));
}

if (replicationInput.getWorkspaceId() != null) {
attrs.add(new MetricAttribute(MetricTags.WORKSPACE_ID, replicationInput.getWorkspaceId().toString()));
}

return attrs.toArray(new MetricAttribute[0]);
}

private VersionedProtocolSerializer getProtocolSerializer(final IntegrationLauncherConfig launcherConfig) {
Expand Down
Loading

0 comments on commit ca3b8e1

Please sign in to comment.