Skip to content

Commit

Permalink
Ensure that short-lived timers can still submit
Browse files Browse the repository at this point in the history
  • Loading branch information
malessi committed Dec 19, 2024
1 parent 0dd7f02 commit f783e14
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
Slf4jReporter.forRegistry(rifFileEvent.getEventMetrics()).outputTo(LOGGER).build();
dataSetFileMetricsReporter.start(2, TimeUnit.MINUTES);

final LongTaskTimer micrometerTimer = metrics.startTimerForRif(rifFile);
final LongTaskTimer.Sample micrometerTimer = metrics.createTimerForRif(rifFile).start();

try {
LOGGER.info("Processing file {}", rifFile.getDisplayName());
Expand All @@ -103,7 +103,7 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
failure = e;
}

metrics.stopTimer(micrometerTimer);
micrometerTimer.stop();

dataSetFileMetricsReporter.stop();
dataSetFileMetricsReporter.report();
Expand Down Expand Up @@ -163,31 +163,17 @@ public static final class Metrics {
private final MeterRegistry micrometerMetrics;

/**
* Start a {@link LongTaskTimer}(s) for a given {@link RifFile} so that the time it takes to
* process the RIF is measured. Should be called prior to processing a {@link RifFile}.
* Creates a {@link LongTaskTimer} for a given {@link RifFile} so that the time it takes to
* process the RIF can be measured. Should be called prior to processing a {@link RifFile}.
*
* @param rifFile the {@link RifFile} to time
* @return the started {@link LongTaskTimer} measuring the time taken to load the {@link
* RifFile}
* @return the {@link LongTaskTimer} that will be used to measure the time taken to load the
* {@link RifFile}
*/
LongTaskTimer startTimerForRif(RifFile rifFile) {
final var timer =
LongTaskTimer.builder(RIF_FILE_PROCESSING_TIMER_NAME)
.tags(getTags(rifFile))
.register(micrometerMetrics);
timer.start();
return timer;
}

/**
* Stops the provided {@link LongTaskTimer} and removes it from the Pipeline application's
* {@link MeterRegistry}.
*
* @param timer the {@link LongTaskTimer} to stop.
*/
void stopTimer(LongTaskTimer timer) {
timer.close();
micrometerMetrics.remove(timer);
LongTaskTimer createTimerForRif(RifFile rifFile) {
return LongTaskTimer.builder(RIF_FILE_PROCESSING_TIMER_NAME)
.tags(getTags(rifFile))
.register(micrometerMetrics);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ public PipelineJobOutcome call() throws Exception {
* processing multiple data sets in parallel (which would lead to data
* consistency problems).
*/
final var processingTimer = loadJobMetrics.createTimerForManifest(manifestToProcess).start();
statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsStarted(manifestRecord);
final var timer = loadJobMetrics.startTimerForManifest(manifestToProcess);
listener.dataAvailable(rifFilesEvent);
statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsProcessed(manifestRecord);
loadJobMetrics.stopTimer(timer);
processingTimer.stop();
LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE);

/*
Expand Down Expand Up @@ -564,32 +564,18 @@ public static final class Metrics {
private final MeterRegistry appMetrics;

/**
* Start a {@link LongTaskTimer}(s) for a given {@link DataSetManifest} so that the time it
* takes to process the manifest is measured. Should be called prior to processing a {@link
* Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes
* to process the manifest is measured. Should be called prior to processing a {@link
* DataSetManifest}.
*
* @param manifest the {@link DataSetManifest} to time
* @return the started {@link LongTaskTimer} measuring the time taken to load the {@link
* DataSetManifest}
* @return the {@link LongTaskTimer} that will be used to measure the time taken to load the
* {@link DataSetManifest}
*/
LongTaskTimer startTimerForManifest(DataSetManifest manifest) {
final var timer =
LongTaskTimer.builder(MANIFEST_PROCESSING_TIMER_NAME)
.tags(getTags(manifest))
.register(appMetrics);
timer.start();
return timer;
}

/**
* Stops the provided {@link LongTaskTimer} and removes it from the Pipeline application's
* {@link MeterRegistry}.
*
* @param timer the {@link LongTaskTimer} to stop.
*/
void stopTimer(LongTaskTimer timer) {
timer.close();
appMetrics.remove(timer);
LongTaskTimer createTimerForManifest(DataSetManifest manifest) {
return LongTaskTimer.builder(MANIFEST_PROCESSING_TIMER_NAME)
.tags(getTags(manifest))
.register(appMetrics);
}

/**
Expand Down

0 comments on commit f783e14

Please sign in to comment.