Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solace Read connector: watermark-calculation related classes #31595

Merged
merged 48 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f404eec
wip solace connector
bzablocki Apr 16, 2024
00274a2
wip solace connector
bzablocki Apr 16, 2024
399ffcc
some checker errors resolved
bzablocki Apr 17, 2024
62dfa4a
all checker errors resolved
bzablocki Apr 17, 2024
ea202fb
improving unit tests
bzablocki Apr 18, 2024
260f645
respond to pr commments
bzablocki Apr 19, 2024
9057860
Documentation
bzablocki Apr 19, 2024
a74b6a6
Small refactor - move data classes out of the client
bzablocki Apr 22, 2024
552cab4
refactor
bzablocki Apr 23, 2024
5d847b3
Add github action for integration test of Solace
bzablocki Apr 23, 2024
23292a1
testing github workflow
bzablocki Apr 24, 2024
a69e10a
bump testcontainers to 1.19.7 - soalce semp was updated with an admin…
bzablocki Apr 24, 2024
f4b0d6c
Use FlowHandle to acknowledge messages to make SolaceCheckpointMark's…
bzablocki Apr 29, 2024
a1ca8b3
Handle StaleSessionException error
bzablocki May 8, 2024
8007a33
Add @Internal annotation to mark the SolaceIO API beta and subject to…
bzablocki May 8, 2024
cff3105
Improve documentation
bzablocki May 13, 2024
f3eaabe
Back to ack based on bytesxmlmessages. Deduplicate default to false.
bzablocki May 29, 2024
34c4170
Merge remote-tracking branch 'origin/master' into solace-connector
bzablocki May 29, 2024
7b24fdb
update changes.md with Solace read connector
bzablocki May 29, 2024
be74c86
remove ack by id code
bzablocki Jun 3, 2024
a85c68c
remove todo comment
bzablocki Jun 3, 2024
533f122
Add licenses to package-info.java files
bzablocki Jun 3, 2024
2f41380
Restructure documentation
bzablocki Jun 3, 2024
9a317a8
update aws test after upgrading testcontainers version.
bzablocki Jun 5, 2024
38b6f0c
Merge branch 'master' of github.com:bzablocki/beam into solace-connector
bzablocki Jun 5, 2024
cf5d24d
Merge branch 'solace-connector' of github.com:bzablocki/beam into sol…
bzablocki Jun 5, 2024
aa29100
Disable publishing docs until the first pass on the master branch
bzablocki Jun 6, 2024
94101a8
Merge branch 'master' of github.com:apache/beam into solace-connector
bzablocki Jun 6, 2024
c625069
Remove files from this branch to split PR into smaller chunks
bzablocki Jun 6, 2024
1521f40
refactor tests for readability
bzablocki Jun 6, 2024
b815286
revert upgrade of testcontainers - not needed in this PR chunk
bzablocki Jun 6, 2024
08b777c
revert upgrade of testcontainers - not needed in this PR chunk
bzablocki Jun 6, 2024
0f63749
spotless
bzablocki Jun 6, 2024
e90e69a
remove IT tests from this pr
bzablocki Jun 6, 2024
f6833d2
Tech Writer review
bzablocki Jun 7, 2024
df1eb6c
Add a field to Solace.Record mapped from BytesXMLMessage.getAttachmen…
bzablocki Jun 7, 2024
5015b32
Add and fix some documentation
bzablocki Jun 10, 2024
2e1c10e
Remove CheckpointMark's reference to the UnboundedSolaceReader - unne…
bzablocki Jun 10, 2024
4d02b99
Revert "Remove CheckpointMark's reference to the UnboundedSolaceReade…
bzablocki Jun 11, 2024
bee8faf
Solace project init - github workflow file, gradle module
bzablocki Jun 11, 2024
a5b29ed
Merge branch 'solace-connector-0-project-init' into solace-connector
bzablocki Jun 11, 2024
09368d6
Merge branch 'master' of github.com:apache/beam into solace-connector
bzablocki Jun 13, 2024
f2d284a
Splitting the #31476 - Leaving only PTransform AutoValue configurations
bzablocki Jun 13, 2024
9af3185
remove unnecessary dependencies
bzablocki Jun 13, 2024
60d854d
remove info from CHANGES.md
bzablocki Jun 13, 2024
3847876
Add watermark-related code
bzablocki Jun 13, 2024
a00b997
Merge branch 'master' of github.com:apache/beam into solace-connector…
bzablocki Jun 14, 2024
d1ada91
Add documentation
bzablocki Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** {@code WatermarkParameters} contains the parameters used for watermark computation. */
@AutoValue
abstract class WatermarkParameters<T> implements Serializable {

private static final Duration STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD =
Duration.standardSeconds(30);

abstract Instant getCurrentWatermark();

abstract Instant getLastSavedWatermark();

abstract Instant getLastUpdateTime();

abstract SerializableFunction<T, Instant> getTimestampFn();

abstract Duration getWatermarkIdleDurationThreshold();

abstract Builder<T> toBuilder();

static <T> Builder<T> builder() {
return new AutoValue_WatermarkParameters.Builder<T>()
.setCurrentWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE)
.setLastSavedWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE)
.setLastUpdateTime(Instant.now())
.setWatermarkIdleDurationThreshold(STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD);
}

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setCurrentWatermark(Instant currentWatermark);

abstract Builder<T> setLastSavedWatermark(Instant eventTime);

abstract Builder<T> setLastUpdateTime(Instant now);

abstract Builder<T> setWatermarkIdleDurationThreshold(Duration watermarkIdleDurationThreshold);

abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> timestampFn);

abstract WatermarkParameters<T> build();
}

/**
* Create an instance of {@link WatermarkParameters} with a {@code SerializableFunction} to
* extract the event time.
*/
static <T> WatermarkParameters<T> create(SerializableFunction<T, Instant> timestampFn) {
Preconditions.checkArgument(timestampFn != null, "timestampFn function is null");
return WatermarkParameters.<T>builder().setTimestampFn(timestampFn).build();
}

/**
* Specify the watermark idle duration to consider before advancing the watermark. The default
* watermark idle duration threshold is {@link #STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD}.
*/
WatermarkParameters<T> withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) {
Preconditions.checkArgument(
idleDurationThreshold != null, "watermark idle duration threshold is null");
return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import java.io.Serializable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
* A class that manages the watermark for a Solace source.
*
* <p>The watermark is calculated based on the last saved watermark, the last update time, and the
* watermark idle duration threshold. If the last update time is before the watermark idle duration
* threshold, the watermark is set to the watermark idle duration threshold. Otherwise, the
* watermark is set to the last saved watermark.
*
* <p>The watermark is updated when a new record is received. The last saved watermark is set to the
* maximum of the current last saved watermark and the value resulting from the function calculating
* the timestamp from the record. The last update time is set to the current time when the watermark
* is updated.
*/
class WatermarkPolicy<T> implements Serializable {
private WatermarkParameters<T> watermarkParameters;

static <T> WatermarkPolicy<T> create(SerializableFunction<T, Instant> timestampFunction) {
return new WatermarkPolicy<T>(WatermarkParameters.<T>create(timestampFunction));
}

private WatermarkPolicy(WatermarkParameters<T> watermarkParameters) {
this.watermarkParameters = watermarkParameters;
}
/**
* Returns the current watermark.
*
* <p>The watermark is calculated based on the last saved watermark, the last update time, and the
* watermark idle duration threshold. If the last update time is before the watermark idle
* duration threshold, the watermark is set to the watermark idle duration threshold. Otherwise,
* the watermark is set to the last saved watermark.
*
* @return the current watermark
*/
Instant getWatermark() {
bzablocki marked this conversation as resolved.
Show resolved Hide resolved
Instant now = Instant.now();
Instant watermarkIdleThreshold =
now.minus(watermarkParameters.getWatermarkIdleDurationThreshold());

Instant newWatermark =
watermarkParameters.getLastUpdateTime().isBefore(watermarkIdleThreshold)
? watermarkIdleThreshold
: watermarkParameters.getLastSavedWatermark();

if (newWatermark.isAfter(watermarkParameters.getCurrentWatermark())) {
watermarkParameters =
watermarkParameters.toBuilder().setCurrentWatermark(newWatermark).build();
}
return watermarkParameters.getCurrentWatermark();
}

/**
* Updates the watermark based on the provided record.
*
* <p>This method updates the last saved watermark and the last update time based on the timestamp
* function for the provided record. The last saved watermark is set to the maximum of the current
* last saved watermark and the timestamp of the record. The last update time is set to the
* current time.
*
* @param record The record to update the watermark with.
*/
void update(@Nullable T record) {
if (record == null) {
return;
}
watermarkParameters =
watermarkParameters
.toBuilder()
.setLastSavedWatermark(
Ordering.natural()
.max(
watermarkParameters.getLastSavedWatermark(),
watermarkParameters.getTimestampFn().apply(record)))
.setLastUpdateTime(Instant.now())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Solace IO connector - read connector classes. */
package org.apache.beam.sdk.io.solace.read;
Loading