From 18255573b8d9e5d5300a5e09751f27d01debed41 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:05:08 -0500 Subject: [PATCH] cleanup --- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 7 +------ sdks/python/apache_beam/io/kinesis.py | 6 ++++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 151d63d84684..44e6b78f4e93 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -116,8 +116,7 @@ def run_kinesis_write(self): region=self.aws_region, service_endpoint=self.aws_service_endpoint, verify_certificate=(not self.use_localstack), - partition_key='1', - producer_properties=self.producer_properties, + partition_key='1' )) def run_kinesis_read(self): @@ -219,10 +218,6 @@ def setUp(self): self.aws_service_endpoint = known_args.aws_service_endpoint self.use_localstack = not known_args.use_real_aws self.expansion_service = known_args.expansion_service - self.producer_properties = { - 'CollectionMaxCount': str(NUM_RECORDS), - 'ConnectTimeout': str(MAX_READ_TIME), - } if self.use_localstack: self.set_localstack() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 0be98c122f6e..1640b25830b4 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -112,7 +112,6 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), - ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -165,6 +164,10 @@ def __init__( 'verify_certificate set to True. This option is no longer ' + 'supported and certificate verification will automatically happen. ' + 'This option may be removed in a future release') + if producer_properties is not None: + raise ValueError( + 'producer_properties is no longer supported and will be removed in ' + + 'a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -175,7 +178,6 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), )