Skip to content

Commit cfee9d6

Browse files
authored
Merge branch 'check-services' of 'https://github.com/jjmerchante/grimoirelab-core'
Merges #38 Closes #38 Fixes chaoss/grimoirelab#740
2 parents 5a838bc + fc680d8 commit cfee9d6

File tree

2 files changed

+108
-26
lines changed

2 files changed

+108
-26
lines changed

src/grimoirelab/core/runner/commands/run.py

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import annotations
2020

2121
import logging
22+
import multiprocessing
2223
import os
2324
import time
2425
import typing
@@ -32,6 +33,7 @@
3233
import redis
3334

3435
from django.conf import settings
36+
from django.db import connections, OperationalError
3537
from urllib3.util import create_urllib3_context
3638

3739
if typing.TYPE_CHECKING:
@@ -57,9 +59,15 @@ def run(ctx: Context):
5759
default=False,
5860
help="Run the service in developer mode.",
5961
)
62+
@click.option(
63+
"--maintenance-interval",
64+
default=300,
65+
show_default=True,
66+
help="Interval in seconds to run maintenance tasks.",
67+
)
6068
@run.command()
6169
@click.pass_context
62-
def server(ctx: Context, devel: bool):
70+
def server(ctx: Context, devel: bool, maintenance_interval: int):
6371
"""Start the GrimoireLab core server.
6472
6573
GrimoireLab server allows to schedule tasks and fetch data from
@@ -69,7 +77,14 @@ def server(ctx: Context, devel: bool):
6977
By default, the server runs a WSGI app because in production it
7078
should be run with a reverse proxy. If you activate the '--dev' flag,
7179
a HTTP server will be run instead.
80+
81+
The server also runs maintenance tasks in the background every
82+
defined interval (default is 60 seconds). These tasks include
83+
rescheduling failed tasks and cleaning old jobs.
7284
"""
85+
_wait_database_ready()
86+
_wait_redis_ready()
87+
7388
env = os.environ
7489

7590
env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
@@ -94,17 +109,49 @@ def server(ctx: Context, devel: bool):
94109
env["UWSGI_LAZY_APPS"] = "true"
95110
env["UWSGI_SINGLE_INTERPRETER"] = "true"
96111

97-
# Run maintenance tasks
98-
from grimoirelab.core.scheduler.scheduler import maintain_tasks
99-
100-
_ = django.core.wsgi.get_wsgi_application()
101-
maintain_tasks()
112+
# Run maintenance tasks in the background
113+
_maintenance_process(maintenance_interval)
102114

103115
# Run the server
104116
os.execvp("uwsgi", ("uwsgi",))
105117

106118

107-
def worker_options(workers=5, verbose=False, burst=False):
119+
def periodic_maintain_tasks(interval):
120+
from grimoirelab.core.scheduler.scheduler import maintain_tasks
121+
122+
while True:
123+
try:
124+
maintain_tasks()
125+
logging.info("Maintenance tasks executed successfully.")
126+
except redis.exceptions.ConnectionError as e:
127+
logging.error(f"Redis connection error during maintenance tasks: {e}")
128+
except django.db.utils.OperationalError as e:
129+
logging.error(f"Database connection error during maintenance tasks: {e}")
130+
connections.close_all()
131+
except Exception as e:
132+
logging.error("Error during maintenance tasks: %s", e)
133+
raise
134+
except KeyboardInterrupt:
135+
logging.info("Maintenance task interrupted. Exiting...")
136+
return
137+
138+
time.sleep(interval)
139+
140+
141+
def _maintenance_process(maintenance_interval):
142+
"""Process to run maintenance tasks periodically."""
143+
144+
process = multiprocessing.Process(
145+
target=periodic_maintain_tasks, args=(maintenance_interval,), daemon=True
146+
)
147+
process.start()
148+
149+
logging.info("Started maintenance process with PID %s", process.pid)
150+
151+
return process
152+
153+
154+
def worker_options(workers: int = 5, verbose: bool = False, burst: bool = False):
108155
"""Decorator to add common worker options to commands."""
109156

110157
def decorator(f):
@@ -146,6 +193,9 @@ def eventizers(workers: int, verbose: bool, burst: bool):
146193
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
147194
in the configuration file.
148195
"""
196+
_wait_redis_ready()
197+
_wait_database_ready()
198+
149199
django.core.management.call_command(
150200
"rqworker-pool",
151201
settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
@@ -162,7 +212,9 @@ def _sleep_backoff(attempt: int) -> None:
162212
time.sleep(backoff)
163213

164214

165-
def _wait_opensearch_ready(url, username, password, index, verify_certs) -> None:
215+
def _wait_opensearch_ready(
216+
url: str, username: str | None, password: str | None, index: str, verify_certs: bool
217+
) -> None:
166218
"""Wait for OpenSearch to be available before starting"""
167219

168220
os_logger = logging.getLogger("opensearch")
@@ -175,10 +227,7 @@ def _wait_opensearch_ready(url, username, password, index, verify_certs) -> None
175227
context.load_default_certs()
176228
context.load_verify_locations(certifi.where())
177229

178-
if username and password:
179-
auth = (username, password)
180-
else:
181-
auth = None
230+
auth = (username, password) if username and password else None
182231

183232
for attempt in range(DEFAULT_MAX_RETRIES):
184233
try:
@@ -196,11 +245,14 @@ def _wait_opensearch_ready(url, username, password, index, verify_certs) -> None
196245
# Index still not created, but OpenSearch is up
197246
break
198247
except opensearchpy.exceptions.AuthorizationException:
199-
logging.error("Authorization failed. Check your credentials.")
248+
logging.error("OpenSearch Authorization failed. Check your credentials.")
200249
exit(1)
201-
except (opensearchpy.exceptions.ConnectionError, opensearchpy.exceptions.TransportError):
250+
except (
251+
opensearchpy.exceptions.ConnectionError,
252+
opensearchpy.exceptions.TransportError,
253+
) as e:
202254
logging.warning(
203-
f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] OpenSearch connection not ready."
255+
f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] OpenSearch connection not ready: {e}"
204256
)
205257
_sleep_backoff(attempt)
206258

@@ -233,6 +285,33 @@ def _wait_redis_ready():
233285
logging.info("Redis is ready")
234286

235287

288+
def _wait_database_ready():
289+
"""Wait for the database to be available before starting."""
290+
291+
for attempt in range(DEFAULT_MAX_RETRIES):
292+
try:
293+
db_conn = connections["default"]
294+
if db_conn:
295+
with db_conn.cursor():
296+
pass # Just test the connection
297+
break
298+
299+
except OperationalError as e:
300+
logging.warning(
301+
f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] Database connection not ready: {e.__cause__}"
302+
)
303+
_sleep_backoff(attempt)
304+
else:
305+
error_msg = "Failed to connect to the database after all retries"
306+
logging.error(error_msg)
307+
raise ConnectionError(error_msg)
308+
309+
logging.info("Database is ready.")
310+
311+
# Close all database connections to avoid timed out connections
312+
connections.close_all()
313+
314+
236315
@run.command()
237316
@worker_options(workers=20)
238317
def archivists(workers: int, verbose: bool, burst: bool):
@@ -295,6 +374,7 @@ def ushers(workers: int, verbose: bool, burst: bool):
295374
"""
296375
from grimoirelab.core.consumers.identities import SortingHatConsumerPool
297376

377+
_wait_database_ready()
298378
_wait_redis_ready()
299379

300380
pool = SortingHatConsumerPool(

src/grimoirelab/core/scheduler/scheduler.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,21 @@ def maintain_tasks() -> None:
164164
This function will check the status of the tasks and jobs
165165
that are scheduled, rescheduling them if necessary.
166166
"""
167-
tasks = find_tasks_by_status(
168-
[
169-
SchedulerStatus.RUNNING,
170-
SchedulerStatus.RECOVERY,
171-
SchedulerStatus.ENQUEUED,
172-
SchedulerStatus.NEW,
173-
]
174-
)
167+
active_status = [
168+
SchedulerStatus.RUNNING,
169+
SchedulerStatus.RECOVERY,
170+
SchedulerStatus.ENQUEUED,
171+
SchedulerStatus.NEW,
172+
]
173+
tasks = find_tasks_by_status(active_status)
175174

176175
for task in tasks:
177-
job_db = task.jobs.order_by("-scheduled_at").first()
176+
job_db = task.jobs.filter(status__in=active_status).order_by("-scheduled_at").first()
178177

179178
if not _is_job_removed_or_stopped(job_db, task.default_job_queue):
180179
continue
181180

182-
logger.debug(f"Job #{job_db.job_id} in queue (task: {task.task_id}) stopped. Rescheduling.")
181+
logger.info(f"Job #{job_db.job_id} in queue (task: {task.task_id}) stopped. Rescheduling.")
183182

184183
current_time = datetime_utcnow()
185184
scheduled_at = max(task.scheduled_at, current_time)
@@ -203,7 +202,10 @@ def _is_job_removed_or_stopped(job: Job, queue: str) -> bool:
203202
# Sometimes, the worker may be forcibly stopped, leaving the job
204203
# in the STARTED status. We need to check if the job has expired
205204
# due to a missing heartbeat.
206-
expiration_date = StartedJobRegistry(queue, connection).get_expiration_time(job_rq)
205+
try:
206+
expiration_date = StartedJobRegistry(queue, connection).get_expiration_time(job_rq)
207+
except TypeError:
208+
return True
207209
return expiration_date < datetime.datetime.now()
208210
else:
209211
return status in RQ_JOB_STOPPED_STATUS

0 commit comments

Comments
 (0)