Skip to content

Commit

Permalink
[GOBBLIN-2186] Emit GoT GTEs to time WorkUnit prep and to record vo…
Browse files Browse the repository at this point in the history
…lume of Work Discovery (#4089)
  • Loading branch information
phet authored Jan 2, 2025
1 parent 7dbeebf commit 3269764
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
*/
protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog destinationIcebergCatalog, String destDbName, String destTableName, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, srcTableName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, srcTableName));
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Source Iceberg Table not found: {%s}.{%s}", srcDbName, srcTableName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName));
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Destination Iceberg Table not found: {%s}.{%s}", destDbName, destTableName));
return createSpecificDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public static class FlowEventConstants {
public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
public static final String WORKUNIT_PLAN_START_TIME = "workunitPlanStartTime";
public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
public static final String WORKUNITS_GENERATED_SUMMARY = "workUnitsGeneratedSummary";
public static final String JOB_END_TIME = "jobEndTime";
public static final String JOB_LAST_PROGRESS_EVENT_TIME = "jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE = "jobCompletionPercentage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
fileStatuses = this.fs.listStatus(this.specDirPath,
new AndPathFilter(new HiddenFilter(), new AvroUtils.AvroPathFilter()));
} catch (IOException e) {
log.error("Error when listing files at path: {}", this.specDirPath.toString(), e);
log.error("Error when listing files at path: " + this.specDirPath.toString(), e);
return null;
}
log.info("Found {} files at path {}", fileStatuses.length, this.specDirPath.toString());
Expand All @@ -102,42 +102,53 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
try {
dataFileReader = new DataFileReader<>(new FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>());
} catch (IOException e) {
log.error("Error creating DataFileReader for: {}", fileStatus.getPath().toString(), e);
log.error("Error creating DataFileReader for: " + fileStatus.getPath().toString(), e);
continue;
}

AvroJobSpec avroJobSpec = null;
while (dataFileReader.hasNext()) {
avroJobSpec = dataFileReader.next();
break;
}
try { // ensure `dataFileReader` is always closed!
AvroJobSpec avroJobSpec = null;
while (dataFileReader.hasNext()) {
avroJobSpec = dataFileReader.next();
break;
}

if (avroJobSpec != null) {
JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
Properties props = new Properties();
props.putAll(avroJobSpec.getProperties());
jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
.withVersion(avroJobSpec.getVersion())
.withDescription(avroJobSpec.getDescription())
.withConfigAsProperties(props)
.withConfig(ConfigUtils.propertiesToConfig(props));

try {
if (!avroJobSpec.getTemplateUri().isEmpty()) {
jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
}
} catch (URISyntaxException u) {
log.error("Error building a job spec: ", u);
continue;
}

if (avroJobSpec != null) {
JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
Properties props = new Properties();
props.putAll(avroJobSpec.getProperties());
jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
.withVersion(avroJobSpec.getVersion())
.withDescription(avroJobSpec.getDescription())
.withConfigAsProperties(props)
.withConfig(ConfigUtils.propertiesToConfig(props));
String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);

JobSpec jobSpec = jobSpecBuilder.build();
log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString());
specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
} finally {
try {
if (!avroJobSpec.getTemplateUri().isEmpty()) {
jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
if (dataFileReader != null) {
dataFileReader.close();
dataFileReader = null;
}
} catch (URISyntaxException u) {
log.error("Error building a job spec: ", u);
continue;
} catch (IOException e) {
log.warn("Unable to close DataFileReader for: {} - {}", fileStatus.getPath().toString(), e.getMessage());
}

String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);

JobSpec jobSpec = jobSpecBuilder.build();
log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString());
specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
}
return new CompletedFuture<>(specList, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {

// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
// to these data structures.
@Getter
protected final Map<URI, TopologySpec> topologySpecMap;

protected final Config config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public class StaticFlowTemplate implements FlowTemplate {
private String description;
@Getter
private transient FlowCatalogWithTemplates catalog;
@Getter
private List<JobTemplate> jobTemplates;

private transient Config rawConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
Expand All @@ -60,6 +61,8 @@
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;


Expand Down Expand Up @@ -127,6 +130,9 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
log.info("Discovered WorkUnits: {}", wuSizeSummary);
// IMPORTANT: send prior to `writeWorkUnits`, so the volume of work discovered (and bin packed) gets durably measured. even if serialization were to
// exceed available memory and this activity execution were to fail, a subsequent re-attempt would know the amount of work, to guide re-config/attempt
createWorkPreparedSizeDistillationTimer(wuSizeSummary, eventSubmitterContext).stop();

JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete
Expand All @@ -150,26 +156,28 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer,
Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
// report (timer) metrics for "Work Discovery", *planning only* - NOT including WU prep, like serialization, `DestinationDatasetHandlerService`ing, etc.
// IMPORTANT: for accurate timing, SEPARATELY emit `.createWorkPreparationTimer`, to record time prior to measuring the WU size required for that one
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
EventTimer workDiscoveryTimer = timerFactory.createWorkDiscoveryTimer();
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
: new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();

// TODO: report (timer) metrics for workunits creation
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // indicates a problem getting the WUs
String errMsg = "Failure in getting work units for job " + jobState.getJobId();
log.error(errMsg);
// TODO: decide whether a non-retryable failure is too severe... (in most circumstances, it's likely what we want)
// TODO: decide whether a non-retryable failure is too severe... (some sources may merit retry)
throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: Source.getWorkUnits()");
}
workDiscoveryTimer.stop();

if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure)
log.warn("No work units created for job " + jobState.getJobId());
return Lists.newArrayList();
}

// TODO: count total bytes for progress tracking!

boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for cleanup
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create()));
Expand Down Expand Up @@ -264,6 +272,19 @@ protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit> workUnit
return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest);
}

protected static EventTimer createWorkPreparedSizeDistillationTimer(
WorkUnitsSizeSummary wuSizeSummary, EventSubmitterContext eventSubmitterContext) {
// Inspired by a pair of log messages produced within `CopySource::getWorkUnits`:
// 1. Statistics for ConcurrentBoundedPriorityIterable: {ResourcePool: {softBound: [ ... ], hardBound: [ ...]},totalResourcesUsed: [ ... ], \
// maxRequirementPerDimension: [entities: 231943.0, bytesCopied: 1.22419622769628E14], ... }
// 2. org.apache.gobblin.data.management.copy.CopySource - Bin packed work units. Initial work units: 27252, packed work units: 13175, \
// max weight per bin: 500000000, max work units per bin: 100.
// rather than merely logging, durably emit this info, to inform re-config for any potential re-attempt (should WU serialization OOM)
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
return timerFactory.createWorkPreparationTimer()
.withMetadataAsJson(TimingEvent.WORKUNITS_GENERATED_SUMMARY, wuSizeSummary.distill());
}

public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ public class WorkUnitsSizeSummary {
@NonNull private List<Double> topLevelQuantilesMinSizes;
@NonNull private List<Double> constituentQuantilesMinSizes;

/** Total size, counts, means, and medians: the most telling measurements packaged for ready consumption / observability */
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public static class Distillation {
@NonNull private long totalSize;
@NonNull private long topLevelWorkUnitsCount;
@NonNull private long constituentWorkUnitsCount;
@NonNull private double topLevelWorkUnitsMeanSize;
@NonNull private double constituentWorkUnitsMeanSize;
@NonNull private double topLevelWorkUnitsMedianSize;
@NonNull private double constituentWorkUnitsMedianSize;
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public Distillation distill() {
return new Distillation(this.totalSize, this.topLevelWorkUnitsCount, this.constituentWorkUnitsCount,
this.getTopLevelWorkUnitsMeanSize(), this.getConstituentWorkUnitsMeanSize(),
this.getTopLevelWorkUnitsMedianSize(), this.getConstituentWorkUnitsMedianSize()
);
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getTopLevelWorkUnitsMeanSize() {
return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.gobblin.temporal.ddm.workflow.impl;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
Expand Down Expand Up @@ -60,12 +59,10 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);

if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
.submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
}
if (commitGobblinStats.getOptFailure().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
.build();

private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.setStartToCloseTimeout(Duration.ofMinutes(10))
.setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
.build();
private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS);

@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = Optional.empty();
Expand Down Expand Up @@ -207,7 +207,7 @@ protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSum
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime());
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp
private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec) {
if (workSpec.isToDoJobLevelTiming()) {
EventSubmitterContext eventSubmitterContext = workSpec.getEventSubmitterContext();
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
return Optional.of(timerFactory.createJobTimer());
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getGreeting(String name, EventSubmitterContext eventSubmitterConte
/**
* Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation.
*/
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
timer.withMetadata("name", name);
Expand Down
Loading

0 comments on commit 3269764

Please sign in to comment.