-
Notifications
You must be signed in to change notification settings - Fork 10
/
main.py
154 lines (123 loc) · 4.87 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import django
# we're moving this before we create the Celery object
# so that celery can detect Django is being used
# using the Django fixup will help fix some database issues
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_scaffold.settings")
django.setup()
# -*- coding: utf-8 -*-
import logging # noqa: E402
import os # noqa: E402
import sys # noqa: E402
import typing # noqa: E402
import click # noqa: E402
import shared.storage # noqa: E402
from celery.signals import worker_process_shutdown # noqa: E402
from prometheus_client import REGISTRY, CollectorRegistry, multiprocess # noqa: E402
from shared.celery_config import BaseCeleryConfig # noqa: E402
from shared.config import get_config # noqa: E402
from shared.license import startup_license_logging # noqa: E402
from shared.metrics import start_prometheus # noqa: E402
from shared.storage.exceptions import BucketAlreadyExistsError # noqa: E402
import app # noqa: E402
from helpers.environment import get_external_dependencies_folder # noqa: E402
from helpers.version import get_current_version # noqa: E402
log = logging.getLogger(__name__)
initialization_text = """
_____ _
/ ____| | |
| | ___ __| | ___ ___ _____ __
| | / _ \\ / _` |/ _ \\/ __/ _ \\ \\ / /
| |___| (_) | (_| | __/ (_| (_) \\ V /
\\_____\\___/ \\__,_|\\___|\\___\\___/ \\_/
{version}
"""
@click.group()
@click.pass_context
def cli(ctx: click.Context):
pass
@cli.command()
def test():
raise click.ClickException("System not suitable to run TEST mode")
@cli.command()
def web():
raise click.ClickException("System not suitable to run WEB mode")
@worker_process_shutdown.connect
def mark_process_dead(pid, exitcode, **kwargs):
multiprocess.mark_process_dead(pid)
def setup_worker():
print(initialization_text.format(version=get_current_version()))
if getattr(sys, "frozen", False):
# Only for enterprise builds
external_deps_folder = get_external_dependencies_folder()
log.info(f"External dependencies folder configured to {external_deps_folder}")
sys.path.append(external_deps_folder)
registry = REGISTRY
if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
start_prometheus(9996, registry=registry) # 9996 is an arbitrary port number
# this storage client is only used to create the bucket so it doesn't need to be
# aware of the repoid
storage_client = shared.storage.get_appropriate_storage_service()
minio_config = get_config("services", "minio")
bucket_name = get_config("services", "minio", "bucket", default="archive")
auto_create_bucket = get_config(
"services", "minio", "auto_create_bucket", default=False
)
region = minio_config.get("region", "us-east-1")
try:
# note that this is a departure from the old default behavior.
# This is intended as the bucket will exist in most cases where IAC or manual setup is used
if auto_create_bucket:
storage_client.create_root_storage(bucket_name, region)
log.info("Initializing bucket %s", bucket_name)
except BucketAlreadyExistsError:
pass
startup_license_logging()
@cli.command()
@click.option("--name", envvar="HOSTNAME", default="worker", help="Node name")
@click.option(
"--concurrency", type=int, default=2, help="Number for celery concurrency"
)
@click.option("--debug", is_flag=True, default=False, help="Enable celery debug mode")
@click.option(
"--queue",
multiple=True,
default=["celery"],
help="Queues to listen to for this worker",
)
def worker(name, concurrency, debug, queue):
setup_worker()
args = [
"worker",
"-n",
name,
"-c",
concurrency,
"-l",
("debug" if debug else "info"),
]
if get_config("setup", "celery_queues_enabled", default=True):
actual_queues = _get_queues_param_from_queue_input(queue)
args += [
"-Q",
actual_queues,
]
if get_config("setup", "celery_beat_enabled", default=True):
args += ["-B", "-s", "/home/codecov/celerybeat-schedule"]
return app.celery_app.worker_main(argv=args)
def _get_queues_param_from_queue_input(queues: typing.List[str]) -> str:
# We always run the health_check queue to make sure the healthcheck is performed
# And also to avoid that queue fillign up with no workers to consume from it
# this should support if one wants to pass comma separated values
# since in the end all is joined again
joined_queues = ",".join(queues)
enterprise_queues = ["enterprise_" + q for q in joined_queues.split(",")]
return ",".join(
[joined_queues, *enterprise_queues, BaseCeleryConfig.health_check_default_queue]
)
def main():
cli(obj={})
if __name__ == "__main__":
main()