Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
fajpunk committed Dec 12, 2024
1 parent 1e726ab commit cc8eb92
Show file tree
Hide file tree
Showing 8 changed files with 596 additions and 571 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: check-toml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.0
rev: v0.8.3
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
409 changes: 211 additions & 198 deletions requirements/dev.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pyvo<1.6
pyyaml
rubin-nublado-client>=8.0.3
safir>=6.5.1
sentry-sdk
shortuuid
structlog
websockets
Expand Down
589 changes: 305 additions & 284 deletions requirements/main.txt

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/mobu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import timedelta
from importlib.metadata import metadata, version

import sentry_sdk
import structlog
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
Expand All @@ -35,6 +36,8 @@
from .handlers.internal import internal_router
from .status import post_status

sentry_sdk.init(traces_sample_rate=0)

__all__ = ["create_app", "lifespan"]


Expand Down
42 changes: 28 additions & 14 deletions src/mobu/services/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import shutil
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import asdict
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any

import sentry_sdk
import yaml
from httpx import AsyncClient
from rubin.nublado.client import JupyterLabSession
Expand All @@ -23,7 +25,7 @@

from ...constants import GITHUB_REPO_CONFIG_PATH
from ...dependencies.config import config_dependency
from ...exceptions import NotebookRepositoryError, RepositoryConfigError
from ...exceptions import NotebookRepositoryError
from ...models.business.notebookrunner import (
ListNotebookRunnerOptions,
NotebookFilterResults,
Expand Down Expand Up @@ -108,18 +110,22 @@ async def initialize(self) -> None:

repo_config_path = self._repo_dir / GITHUB_REPO_CONFIG_PATH
if repo_config_path.exists():
try:
repo_config = RepoConfig.model_validate(
yaml.safe_load(repo_config_path.read_text())
)
except Exception as err:
raise RepositoryConfigError(
err=err,
user=self.user.username,
config_file=GITHUB_REPO_CONFIG_PATH,
repo_url=self.options.repo_url,
repo_ref=self.options.repo_ref,
) from err
sentry_sdk.set_context(
"repo_info",
{
"repo_url": self.options.repo_url,
"repo_ref": self.options.repo_ref,
},
)
sentry_sdk.set_tag("repo_url", self.options.repo_url)
sentry_sdk.set_tag("repo_ref", self.options.repo_ref)
sentry_sdk.set_context(
"repo_info", {"config_file": GITHUB_REPO_CONFIG_PATH}
)

repo_config = RepoConfig.model_validate(
yaml.safe_load(repo_config_path.read_text())
)
else:
repo_config = RepoConfig()

Expand Down Expand Up @@ -290,6 +296,7 @@ async def execute_code(self, session: JupyterLabSession) -> None:
return

self._notebook = self.next_notebook()
sentry_sdk.set_tag("notebook", self._notebook.name)

iteration = f"{count + 1}/{num_executions}"
msg = f"Notebook {self._notebook.name} iteration {iteration}"
Expand All @@ -305,7 +312,14 @@ async def execute_code(self, session: JupyterLabSession) -> None:
cell_number=f"#{cell['_index']}",
cell_source=code,
)
await self.execute_cell(session, code, cell_id, ctx)
sentry_sdk.set_tag("cell", cell_id)
sentry_sdk.set_context("cell_info", asdict(ctx))
try:
await self.execute_cell(session, code, cell_id, ctx)
except Exception as e:
if error := getattr(e, "error", None):
sentry_sdk.set_context("error_info", {"error": error})
raise
if not await self.execution_idle():
break

Expand Down
28 changes: 12 additions & 16 deletions src/mobu/services/business/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

import pyvo
import requests
import sentry_sdk
from httpx import AsyncClient
from structlog.stdlib import BoundLogger

from ...dependencies.config import config_dependency
from ...exceptions import CodeExecutionError, TAPClientError
from ...exceptions import TAPClientError
from ...models.business.tap import TAPBusinessData, TAPBusinessOptions
from ...models.user import AuthenticatedUser
from .base import Business
Expand Down Expand Up @@ -73,21 +74,16 @@ async def execute(self) -> None:
query = self.get_next_query()
with self.timings.start("execute_query", {"query": query}) as sw:
self._running_query = query

try:
if self.options.sync:
await self.run_sync_query(query)
else:
await self.run_async_query(query)
except Exception as e:
raise CodeExecutionError(
user=self.user.username,
code=query,
code_type="TAP query",
event="execute_query",
started_at=sw.start_time,
error=f"{type(e).__name__}: {e!s}",
) from e
sentry_sdk.set_tag("event", "execute_query")
sentry_sdk.set_tag("sync", self.options.sync)
sentry_sdk.set_context(
"query_info", {"query": query, "started_at": sw.start_time}
)

if self.options.sync:
await self.run_sync_query(query)
else:
await self.run_async_query(query)

self._running_query = None
elapsed = sw.elapsed.total_seconds()
Expand Down
93 changes: 35 additions & 58 deletions src/mobu/services/monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
import sys
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper

import sentry_sdk
import structlog
from aiojobs import Job, Scheduler
from httpx import AsyncClient
from safir.datetime import current_datetime, format_datetime_for_logging
from safir.logging import Profile
from safir.slack.blockkit import SlackException, SlackMessage, SlackTextField
from safir.slack.webhook import SlackWebhookClient
from structlog.stdlib import BoundLogger

Expand Down Expand Up @@ -135,36 +134,7 @@ async def alert(self, exc: Exception) -> None:
state = self._state.name
self._logger.info(f"Not sending alert because state is {state}")
return
if not self._slack:
self._logger.info("Alert hook isn't set, so not sending to Slack")
return
monkey = f"{self._flock}/{self._name}" if self._flock else self._name
if isinstance(exc, MobuMixin):
# Add the monkey info if it is not already set.
if not exc.monkey:
exc.monkey = monkey
if isinstance(exc, SlackException):
# Avoid post_exception here since it adds the application name,
# but mobu (unusually) uses a dedicated web hook and therefore
# doesn't need to label its alerts.
await self._slack.post(exc.to_slack())
else:
now = current_datetime(microseconds=True)
date = format_datetime_for_logging(now)
name = type(exc).__name__
error = f"{name}: {exc!s}"
message = SlackMessage(
message=f"Unexpected exception {error}",
fields=[
SlackTextField(heading="Exception type", text=name),
SlackTextField(heading="Failed at", text=date),
SlackTextField(heading="Monkey", text=monkey),
SlackTextField(heading="User", text=self._user.username),
],
)
await self._slack.post(message)

self._global_logger.info("Sent alert to Slack")
sentry_sdk.capture_exception(exc)

def logfile(self) -> str:
"""Get the log file for a monkey's log."""
Expand Down Expand Up @@ -205,32 +175,39 @@ async def _runner(self) -> None:
run = True

while run:
try:
self._state = MonkeyState.RUNNING
await self.business.run()
run = False
except Exception as e:
msg = "Exception thrown while doing monkey business"
if self._flock:
monkey = f"{self._flock}/{self._name}"
else:
monkey = self._name
if isinstance(e, MobuMixin):
e.monkey = monkey

await self.alert(e)
self._logger.exception(msg)

run = self._restart and self._state == MonkeyState.RUNNING
if run:
self._state = MonkeyState.ERROR
await self.business.error_idle()
if self._state == MonkeyState.STOPPING:
run = False
else:
self._state = MonkeyState.STOPPING
msg = "Shutting down monkey due to error"
self._global_logger.warning(msg)
with sentry_sdk.isolation_scope() as scope:
scope.clear_breadcrumbs()
scope.set_tag("flock", self._flock)
scope.set_user({"username": self._user.username})
scope.set_tag("monkey", self._name)
scope.set_tag("business", self.business.__class__.__name__)

try:
self._state = MonkeyState.RUNNING
await self.business.run()
run = False
except Exception as e:
msg = "Exception thrown while doing monkey business"
if self._flock:
monkey = f"{self._flock}/{self._name}"
else:
monkey = self._name
if isinstance(e, MobuMixin):
e.monkey = monkey

await self.alert(e)
self._logger.exception(msg)

run = self._restart and self._state == MonkeyState.RUNNING
if run:
self._state = MonkeyState.ERROR
await self.business.error_idle()
if self._state == MonkeyState.STOPPING:
run = False
else:
self._state = MonkeyState.STOPPING
msg = "Shutting down monkey due to error"
self._global_logger.warning(msg)

await self.business.close()
self.state = MonkeyState.FINISHED
Expand Down

0 comments on commit cc8eb92

Please sign in to comment.