Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mount the dedicated storage for each function #1408

Merged
merged 6 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ data:
- mountPath: /data
name: user-storage
subPath: {{`{{ user_id }}`}}
- mountPath: /function_data
name: user-storage
Tansito marked this conversation as resolved.
Show resolved Hide resolved
subPath: {{`{{ function_data }}`}}
env:
# Environment variables for Ray TLS authentication.
# See https://docs.ray.io/en/latest/ray-core/configure.html#tls-authentication for more details.
Expand Down Expand Up @@ -352,6 +355,9 @@ data:
- mountPath: /data
name: user-storage
subPath: {{`{{ user_id }}`}}
- mountPath: /function_data
name: user-storage
subPath: {{`{{ function_data }}`}}
Comment on lines +358 to +360
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to mount this with any kind of restricted permissions? In theory, and correct me if I'm wrong here, whoever is running the function (user or provider) would be able to access this location via the container...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that with the current custom image function design, the only the code in the custom image is executed in the Ray node. No user code is sent or executed in the Ray node so only the function code read/write to this directory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users could in theory find a way around that restriction (if we misimplement something, CVS / exploits, etc.) so it's a risk, we'd just need to determine if it's an acceptable one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just add the path when a function is from a provider? That could solve part of the problem. And correct me if I'm wrong here @akihikokuroda but all the providers will write here so they could potentially see files from other providers with this approach. What is the problem to use the name of provider as sub-path instead of function_data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was that in that way we could attach the specific provider path only when it's a function from the specific provider (but I don't know if there are limitations in this approach).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought function-data was the provider space and data was for users ? since users would already go to data on their own functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the provider, it may be useful to have both while they develop the function because it is the same environment when the function is executed by the user.

Copy link
Member

@Tansito Tansito Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought function-data was the provider space and data was for users ?

yep

since users would already go to data on their own functions

basically that's my comment, for functions that come from the user we are adding /function_data too. It's true that /function_data and /data are going to point to user.username path so not a bid deal but probably we can just avoid add /function_data for user functions.

BTW, another answer that I would understand is: "well, we don't overcomplicate more the template this way" and I would agree. Yep.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Django template in helm template is very tricky :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hahaha I agree, I agree. I'm just reviewing the rest of the code, Aki and I will update my review 👍

{{- if .Values.useCertManager }}
- mountPath: /tmp/tls
name: cert-tls
Expand Down
17 changes: 10 additions & 7 deletions client/qiskit_serverless/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,22 +475,25 @@ def upload(self, program: QiskitFunction):
def get_jobs(self, **kwargs) -> List[Job]:
return self._job_client.list(**kwargs)

def files(self) -> List[str]:
return self._files_client.list()
def files(self, provider: Optional[str] = None) -> List[str]:
return self._files_client.list(provider)

def file_download(
self,
file: str,
target_name: Optional[str] = None,
download_location: str = "./",
provider: Optional[str] = None,
):
return self._files_client.download(file, download_location, target_name)
return self._files_client.download(
file, download_location, target_name, provider
)

def file_delete(self, file: str):
return self._files_client.delete(file)
def file_delete(self, file: str, provider: Optional[str] = None):
return self._files_client.delete(file, provider)

def file_upload(self, file: str):
return self._files_client.upload(file)
def file_upload(self, file: str, provider: Optional[str] = None):
return self._files_client.upload(file, provider)

def list(self, **kwargs) -> List[QiskitFunction]:
"""Returns list of available programs."""
Expand Down
18 changes: 12 additions & 6 deletions client/qiskit_serverless/core/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ def __init__(self, host: str, token: str, version: str):
self._token = token

def download(
self, file: str, download_location: str, target_name: Optional[str] = None
self,
file: str,
download_location: str,
target_name: Optional[str] = None,
provider: Optional[str] = None,
) -> Optional[str]:
"""Downloads file."""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("files.download"):
with requests.get(
f"{self.host}/api/{self.version}/files/download/",
params={"file": file},
params={"file": file, "provider": provider},
stream=True,
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
Expand All @@ -80,14 +84,15 @@ def download(
progress_bar.close()
return file_name

def upload(self, file: str) -> Optional[str]:
def upload(self, file: str, provider: Optional[str] = None) -> Optional[str]:
"""Uploads file."""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("files.upload"):
with open(file, "rb") as f:
with requests.post(
f"{self.host}/api/{self.version}/files/upload/",
files={"file": f},
data={"provider": provider},
stream=True,
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
Expand All @@ -97,27 +102,28 @@ def upload(self, file: str) -> Optional[str]:
return "Upload failed"
return "Can not open file"

def list(self) -> List[str]:
def list(self, provider: Optional[str] = None) -> List[str]:
"""Returns list of available files to download produced by programs,"""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("files.list"):
response_data = safe_json_request(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/files/",
params={"provider": provider},
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return response_data.get("results", [])

def delete(self, file: str) -> Optional[str]:
def delete(self, file: str, provider: Optional[str] = None) -> Optional[str]:
"""Deletes file uploaded or produced by the programs,"""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("files.delete"):
response_data = safe_json_request(
request=lambda: requests.delete(
f"{self.host}/api/{self.version}/files/delete/",
data={"file": file},
data={"file": file, "provider": provider},
headers={
"Authorization": f"Bearer {self._token}",
"format": "json",
Expand Down
6 changes: 5 additions & 1 deletion gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def submit_job(job: Job) -> Job:
return job


def create_ray_cluster(
def create_ray_cluster( # pylint: disable=too-many-branches
job: Job,
cluster_name: Optional[str] = None,
cluster_data: Optional[str] = None,
Expand Down Expand Up @@ -250,14 +250,18 @@ def create_ray_cluster(
node_image = settings.RAY_NODE_IMAGE

# if user specified image use specified image
function_data = user.username
if job.program.image is not None:
node_image = job.program.image
if job.program.provider.name:
function_data = job.program.provider.name

cluster = get_template("rayclustertemplate.yaml")
manifest = cluster.render(
{
"cluster_name": cluster_name,
"user_id": user.username,
"function_data": function_data,
"node_image": node_image,
"workers": job_config.workers,
"min_workers": job_config.min_workers,
Expand Down
63 changes: 56 additions & 7 deletions gateway/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Program,
Job,
RuntimeJob,
Provider,
)
from .ray import get_job_handler
from .serializers import (
Expand Down Expand Up @@ -542,15 +543,39 @@ class FilesViewSet(viewsets.ViewSet):

BASE_NAME = "files"

def list_user_providers(self, user):
"""list provider names that the user in"""
provider_list = []
providers = Provider.objects.all()
for instance in providers:
if instance.admin_group in user.groups.all():
provider_list.append(instance.name)
return provider_list

def check_user_has_provider(self, user, provider_name):
"""check if user has the provider"""
return provider_name in self.list_user_providers(user)

def list(self, request):
"""List of available for user files."""
response = Response(
{"message": "Requested file was not found."},
status=status.HTTP_404_NOT_FOUND,
)
files = []
tracer = trace.get_tracer("gateway.tracer")
ctx = TraceContextTextMapPropagator().extract(carrier=request.headers)
with tracer.start_as_current_span("gateway.files.list", context=ctx):
user_dir = request.user.username
provider_name = request.query_params.get("provider")
if provider_name is not None:
if self.check_user_has_provider(request.user, provider_name):
user_dir = provider_name
else:
return response
user_dir = os.path.join(
sanitize_file_path(settings.MEDIA_ROOT),
sanitize_file_path(request.user.username),
sanitize_file_path(user_dir),
)
if os.path.exists(user_dir):
files = [
Expand All @@ -577,17 +602,23 @@ def download(self, request): # pylint: disable=invalid-name
ctx = TraceContextTextMapPropagator().extract(carrier=request.headers)
with tracer.start_as_current_span("gateway.files.download", context=ctx):
requested_file_name = request.query_params.get("file")
provider_name = request.query_params.get("provider")
if requested_file_name is not None:
user_dir = request.user.username
if provider_name is not None:
if self.check_user_has_provider(request.user, provider_name):
user_dir = provider_name
else:
return response
# look for file in user's folder
filename = os.path.basename(requested_file_name)
user_dir = os.path.join(
sanitize_file_path(settings.MEDIA_ROOT),
sanitize_file_path(request.user.username),
sanitize_file_path(user_dir),
)
file_path = os.path.join(
sanitize_file_path(user_dir), sanitize_file_path(filename)
)

if os.path.exists(user_dir) and os.path.exists(file_path) and filename:
chunk_size = 8192
# note: we do not use with statements as Streaming response closing file itself.
Expand Down Expand Up @@ -618,14 +649,20 @@ def delete(self, request): # pylint: disable=invalid-name
if request.data and "file" in request.data:
# look for file in user's folder
filename = os.path.basename(request.data["file"])
provider_name = request.data.get("provider")
user_dir = request.user.username
if provider_name is not None:
if self.check_user_has_provider(request.user, provider_name):
user_dir = provider_name
else:
return response
user_dir = os.path.join(
sanitize_file_path(settings.MEDIA_ROOT),
sanitize_file_path(request.user.username),
sanitize_file_path(user_dir),
)
file_path = os.path.join(
sanitize_file_path(user_dir), sanitize_file_path(filename)
)

if os.path.exists(user_dir) and os.path.exists(file_path) and filename:
os.remove(file_path)
response = Response(
Expand All @@ -637,20 +674,32 @@ def delete(self, request): # pylint: disable=invalid-name
@action(methods=["POST"], detail=False)
def upload(self, request): # pylint: disable=invalid-name
"""Upload selected file."""
response = Response(
{"message": "Requested file was not found."},
status=status.HTTP_404_NOT_FOUND,
)
tracer = trace.get_tracer("gateway.tracer")
ctx = TraceContextTextMapPropagator().extract(carrier=request.headers)
with tracer.start_as_current_span("gateway.files.download", context=ctx):
upload_file = request.FILES["file"]
filename = os.path.basename(upload_file.name)
user_dir = request.user.username
if request.data and "provider" in request.data:
provider_name = request.data["provider"]
if provider_name is not None:
if self.check_user_has_provider(request.user, provider_name):
user_dir = provider_name
else:
return response
user_dir = os.path.join(
sanitize_file_path(settings.MEDIA_ROOT),
sanitize_file_path(request.user.username),
sanitize_file_path(user_dir),
)
file_path = os.path.join(
sanitize_file_path(user_dir), sanitize_file_path(filename)
)
with open(file_path, "wb+") as destination:
for chunk in upload_file.chunks():
destination.write(chunk)
return Response({"message": file_path})
return Response({"message": file_path})
return Response("server error", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
Loading
Loading