Skip to content

Commit

Permalink
Update Azure infrastructure.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed May 2, 2024
1 parent fa2afe8 commit 2a0055b
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 68 deletions.
3 changes: 2 additions & 1 deletion lib/galaxy/dependencies/conditional-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ sentry-sdk[fastapi]
pbs_python
drmaa
statsd
azure-storage==0.32.0
azure-storage-blob==12.19.1
python-irodsclient==2.0.0
python-ldap==3.4.0
ldap3==2.9.1
Expand All @@ -26,6 +26,7 @@ fs-gcsfs # type: googlecloudstorage
google-cloud-storage>=2.8.0 # type: googlecloudstorage
fs.onedatarestfs # type: onedata
fs-basespace # type: basespace
fs-azureblob # type: azure

# Vault backend
hvac
Expand Down
43 changes: 43 additions & 0 deletions lib/galaxy/files/sources/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Union

try:
from fs.azblob import (
BlobFS,
BlobFSV2,
)
except ImportError:
BlobFS = None

from typing import Optional

from . import (
FilesSourceOptions,
FilesSourceProperties,
)
from ._pyfilesystem2 import PyFilesystem2FilesSource


class AzureFileSource(PyFilesystem2FilesSource):
plugin_type = "azure"
required_module = BlobFS
required_package = "fs-azureblob"

def _open_fs(self, user_context=None, opts: Optional[FilesSourceOptions] = None):
props = self._serialization_props(user_context)
extra_props: Union[FilesSourceProperties, dict] = opts.extra_props or {} if opts else {}
all_props = {**props, **extra_props}
namespace_type = all_props.get("namespace_type", "hierarchical")
if namespace_type not in ["hierarchical", "flat"]:
raise Exception("Misconfigured azure file source")
account_name = all_props["account_name"]
account_key = all_props["account_key"]
container = all_props["container_name"]
if namespace_type == "flat":
handle = BlobFS(account_name, container, account_key)
else:
handle = BlobFSV2(account_name, container, account_key)

return handle


__all__ = ("AzureFileSource",)
141 changes: 78 additions & 63 deletions lib/galaxy/objectstore/azure_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
import logging
import os
import shutil
from datetime import datetime
from datetime import (
datetime,
timedelta,
)
from typing import Optional

try:
from azure.common import AzureHttpError
from azure.storage import CloudStorageAccount
from azure.storage.blob import BlockBlobService
from azure.storage.blob.models import Blob
from azure.storage.blob import (
BlobSasPermissions,
BlobServiceClient,
generate_blob_sas,
)
except ImportError:
BlockBlobService = None
BlobServiceClient = None

from galaxy.exceptions import (
ObjectInvalid,
Expand Down Expand Up @@ -48,6 +53,7 @@ def parse_config_xml(config_xml):

account_name = auth_xml.get("account_name")
account_key = auth_xml.get("account_key")
account_url = auth_xml.get("account_url")

container_xml = config_xml.find("container")
container_name = container_xml.get("name")
Expand All @@ -62,11 +68,15 @@ def parse_config_xml(config_xml):
log.error(msg)
raise Exception(msg)
extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs]
auth = {
"account_name": account_name,
"account_key": account_key,
}
if account_url:
auth["account_url"] = account_url

return {
"auth": {
"account_name": account_name,
"account_key": account_key,
},
"auth": auth,
"container": {
"name": container_name,
"max_chunk_size": max_chunk_size,
Expand Down Expand Up @@ -102,6 +112,7 @@ def __init__(self, config, config_dict):
self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict)

self.account_name = auth_dict.get("account_name")
self.account_url = auth_dict.get("account_url")
self.account_key = auth_dict.get("account_key")

self.container_name = container_dict.get("name")
Expand All @@ -114,7 +125,7 @@ def __init__(self, config, config_dict):
self._initialize()

def _initialize(self):
if BlockBlobService is None:
if BlobServiceClient is None:
raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE)

self._configure_connection()
Expand All @@ -124,12 +135,15 @@ def _initialize(self):

def to_dict(self):
as_dict = super().to_dict()
auth = {
"account_name": self.account_name,
"account_key": self.account_key,
}
if self.account_url:
auth["account_url"] = self.account_url
as_dict.update(
{
"auth": {
"account_name": self.account_name,
"account_key": self.account_key,
},
"auth": auth,
"container": {
"name": self.container_name,
"max_chunk_size": self.max_chunk_size,
Expand All @@ -154,8 +168,18 @@ def parse_xml(clazz, config_xml):

def _configure_connection(self):
log.debug("Configuring Connection")
self.account = CloudStorageAccount(self.account_name, self.account_key)
self.service = self.account.create_block_blob_service()
if self.account_url:
# https://pypi.org/project/azure-storage-blob/
service = BlobServiceClient(
account_url=self.account_url,
credential={"account_name": self.account_name, "account_key": self.account_key},
)
else:
service = BlobServiceClient(
account_url=f"https://{self.account_name}.blob.core.windows.net",
credential=self.account_key,
)
self.service = service

def _construct_path(
self,
Expand Down Expand Up @@ -224,32 +248,26 @@ def _fix_permissions(self, rel_path):
def _get_cache_path(self, rel_path):
return os.path.abspath(os.path.join(self.staging_path, rel_path))

def _get_transfer_progress(self):
return self.transfer_progress

def _get_size_in_azure(self, rel_path):
try:
properties = self.service.get_blob_properties(self.container_name, rel_path)
# Currently this returns a blob and not a BlobProperties object
# Similar issue for the ruby https://github.com/Azure/azure-storage-ruby/issues/13
# The typecheck is an attempt at future-proofing this when/if the bug is fixed.
if type(properties) is Blob:
properties = properties.properties
if properties:
size_in_bytes = properties.content_length
return size_in_bytes
properties = self._blob_client(rel_path).get_blob_properties()
size_in_bytes = properties.size
return size_in_bytes
except AzureHttpError:
log.exception("Could not get size of blob '%s' from Azure", rel_path)
return -1

def _in_azure(self, rel_path):
try:
exists = self.service.exists(self.container_name, rel_path)
exists = self._blob_client(rel_path).exists()
except AzureHttpError:
log.exception("Trouble checking existence of Azure blob '%s'", rel_path)
return False
return exists

def _blob_client(self, rel_path: str):
return self.service.get_blob_client(self.container_name, rel_path)

def _in_cache(self, rel_path):
"""Check if the given dataset is in the local cache."""
cache_path = self._get_cache_path(rel_path)
Expand All @@ -265,9 +283,6 @@ def _pull_into_cache(self, rel_path):
self._fix_permissions(self._get_cache_path(rel_path_dir))
return file_ok

def _transfer_cb(self, complete, total):
self.transfer_progress = float(complete) / float(total) * 100 # in percent

def _download(self, rel_path):
local_destination = self._get_cache_path(rel_path)
try:
Expand All @@ -281,10 +296,8 @@ def _download(self, rel_path):
)
return False
else:
self.transfer_progress = 0 # Reset transfer progress counter
self.service.get_blob_to_path(
self.container_name, rel_path, local_destination, progress_callback=self._transfer_cb
)
with open(local_destination, "wb") as f:
self._blob_client(rel_path).download_blob().download_to_stream(f)
return True
except AzureHttpError:
log.exception("Problem downloading '%s' from Azure", rel_path)
Expand All @@ -301,24 +314,22 @@ def _push_to_os(self, rel_path, source_file=None, from_string=None):
try:
source_file = source_file or self._get_cache_path(rel_path)

if not os.path.exists(source_file):
if from_string is None and not os.path.exists(source_file):
log.error(
"Tried updating blob '%s' from source file '%s', but source file does not exist.",
rel_path,
source_file,
)
return False

if os.path.getsize(source_file) == 0:
if from_string is None and os.path.getsize(source_file) == 0:
log.debug(
"Wanted to push file '%s' to azure blob '%s' but its size is 0; skipping.", source_file, rel_path
)
return True

if from_string:
self.service.create_blob_from_text(
self.container_name, rel_path, from_string, progress_callback=self._transfer_cb
)
if from_string is not None:
self._blob_client(rel_path).upload_blob(from_string, overwrite=True)
log.debug("Pushed data from string '%s' to blob '%s'", from_string, rel_path)
else:
start_time = datetime.now()
Expand All @@ -328,13 +339,11 @@ def _push_to_os(self, rel_path, source_file=None, from_string=None):
os.path.getsize(source_file),
rel_path,
)
self.transfer_progress = 0 # Reset transfer progress counter
self.service.create_blob_from_path(
self.container_name, rel_path, source_file, progress_callback=self._transfer_cb
)
with open(source_file, "rb") as f:
self._blob_client(rel_path).upload_blob(f, overwrite=True)
end_time = datetime.now()
log.debug(
"Pushed cache file '%s' to blob '%s' (%s bytes transfered in %s sec)",
"Pushed cache file '%s' to blob '%s' (%s bytes transferred in %s sec)",
source_file,
rel_path,
os.path.getsize(source_file),
Expand Down Expand Up @@ -433,7 +442,9 @@ def _create(self, obj, **kwargs):

def _empty(self, obj, **kwargs):
if self._exists(obj, **kwargs):
return bool(self._size(obj, **kwargs) > 0)
size = self._size(obj, **kwargs)
is_empty = bool(size == 0)
return is_empty
else:
raise ObjectNotFound(f"objectstore.empty, object does not exist: {str(obj)}, kwargs: {str(kwargs)}")

Expand Down Expand Up @@ -467,18 +478,18 @@ def _delete(self, obj, entire_dir=False, **kwargs):
# but requires iterating through each individual blob in Azure and deleing it.
if entire_dir and extra_dir:
shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True)
blobs = self.service.list_blobs(self.container_name, prefix=rel_path)
blobs = self.service.get_container_client(self.container_name).list_blobs()
for blob in blobs:
log.debug("Deleting from Azure: %s", blob)
self.service.delete_blob(self.container_name, blob.name)
self._blob_client(blob.name).delete_blob()
return True
else:
# Delete from cache first
unlink(self._get_cache_path(rel_path), ignore_errors=True)
# Delete from S3 as well
if self._in_azure(rel_path):
log.debug("Deleting from Azure: %s", rel_path)
self.service.delete_blob(self.container_name, rel_path)
self._blob_client(rel_path).delete_blob()
return True
except AzureHttpError:
log.exception("Could not delete blob '%s' from Azure", rel_path)
Expand Down Expand Up @@ -512,14 +523,6 @@ def _get_filename(self, obj, **kwargs):
cache_path = self._get_cache_path(rel_path)
if not sync_cache:
return cache_path
# S3 does not recognize directories as files so cannot check if those exist.
# So, if checking dir only, ensure given dir exists in cache and return
# the expected cache path.
# dir_only = kwargs.get('dir_only', False)
# if dir_only:
# if not os.path.exists(cache_path):
# os.makedirs(cache_path)
# return cache_path
# Check if the file exists in the cache first, always pull if file size in cache is zero
if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0):
return cache_path
Expand All @@ -539,7 +542,8 @@ def _get_filename(self, obj, **kwargs):
def _update_from_file(self, obj, file_name=None, create=False, **kwargs):
if create is True:
self._create(obj, **kwargs)
elif self._exists(obj, **kwargs):

if self._exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
# Chose whether to use the dataset file itself or an alternate file
if file_name:
Expand Down Expand Up @@ -567,13 +571,24 @@ def _get_object_url(self, obj, **kwargs):
if self._exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
try:
url = self.service.make_blob_url(container_name=self.container_name, blob_name=rel_path)
return url
url = self._blob_client(rel_path).url
# https://learn.microsoft.com/en-us/azure/storage/blobs/sas-service-create-python
token = generate_blob_sas(
account_name=self.account_name,
account_key=self.account_key,
container_name=self.container_name,
blob_name=rel_path,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1),
)
return f"{url}?{token}"
except AzureHttpError:
log.exception("Trouble generating URL for dataset '%s'", rel_path)
return None

def _get_store_usage_percent(self):
def _get_store_usage_percent(self, obj):
# Percent used for Azure blob containers is effectively zero realistically.
# https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets
return 0.0

@property
Expand Down
6 changes: 6 additions & 0 deletions lib/galaxy/objectstore/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import threading
import time
from math import inf
from typing import (
List,
Optional,
Expand Down Expand Up @@ -76,6 +77,11 @@ def check_cache(cache_target: CacheTarget):
_clean_cache(file_list, delete_this_much)


def reset_cache(cache_target: CacheTarget):
_, file_list = _get_cache_size_files(cache_target.path)
_clean_cache(file_list, inf)


def _clean_cache(file_list: FileListT, delete_this_much: float) -> None:
"""Keep deleting files from the file_list until the size of the deleted
files is greater than the value in delete_this_much parameter.
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/objectstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def _get_object_url(self, obj, **kwargs):
log.exception("Trouble generating URL for dataset '%s'", rel_path)
return None

def _get_store_usage_percent(self):
def _get_store_usage_percent(self, obj):
return 0.0

def shutdown(self):
Expand Down
Loading

0 comments on commit 2a0055b

Please sign in to comment.