-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrunner.py
More file actions
147 lines (125 loc) · 5.17 KB
/
runner.py
File metadata and controls
147 lines (125 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import sys
import time
import signal
import subprocess
from typing import Dict, Tuple, Callable, Optional
from .types import Logger, QueueName, WorkerNumber
from .utils import get_backend, set_process_title
from .exposition import metrics_http_server
from .app_settings import app_settings
from .machine_types import Machine
from .cron_scheduler import (
CronScheduler,
get_cron_config,
ensure_queue_workers_for_config,
)
def runner(
touch_filename_fn: Callable[[QueueName], Optional[str]],
machine: Machine,
logger: Logger,
) -> None:
set_process_title("Master process")
if machine.configure_cron:
# Load the cron scheduling configuration and setup the worker numbers for it,
# even if we're not running cronjobs, as it changes the queue count.
cron_config = get_cron_config()
ensure_queue_workers_for_config(cron_config)
running = True
# Some backends may require on-startup logic per-queue, initialise a dummy
# backend per queue to do so. Note: we need to do this after any potential
# calls to `ensure_queue_workers_for_config` so that all the workers
# (including the implicit cron ones) have been configured.
queues_to_startup = set(queue for queue, _ in machine.worker_names)
for queue in queues_to_startup:
logger.debug("Running startup for queue {}".format(queue))
backend = get_backend(queue)
backend.startup(queue)
# Note: we deliberately configure our handling of SIGTERM _after_ the
# startup processes have happened; this ensures that the startup processes
# (which could take a long time) are naturally interupted by the signal.
def handle_term(signum: int, stack: object) -> None:
nonlocal running
logger.debug("Caught TERM signal")
set_process_title("Master process exiting")
running = False
signal.signal(signal.SIGTERM, handle_term)
if machine.run_cron:
# Load the cron scheduling configuration explicitly, to account for the
# case where we want to run the cron but not configure it. This can
# happen if our caller has already done the configuration.
cron_config = get_cron_config()
cron_scheduler = CronScheduler(cron_config)
cron_scheduler.start()
workers = {
x: (None, "{}/{}".format(*x))
for x in machine.worker_names
} # type: Dict[Tuple[QueueName, WorkerNumber], Tuple[Optional[subprocess.Popen[bytes]], str]]
if app_settings.ENABLE_PROMETHEUS:
metrics_server = metrics_http_server(machine.worker_names)
metrics_server.start()
while running:
for index, (queue, worker_num) in enumerate(machine.worker_names, start=1):
worker, worker_name = workers[(queue, worker_num)]
# Ensure that all workers are now running (idempotent)
if worker is None or worker.poll() is not None:
if worker is None:
logger.info(
"Starting worker #{} for {} ({})".format(
worker_num,
queue,
worker_name,
),
extra={
'worker': worker_num,
'queue': queue,
},
)
else:
logger.info(
"Starting missing worker {} (exit code was: {})".format(
worker_name,
worker.returncode,
),
extra={
'worker': worker_num,
'queue': queue,
'exit_code': worker.returncode,
},
)
args = [
sys.executable,
# manage.py
sys.argv[0],
'queue_worker',
queue,
str(worker_num),
'--prometheus-port',
str(app_settings.PROMETHEUS_START_PORT + index),
]
touch_filename = touch_filename_fn(queue)
if touch_filename is not None:
args.extend([
'--touch-file',
touch_filename,
])
worker = subprocess.Popen(args)
workers[(queue, worker_num)] = (worker, worker_name)
time.sleep(1)
def signal_workers(signum: int) -> None:
for worker, _ in workers.values():
if worker is None:
continue
try:
worker.send_signal(signum)
except OSError:
pass
# SIGUSR2 all the workers. This sets a flag asking them to shut down
# gracefully, or kills them immediately if they are receptive to that
# sort of abuse.
signal_workers(signal.SIGUSR2)
for worker, worker_name in workers.values():
if worker is None:
continue
logger.info("Waiting for {} to terminate".format(worker_name))
worker.wait()
logger.info("All processes finished")