From 491c1ac5c092d086250f15216dfdb23682534f8f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 16:07:37 -0400 Subject: [PATCH] add fix and test error message --- sdks/python/apache_beam/io/gcp/bigtableio.py | 2 +- .../python/apache_beam/io/gcp/bigtableio_it_test.py | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index b2b52bd675c5..f402e6d7b393 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -255,7 +255,7 @@ def process(self, direct_row): "value": mutation.set_cell.value } micros = mutation.set_cell.timestamp_micros - if micros > -1: + if micros >= -1: mutation_dict['timestamp_micros'] = struct.pack('>q', micros) elif mutation.__contains__("delete_from_column"): mutation_dict = { diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 0f0613a1f2d8..c9506e2a8a66 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -144,9 +144,9 @@ def test_read_xlang(self): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -@unittest.skipUnless( - os.environ.get('EXPANSION_PORT'), - "EXPANSION_PORT environment var is not provided.") +# @unittest.skipUnless( +# os.environ.get('EXPANSION_PORT'), +# "EXPANSION_PORT environment var is not provided.") @unittest.skipIf(client is None, 'Bigtable dependencies are not installed') class TestWriteToBigtableXlangIT(unittest.TestCase): # These are integration tests for the cross-language write transform. @@ -158,7 +158,7 @@ def setUpClass(cls): cls.test_pipeline = TestPipeline(is_integration_test=True) cls.project = cls.test_pipeline.get_option('project') cls.args = cls.test_pipeline.get_full_options_as_args() - cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) + cls.expansion_service = None #('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = '%s-%s-%s' % ( cls.INSTANCE, str(int(time.time())), secrets.token_hex(3)) @@ -262,7 +262,10 @@ def test_set_mutation(self): # time.time() we set when creating this test case self.assertTrue( row2_col1_no_timestamp.timestamp < actual_row2.find_cells( - 'col_fam', b'col-no-timestamp')[0].timestamp) + 'col_fam', b'col-no-timestamp')[0].timestamp, + msg="Expected cell with unset timestamp to have ingestion time attached, " + f"but was {actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].timestamp}" + ) def test_delete_cells_mutation(self): col_fam = self.table.column_family('col_fam')