diff --git a/leap_data_management_utils/data_management_transforms.py b/leap_data_management_utils/data_management_transforms.py index 6362139..bc6079c 100644 --- a/leap_data_management_utils/data_management_transforms.py +++ b/leap_data_management_utils/data_management_transforms.py @@ -1,10 +1,12 @@ # Note: All of this code was written by Julius Busecke and copied from this feedstock: # https://github.com/leap-stc/cmip6-leap-feedstock/blob/main/feedstock/recipe.py#L262 +import logging import os import subprocess from dataclasses import dataclass from datetime import datetime, timezone +from typing import Optional import apache_beam as beam import zarr @@ -12,6 +14,8 @@ from leap_data_management_utils.bq_interfaces import BQInterface +logger = logging.getLogger(__name__) + yaml = YAML(typ='safe') @@ -164,3 +168,73 @@ def expand( self, pcoll: beam.PCollection[zarr.storage.FSStore] ) -> beam.PCollection[zarr.storage.FSStore]: return pcoll | 'Injecting Attributes' >> beam.Map(self._update_zarr_attrs) + + +@dataclass +class CopyRclone(beam.PTransform): + """Copy a store to a new location using rclone. If the target input is False, do nothing. + Currently assumes that the source is a GCS bucket (with auth in the environment) + and the target is an OSN bucket. + The OSN credentials are fetched from GCP Secret Manager. This could be implemented more generally + for arbirary rclone remotes as sources and targets. + """ + + target: str + remove_endpoint_url: Optional[str] = None + + def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: + import os + + import zarr + + # We do need the gs:// prefix? + # TODO: Determine this dynamically from zarr.storage.FSStore + source = os.path.normpath(store.path) + if self.target is False: + # dont do anything + return store + else: + from google.cloud import secretmanager + + secret_client = secretmanager.SecretManagerServiceClient() + + # define rclone remotes ("source" and "target") in the environment + + os.environ['RCLONE_CONFIG_SOURCE_TYPE'] = 'gcs' + os.environ['RCLONE_CONFIG_SOURCE_ENV_AUTH'] = 'true' + + os.environ['RCLONE_CONFIG_TARGET_TYPE'] = 's3' + os.environ['RCLONE_CONFIG_TARGET_PROVIDER'] = 'Ceph' + os.environ['RCLONE_CONFIG_TARGET_ENDPOINT'] = 'https://nyu1.osn.mghpcc.org' + os.environ['RCLONE_CONFIG_TARGET_ACCESS_KEY_ID'] = secret_client.access_secret_version( + name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_KEY/versions/latest' + ).payload.data.decode('UTF-8') + os.environ['RCLONE_CONFIG_TARGET_SECRET_ACCESS_KEY'] = ( + secret_client.access_secret_version( + name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_SECRET/versions/latest' + ).payload.data.decode('UTF-8') + ) + + # beam does not like clients to stick around + del secret_client + + logger.warning(f'Copying from {source} to {self.target}') + if self.remove_endpoint_url is not None: + target = self.target.replace(self.remove_endpoint_url, '') + else: + target = self.target + + copy_proc = subprocess.run( + f'rclone -vv copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{target}/"', + shell=True, + capture_output=True, + text=True, + ) + logger.warning(copy_proc.stdout) + logger.warning(copy_proc.stderr) + copy_proc.check_returncode() + del copy_proc + return zarr.storage.FSStore(self.target) + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | 'Copying Store' >> beam.Map(self._copy) diff --git a/pyproject.toml b/pyproject.toml index 66a600f..f62e9ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ pangeo-forge = [ "pangeo-forge-recipes", "apache-beam==2.60.0", "dynamic-chunks", + "google-cloud-secret-manager", "leap-data-management-utils[bigquery]", ] catalog = [