From bb2838adc06101b32e5d3007cf7370eb89830a69 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Thu, 24 Oct 2024 12:30:39 -0400 Subject: [PATCH 1/8] Add rclone copy stage --- .../data_management_transforms.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 6362139..0b70175 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -1,6 +1,7 @@ # Note: All of this code was written by Julius Busecke and copied from this feedstock: # https://github.com/leap-stc/cmip6-leap-feedstock/blob/main/feedstock/recipe.py#L262 +import logging import os import subprocess from dataclasses import dataclass @@ -12,6 +13,8 @@ from leap_data_management_utils.bq_interfaces import BQInterface +logger = logging.getLogger(__name__) + yaml = YAML(typ='safe') @@ -164,3 +167,61 @@ def expand( self, pcoll: beam.PCollection[zarr.storage.FSStore] ) -> beam.PCollection[zarr.storage.FSStore]: return pcoll | 'Injecting Attributes' >> beam.Map(self._update_zarr_attrs) + + +@dataclass +class CopyRclone(beam.PTransform): + """Copy a store to a new location using rclone. If the target input is False, do nothing. + Currently assumes that the source is a GCS bucket (with auth in the environment) + and the target is an OSN bucket. + The OSN credentials are fetched from GCP Secret Manager. This could be implemented more generally + for arbirary rclone remotes as sources and targets. + """ + + target: str + + def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: + import os + + import zarr + + # We do need the gs:// prefix? + # TODO: Determine this dynamically from zarr.storage.FSStore + source = f'gs://{os.path.normpath(store.path)}/' # FIXME more elegant. `.copytree` needs trailing slash + if self.target is False: + # dont do anything + return store + else: + from google.cloud import secretmanager + + secret_client = secretmanager.SecretManagerServiceClient() + osn_id = secret_client.access_secret_version( + name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY/versions/latest' + ).payload.data.decode('UTF-8') + osn_secret = secret_client.access_secret_version( + name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY_SECRET/versions/latest' + ).payload.data.decode('UTF-8') + + # beam does not like clients to stick around + del secret_client + + # Define remotes with the credentials (do not print these EVER!) + # TODO: It might be safer to use env variables here? see https://github.com/leap-stc/data-management/blob/main/.github/workflows/transfer.yaml for a template + + gcs_remote = ':gcs,env_auth=true:' + osn_remote = f":s3,provider=Ceph,endpoint='https://nyu1.osn.mghpcc.org',access_key_id={osn_id},secret_access_key={osn_secret}:" + + logger.warning(f'Copying from {source} to {self.target}') + + copy_proc = subprocess.run( + f'rclone copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P "{gcs_remote}{source}/" "{osn_remote}{self.target}/"', + shell=True, + capture_output=False, # will expose secrets if true + text=True, + ) + copy_proc.check_returncode() + del copy_proc + return zarr.storage.FSStore(self.target) + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | 'Copying Store' >> beam.Map(self._copy) From 3c22d6ff6315613117ef205f0041e5b4535e9178 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Thu, 24 Oct 2024 12:49:12 -0400 Subject: [PATCH 2/8] Update pyproject.toml --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 06ecebd..df27f0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ pangeo-forge = [ "pangeo-forge-recipes", "apache-beam==2.58.0", "dynamic-chunks", + "google-cloud-secret-manager", "leap-data-management-utils[bigquery]", ] catalog = [ From 86be0966c4dce83a193999ed622cfc1d97f48994 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Thu, 24 Oct 2024 15:57:21 -0400 Subject: [PATCH 3/8] Change entire config to env variables --- .../data_management_transforms.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 0b70175..94fb312 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -187,7 +187,7 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: # We do need the gs:// prefix? # TODO: Determine this dynamically from zarr.storage.FSStore - source = f'gs://{os.path.normpath(store.path)}/' # FIXME more elegant. `.copytree` needs trailing slash + source = os.path.normpath(store.path) if self.target is False: # dont do anything return store @@ -195,26 +195,31 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: from google.cloud import secretmanager secret_client = secretmanager.SecretManagerServiceClient() - osn_id = secret_client.access_secret_version( + + # define rclone remotes ("source" and "target") in the environment + + os.environ['RCLONE_CONFIG_SOURCE_TYPE'] = 'gcs' + os.environ['RCLONE_CONFIG_SOURCE_ENV_AUTH'] = 'true' + + os.environ['RCLONE_CONFIG_TARGET_TYPE'] = 's3' + os.environ['RCLONE_CONFIG_TARGET_PROVIDER'] = 'Ceph' + os.environ['RCLONE_CONFIG_TARGET_ENDPOINT'] = 'https://nyu1.osn.mghpcc.org' + os.environ['RCLONE_CONFIG_TARGET_ACCESS_KEY_ID'] = secret_client.access_secret_version( name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY/versions/latest' ).payload.data.decode('UTF-8') - osn_secret = secret_client.access_secret_version( - name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY_SECRET/versions/latest' - ).payload.data.decode('UTF-8') + os.environ['RCLONE_CONFIG_TARGET_SECRET_ACCESS_KEY'] = ( + secret_client.access_secret_version( + name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY_SECRET/versions/latest' + ).payload.data.decode('UTF-8') + ) # beam does not like clients to stick around del secret_client - # Define remotes with the credentials (do not print these EVER!) - # TODO: It might be safer to use env variables here? see https://github.com/leap-stc/data-management/blob/main/.github/workflows/transfer.yaml for a template - - gcs_remote = ':gcs,env_auth=true:' - osn_remote = f":s3,provider=Ceph,endpoint='https://nyu1.osn.mghpcc.org',access_key_id={osn_id},secret_access_key={osn_secret}:" - logger.warning(f'Copying from {source} to {self.target}') copy_proc = subprocess.run( - f'rclone copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P "{gcs_remote}{source}/" "{osn_remote}{self.target}/"', + f'rclone copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{self.target}/"', shell=True, capture_output=False, # will expose secrets if true text=True, From 67e632224e842cde52f0129748e4b633a2103e51 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Thu, 24 Oct 2024 17:18:45 -0400 Subject: [PATCH 4/8] add logging and increase verbosity --- leap_data_management_utils/data_management_transforms.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 94fb312..76f47f0 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -219,11 +219,13 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: logger.warning(f'Copying from {source} to {self.target}') copy_proc = subprocess.run( - f'rclone copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{self.target}/"', + f'rclone -vv copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{self.target}/"', shell=True, - capture_output=False, # will expose secrets if true + capture_output=True, text=True, ) + logger.warning(copy_proc.stdout) + logger.warning(copy_proc.stderr) copy_proc.check_returncode() del copy_proc return zarr.storage.FSStore(self.target) From d6fb80272c52fb4c30b63897bae4368ca9df388c Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Fri, 25 Oct 2024 11:05:49 -0400 Subject: [PATCH 5/8] Update data_management_transforms.py --- leap_data_management_utils/data_management_transforms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 76f47f0..8a0a721 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -205,11 +205,11 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: os.environ['RCLONE_CONFIG_TARGET_PROVIDER'] = 'Ceph' os.environ['RCLONE_CONFIG_TARGET_ENDPOINT'] = 'https://nyu1.osn.mghpcc.org' os.environ['RCLONE_CONFIG_TARGET_ACCESS_KEY_ID'] = secret_client.access_secret_version( - name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY/versions/latest' + name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_KEY /versions/latest' ).payload.data.decode('UTF-8') os.environ['RCLONE_CONFIG_TARGET_SECRET_ACCESS_KEY'] = ( secret_client.access_secret_version( - name='projects/leap-pangeo/secrets/OSN_CATALOG_BUCKET_KEY_SECRET/versions/latest' + name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_SECRET/versions/latest' ).payload.data.decode('UTF-8') ) From b83e24b6ed5af5a965076f8539e41eb872b5ef3a Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Mon, 28 Oct 2024 11:40:01 -0400 Subject: [PATCH 6/8] Fix whitspace in secred id --- leap_data_management_utils/data_management_transforms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 8a0a721..e824375 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -205,7 +205,7 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: os.environ['RCLONE_CONFIG_TARGET_PROVIDER'] = 'Ceph' os.environ['RCLONE_CONFIG_TARGET_ENDPOINT'] = 'https://nyu1.osn.mghpcc.org' os.environ['RCLONE_CONFIG_TARGET_ACCESS_KEY_ID'] = secret_client.access_secret_version( - name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_KEY /versions/latest' + name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_KEY/versions/latest' ).payload.data.decode('UTF-8') os.environ['RCLONE_CONFIG_TARGET_SECRET_ACCESS_KEY'] = ( secret_client.access_secret_version( From 115484a91c54e8c20092c82c80c4fe2e5c3c1777 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Thu, 21 Nov 2024 14:54:52 -0500 Subject: [PATCH 7/8] add remove endpoint logic --- leap_data_management_utils/data_management_transforms.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index e824375..816b722 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -6,6 +6,7 @@ import subprocess from dataclasses import dataclass from datetime import datetime, timezone +from typing import Optional import apache_beam as beam import zarr @@ -179,6 +180,7 @@ class CopyRclone(beam.PTransform): """ target: str + remove_endpoint_url: Optional[str] = None def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: import os @@ -217,9 +219,13 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: del secret_client logger.warning(f'Copying from {source} to {self.target}') + if self.remove_endpoint_url is not None: + target = self.target.replace(self.remove_endpoint_url, "") + else: + target = self.target copy_proc = subprocess.run( - f'rclone -vv copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{self.target}/"', + f'rclone -vv copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{target}/"', shell=True, capture_output=True, text=True, From 62164898b7ff58ad76c6d98b2bd8797975632838 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 19:55:54 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- leap_data_management_utils/data_management_transforms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 816b722..bc6079c 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -180,7 +180,7 @@ class CopyRclone(beam.PTransform): """ target: str - remove_endpoint_url: Optional[str] = None + remove_endpoint_url: Optional[str] = None def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: import os @@ -220,7 +220,7 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: logger.warning(f'Copying from {source} to {self.target}') if self.remove_endpoint_url is not None: - target = self.target.replace(self.remove_endpoint_url, "") + target = self.target.replace(self.remove_endpoint_url, '') else: target = self.target