Skip to content

Commit

Permalink
Fix artifact unload and load
Browse files Browse the repository at this point in the history
  • Loading branch information
oeway committed Dec 6, 2024
1 parent 3b8f732 commit 867be63
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 164 deletions.
7 changes: 6 additions & 1 deletion hypha/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ async def get_runners(self):
# start the browser runner
server = await self.store.get_public_api()
svcs = await server.list_services("public/server-app-worker")
if not svcs:
return []
runners = [await server.get_service(svc["id"]) for svc in svcs]
if runners:
return runners
Expand Down Expand Up @@ -633,7 +635,10 @@ async def close_workspace(self, workspace_info: WorkspaceInfo):
if app["workspace"] == workspace_info.id:
await self._stop(app["id"], raise_exception=False)
# Send to all runners
for runner in await self.get_runners():
runners = await self.get_runners()
if not runners:
return
for runner in runners:
try:
await runner.close_workspace(workspace_info.id)
except Exception as exp:
Expand Down
10 changes: 7 additions & 3 deletions hypha/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2776,7 +2776,9 @@ async def prepare_workspace(self, workspace_info: WorkspaceInfo):
bucket=s3_config["bucket"],
prefix=prefix,
)
logger.info(f"Artifacts in workspace {workspace_info.id} prepared.")
logger.info(
f"Artifacts (#{len(artifacts)}) in workspace {workspace_info.id} prepared."
)
except Exception as e:
logger.error(f"Error preparing workspace: {traceback.format_exc()}")
raise e
Expand Down Expand Up @@ -2818,10 +2820,12 @@ async def close_workspace(self, workspace_info: WorkspaceInfo):
f"{artifact.workspace}/{artifact.alias}"
)
logger.info(
f"Artifacts in workspace {workspace_info.id} prepared for closure."
f"Artifacts (#{len(artifacts)}) in workspace {workspace_info.id} prepared for closure."
)
except Exception as e:
logger.error(f"Error closing workspace: {traceback.format_exc()}")
logger.error(
f"Error closing workspace {workspace_info.id}: {traceback.format_exc()}"
)
raise e
finally:
await session.close()
Expand Down
30 changes: 18 additions & 12 deletions hypha/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -1754,6 +1754,8 @@ async def unload(self, context=None):
"""Unload the workspace."""
self.validate_context(context, permission=UserPermission.admin)
ws = context["ws"]
if not await self._redis.hexists("workspaces", ws):
raise KeyError(f"Workspace {ws} has already been unloaded.")
winfo = await self.load_workspace_info(ws)
# list all the clients in the workspace and send a meesage to delete them
client_keys = await self._list_client_keys(winfo.id)
Expand All @@ -1770,16 +1772,13 @@ async def unload(self, context=None):

# Mark the workspace as not ready
winfo.status = None
if winfo.persistent and self._s3_controller:
# since the workspace will be persisted, we can remove the workspace info from the redis store
await self._redis.hdel("workspaces", ws)

elif not winfo.persistent:
if not winfo.persistent:
# delete all the items in redis starting with `workspaces_name:`
# Including the queue and other associated resources
keys = await self._redis.keys(f"{ws}:*")
for key in keys:
await self._redis.delete(key)
await self._redis.hdel("workspaces", ws)
if self._s3_controller:
await self._s3_controller.cleanup_workspace(winfo)

Expand All @@ -1789,11 +1788,8 @@ async def unload(self, context=None):
self._active_svc.remove(ws)
except KeyError:
pass

await self._close_workspace(winfo)

await self._event_bus.emit("workspace_unloaded", winfo.model_dump())
logger.info("Workspace %s unloaded.", ws)
await self._redis.hdel("workspaces", ws)

async def _prepare_workspace(self, workspace_info: WorkspaceInfo):
"""Prepare the workspace."""
Expand Down Expand Up @@ -1824,10 +1820,20 @@ async def _close_workspace(self, workspace_info: WorkspaceInfo):
), "Workspace must be unloaded before archiving."
if workspace_info.persistent:
if self._artifact_manager:
await self._artifact_manager.close_workspace(workspace_info)
try:
await self._artifact_manager.close_workspace(workspace_info)
except Exception as e:
logger.error(f"Aritfact manager failed to close workspace: {e}")
if self._server_app_controller:
await self._server_app_controller.close_workspace(workspace_info)
logger.info("Workspace %s archived.", workspace_info.id)
try:
await self._server_app_controller.close_workspace(workspace_info)
except Exception as e:
logger.error(
f"Server app controller failed to close workspace: {e}"
)

await self._event_bus.emit("workspace_unloaded", workspace_info.model_dump())
logger.info("Workspace %s unloaded.", workspace_info.id)

@schema_method
async def wait_until_ready(self, timeout: Optional[int] = 10, context=None):
Expand Down
15 changes: 0 additions & 15 deletions hypha/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,6 @@ def create_queue_service(store: RedisStore):
"""Create a queue service for Hypha."""
redis: aioredis.FakeRedis = store.get_redis()

event_bus = store.get_event_bus()

async def on_workspace_unloaded(workspace):
# delete all the keys that start with workspace["name"] + ":q:"
keys_pattern = workspace["name"] + ":q:*"
cursor = "0"
while cursor != 0:
cursor, keys = await redis.scan(cursor=cursor, match=keys_pattern)
if keys:
await redis.delete(*keys)
if cursor != "0":
logger.info("Removed queue keys for workspace: %s", workspace["name"])

event_bus.on_local("workspace_unloaded", on_workspace_unloaded)

async def push_task(queue_name, task: dict, context: dict = None):
workspace = context["ws"]
await redis.lpush(workspace + ":q:" + queue_name, json.dumps(task))
Expand Down
20 changes: 15 additions & 5 deletions hypha/vectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ def _get_index_name(self, collection_name: str) -> str:

async def _get_fields(self, collection_name: str):
index_name = self._get_index_name(collection_name)
info = await self._redis.ft(index_name).info()
try:
info = await self._redis.ft(index_name).info()
except aioredis.ResponseError:
raise KeyError(f"Vector collection {collection_name} does not exist.")
fields = parse_attributes(info["attributes"])
return fields

Expand Down Expand Up @@ -318,10 +321,17 @@ async def load_collection(
if isinstance(value, list):
vector_data[key] = np.array(value, dtype=np.float32)

# Add vector to Redis
await self.add_vectors(
collection_name, [{"id": obj["name"].split(".")[0], **vector_data}]
)
try:
# Add vector to Redis
await self.add_vectors(
collection_name,
[{"id": obj["name"].split(".")[0], **vector_data}],
)
except Exception as e:
logger.error(
f"Failed to load vector {obj['name']} from S3 bucket {bucket} under prefix {prefix}: {e}"
)
raise e

logger.info(
f"Collection {collection_name} loaded from S3 bucket {bucket} under prefix {prefix}."
Expand Down
Loading

0 comments on commit 867be63

Please sign in to comment.