Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 #28624

Merged
merged 9 commits into from
Sep 23, 2023
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def process(self, direct_row):
"value": mutation.set_cell.value
}
micros = mutation.set_cell.timestamp_micros
if micros > -1:
if micros >= -1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this conditional? why isnt the timestamp just passed through?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot why it was originally set up like this, but I don't see a reason for it

mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
elif mutation.__contains__("delete_from_column"):
mutation_dict = {
Expand Down
26 changes: 22 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def test_read_xlang(self):

@pytest.mark.uses_gcp_java_expansion_service
@pytest.mark.uses_transform_service
@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")
# @unittest.skipUnless(
# os.environ.get('EXPANSION_PORT'),
# "EXPANSION_PORT environment var is not provided.")
@unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
class TestWriteToBigtableXlangIT(unittest.TestCase):
# These are integration tests for the cross-language write transform.
Expand All @@ -158,7 +158,7 @@ def setUpClass(cls):
cls.test_pipeline = TestPipeline(is_integration_test=True)
cls.project = cls.test_pipeline.get_option('project')
cls.args = cls.test_pipeline.get_full_options_as_args()
cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT'))
cls.expansion_service = None #('localhost:%s' % os.environ.get('EXPANSION_PORT'))

instance_id = '%s-%s-%s' % (
cls.INSTANCE, str(int(time.time())), secrets.token_hex(3))
Expand Down Expand Up @@ -223,6 +223,9 @@ def test_set_mutation(self):
row1_col2_cell = Cell(b'val1-2', 200_000_000)
row2_col1_cell = Cell(b'val2-1', 100_000_000)
row2_col2_cell = Cell(b'val2-2', 200_000_000)
# When setting this cell, we won't set a timestamp. We expect the timestamp
# to default to -1, and Bigtable will set it to system time at insertion.
row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
# rows sent to write transform
row1.set_cell(
'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
Expand All @@ -232,6 +235,8 @@ def test_set_mutation(self):
'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
row2.set_cell(
'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
# don't set a timestamp here. it should default to -1
row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

self.run_pipeline([row1, row2])

Expand All @@ -249,6 +254,19 @@ def test_set_mutation(self):
self.assertEqual(
row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])

# check cells that don't have a timestamp set are handled properly:
self.assertEqual(
row2_col1_no_timestamp.value,
actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
# Bigtable sets timestamp as insertion time, which is later than the
# time.time() we set when creating this test case
self.assertTrue(
row2_col1_no_timestamp.timestamp < actual_row2.find_cells(
'col_fam', b'col-no-timestamp')[0].timestamp,
msg="Expected cell with unset timestamp to have ingestion time attached, "
f"but was {actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].timestamp}"
)

def test_delete_cells_mutation(self):
col_fam = self.table.column_family('col_fam')
col_fam.create()
Expand Down