From 06501e735ee5d811f94b28f8b6f960e622d1dcc8 Mon Sep 17 00:00:00 2001 From: Diego Alonso Marquez Palacios Date: Thu, 12 Dec 2024 10:40:52 -0500 Subject: [PATCH] Revert "convert to internal changes in change stream classes" This reverts commit 51657d8b5380ab0246d26a4c66dae4700173fdc0. --- .../data/v2/models/ChangeStreamMutation.java | 41 ++++++++++++++++--- .../DefaultChangeStreamRecordAdapter.java | 2 +- .../bigtable/data/v2/models/Heartbeat.java | 21 +++++++--- .../v2/models/ChangeStreamMutationTest.java | 35 ++++++++++------ .../v2/models/ChangeStreamRecordTest.java | 2 +- .../DefaultChangeStreamRecordAdapterTest.java | 33 ++++++++------- ...ChangeStreamRecordMergingCallableTest.java | 6 +-- 7 files changed, 96 insertions(+), 44 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index 339cb68a8..1defc67c5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -15,7 +15,11 @@ */ package com.google.cloud.bigtable.data.v2.models; +import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant; +import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant; + import com.google.api.core.InternalApi; +import com.google.api.core.ObsoleteApi; import com.google.auto.value.AutoValue; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger; @@ -78,7 +82,7 @@ static Builder createUserMutation( .setRowKey(rowKey) .setType(MutationType.USER) .setSourceClusterId(sourceClusterId) - .setCommitTimestamp(commitTimestamp) + .setCommitTime(commitTimestamp) .setTieBreaker(tieBreaker); } @@ -93,7 +97,7 @@ static Builder createGcMutation( .setRowKey(rowKey) .setType(MutationType.GARBAGE_COLLECTION) .setSourceClusterId("") - .setCommitTimestamp(commitTimestamp) + .setCommitTime(commitTimestamp) .setTieBreaker(tieBreaker); } @@ -109,7 +113,14 @@ static Builder createGcMutation( @Nonnull public abstract String getSourceClusterId(); - public abstract java.time.Instant getCommitTimestamp(); + /** This method is obsolete. Use {@link #getCommitTime()} instead. */ + @ObsoleteApi("Use getCommitTime() instead") + public abstract org.threeten.bp.Instant getCommitTimestamp(); + + /** Get the commit timestamp of the current mutation. */ + public java.time.Instant getCommitTime() { + return toJavaTimeInstant(getCommitTimestamp()); + } /** * Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple @@ -121,8 +132,14 @@ static Builder createGcMutation( @Nonnull public abstract String getToken(); + /** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */ + @ObsoleteApi("Use getEstimatedLowWatermarkTime() instead") + public abstract org.threeten.bp.Instant getEstimatedLowWatermark(); + /** Get the low watermark of the current mutation. */ - public abstract java.time.Instant getEstimatedLowWatermark(); + public java.time.Instant getEstimatedLowWatermarkTime() { + return toJavaTimeInstant(getEstimatedLowWatermark()); + } /** Get the list of mods of the current mutation. */ @Nonnull @@ -143,7 +160,13 @@ abstract static class Builder { abstract Builder setSourceClusterId(@Nonnull String sourceClusterId); - abstract Builder setCommitTimestamp(java.time.Instant commitTimestamp); + Builder setCommitTime(java.time.Instant commitTimestamp) { + return setCommitTimestamp(toThreetenInstant(commitTimestamp)); + } + + /** This method is obsolete. Use {@link #setCommitTime(java.time.Instant)} instead. */ + @ObsoleteApi("Use setCommitTime(java.time.Instant) instead") + abstract Builder setCommitTimestamp(org.threeten.bp.Instant commitTimestamp); abstract Builder setTieBreaker(int tieBreaker); @@ -151,7 +174,13 @@ abstract static class Builder { abstract Builder setToken(@Nonnull String token); - abstract Builder setEstimatedLowWatermark(java.time.Instant estimatedLowWatermark); + Builder setLowWatermarkTime(java.time.Instant estimatedLowWatermark) { + return setEstimatedLowWatermark(toThreetenInstant(estimatedLowWatermark)); + } + + /** This method is obsolete. Use {@link #setLowWatermarkTime(java.time.Instant)} instead. */ + @ObsoleteApi("Use setEstimatedLowWatermarkInstant(java.time.Instant) instead") + abstract Builder setEstimatedLowWatermark(org.threeten.bp.Instant estimatedLowWatermark); Builder setCell( @Nonnull String familyName, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java index 56394ad31..217caa93f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -177,7 +177,7 @@ public void finishCell() { public ChangeStreamRecord finishChangeStreamMutation( String token, Instant estimatedLowWatermark) { this.changeStreamMutationBuilder.setToken(token); - this.changeStreamMutationBuilder.setEstimatedLowWatermark(estimatedLowWatermark); + this.changeStreamMutationBuilder.setLowWatermarkTime(estimatedLowWatermark); return this.changeStreamMutationBuilder.build(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index 9f38b604f..0cf792ee1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -15,11 +15,14 @@ */ package com.google.cloud.bigtable.data.v2.models; +import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant; +import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant; + import com.google.api.core.InternalApi; +import com.google.api.core.ObsoleteApi; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.ReadChangeStreamResponse; import java.io.Serializable; -import java.time.Instant; import javax.annotation.Nonnull; /** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */ @@ -29,15 +32,17 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608504L; private static Heartbeat create( - ChangeStreamContinuationToken changeStreamContinuationToken, Instant estimatedLowWatermark) { - return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark); + ChangeStreamContinuationToken changeStreamContinuationToken, + java.time.Instant estimatedLowWatermark) { + return new AutoValue_Heartbeat( + changeStreamContinuationToken, toThreetenInstant(estimatedLowWatermark)); } /** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */ static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { return create( ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()), - Instant.ofEpochSecond( + java.time.Instant.ofEpochSecond( heartbeat.getEstimatedLowWatermark().getSeconds(), heartbeat.getEstimatedLowWatermark().getNanos())); } @@ -45,6 +50,12 @@ static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat @InternalApi("Intended for use by the BigtableIO in apache/beam only.") public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken(); + /** This method is obsolete. Use {@link #getEstimatedLowWatermarkInstant()} instead. */ + @ObsoleteApi("Use getEstimatedLowWatermarkInstant() instead") + public abstract org.threeten.bp.Instant getEstimatedLowWatermark(); + @InternalApi("Intended for use by the BigtableIO in apache/beam only.") - public abstract Instant getEstimatedLowWatermark(); + public java.time.Instant getEstimatedLowWatermarkInstant() { + return toJavaTimeInstant(getEstimatedLowWatermark()); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java index 05514da9f..966da60c8 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; +import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant; import static com.google.common.truth.Truth.assertThat; import com.google.bigtable.v2.MutateRowRequest; @@ -45,6 +46,10 @@ public class ChangeStreamMutationTest { RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID); private static final Instant FAKE_COMMIT_TIMESTAMP = Instant.ofEpochSecond(0, 1000L); private static final Instant FAKE_LOW_WATERMARK = Instant.ofEpochSecond(0, 2000L); + private static final org.threeten.bp.Instant FAKE_COMMIT_TIMESTAMP_THREETEN = + toThreetenInstant(FAKE_COMMIT_TIMESTAMP); + private static final org.threeten.bp.Instant FAKE_LOW_WATERMARK_THREETEN = + toThreetenInstant(FAKE_LOW_WATERMARK); @Test public void userInitiatedMutationTest() throws IOException, ClassNotFoundException { @@ -73,18 +78,20 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti Value.rawTimestamp(1000), Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L)))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setLowWatermarkTime(FAKE_LOW_WATERMARK) .build(); // Test the getters. assertThat(changeStreamMutation.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key")); assertThat(changeStreamMutation.getType()).isEqualTo(ChangeStreamMutation.MutationType.USER); assertThat(changeStreamMutation.getSourceClusterId()).isEqualTo("fake-source-cluster-id"); - assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP); + assertThat(changeStreamMutation.getCommitTime()).isEqualTo(FAKE_COMMIT_TIMESTAMP); + assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP_THREETEN); assertThat(changeStreamMutation.getTieBreaker()).isEqualTo(0); assertThat(changeStreamMutation.getToken()).isEqualTo("fake-token"); - assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK); - assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK); + assertThat(changeStreamMutation.getEstimatedLowWatermarkTime()).isEqualTo(FAKE_LOW_WATERMARK); + assertThat(changeStreamMutation.getEstimatedLowWatermark()) + .isEqualTo(FAKE_LOW_WATERMARK_THREETEN); // Test serialization. ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -113,7 +120,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException { ByteString.copyFromUtf8("fake-qualifier"), Range.TimestampRange.create(1000L, 2000L)) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Test the getters. @@ -121,11 +128,13 @@ public void gcMutationTest() throws IOException, ClassNotFoundException { assertThat(changeStreamMutation.getType()) .isEqualTo(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION); Assert.assertTrue(changeStreamMutation.getSourceClusterId().isEmpty()); - assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP); + assertThat(changeStreamMutation.getCommitTime()).isEqualTo(FAKE_COMMIT_TIMESTAMP); + assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP_THREETEN); assertThat(changeStreamMutation.getTieBreaker()).isEqualTo(0); assertThat(changeStreamMutation.getToken()).isEqualTo("fake-token"); - assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK); - assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK); + assertThat(changeStreamMutation.getEstimatedLowWatermarkTime()).isEqualTo(FAKE_LOW_WATERMARK); + assertThat(changeStreamMutation.getEstimatedLowWatermark()) + .isEqualTo(FAKE_LOW_WATERMARK_THREETEN); // Test serialization. ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -163,7 +172,7 @@ public void toRowMutationTest() { Value.rawTimestamp(1000), Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L)))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Convert it to a rowMutation and construct a MutateRowRequest. @@ -206,7 +215,7 @@ public void toRowMutationWithoutTokenShouldFailTest() { ChangeStreamMutation.createUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK); + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); Assert.assertThrows(IllegalStateException.class, builder::build); } @@ -246,7 +255,7 @@ public void toRowMutationEntryTest() { Value.rawTimestamp(1000), Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L)))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Convert it to a rowMutationEntry and construct a MutateRowRequest. @@ -286,7 +295,7 @@ public void toRowMutationEntryWithoutTokenShouldFailTest() { ChangeStreamMutation.createUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK); + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); Assert.assertThrows(IllegalStateException.class, builder::build); } @@ -311,7 +320,7 @@ public void testWithLongValue() { 1000L, ByteString.copyFrom(Longs.toByteArray(1L))) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index 888a9ee19..14697f430 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -129,7 +129,7 @@ public void heartbeatTest() { .build(); Heartbeat actualHeartbeat = Heartbeat.fromProto(heartbeatProto); - assertThat(actualHeartbeat.getEstimatedLowWatermark()) + assertThat(actualHeartbeat.getEstimatedLowWatermarkInstant()) .isEqualTo(Instant.ofEpochSecond(lowWatermark.getSeconds(), lowWatermark.getNanos())); assertThat(actualHeartbeat.getChangeStreamContinuationToken().getPartition()) .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index f6146d794..80b4b15b2 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; +import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant; import static com.google.common.truth.Truth.assertThat; import com.google.bigtable.v2.Mutation; @@ -41,6 +42,8 @@ public class DefaultChangeStreamRecordAdapterTest { private ChangeStreamRecordBuilder changeStreamRecordBuilder; private static final Instant FAKE_COMMIT_TIMESTAMP = Instant.ofEpochSecond(0L, 1000L); private static final Instant FAKE_LOW_WATERMARK = Instant.ofEpochSecond(0L, 2000L); + private static final org.threeten.bp.Instant FAKE_LOW_WATERMARK_THREETEN = + toThreetenInstant(FAKE_LOW_WATERMARK); @Rule public ExpectedException expect = ExpectedException.none(); @@ -59,7 +62,7 @@ public void isHeartbeatTest() { ChangeStreamMutation.createGcMutation( ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0) .setToken("token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); Assert.assertTrue(adapter.isHeartbeat(heartbeatRecord)); Assert.assertFalse(adapter.isHeartbeat(closeStreamRecord)); @@ -73,8 +76,8 @@ public void getTokenFromHeartbeatTest() { ReadChangeStreamResponse.Heartbeat.newBuilder() .setEstimatedLowWatermark( Timestamp.newBuilder() - .setSeconds(FAKE_LOW_WATERMARK.getEpochSecond()) - .setNanos(FAKE_LOW_WATERMARK.getNano())) + .setSeconds(FAKE_LOW_WATERMARK_THREETEN.getEpochSecond()) + .setNanos(FAKE_LOW_WATERMARK_THREETEN.getNano())) .setContinuationToken( StreamContinuationToken.newBuilder().setToken("heartbeat-token").build()) .build()); @@ -99,7 +102,7 @@ public void isChangeStreamMutationTest() { ChangeStreamMutation.createGcMutation( ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0) .setToken("token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); Assert.assertFalse(adapter.isChangeStreamMutation(heartbeatRecord)); Assert.assertFalse(adapter.isChangeStreamMutation(closeStreamRecord)); @@ -112,7 +115,7 @@ public void getTokenFromChangeStreamMutationTest() { ChangeStreamMutation.createGcMutation( ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0) .setToken("change-stream-mutation-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); Assert.assertEquals( adapter.getTokenFromChangeStreamMutation(changeStreamMutationRecord), @@ -133,8 +136,8 @@ public void heartbeatTest() { ReadChangeStreamResponse.Heartbeat.newBuilder() .setEstimatedLowWatermark( Timestamp.newBuilder() - .setSeconds(FAKE_LOW_WATERMARK.getEpochSecond()) - .setNanos(FAKE_LOW_WATERMARK.getNano()) + .setSeconds(FAKE_LOW_WATERMARK_THREETEN.getEpochSecond()) + .setNanos(FAKE_LOW_WATERMARK_THREETEN.getNano()) .build()) .setContinuationToken( StreamContinuationToken.newBuilder().setToken("random-token").build()) @@ -186,7 +189,7 @@ public void singleDeleteFamilyTest() { ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -225,7 +228,7 @@ public void singleDeleteCellTest() { ByteString.copyFromUtf8("fake-qualifier"), Range.TimestampRange.create(1000L, 2000L)) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -258,7 +261,7 @@ public void singleNonChunkedCellTest() { 100L, ByteString.copyFromUtf8("fake-value")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -290,7 +293,7 @@ public void singleChunkedCellTest() { 100L, ByteString.copyFromUtf8("fake-value1-value2")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. @@ -327,7 +330,7 @@ public void multipleChunkedCellsTest() { } expectedChangeStreamMutationBuilder .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK); + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( @@ -369,7 +372,7 @@ public void multipleDifferentModsTest() { 100L, ByteString.copyFromUtf8("chunked-value")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK); + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN); // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( @@ -418,7 +421,7 @@ public void resetTest() { ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0) .deleteFamily("fake-family") .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0); @@ -438,7 +441,7 @@ public void resetTest() { 100L, ByteString.copyFromUtf8("fake-value1-value2")) .setToken("fake-token") - .setEstimatedLowWatermark(FAKE_LOW_WATERMARK) + .setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN) .build(); changeStreamRecordBuilder.startUserMutation( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index a966e1883..58cc09a61 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -33,13 +33,13 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.rpc.Status; -import java.time.Instant; import java.util.Collections; import java.util.List; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.threeten.bp.Instant; /** * Additional tests in addition to {@link ReadChangeStreamMergingAcceptanceTest}. @@ -85,9 +85,9 @@ public void heartbeatTest() { Instant.ofEpochSecond( heartbeatProto.getEstimatedLowWatermark().getSeconds(), heartbeatProto.getEstimatedLowWatermark().getNanos())); - assertThat(heartbeat.getEstimatedLowWatermark()) + assertThat(heartbeat.getEstimatedLowWatermarkInstant()) .isEqualTo( - Instant.ofEpochSecond( + java.time.Instant.ofEpochSecond( heartbeatProto.getEstimatedLowWatermark().getSeconds(), heartbeatProto.getEstimatedLowWatermark().getNanos())); }