forked from codecov/worker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
133 lines (106 loc) · 3.88 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
import logging
import os
import sys
import typing
import click
from celery.signals import worker_process_shutdown
from prometheus_client import REGISTRY, CollectorRegistry, multiprocess
from shared.celery_config import BaseCeleryConfig
from shared.config import get_config
from shared.metrics import start_prometheus
from shared.storage.exceptions import BucketAlreadyExistsError
import app
from helpers.environment import get_external_dependencies_folder
from helpers.version import get_current_version
from services.storage import get_storage_client
log = logging.getLogger(__name__)
initialization_text = """
_____ _
/ ____| | |
| | ___ __| | ___ ___ _____ __
| | / _ \\ / _` |/ _ \\/ __/ _ \\ \\ / /
| |___| (_) | (_| | __/ (_| (_) \\ V /
\\_____\\___/ \\__,_|\\___|\\___\\___/ \\_/
{version}
"""
@click.group()
@click.pass_context
def cli(ctx: click.Context):
pass
@cli.command()
def test():
raise click.ClickException("System not suitable to run TEST mode")
@cli.command()
def web():
raise click.ClickException("System not suitable to run WEB mode")
@worker_process_shutdown.connect
def mark_process_dead(pid, exitcode, **kwargs):
multiprocess.mark_process_dead(pid)
def setup_worker():
print(initialization_text.format(version=get_current_version()))
if getattr(sys, "frozen", False):
# Only for enterprise builds
external_deps_folder = get_external_dependencies_folder()
log.info(f"External dependencies folder configured to {external_deps_folder}")
sys.path.append(external_deps_folder)
registry = REGISTRY
if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
start_prometheus(9996, registry=registry) # 9996 is an arbitrary port number
storage_client = get_storage_client()
minio_config = get_config("services", "minio")
bucket_name = get_config("services", "minio", "bucket", default="archive")
region = minio_config.get("region", "us-east-1")
try:
storage_client.create_root_storage(bucket_name, region)
log.info("Initializing bucket %s", bucket_name)
except BucketAlreadyExistsError:
pass
@cli.command()
@click.option("--name", envvar="HOSTNAME", default="worker", help="Node name")
@click.option(
"--concurrency", type=int, default=2, help="Number for celery concurrency"
)
@click.option("--debug", is_flag=True, default=False, help="Enable celery debug mode")
@click.option(
"--queue",
multiple=True,
default=["celery"],
help="Queues to listen to for this worker",
)
def worker(name, concurrency, debug, queue):
setup_worker()
args = [
"worker",
"-n",
name,
"-c",
concurrency,
"-l",
("debug" if debug else "info"),
]
if get_config("setup", "celery_queues_enabled", default=True):
actual_queues = _get_queues_param_from_queue_input(queue)
args += [
"-Q",
actual_queues,
]
if get_config("setup", "celery_beat_enabled", default=True):
args += ["-B", "-s", "/home/codecov/celerybeat-schedule"]
return app.celery_app.worker_main(argv=args)
def _get_queues_param_from_queue_input(queues: typing.List[str]) -> str:
# We always run the health_check queue to make sure the healthcheck is performed
# And also to avoid that queue fillign up with no workers to consume from it
# this should support if one wants to pass comma separated values
# since in the end all is joined again
joined_queues = ",".join(queues)
enterprise_queues = ["enterprise_" + q for q in joined_queues.split(",")]
return ",".join(
[joined_queues, *enterprise_queues, BaseCeleryConfig.health_check_default_queue]
)
def main():
cli(obj={})
if __name__ == "__main__":
main()