Skip to content

Commit 35c09b9

Browse files
committed
[core] Modify logs messages to be structured
Some messages were updated to follow the structured format. Signed-off-by: Santiago Dueñas <[email protected]>
1 parent 2ed0411 commit 35c09b9

File tree

6 files changed

+105
-40
lines changed

6 files changed

+105
-40
lines changed

src/grimoirelab/core/consumers/consumer.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from multiprocessing import Event as ProcessEvent
2828

2929
import redis
30+
import structlog
3031

3132
if typing.TYPE_CHECKING:
3233
from typing import Iterable
@@ -170,7 +171,7 @@ def fetch_new_entries(self) -> Iterable[Entry]:
170171
def recover_stream_entries(self, recover_idle_time: int = RECOVER_IDLE_TIME) -> Iterable[Entry]:
171172
"""Transfers ownership of pending entries idle for 'recover_idle_time'."""
172173

173-
self.logger.debug(f"Recovering events from '{self.stream_name}:{self.consumer_group}'")
174+
self.logger.debug("Recovering events", stream=self.stream_name, consumer_group=self.consumer_group)
174175

175176
while True:
176177
response = self.connection.xautoclaim(
@@ -199,6 +200,8 @@ def recover_stream_entries(self, recover_idle_time: int = RECOVER_IDLE_TIME) ->
199200
if self._stop_event.is_set():
200201
break
201202

203+
self.logger.info("events recovered", stream=self.stream_name, consumer_group=self.consumer_group)
204+
202205
def process_entries(self, entries: Iterable[Entry], recovery: bool = False):
203206
"""Process entries (implement this method in subclasses).
204207
@@ -222,12 +225,11 @@ def ack_entries(self, message_ids: list):
222225
def stop(self):
223226
"""Stop the consumer gracefully."""
224227

225-
self.logger.info(f"Stopping consumer '{self.consumer_name}'.")
226-
227228
self._stop_event.set()
229+
self.logger.info("consumer stopped", consumer=self.consumer_name)
228230

229231
def _create_logger(self):
230-
logger = logging.getLogger(name=f"{self.__class__.__name__}")
232+
logger = structlog.get_logger(self.__class__.__name__)
231233
logger.setLevel(self.logging_level)
232234
return logger
233235

src/grimoirelab/core/consumers/consumer_pool.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@
2727

2828
import redis
2929
from rq.connections import parse_connection
30+
import structlog
3031

3132
from .consumer import Consumer
3233

3334

3435
ConsumerData = namedtuple("ConsumerData", ["name", "pid", "process"])
3536

3637

38+
logger = structlog.get_logger(__name__)
39+
40+
3741
class ConsumerPool:
3842
"""Base class to create a pool of consumers.
3943
@@ -198,7 +202,7 @@ def cleanup_consumers(self):
198202
self._consumers.pop(name)
199203

200204
def _create_logger(self):
201-
logger = logging.getLogger(self.__class__.__name__)
205+
logger = structlog.get_logger(self.__class__.__name__)
202206
logger.setLevel(self.log_level)
203207
return logger
204208

@@ -266,4 +270,4 @@ def _run_consumer(
266270
consumer = consumer_class(connection=connection, *args, **kwargs)
267271
consumer.start(burst=burst)
268272
except Exception as e:
269-
logging.error(f"Consumer {consumer_class.__name__} failed: {e}")
273+
logger.error(f"Consumer {consumer_class.__name__} failed: {e}")

src/grimoirelab/core/scheduler/jobs.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
import typing
2323

2424
import rq.job
25+
import structlog
2526

2627
if typing.TYPE_CHECKING:
2728
from typing import Any
2829
from logging import LogRecord
2930
from rq.types import FunctionReferenceType
3031

3132

32-
logger = logging.getLogger(__name__)
33+
logger = structlog.get_logger(__name__)
3334

3435

3536
class GrimoireLabJob(rq.job.Job):
@@ -129,9 +130,13 @@ def _execute(self) -> Any:
129130
try:
130131
self._add_log_handler()
131132
return super()._execute()
132-
except Exception:
133-
logger.exception(f"Error running job {self.id}.")
134-
raise
133+
except Exception as ex:
134+
logger.error(
135+
"job exception",
136+
job_id=self.id,
137+
exc_info=ex
138+
)
139+
raise ex
135140
finally:
136141
self._remove_log_handler()
137142

src/grimoirelab/core/scheduler/scheduler.py

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
from __future__ import annotations
2020

2121
import datetime
22-
import logging
2322
import typing
2423
import uuid
2524

2625
import django.db
2726
import django_rq
2827
import rq.exceptions
2928
import rq.job
29+
import structlog
3030

3131
from django.conf import settings
3232
from rq.registry import StartedJobRegistry
@@ -59,7 +59,8 @@
5959
rq.job.JobStatus.CANCELED,
6060
]
6161

62-
logger = logging.getLogger(__name__)
62+
63+
logger = structlog.get_logger(__name__)
6364

6465

6566
def schedule_task(
@@ -88,6 +89,15 @@ def schedule_task(
8889
)
8990
_enqueue_task(task, scheduled_at=datetime_utcnow())
9091

92+
logger.info(
93+
"task scheduled",
94+
task_uuid=task.uuid,
95+
task_args=task_args,
96+
job_interval=job_interval,
97+
job_max_retries=job_max_retries,
98+
burst=burst
99+
)
100+
91101
return task
92102

93103

@@ -122,6 +132,10 @@ def cancel_task(task_uuid: str) -> None:
122132
task.status = SchedulerStatus.CANCELED
123133
task.save()
124134

135+
logger.info(
136+
"task canceled", task_uuid=task.uuid
137+
)
138+
125139

126140
def reschedule_task(task_uuid: str) -> None:
127141
"""Reschedule a task
@@ -147,7 +161,6 @@ def reschedule_task(task_uuid: str) -> None:
147161
except (rq.exceptions.NoSuchJobError, rq.exceptions.InvalidJobOperation):
148162
pass
149163
_schedule_job(task, job, datetime_utcnow(), job.job_args)
150-
151164
elif task.status == SchedulerStatus.RUNNING:
152165
# Make sure it is running
153166
job = task.jobs.order_by("-scheduled_at").first()
@@ -157,6 +170,10 @@ def reschedule_task(task_uuid: str) -> None:
157170
else:
158171
_enqueue_task(task)
159172

173+
logger.info(
174+
"task rescheduled", task_uuid=task.uuid
175+
)
176+
160177

161178
def maintain_tasks() -> None:
162179
"""Maintain the tasks that are scheduled to be executed.
@@ -178,14 +195,21 @@ def maintain_tasks() -> None:
178195
if not _is_job_removed_or_stopped(job_db, task.default_job_queue):
179196
continue
180197

181-
logger.info(f"Job #{job_db.job_id} in queue (task: {task.task_id}) stopped. Rescheduling.")
198+
logger.info(
199+
"Job stopped but task wasn't updated; rescheduling",
200+
task_uuid=task.uuid,
201+
job_uuid=job_db.uuid,
202+
queue=job_db.queue
203+
)
182204

183205
current_time = datetime_utcnow()
184206
scheduled_at = max(task.scheduled_at, current_time)
185207

186208
job_db.save_run(SchedulerStatus.CANCELED)
187209
_enqueue_task(task, scheduled_at=scheduled_at)
188210

211+
logger.debug("Maintenance of tasks completed")
212+
189213

190214
def _is_job_removed_or_stopped(job: Job, queue: str) -> bool:
191215
"""
@@ -246,10 +270,6 @@ def _enqueue_task(task: Task, scheduled_at: datetime.datetime | None = None) ->
246270

247271
_schedule_job(task, job, scheduled_at, job_args)
248272

249-
logger.info(
250-
f"Job #{job.job_id} (task: {task.task_id}) enqueued in '{job.queue}' at {scheduled_at}"
251-
)
252-
253273
return job
254274

255275

@@ -277,15 +297,29 @@ def _schedule_job(
277297
task.status = SchedulerStatus.ENQUEUED
278298
job.scheduled_at = scheduled_at
279299
task.scheduled_at = scheduled_at
280-
except Exception as e:
281-
logger.error(f"Error enqueuing job of task {task.task_id}. Not scheduled. Error: {e}")
300+
except Exception as ex:
282301
job.status = SchedulerStatus.FAILED
283302
task.status = SchedulerStatus.FAILED
284-
raise e
303+
logger.error(
304+
"job scheduling",
305+
job_uuid=job.uuid,
306+
task_uuid=task.uuid,
307+
exc_info=ex
308+
)
309+
raise ex
285310
finally:
286311
job.save()
287312
task.save()
288313

314+
logger.info(
315+
"job scheduled",
316+
job_uuid=job.uuid,
317+
job_args=job_args,
318+
task_uuid=task.uuid,
319+
queue=job.queue,
320+
scheduled_at=scheduled_at
321+
)
322+
289323
return rq_job
290324

291325

@@ -304,17 +338,25 @@ def _on_success_callback(
304338
try:
305339
job_db = find_job(job.id)
306340
except NotFoundError:
307-
logger.error("Job not found. Not rescheduling.")
341+
logger.error("job not found", job_uuid=job.uuid)
308342
return
309343

310-
job_db.save_run(SchedulerStatus.COMPLETED, progress=result, logs=job.meta.get("log", None))
344+
job_db.save_run(
345+
SchedulerStatus.COMPLETED,
346+
progress=result,
347+
logs=job.meta.get('log', None)
348+
)
311349
task = job_db.task
312350

313-
logger.info(f"Job #{job_db.job_id} (task: {task.task_id}) completed.")
351+
logger.info(
352+
"job completed",
353+
job_uuid=job_db.uuid,
354+
task_uuid=task.uuid
355+
)
314356

315357
# Reschedule task
316358
if task.burst:
317-
logger.info(f"Task: {task.task_id} finished. It was a burst task. It won't be rescheduled.")
359+
logger.info("task completed", task_uuid=task.uuid, burst=True)
318360
return
319361
else:
320362
scheduled_at = datetime_utcnow() + datetime.timedelta(seconds=task.job_interval)
@@ -342,27 +384,39 @@ def _on_failure_callback(
342384
try:
343385
job_db = find_job(job.id)
344386
except NotFoundError:
345-
logger.error("Job not found. Not rescheduling.")
387+
logger.error("job not found", job_uuid=job.uuid)
346388
return
347389

348390
job_db.save_run(
349-
SchedulerStatus.FAILED, progress=job.meta["progress"], logs=job.meta.get("log", None)
391+
SchedulerStatus.FAILED,
392+
progress=job.meta['progress'],
393+
logs=job.meta.get('log', None)
350394
)
351395
task = job_db.task
352396

353-
logger.error(f"Job #{job_db.job_id} (task: {task.task_id}) failed; error: {value}")
397+
logger.info(
398+
"job failed",
399+
job_uuid=job_db.uuid,
400+
task_uuid=task.uuid,
401+
error=value
402+
)
403+
404+
# Define new log to reuse parameters
405+
log = logger.bind(task_uuid=task.uuid, job_uuid=job_db.uuid)
354406

355-
# Try to retry the task
407+
# Retry the task, if possible
356408
if task.failures >= task.job_max_retries:
357-
logger.error(f"Task: {task.task_id} max retries reached; cancelled")
409+
log.info(
410+
"task failed; max retries reached",
411+
max_failures=task.failures
412+
)
358413
return
359414
elif not task.can_be_retried():
360-
logger.error(f"Task: {task.task_id} can't be retried")
415+
log.info("task failed; can't be retried")
361416
return
362417
else:
363-
logger.error(f"Task: {task.task_id} failed but task will be retried")
364418
task.status = SchedulerStatus.RECOVERY
365419
task.save()
366-
367-
scheduled_at = datetime_utcnow() + datetime.timedelta(seconds=task.job_interval)
368-
_enqueue_task(task, scheduled_at=scheduled_at)
420+
scheduled_at = datetime_utcnow() + datetime.timedelta(seconds=task.job_interval)
421+
_enqueue_task(task, scheduled_at=scheduled_at)
422+
log.info("task failed; recovered")

src/grimoirelab/core/scheduler/tasks/chronicler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
from __future__ import annotations
2020

21-
import logging
2221
import typing
2322

2423
import cloudevents.conversion
25-
import rq.job
24+
import rq
25+
import structlog
2626

2727
import perceval.backend
2828
import perceval.backends
@@ -37,7 +37,7 @@
3737
from datetime import datetime
3838

3939

40-
logger = logging.getLogger("chronicler")
40+
logger = structlog.get_logger('__name__')
4141

4242

4343
def chronicler_job(

src/grimoirelab/core/scheduler/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
from __future__ import annotations
2020

21-
import logging
2221
import typing
2322

2423
import django.db.transaction
2524
import rq.worker
25+
import structlog
2626

2727
from grimoirelab_toolkit.datetime import datetime_utcnow
2828

@@ -33,7 +33,7 @@
3333
from .jobs import GrimoireLabJob
3434

3535

36-
logger = logging.getLogger(__name__)
36+
logger = structlog.get_logger(__name__)
3737

3838

3939
class GrimoireLabWorker(rq.worker.Worker):

0 commit comments

Comments
 (0)