Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Celery healthcheck files #660

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion visyn_core/celery/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import logging.config
from pathlib import Path

from celery import Celery

Expand All @@ -18,9 +19,58 @@ def init_celery_manager(*, plugins: list[EntryPointPlugin]):
_log.warning("No Celery settings found in configuration, skipping Celery initialization")
return None

manager.celery = Celery("visyn", result_extended=True, **manager.settings.visyn_core.celery)
# Celery readiness checks are quite tricky, so we just create a file when the worker is ready and remove it when it shuts down.
# https://github.com/celery/celery/issues/4079#issuecomment-1270085680
if manager.settings.visyn_core.celery_readiness_file:
from celery.signals import (
worker_ready,
worker_shutdown,
)

_log.info("Setting up Celery readiness file")
readiness_file = Path(manager.settings.visyn_core.celery_readiness_file)

@worker_ready.connect
def readiness_on_worker_ready(**_):
_log.info("Worker is ready")
readiness_file.touch()

@worker_shutdown.connect
def readiness_on_worker_shutdown(**_):
_log.info("Worker is shutting down")
readiness_file.unlink(missing_ok=True)

_log.info("Initializing celery app")
manager.celery = Celery("visyn", result_extended=True, **manager.settings.visyn_core.celery)

if manager.settings.visyn_core.celery_liveness_file:
from celery import bootsteps

_log.info("Setting up Celery liveness file")
liveness_file = Path(manager.settings.visyn_core.celery_liveness_file)

class LivenessProbe(bootsteps.StartStopStep):
requires = ("celery.worker.components:Timer",)

def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None

def start(self, worker):
self.tref = worker.timer.call_repeatedly(
1.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, worker):
liveness_file.unlink(missing_ok=True)

def update_heartbeat_file(self, worker):
liveness_file.touch()

manager.celery.steps["worker"].add(LivenessProbe)

# Discover tasks from all plugins, i.e. visyn_core.tasks, visyn_plugin.tasks
manager.celery.autodiscover_tasks([p.id for p in plugins])
Expand Down
12 changes: 10 additions & 2 deletions visyn_core/settings/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,19 @@ class VisynCoreSettings(BaseModel):
"""
sentry: SentrySettings = SentrySettings()
"""
Settings for celery. If not set, celery will not be initialized.
Settings for Sentry. DSN will be shared via the client config.
"""
celery: dict[str, Any] | None = None
"""
Settings for Sentry. DSN will be shared via the client config.
Settings for celery. If not set, celery will not be initialized.
"""
celery_readiness_file: str | None = None
"""
Readiness file for Celery. If set, a file will be created when the worker is ready and removed when it shuts down.
"""
celery_liveness_file: str | None = None
"""
Heartbeat file for Celery. If set, a file will be created at an interval when the worker is ready.
"""
cypress: bool = False
"""
Expand Down
2 changes: 2 additions & 0 deletions visyn_core/tests/fixtures/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def workspace_config() -> dict:
return {
"visyn_core": {
"enabled_plugins": ["visyn_core"],
"celery_readiness_file": "./celery_ready",
"celery_liveness_file": "./celery_liveness",
"celery": {
"broker": "memory://localhost/",
"task_always_eager": True,
Expand Down
Loading