diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index d38bdae2f092..b99b69621a84 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -179,12 +179,13 @@ public KV> apply(Row row) { .setColumnQualifier( ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get())) .setFamilyNameBytes( - ByteString.copyFrom(ofNullable(mutation.get("family_name")).get())); - if (mutation.containsKey("timestamp_micros")) { - setMutation = - setMutation.setTimestampMicros( - Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get())); - } + ByteString.copyFrom(ofNullable(mutation.get("family_name")).get())) + // Use timestamp if provided, else default to -1 (current Bigtable server time) + .setTimestampMicros( + mutation.containsKey("timestamp_micros") + ? Longs.fromByteArray( + ofNullable(mutation.get("timestamp_micros")).get()) + : -1); bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build(); break; case "DeleteFromColumn": diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java index 14bb04b0315d..1a60fe661b52 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java @@ -154,8 +154,8 @@ public void tearDown() { public void testSetMutationsExistingColumn() { RowMutation rowMutation = RowMutation.create(tableId, "key-1") - .setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a") - .setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c"); + .setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a") + .setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c"); dataClient.mutateRow(rowMutation); List> mutations = new ArrayList<>(); @@ -165,13 +165,15 @@ public void testSetMutationsExistingColumn() { "type", "SetCell".getBytes(StandardCharsets.UTF_8), "value", "new-val-1-a".getBytes(StandardCharsets.UTF_8), "column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8), - "family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8))); + "family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8), + "timestamp_micros", Longs.toByteArray(2000))); mutations.add( ImmutableMap.of( "type", "SetCell".getBytes(StandardCharsets.UTF_8), "value", "new-val-1-c".getBytes(StandardCharsets.UTF_8), "column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8), - "family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8))); + "family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8), + "timestamp_micros", Longs.toByteArray(2000))); Row mutationRow = Row.withSchema(SCHEMA) .withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8)) @@ -202,10 +204,11 @@ public void testSetMutationsExistingColumn() { .collect(Collectors.toList()); assertEquals(2, cellsColA.size()); assertEquals(2, cellsColC.size()); - System.out.println(cellsColA); - System.out.println(cellsColC); - assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8()); - assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8()); + // Bigtable keeps cell history ordered by descending timestamp + assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8()); + assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8()); + assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8()); + assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8()); } @Test diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index b2b52bd675c5..f8534f38ddfc 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -252,11 +252,10 @@ def process(self, direct_row): "type": b'SetCell', "family_name": mutation.set_cell.family_name.encode('utf-8'), "column_qualifier": mutation.set_cell.column_qualifier, - "value": mutation.set_cell.value + "value": mutation.set_cell.value, + "timestamp_micros": struct.pack( + '>q', mutation.set_cell.timestamp_micros) } - micros = mutation.set_cell.timestamp_micros - if micros > -1: - mutation_dict['timestamp_micros'] = struct.pack('>q', micros) elif mutation.__contains__("delete_from_column"): mutation_dict = { "type": b'DeleteFromColumn', diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 341f2983c8bc..f61e346cff9f 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -223,6 +223,9 @@ def test_set_mutation(self): row1_col2_cell = Cell(b'val1-2', 200_000_000) row2_col1_cell = Cell(b'val2-1', 100_000_000) row2_col2_cell = Cell(b'val2-2', 200_000_000) + # When setting this cell, we won't set a timestamp. We expect the timestamp + # to default to -1, and Bigtable will set it to system time at insertion. + row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time()) # rows sent to write transform row1.set_cell( 'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp) @@ -232,6 +235,8 @@ def test_set_mutation(self): 'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp) row2.set_cell( 'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp) + # don't set a timestamp here. it should default to -1 + row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value) self.run_pipeline([row1, row2]) @@ -249,6 +254,19 @@ def test_set_mutation(self): self.assertEqual( row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0]) + # check mutation that doesn't have a timestamp set is handled properly: + self.assertEqual( + row2_col1_no_timestamp.value, + actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value) + # Bigtable sets timestamp as insertion time, which is later than the + # time.time() we set when creating this test case + cell_timestamp = actual_row2.find_cells('col_fam', + b'col-no-timestamp')[0].timestamp + self.assertTrue( + row2_col1_no_timestamp.timestamp < cell_timestamp, + msg="Expected cell with unset timestamp to have ingestion time " + f"attached, but was {cell_timestamp}") + def test_delete_cells_mutation(self): col_fam = self.table.column_family('col_fam') col_fam.create()