Skip to content

Commit

Permalink
Add test code to overwrite SQL in Beam Python JDBC (#30417)
Browse files Browse the repository at this point in the history
* Add python jdbc test with override query

* fix jdbcio it test lint
  • Loading branch information
case-k-git authored Mar 4, 2024
1 parent 15d16f7 commit 2b8a737
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from parameterized import parameterized

import apache_beam as beam
from apache_beam import DoFn
from apache_beam import ParDo
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.jdbc import WriteToJdbc
Expand Down Expand Up @@ -197,8 +199,6 @@ def test_xlang_jdbc_write_read(self, database):
p.not_use_test_runner_api = True
result = (
p
# TODO(https://github.com/apache/beam/issues/20446) Add test with
# overridden read_query
| 'Read from jdbc' >> ReadFromJdbc(
table_name=table_name,
driver_class_name=self.driver_class_name,
Expand All @@ -209,6 +209,26 @@ def test_xlang_jdbc_write_read(self, database):

assert_that(result, equal_to(expected_row))

with TestPipeline() as p:
p.not_use_test_runner_api = True

class ExtractCount(DoFn):
def process(self, element):
yield element[0]

result = (
p
| 'Read from jdbc override query' >> ReadFromJdbc(
table_name=table_name,
query=f'select count(*) from {table_name}',
driver_class_name=self.driver_class_name,
jdbc_url=self.jdbc_url,
username=self.username,
password=self.password,
classpath=classpath)
| 'ExtractCount' >> ParDo(ExtractCount()))
assert_that(result, equal_to([ROW_COUNT]))

# Try the same read using the partitioned reader code path.
# Outputs should be the same.
with TestPipeline() as p:
Expand Down

0 comments on commit 2b8a737

Please sign in to comment.