From 372ebd88632e0033633f8bf624a9bd9e3e1dafa4 Mon Sep 17 00:00:00 2001 From: bzablocki Date: Mon, 17 Jun 2024 22:05:02 +0200 Subject: [PATCH] Solace Read connector: watermark-calculation related classes (#31595) Solace Read connector: watermark-calculation related classes --- .../io/solace/read/WatermarkParameters.java | 88 +++++++++++++++ .../sdk/io/solace/read/WatermarkPolicy.java | 101 ++++++++++++++++++ .../beam/sdk/io/solace/read/package-info.java | 20 ++++ 3 files changed, 209 insertions(+) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/package-info.java diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java new file mode 100644 index 000000000000..f58cb1cc202d --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** {@code WatermarkParameters} contains the parameters used for watermark computation. */ +@AutoValue +abstract class WatermarkParameters implements Serializable { + + private static final Duration STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD = + Duration.standardSeconds(30); + + abstract Instant getCurrentWatermark(); + + abstract Instant getLastSavedWatermark(); + + abstract Instant getLastUpdateTime(); + + abstract SerializableFunction getTimestampFn(); + + abstract Duration getWatermarkIdleDurationThreshold(); + + abstract Builder toBuilder(); + + static Builder builder() { + return new AutoValue_WatermarkParameters.Builder() + .setCurrentWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE) + .setLastSavedWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE) + .setLastUpdateTime(Instant.now()) + .setWatermarkIdleDurationThreshold(STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCurrentWatermark(Instant currentWatermark); + + abstract Builder setLastSavedWatermark(Instant eventTime); + + abstract Builder setLastUpdateTime(Instant now); + + abstract Builder setWatermarkIdleDurationThreshold(Duration watermarkIdleDurationThreshold); + + abstract Builder setTimestampFn(SerializableFunction timestampFn); + + abstract WatermarkParameters build(); + } + + /** + * Create an instance of {@link WatermarkParameters} with a {@code SerializableFunction} to + * extract the event time. + */ + static WatermarkParameters create(SerializableFunction timestampFn) { + Preconditions.checkArgument(timestampFn != null, "timestampFn function is null"); + return WatermarkParameters.builder().setTimestampFn(timestampFn).build(); + } + + /** + * Specify the watermark idle duration to consider before advancing the watermark. The default + * watermark idle duration threshold is {@link #STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD}. + */ + WatermarkParameters withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) { + Preconditions.checkArgument( + idleDurationThreshold != null, "watermark idle duration threshold is null"); + return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build(); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java new file mode 100644 index 000000000000..13d65639e335 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import java.io.Serializable; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * A class that manages the watermark for a Solace source. + * + *

The watermark is calculated based on the last saved watermark, the last update time, and the + * watermark idle duration threshold. If the last update time is before the watermark idle duration + * threshold, the watermark is set to the watermark idle duration threshold. Otherwise, the + * watermark is set to the last saved watermark. + * + *

The watermark is updated when a new record is received. The last saved watermark is set to the + * maximum of the current last saved watermark and the value resulting from the function calculating + * the timestamp from the record. The last update time is set to the current time when the watermark + * is updated. + */ +class WatermarkPolicy implements Serializable { + private WatermarkParameters watermarkParameters; + + static WatermarkPolicy create(SerializableFunction timestampFunction) { + return new WatermarkPolicy(WatermarkParameters.create(timestampFunction)); + } + + private WatermarkPolicy(WatermarkParameters watermarkParameters) { + this.watermarkParameters = watermarkParameters; + } + /** + * Returns the current watermark. + * + *

The watermark is calculated based on the last saved watermark, the last update time, and the + * watermark idle duration threshold. If the last update time is before the watermark idle + * duration threshold, the watermark is set to the watermark idle duration threshold. Otherwise, + * the watermark is set to the last saved watermark. + * + * @return the current watermark + */ + Instant getWatermark() { + Instant now = Instant.now(); + Instant watermarkIdleThreshold = + now.minus(watermarkParameters.getWatermarkIdleDurationThreshold()); + + Instant newWatermark = + watermarkParameters.getLastUpdateTime().isBefore(watermarkIdleThreshold) + ? watermarkIdleThreshold + : watermarkParameters.getLastSavedWatermark(); + + if (newWatermark.isAfter(watermarkParameters.getCurrentWatermark())) { + watermarkParameters = + watermarkParameters.toBuilder().setCurrentWatermark(newWatermark).build(); + } + return watermarkParameters.getCurrentWatermark(); + } + + /** + * Updates the watermark based on the provided record. + * + *

This method updates the last saved watermark and the last update time based on the timestamp + * function for the provided record. The last saved watermark is set to the maximum of the current + * last saved watermark and the timestamp of the record. The last update time is set to the + * current time. + * + * @param record The record to update the watermark with. + */ + void update(@Nullable T record) { + if (record == null) { + return; + } + watermarkParameters = + watermarkParameters + .toBuilder() + .setLastSavedWatermark( + Ordering.natural() + .max( + watermarkParameters.getLastSavedWatermark(), + watermarkParameters.getTimestampFn().apply(record))) + .setLastUpdateTime(Instant.now()) + .build(); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/package-info.java new file mode 100644 index 000000000000..ce24cc1a3088 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO connector - read connector classes. */ +package org.apache.beam.sdk.io.solace.read;