Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rclone copy stage #60

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
74 changes: 74 additions & 0 deletions leap_data_management_utils/data_management_transforms.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
# Note: All of this code was written by Julius Busecke and copied from this feedstock:
# https://github.com/leap-stc/cmip6-leap-feedstock/blob/main/feedstock/recipe.py#L262

import logging
import os
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional

import apache_beam as beam
import zarr
from ruamel.yaml import YAML

from leap_data_management_utils.bq_interfaces import BQInterface

logger = logging.getLogger(__name__)

yaml = YAML(typ='safe')


Expand Down Expand Up @@ -164,3 +168,73 @@ def expand(
self, pcoll: beam.PCollection[zarr.storage.FSStore]
) -> beam.PCollection[zarr.storage.FSStore]:
return pcoll | 'Injecting Attributes' >> beam.Map(self._update_zarr_attrs)


@dataclass
class CopyRclone(beam.PTransform):
"""Copy a store to a new location using rclone. If the target input is False, do nothing.
Currently assumes that the source is a GCS bucket (with auth in the environment)
and the target is an OSN bucket.
The OSN credentials are fetched from GCP Secret Manager. This could be implemented more generally
for arbirary rclone remotes as sources and targets.
"""

target: str
remove_endpoint_url: Optional[str] = None

def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
import os

import zarr

# We do need the gs:// prefix?
# TODO: Determine this dynamically from zarr.storage.FSStore
source = os.path.normpath(store.path)
if self.target is False:
# dont do anything
return store
else:
from google.cloud import secretmanager

secret_client = secretmanager.SecretManagerServiceClient()

# define rclone remotes ("source" and "target") in the environment

os.environ['RCLONE_CONFIG_SOURCE_TYPE'] = 'gcs'
os.environ['RCLONE_CONFIG_SOURCE_ENV_AUTH'] = 'true'

os.environ['RCLONE_CONFIG_TARGET_TYPE'] = 's3'
os.environ['RCLONE_CONFIG_TARGET_PROVIDER'] = 'Ceph'
os.environ['RCLONE_CONFIG_TARGET_ENDPOINT'] = 'https://nyu1.osn.mghpcc.org'
os.environ['RCLONE_CONFIG_TARGET_ACCESS_KEY_ID'] = secret_client.access_secret_version(
name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_KEY/versions/latest'
).payload.data.decode('UTF-8')
os.environ['RCLONE_CONFIG_TARGET_SECRET_ACCESS_KEY'] = (
secret_client.access_secret_version(
name='projects/leap-pangeo/secrets/OSN_LEAP_PANGEO_PIPELINE_SECRET/versions/latest'
).payload.data.decode('UTF-8')
)

# beam does not like clients to stick around
del secret_client

logger.warning(f'Copying from {source} to {self.target}')
if self.remove_endpoint_url is not None:
target = self.target.replace(self.remove_endpoint_url, '')
else:
target = self.target

copy_proc = subprocess.run(
f'rclone -vv copy --fast-list --max-backlog 500000 --s3-chunk-size 200M --s3-upload-concurrency 128 --transfers 128 --checkers 128 -vv -P source:"{source}/" target:"{target}/"',
shell=True,
capture_output=True,
text=True,
)
logger.warning(copy_proc.stdout)
logger.warning(copy_proc.stderr)
copy_proc.check_returncode()
del copy_proc
return zarr.storage.FSStore(self.target)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | 'Copying Store' >> beam.Map(self._copy)
1 change: 1 addition & 0 deletions pyproject.toml
jbusecke marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pangeo-forge = [
"pangeo-forge-recipes",
"apache-beam==2.60.0",
"dynamic-chunks",
"google-cloud-secret-manager",
"leap-data-management-utils[bigquery]",
]
catalog = [
Expand Down