From ed0213dd7d1e22bed65dfdc1b251e9c0b4897e27 Mon Sep 17 00:00:00 2001 From: Diego Alonso Marquez Palacios Date: Thu, 12 Dec 2024 11:01:04 -0500 Subject: [PATCH] convert to internal changes in change stream classes ii --- .../data/v2/models/ChangeStreamMutation.java | 33 ++++++------------- .../DefaultChangeStreamRecordAdapter.java | 2 +- .../bigtable/data/v2/models/Heartbeat.java | 16 ++++----- .../v2/models/ChangeStreamMutationTest.java | 14 ++++---- .../v2/models/ChangeStreamRecordTest.java | 2 +- .../DefaultChangeStreamRecordAdapterTest.java | 22 ++++++------- ...ChangeStreamRecordMergingCallableTest.java | 2 +- 7 files changed, 38 insertions(+), 53 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index 1defc67c5..838a7ec62 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.models; -import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant; import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant; import com.google.api.core.InternalApi; @@ -115,12 +114,12 @@ static Builder createGcMutation( /** This method is obsolete. Use {@link #getCommitTime()} instead. */ @ObsoleteApi("Use getCommitTime() instead") - public abstract org.threeten.bp.Instant getCommitTimestamp(); + public org.threeten.bp.Instant getCommitTimestamp() { + return toThreetenInstant(getCommitTime()); + } /** Get the commit timestamp of the current mutation. */ - public java.time.Instant getCommitTime() { - return toJavaTimeInstant(getCommitTimestamp()); - } + public abstract java.time.Instant getCommitTime(); /** * Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple @@ -134,12 +133,12 @@ public java.time.Instant getCommitTime() { /** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */ @ObsoleteApi("Use getEstimatedLowWatermarkTime() instead") - public abstract org.threeten.bp.Instant getEstimatedLowWatermark(); + public org.threeten.bp.Instant getEstimatedLowWatermark() { + return toThreetenInstant(getEstimatedLowWatermarkTime()); + } /** Get the low watermark of the current mutation. */ - public java.time.Instant getEstimatedLowWatermarkTime() { - return toJavaTimeInstant(getEstimatedLowWatermark()); - } + public abstract java.time.Instant getEstimatedLowWatermarkTime(); /** Get the list of mods of the current mutation. */ @Nonnull @@ -160,13 +159,7 @@ abstract static class Builder { abstract Builder setSourceClusterId(@Nonnull String sourceClusterId); - Builder setCommitTime(java.time.Instant commitTimestamp) { - return setCommitTimestamp(toThreetenInstant(commitTimestamp)); - } - - /** This method is obsolete. Use {@link #setCommitTime(java.time.Instant)} instead. */ - @ObsoleteApi("Use setCommitTime(java.time.Instant) instead") - abstract Builder setCommitTimestamp(org.threeten.bp.Instant commitTimestamp); + abstract Builder setCommitTime(java.time.Instant commitTimestamp); abstract Builder setTieBreaker(int tieBreaker); @@ -174,13 +167,7 @@ Builder setCommitTime(java.time.Instant commitTimestamp) { abstract Builder setToken(@Nonnull String token); - Builder setLowWatermarkTime(java.time.Instant estimatedLowWatermark) { - return setEstimatedLowWatermark(toThreetenInstant(estimatedLowWatermark)); - } - - /** This method is obsolete. Use {@link #setLowWatermarkTime(java.time.Instant)} instead. */ - @ObsoleteApi("Use setEstimatedLowWatermarkInstant(java.time.Instant) instead") - abstract Builder setEstimatedLowWatermark(org.threeten.bp.Instant estimatedLowWatermark); + abstract Builder setEstimatedLowWatermarkTime(java.time.Instant estimatedLowWatermark); Builder setCell( @Nonnull String familyName, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java index 217caa93f..54bf05cd7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -177,7 +177,7 @@ public void finishCell() { public ChangeStreamRecord finishChangeStreamMutation( String token, Instant estimatedLowWatermark) { this.changeStreamMutationBuilder.setToken(token); - this.changeStreamMutationBuilder.setLowWatermarkTime(estimatedLowWatermark); + this.changeStreamMutationBuilder.setEstimatedLowWatermarkTime(estimatedLowWatermark); return this.changeStreamMutationBuilder.build(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index 0cf792ee1..ae5507ae7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.models; -import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant; import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant; import com.google.api.core.InternalApi; @@ -34,8 +33,7 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static Heartbeat create( ChangeStreamContinuationToken changeStreamContinuationToken, java.time.Instant estimatedLowWatermark) { - return new AutoValue_Heartbeat( - changeStreamContinuationToken, toThreetenInstant(estimatedLowWatermark)); + return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark); } /** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */ @@ -50,12 +48,12 @@ static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat @InternalApi("Intended for use by the BigtableIO in apache/beam only.") public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken(); - /** This method is obsolete. Use {@link #getEstimatedLowWatermarkInstant()} instead. */ - @ObsoleteApi("Use getEstimatedLowWatermarkInstant() instead") - public abstract org.threeten.bp.Instant getEstimatedLowWatermark(); + /** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */ + @ObsoleteApi("Use getEstimatedLowWatermarkTime() instead") + public org.threeten.bp.Instant getEstimatedLowWatermark() { + return toThreetenInstant(getEstimatedLowWatermarkTime()); + } @InternalApi("Intended for use by the BigtableIO in apache/beam only.") - public java.time.Instant getEstimatedLowWatermarkInstant() { - return toJavaTimeInstant(getEstimatedLowWatermark()); - } + public abstract java.time.Instant getEstimatedLowWatermarkTime(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java index 966da60c8..761bec376 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java @@ -78,7 +78,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti Value.rawTimestamp(1000), Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L)))) .setToken("fake-token") - .setLowWatermarkTime(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Test the getters. @@ -120,7 +120,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException { ByteString.copyFromUtf8("fake-qualifier"), Range.TimestampRange.create(1000L, 2000L)) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Test the getters. @@ -172,7 +172,7 @@ public void toRowMutationTest() { Value.rawTimestamp(1000), Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L)))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Convert it to a rowMutation and construct a MutateRowRequest. @@ -215,7 +215,7 @@ public void toRowMutationWithoutTokenShouldFailTest() { ChangeStreamMutation.createUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK); Assert.assertThrows(IllegalStateException.class, builder::build); } @@ -255,7 +255,7 @@ public void toRowMutationEntryTest() { Value.rawTimestamp(1000), Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L)))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Convert it to a rowMutationEntry and construct a MutateRowRequest. @@ -295,7 +295,7 @@ public void toRowMutationEntryWithoutTokenShouldFailTest() { ChangeStreamMutation.createUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK); Assert.assertThrows(IllegalStateException.class, builder::build); } @@ -320,7 +320,7 @@ public void testWithLongValue() { 1000L, ByteString.copyFrom(Longs.toByteArray(1L))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index 14697f430..9dd66acc7 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -129,7 +129,7 @@ public void heartbeatTest() { .build(); Heartbeat actualHeartbeat = Heartbeat.fromProto(heartbeatProto); - assertThat(actualHeartbeat.getEstimatedLowWatermarkInstant()) + assertThat(actualHeartbeat.getEstimatedLowWatermarkTime()) .isEqualTo(Instant.ofEpochSecond(lowWatermark.getSeconds(), lowWatermark.getNanos())); assertThat(actualHeartbeat.getChangeStreamContinuationToken().getPartition()) .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index 80b4b15b2..b6997ae9d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -62,7 +62,7 @@ public void isHeartbeatTest() { ChangeStreamMutation.createGcMutation( ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0) .setToken("token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); Assert.assertTrue(adapter.isHeartbeat(heartbeatRecord)); Assert.assertFalse(adapter.isHeartbeat(closeStreamRecord)); @@ -102,7 +102,7 @@ public void isChangeStreamMutationTest() { ChangeStreamMutation.createGcMutation( ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0) .setToken("token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); Assert.assertFalse(adapter.isChangeStreamMutation(heartbeatRecord)); Assert.assertFalse(adapter.isChangeStreamMutation(closeStreamRecord)); @@ -115,7 +115,7 @@ public void getTokenFromChangeStreamMutationTest() { ChangeStreamMutation.createGcMutation( ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0) .setToken("change-stream-mutation-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); Assert.assertEquals( adapter.getTokenFromChangeStreamMutation(changeStreamMutationRecord), @@ -189,7 +189,7 @@ public void singleDeleteFamilyTest() { ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -228,7 +228,7 @@ public void singleDeleteCellTest() { ByteString.copyFromUtf8("fake-qualifier"), Range.TimestampRange.create(1000L, 2000L)) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -261,7 +261,7 @@ public void singleNonChunkedCellTest() { 100L, ByteString.copyFromUtf8("fake-value")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -293,7 +293,7 @@ public void singleChunkedCellTest() { 100L, ByteString.copyFromUtf8("fake-value1-value2")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -330,7 +330,7 @@ public void multipleChunkedCellsTest() { } expectedChangeStreamMutationBuilder .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( @@ -372,7 +372,7 @@ public void multipleDifferentModsTest() { 100L, ByteString.copyFromUtf8("chunked-value")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( @@ -421,7 +421,7 @@ public void resetTest() { ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0); @@ -441,7 +441,7 @@ public void resetTest() { 100L, ByteString.copyFromUtf8("fake-value1-value2")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) + .setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); changeStreamRecordBuilder.startUserMutation( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index 58cc09a61..a5201770e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -85,7 +85,7 @@ public void heartbeatTest() { Instant.ofEpochSecond( heartbeatProto.getEstimatedLowWatermark().getSeconds(), heartbeatProto.getEstimatedLowWatermark().getNanos())); - assertThat(heartbeat.getEstimatedLowWatermarkInstant()) + assertThat(heartbeat.getEstimatedLowWatermarkTime()) .isEqualTo( java.time.Instant.ofEpochSecond( heartbeatProto.getEstimatedLowWatermark().getSeconds(),