Skip to content

Commit

Permalink
Updates the test suite to use the transform service (#30605)
Browse files Browse the repository at this point in the history
* Updates the test suite to use the transform service

* Trigger the test suite

* Fix formatting

* Addressing reviewer comments
  • Loading branch information
chamikaramj authored Mar 13, 2024
1 parent 5a9a7d2 commit 06c76b4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
}
16 changes: 14 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def setUp(self):
self.table = self.instance.table(self.TABLE_ID)
self.table.create()
_LOGGER.info("Created table [%s]", self.table.table_id)
if (os.environ.get('TRANSFORM_SERVICE_PORT')):
self._transform_service_address = (
'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT'))
else:
self._transform_service_address = None

def tearDown(self):
try:
Expand Down Expand Up @@ -142,7 +147,8 @@ def test_read_xlang(self):
| bigtableio.ReadFromBigtable(
project_id=self.project,
instance_id=self.instance.instance_id,
table_id=self.table.table_id)
table_id=self.table.table_id,
expansion_service=self._transform_service_address)
| "Extract cells" >> beam.Map(lambda row: row._cells))

assert_that(cells, equal_to(expected_cells))
Expand Down Expand Up @@ -190,6 +196,11 @@ def setUp(self):
(self.TABLE_ID, str(int(time.time())), secrets.token_hex(3)))
self.table.create()
_LOGGER.info("Created table [%s]", self.table.table_id)
if (os.environ.get('TRANSFORM_SERVICE_PORT')):
self._transform_service_address = (
'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT'))
else:
self._transform_service_address = None

def tearDown(self):
try:
Expand All @@ -216,7 +227,8 @@ def run_pipeline(self, rows):
project_id=self.project,
instance_id=self.instance.instance_id,
table_id=self.table.table_id,
use_cross_language=True))
use_cross_language=True,
expansion_service=self._transform_service_address))

def test_set_mutation(self):
row1: DirectRow = DirectRow('key-1')
Expand Down
24 changes: 19 additions & 5 deletions sdks/python/expansion-service-container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func installExtraPackages(requirementsFile string) error {
return nil
}

func getUpdatedRequirementsFile(oldRequirementsFileName string, dependenciesDir string) (string, error) {
oldExtraPackages, err := getLines(filepath.Join(dependenciesDir, oldRequirementsFileName))
func getUpdatedRequirementsFile(oldDependenciesRequirementsFile string, dependenciesDir string) (string, error) {
oldExtraPackages, err := getLines(oldDependenciesRequirementsFile)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -145,9 +145,20 @@ func launchExpansionServiceProcess() error {

args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}

if *requirements_file != "" {
log.Printf("Received the requirements file %v", *requirements_file)
updatedRequirementsFileName, err := getUpdatedRequirementsFile(*requirements_file, *dependencies_dir)
// Requirements file with dependencies to install.
// Note that we have to look for the requirements file in the dependencies
// volume here not the requirements file at the top level. Latter provides
// Beam dependencies.
dependencies_requirements_file := filepath.Join(*dependencies_dir, *requirements_file)
dependencies_requirements_file_exists := false
if _, err := os.Stat(dependencies_requirements_file); err == nil {
dependencies_requirements_file_exists = true
}

// We only try to install dependencies, if the requirements file exists.
if dependencies_requirements_file_exists {
log.Printf("Received the requirements file %s with extra packages.", dependencies_requirements_file)
updatedRequirementsFileName, err := getUpdatedRequirementsFile(dependencies_requirements_file, *dependencies_dir)
if err != nil {
return err
}
Expand All @@ -161,7 +172,10 @@ func launchExpansionServiceProcess() error {
if err != nil {
return err
}
} else {
log.Printf("Requirements file %s was provided but not available.", dependencies_requirements_file)
}

if err := execx.Execute(pythonVersion, args...); err != nil {
return fmt.Errorf("could not start the expansion service: %s", err)
}
Expand Down

0 comments on commit 06c76b4

Please sign in to comment.