Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 #28624

Merged
merged 9 commits into from
Sep 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,13 @@ public KV<ByteString, Iterable<Mutation>> apply(Row row) {
.setColumnQualifier(
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
.setFamilyNameBytes(
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
if (mutation.containsKey("timestamp_micros")) {
setMutation =
setMutation.setTimestampMicros(
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
}
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
// Use timestamp if provided, else default to -1 (current Bigtable server time)
.setTimestampMicros(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would the most appropriate logic be more like this?

if (mutation.containsKey("timestamp_micros")) { 
  builder.setTimestampMicros(mutation.get("timestamp_micros"));
}

and so on probably for all the fields, since proto handling of optional fields is not usable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could definitely add those checks, but want to point out that the input mutation map object is constructed here:

for mutation in direct_row._get_mutations():
if mutation.__contains__("set_cell"):
mutation_dict = {
"type": b'SetCell',
"family_name": mutation.set_cell.family_name.encode('utf-8'),
"column_qualifier": mutation.set_cell.column_qualifier,
"value": mutation.set_cell.value,
"timestamp_micros": struct.pack(
'>q', mutation.set_cell.timestamp_micros)
}

This process is internal, so we always expect these fields to exist (at least if we're talking xlang. I'm not aware of other ways SchemaTransforms are used). Users wouldn't be constructing their own Beam Row mutations

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean to be a true passthrough, we pass "not present" to "not present" instead of putting the logic for choosing a default value into our code, where it is duplicated and could get out of sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can agree with that. I can address this nit in another PR but for now the current changes should be okay to cherry-pick?

@Abacn this means we'd have to make sure the Go wrapper defaults timestamps to time at ingestion. Also templates don't use the schematransform AFAIK so it shouldn't matter there anyways right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@igorbernstein2 cc'ing you to this thread as well cuz I heard the Go wrapper is implemented by the Bigtable team and maybe you can weigh in on this

Copy link
Contributor

@Abacn Abacn Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to a different/inconsistent behavior in Java an Python API. We cannot "pass "not present" to "not present"". For Python, If timestamp not set it defaults to -1: https://github.com/googleapis/python-bigtable/blob/e5af3597f45fc4c094c59abca876374f5a866c1b/google/cloud/bigtable/row.py#L164

For Java, if timestamp not set it defaults to 0 and causing problem

Arguably the Documentation for Java client asks user to set Timestamp and warns that it will defaults to 0 if unspecified: https://github.com/googleapis/java-bigtable/blob/15cd4868ff807513914095a3758134eaa14f0ea3/proto-google-cloud-bigtable-v2/src/main/java/com/google/bigtable/v2/Mutation.java#L902

Consequently the possible misuse (did not set Timestamp and then data loss) could still happen in Java BigtableIO with user constructed Mutation: #27022

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a useful action item to add validation somehow to prevent unset timestamp

mutation.containsKey("timestamp_micros")
? Longs.fromByteArray(
ofNullable(mutation.get("timestamp_micros")).get())
: -1);
bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build();
break;
case "DeleteFromColumn":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void tearDown() {
public void testSetMutationsExistingColumn() {
RowMutation rowMutation =
RowMutation.create(tableId, "key-1")
.setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
.setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
.setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
.setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
dataClient.mutateRow(rowMutation);

List<Map<String, byte[]>> mutations = new ArrayList<>();
Expand All @@ -165,13 +165,15 @@ public void testSetMutationsExistingColumn() {
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
"timestamp_micros", Longs.toByteArray(2000)));
mutations.add(
ImmutableMap.of(
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-c".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8),
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)));
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8),
"timestamp_micros", Longs.toByteArray(2000)));
Row mutationRow =
Row.withSchema(SCHEMA)
.withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8))
Expand Down Expand Up @@ -202,10 +204,11 @@ public void testSetMutationsExistingColumn() {
.collect(Collectors.toList());
assertEquals(2, cellsColA.size());
assertEquals(2, cellsColC.size());
System.out.println(cellsColA);
System.out.println(cellsColC);
assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8());
assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8());
// Bigtable keeps cell history ordered by descending timestamp
assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8());
assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8());
assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8());
assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8());
}

@Test
Expand Down
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,10 @@ def process(self, direct_row):
"type": b'SetCell',
"family_name": mutation.set_cell.family_name.encode('utf-8'),
"column_qualifier": mutation.set_cell.column_qualifier,
"value": mutation.set_cell.value
"value": mutation.set_cell.value,
"timestamp_micros": struct.pack(
'>q', mutation.set_cell.timestamp_micros)
}
micros = mutation.set_cell.timestamp_micros
if micros > -1:
mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
elif mutation.__contains__("delete_from_column"):
mutation_dict = {
"type": b'DeleteFromColumn',
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def test_set_mutation(self):
row1_col2_cell = Cell(b'val1-2', 200_000_000)
row2_col1_cell = Cell(b'val2-1', 100_000_000)
row2_col2_cell = Cell(b'val2-2', 200_000_000)
# When setting this cell, we won't set a timestamp. We expect the timestamp
# to default to -1, and Bigtable will set it to system time at insertion.
row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
# rows sent to write transform
row1.set_cell(
'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
Expand All @@ -232,6 +235,8 @@ def test_set_mutation(self):
'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
row2.set_cell(
'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
# don't set a timestamp here. it should default to -1
row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

self.run_pipeline([row1, row2])

Expand All @@ -249,6 +254,19 @@ def test_set_mutation(self):
self.assertEqual(
row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])

# check mutation that doesn't have a timestamp set is handled properly:
self.assertEqual(
row2_col1_no_timestamp.value,
actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
# Bigtable sets timestamp as insertion time, which is later than the
# time.time() we set when creating this test case
cell_timestamp = actual_row2.find_cells('col_fam',
b'col-no-timestamp')[0].timestamp
self.assertTrue(
row2_col1_no_timestamp.timestamp < cell_timestamp,
msg="Expected cell with unset timestamp to have ingestion time "
f"attached, but was {cell_timestamp}")

def test_delete_cells_mutation(self):
col_fam = self.table.column_family('col_fam')
col_fam.create()
Expand Down