From ef3246af0cb0f10c8e30c042c5081c86b91ef9cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Due=C3=B1as?= Date: Thu, 24 Apr 2025 18:00:19 +0200 Subject: [PATCH 1/2] [config] Add structured logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Organizing log data in a structured format makes easier to read and analyze them. With this commit, log messages will always have a structured format. The default mode is to print the messages to the console in a plain format, but it can also be configured to print them in JSON format. We have added structured logs to GrimoireLab using the 'structlog' package. By default, tests will run silent. If developers want to get log messages printed to the console, activate the environment variable 'GRIMOIRELAB_TESTING_VERBOSE'. Signed-off-by: Santiago Dueñas --- README.md | 7 +- config/settings/testing.py | 39 +++-- poetry.lock | 66 ++++++++- pyproject.toml | 2 + src/grimoirelab/core/config/logging.py | 157 +++++++++++++++++++++ src/grimoirelab/core/config/settings.py | 47 ++---- tests/unit/consumers/test_consumer_pool.py | 9 +- 7 files changed, 280 insertions(+), 47 deletions(-) create mode 100644 src/grimoirelab/core/config/logging.py diff --git a/README.md b/README.md index c596293..fc20829 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ $ pip install -e . ``` #### Poetry -We use [poetry](https://python-poetry.org/) for dependency management and +We use [poetry](https://python-poetry.org/) for dependency management and packaging. You can install it following its [documentation](https://python-poetry.org/docs/#installation). Once you have installed it, you can install grimoirelab-core and the dependencies in a project isolated environment using: @@ -134,7 +134,7 @@ Commands: ## Configuration -The first step is to run a Redis server, a MySQL database and an OpenSearch +The first step is to run a Redis server, a MySQL database and an OpenSearch container that will be used for communicating components and storing results. Please refer to their documentation to know how to install and run them. @@ -202,3 +202,6 @@ Valkey, and OpenSearch). ``` (.venv)$ pytest ``` + +Set the environment variable `GRIMOIRELAB_TESTING_VERBOSE` to activate the +verbose mode. diff --git a/config/settings/testing.py b/config/settings/testing.py index 3f8eb81..ed44d39 100644 --- a/config/settings/testing.py +++ b/config/settings/testing.py @@ -1,20 +1,41 @@ -from grimoirelab.core.config.settings import * # noqa: F403,F401 -from grimoirelab.core.config.settings import INSTALLED_APPS, _RQ_DATABASE, RQ - +import os import warnings +from grimoirelab.core.config.settings import * # noqa: F403,F401 +from grimoirelab.core.config.settings import ( + INSTALLED_APPS, + _RQ_DATABASE, + RQ, + LOGGING, +) + import django_rq.queues from fakeredis import FakeRedis, FakeStrictRedis + INSTALLED_APPS.append("tests") -LOGGING = { - "version": 1, - "disable_existing_loggers": True, - "loggers": { - "grimoirelab.core": {"level": "CRITICAL"}, - }, +GRIMOIRELAB_TESTING_VERBOSE = os.environ.get( + "GRIMOIRELAB_TESTING_VERBOSE", + "False", +).lower() in ("true", "1") + +# Logging configuration for testing +# +# By default, logging is silent and doesn't print messages to the console. +# Set 'GRIMOIRELAB_TESTING_VERBOSE' to 'True' to print messages to the console. +# +LOGGING["handlers"]["testing"] = { + "class": "logging.NullHandler", +} + +LOGGING["loggers"] = { + "": { + "handlers": ["default"] if GRIMOIRELAB_TESTING_VERBOSE else ["testing"], + "level": "DEBUG", + "propagate": True, + } } SQL_MODE = [ diff --git a/poetry.lock b/poetry.lock index 4ef31a7..2abb94c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -650,6 +650,21 @@ pyjwt = ">=2,<3" doc = ["sphinx"] test = ["black", "codecov", "cryptography", "flake8", "isort", "pytest", "pytest-cov", "pytest-django"] +[[package]] +name = "django-ipware" +version = "7.0.1" +description = "A Django application to retrieve user's IP address" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "django-ipware-7.0.1.tar.gz", hash = "sha256:d9ec43d2bf7cdf216fed8d494a084deb5761a54860a53b2e74346a4f384cff47"}, + {file = "django_ipware-7.0.1-py2.py3-none-any.whl", hash = "sha256:db16bbee920f661ae7f678e4270460c85850f03c6761a4eaeb489bdc91f64709"}, +] + +[package.dependencies] +python-ipware = ">=2.0.3" + [[package]] name = "django-rq" version = "3.1" @@ -696,6 +711,28 @@ libcloud = ["apache-libcloud"] s3 = ["boto3 (>=1.4.4)"] sftp = ["paramiko (>=1.15)"] +[[package]] +name = "django-structlog" +version = "9.1.1" +description = "Structured Logging for Django" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "django_structlog-9.1.1-py3-none-any.whl", hash = "sha256:5b6ac3abdf6549e94ccb35160b1f10266f1627c3ac77844571235a08a1ddae66"}, + {file = "django_structlog-9.1.1.tar.gz", hash = "sha256:14342c6c824581f1e063c88a8bc52314cd67995a3bd4a4fc8c27ea37ccd78947"}, +] + +[package.dependencies] +asgiref = ">=3.6.0" +django = ">=4.2" +django-ipware = ">=6.0.2" +structlog = ">=21.4.0" + +[package.extras] +celery = ["celery (>=5.1)"] +commands = ["django-extensions (>=1.4.9)"] + [[package]] name = "django-treebeard" version = "4.7.1" @@ -1964,6 +2001,21 @@ files = [ [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "python-ipware" +version = "3.0.0" +description = "A Python package to retrieve user's IP address" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "python_ipware-3.0.0-py3-none-any.whl", hash = "sha256:fc936e6e7ec9fcc107f9315df40658f468ac72f739482a707181742882e36b60"}, + {file = "python_ipware-3.0.0.tar.gz", hash = "sha256:9117b1c4dddcb5d5ca49e6a9617de2fc66aec2ef35394563ac4eecabdf58c062"}, +] + +[package.extras] +dev = ["coverage[toml]", "coveralls (>=3.3,<4.0)", "ruff", "twine"] + [[package]] name = "pytz" version = "2025.2" @@ -2490,6 +2542,18 @@ files = [ dev = ["build", "hatch"] doc = ["sphinx"] +[[package]] +name = "structlog" +version = "25.4.0" +description = "Structured Logging for Python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c"}, + {file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"}, +] + [[package]] name = "testcontainers" version = "4.13.2" @@ -2737,4 +2801,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "facf8e2303129b9eb077590e3098c77685e4fbb6a37e7596559dd3f21ba92b79" +content-hash = "c9f497741d2be093082ec76aee5e4bb3ec6d9b7fcbfe755459887944d8995aeb" diff --git a/pyproject.toml b/pyproject.toml index b38e0d1..f4565f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,8 @@ opensearch-py = "^3.0.0" djangorestframework-simplejwt = "^5.4.0" django-storages = {version = "^1.14.6", extras = ["google"]} drf-spectacular = "^0.28.0" +django-structlog = "^9.0.1" +structlog = "^25.2.0" grimoirelab-toolkit = {version = ">=1.2.0", allow-prereleases = true} perceval = {version = ">=1.3.4", allow-prereleases = true} sortinghat = {version = ">=1.11.0", allow-prereleases = true} diff --git a/src/grimoirelab/core/config/logging.py b/src/grimoirelab/core/config/logging.py new file mode 100644 index 0000000..7d7b78c --- /dev/null +++ b/src/grimoirelab/core/config/logging.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# +# GrimoireLab logging setup. +# +# Logging is configured using 'structlog', 'django-structlog', +# and the standard library 'logging' module. +# +# More info: +# +# https://django-structlog.readthedocs.io/en/latest/ +# https://docs.djangoproject.com/en/4.2/topics/logging/#configuring-logging +# + +import structlog + + +# Default column styles for the console renderer. + +styles = structlog.dev._ColorfulStyles + +logger_name_formatter = structlog.dev.KeyValueColumnFormatter( + key_style=None, + value_style=styles.bright + styles.logger_name, + reset_style=styles.reset, + value_repr=str, + prefix="[", + postfix="]", +) + +console_columns = [ + structlog.dev.Column( + "timestamp", + structlog.dev.KeyValueColumnFormatter( + key_style=None, + value_style=styles.timestamp, + reset_style=styles.reset, + value_repr=str, + ), + ), + structlog.dev.Column( + "level", + structlog.dev.LogLevelColumnFormatter( + structlog.dev.ConsoleRenderer.get_default_level_styles(), + reset_style=styles.reset, + ), + ), + structlog.dev.Column("logger", logger_name_formatter), + structlog.dev.Column("logger_name", logger_name_formatter), + structlog.dev.Column( + "event", + structlog.dev.KeyValueColumnFormatter( + key_style=None, + value_style=styles.bright, + reset_style=styles.reset, + value_repr=str, + ), + ), + # Default formatter for all keys not explicitly mentioned. + structlog.dev.Column( + "", + structlog.dev.KeyValueColumnFormatter( + key_style=styles.kv_key, + value_style=styles.kv_value, + reset_style=styles.reset, + value_repr=str, + ), + ), +] + + +# Configuration of chain processors for logs not generated by structlog. +# This will add default fields such the log level, timestamp, etc. + +pre_chain_processors = [ + structlog.processors.TimeStamper(fmt="iso", utc=True), + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.ExtraAdder(), +] + + +# Default logging configuration for GrimoireLab + +_GRIMOIRELAB_LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "plain": { + "()": structlog.stdlib.ProcessorFormatter, + "processors": [ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + structlog.dev.ConsoleRenderer(columns=console_columns), + ], + "foreign_pre_chain": pre_chain_processors, + }, + "json": { + "()": structlog.stdlib.ProcessorFormatter, + "processors": [ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + structlog.processors.JSONRenderer(), + ], + "foreign_pre_chain": pre_chain_processors, + }, + "not_structured": { + "format": "[%(asctime)s - %(name)s - %(levelname)s] - %(message)s", + }, + }, + "handlers": { + "default": { + "class": "logging.StreamHandler", + "formatter": "plain", + }, + "json": { + "class": "logging.StreamHandler", + "formatter": "json", + }, + }, +} + + +def configure_grimoirelab_logging( + json_mode: bool = False, + debug: bool = False, +) -> None: + """ + Set up the GrimoireLab logging settings. + + :param json_mode: If True, use JSON format for logging. + :param debug: If True, set logging level to DEBUG. + """ + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + structlog.stdlib.filter_by_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso", utc=True), + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + + logging_settings = dict(_GRIMOIRELAB_LOGGING_CONFIG) + logging_settings["loggers"] = { + "": { + "handlers": ["json"] if json_mode else ["default"], + "level": "DEBUG" if debug else "INFO", + }, + } + + return logging_settings diff --git a/src/grimoirelab/core/config/settings.py b/src/grimoirelab/core/config/settings.py index 30d3bc6..4f12a56 100644 --- a/src/grimoirelab/core/config/settings.py +++ b/src/grimoirelab/core/config/settings.py @@ -16,6 +16,9 @@ import os from pathlib import Path +from .logging import configure_grimoirelab_logging + + BASE_DIR = Path(__file__).resolve().parent.parent SILENCED_SYSTEM_CHECKS = ["django_mysql.E016"] @@ -102,6 +105,7 @@ "django.contrib.messages", "django.contrib.staticfiles", "django_rq", + "django_structlog", "corsheaders", "rest_framework", "grimoirelab.core.scheduler", @@ -121,6 +125,7 @@ "django.contrib.auth.middleware.AuthenticationMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", + "django_structlog.middlewares.RequestMiddleware", ] ROOT_URLCONF = "grimoirelab.core.app.urls" @@ -200,38 +205,6 @@ USE_TZ = True TIME_ZONE = "UTC" -# -# GrimoireLab Logging -# -# https://docs.djangoproject.com/en/4.2/topics/logging/#configuring-logging -# - -LOGGING = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "simple": { - "format": "[{asctime}] {message}", - "style": "{", - }, - "verbose": { - "format": "[{asctime} - {levelname} - {name}] {message}", - "style": "{", - }, - }, - "handlers": { - "console": { - "class": "logging.StreamHandler", - "formatter": "verbose", - }, - }, - "root": { - "handlers": ["console"], - "level": "INFO", - }, -} - - # # Static files (CSS, JavaScript, Images) # @@ -295,6 +268,16 @@ "DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema", } +# +# GrimoireLab Logging +# +# Set GRIMOIRELAB_LOGS_JSON to enable JSON logging with values 'true' or '1'. +# + +GRIMOIRELAB_LOGS_JSON = os.environ.get("GRIMOIRELAB_LOGS_JSON", "False").lower() in ("true", "1") + +LOGGING = configure_grimoirelab_logging(json_mode=GRIMOIRELAB_LOGS_JSON, debug=DEBUG) + # # GrimoireLab uses RQ to run background and async jobs. # You'll HAVE TO set the next parameters in order to run diff --git a/tests/unit/consumers/test_consumer_pool.py b/tests/unit/consumers/test_consumer_pool.py index e6d2bcc..ea8a214 100644 --- a/tests/unit/consumers/test_consumer_pool.py +++ b/tests/unit/consumers/test_consumer_pool.py @@ -16,22 +16,26 @@ # along with this program. If not, see . # -import logging import os import signal import threading import time +import structlog + from grimoirelab.core.consumers.consumer_pool import ConsumerPool from grimoirelab.core.consumers.consumer import Consumer from ..base import GrimoireLabTestCase +logger = structlog.get_logger(__name__) + + class SampleConsumer(Consumer): def process_entries(self, entries, recovery=False): for entry in entries: - logging.info(f"Processing entry: {entry.message_id}") + logger.info(f"Processing entry: {entry.message_id}") self.ack_entries([entry.message_id]) @@ -59,7 +63,6 @@ def test_pool_initialization(self): self.assertEqual(pool.num_consumers, 10) self.assertEqual(pool.stream_block_timeout, 1000) self.assertTrue(pool.verbose) - self.assertEqual(pool.log_level, logging.DEBUG) self.assertEqual(pool.status, ConsumerPool.Status.IDLE) def test_clean_up_consumers(self): From 3fb5072912c159d89e60c66a36c578d0864f0894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Due=C3=B1as?= Date: Fri, 25 Apr 2025 17:53:55 +0200 Subject: [PATCH 2/2] [core] Modify logs messages to be structured MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some messages were updated to follow the structured format. Signed-off-by: Santiago Dueñas --- ...og-added-for-logging-platform-messages.yml | 14 +++ src/grimoirelab/core/consumers/consumer.py | 14 ++- .../core/consumers/consumer_pool.py | 10 ++- src/grimoirelab/core/runner/commands/run.py | 62 +++++++------ src/grimoirelab/core/scheduler/jobs.py | 9 +- src/grimoirelab/core/scheduler/scheduler.py | 88 +++++++++++++------ .../core/scheduler/tasks/chronicler.py | 6 +- src/grimoirelab/core/scheduler/worker.py | 4 +- 8 files changed, 141 insertions(+), 66 deletions(-) create mode 100644 releases/unreleased/struclog-added-for-logging-platform-messages.yml diff --git a/releases/unreleased/struclog-added-for-logging-platform-messages.yml b/releases/unreleased/struclog-added-for-logging-platform-messages.yml new file mode 100644 index 0000000..ffcc835 --- /dev/null +++ b/releases/unreleased/struclog-added-for-logging-platform-messages.yml @@ -0,0 +1,14 @@ +--- +title: struclog added for logging platform messages +category: other +author: Santiago Dueñas +issue: null +notes: > + Organizing log data in a structured format makes easier + to read and analyze it. After adding 'structlog', + log messages will always have a structured format. + + The default mode prints the messages to the console + in plain format, but it can also be configured to print + them in JSON format by setting the environment variable + 'GRIMOIRELAB_LOGS_JSON' to 1 or 'true'. diff --git a/src/grimoirelab/core/consumers/consumer.py b/src/grimoirelab/core/consumers/consumer.py index 4519b04..57386f0 100644 --- a/src/grimoirelab/core/consumers/consumer.py +++ b/src/grimoirelab/core/consumers/consumer.py @@ -27,6 +27,7 @@ from multiprocessing import Event as ProcessEvent import redis +import structlog if typing.TYPE_CHECKING: from typing import Iterable @@ -170,7 +171,9 @@ def fetch_new_entries(self) -> Iterable[Entry]: def recover_stream_entries(self, recover_idle_time: int = RECOVER_IDLE_TIME) -> Iterable[Entry]: """Transfers ownership of pending entries idle for 'recover_idle_time'.""" - self.logger.debug(f"Recovering events from '{self.stream_name}:{self.consumer_group}'") + self.logger.debug( + "recovering events", stream=self.stream_name, consumer_group=self.consumer_group + ) while True: response = self.connection.xautoclaim( @@ -199,6 +202,10 @@ def recover_stream_entries(self, recover_idle_time: int = RECOVER_IDLE_TIME) -> if self._stop_event.is_set(): break + self.logger.debug( + "events recovered", stream=self.stream_name, consumer_group=self.consumer_group + ) + def process_entries(self, entries: Iterable[Entry], recovery: bool = False): """Process entries (implement this method in subclasses). @@ -222,12 +229,11 @@ def ack_entries(self, message_ids: list): def stop(self): """Stop the consumer gracefully.""" - self.logger.info(f"Stopping consumer '{self.consumer_name}'.") - self._stop_event.set() + self.logger.info("consumer stopped", consumer=self.consumer_name) def _create_logger(self): - logger = logging.getLogger(name=f"{self.__class__.__name__}") + logger = structlog.get_logger(self.__class__.__name__) logger.setLevel(self.logging_level) return logger diff --git a/src/grimoirelab/core/consumers/consumer_pool.py b/src/grimoirelab/core/consumers/consumer_pool.py index 396936b..b8ace16 100644 --- a/src/grimoirelab/core/consumers/consumer_pool.py +++ b/src/grimoirelab/core/consumers/consumer_pool.py @@ -27,6 +27,7 @@ import redis from rq.connections import parse_connection +import structlog from .consumer import Consumer @@ -34,6 +35,9 @@ ConsumerData = namedtuple("ConsumerData", ["name", "pid", "process"]) +logger = structlog.get_logger(__name__) + + class ConsumerPool: """Base class to create a pool of consumers. @@ -198,7 +202,7 @@ def cleanup_consumers(self): self._consumers.pop(name) def _create_logger(self): - logger = logging.getLogger(self.__class__.__name__) + logger = structlog.get_logger(self.__class__.__name__) logger.setLevel(self.log_level) return logger @@ -265,5 +269,5 @@ def _run_consumer( try: consumer = consumer_class(connection=connection, *args, **kwargs) consumer.start(burst=burst) - except Exception as e: - logging.error(f"Consumer {consumer_class.__name__} failed: {e}") + except Exception as exc: + logger.error(f"Consumer {consumer_class.__name__} failed", err=exc) diff --git a/src/grimoirelab/core/runner/commands/run.py b/src/grimoirelab/core/runner/commands/run.py index 6929dbd..31db20e 100644 --- a/src/grimoirelab/core/runner/commands/run.py +++ b/src/grimoirelab/core/runner/commands/run.py @@ -31,6 +31,7 @@ import django_rq import opensearchpy import redis +import structlog from django.conf import settings from django.db import connections, OperationalError @@ -44,6 +45,9 @@ DEFAULT_MAX_RETRIES = 10 +logger = structlog.get_logger(__name__) + + @click.group() @click.pass_context def run(ctx: Context): @@ -122,17 +126,17 @@ def periodic_maintain_tasks(interval): while True: try: maintain_tasks() - logging.info("Maintenance tasks executed successfully.") - except redis.exceptions.ConnectionError as e: - logging.error(f"Redis connection error during maintenance tasks: {e}") - except django.db.utils.OperationalError as e: - logging.error(f"Database connection error during maintenance tasks: {e}") + logger.info("Maintenance tasks executed successfully") + except redis.exceptions.ConnectionError as exc: + logger.error("Redis connection error during maintenance tasks", err=exc) + except django.db.utils.OperationalError as exc: + logger.error("Database connection error during maintenance tasks", err=exc) connections.close_all() - except Exception as e: - logging.error("Error during maintenance tasks: %s", e) + except Exception as exc: + logger.error("Unexpected error during maintenance tasks", err=exc) raise except KeyboardInterrupt: - logging.info("Maintenance task interrupted. Exiting...") + logger.info("Maintenance task interrupted. Exiting...") return time.sleep(interval) @@ -146,7 +150,7 @@ def _maintenance_process(maintenance_interval): ) process.start() - logging.info("Started maintenance process with PID %s", process.pid) + logger.info("Started maintenance process", pid=process.pid) return process @@ -217,6 +221,10 @@ def _wait_opensearch_ready( ) -> None: """Wait for OpenSearch to be available before starting""" + # The 'opensearch' library writes logs with the exceptions while + # connecting to the database. Disable them temporarily until + # the service is up. We have to use logging library because structlog + # doesn't allow to disable a logger dynamically. os_logger = logging.getLogger("opensearch") os_logger.disabled = True @@ -245,24 +253,26 @@ def _wait_opensearch_ready( # Index still not created, but OpenSearch is up break except opensearchpy.exceptions.AuthorizationException: - logging.error("OpenSearch Authorization failed. Check your credentials.") + logger.error("OpenSearch Authorization failed. Check your credentials.") exit(1) except ( opensearchpy.exceptions.ConnectionError, opensearchpy.exceptions.TransportError, - ) as e: - logging.warning( - f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] OpenSearch connection not ready: {e}" + ) as exc: + logger.warning( + f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] OpenSearch connection not ready", + err=exc, ) _sleep_backoff(attempt) else: - logging.error("Failed to connect to OpenSearch.") + logger.error("Failed to connect to OpenSearch.") exit(1) + # Enable back 'opensearch' library logs os_logger.disabled = False - logging.info("OpenSearch is ready.") + logger.info("OpenSearch is ready.") def _wait_redis_ready(): @@ -273,16 +283,17 @@ def _wait_redis_ready(): redis_conn = django_rq.get_connection() redis_conn.ping() break - except redis.exceptions.ConnectionError as e: - logging.warning( - f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] Redis connection not ready: {e}" + except redis.exceptions.ConnectionError as exc: + logger.warning( + f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] Redis connection not ready", + err=exc, ) _sleep_backoff(attempt) else: - logging.error("Failed to connect to Redis server") + logger.error("Failed to connect to Redis server") exit(1) - logging.info("Redis is ready") + logger.info("Redis is ready") def _wait_database_ready(): @@ -296,17 +307,18 @@ def _wait_database_ready(): pass # Just test the connection break - except OperationalError as e: - logging.warning( - f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] Database connection not ready: {e.__cause__}" + except OperationalError as exc: + logger.warning( + f"[{attempt + 1}/{DEFAULT_MAX_RETRIES}] Database connection not ready", + err=exc.__cause__, ) _sleep_backoff(attempt) else: error_msg = "Failed to connect to the database after all retries" - logging.error(error_msg) + logger.error(error_msg) raise ConnectionError(error_msg) - logging.info("Database is ready.") + logger.info("Database is ready.") # Close all database connections to avoid timed out connections connections.close_all() diff --git a/src/grimoirelab/core/scheduler/jobs.py b/src/grimoirelab/core/scheduler/jobs.py index 0d662a0..22ddc6d 100644 --- a/src/grimoirelab/core/scheduler/jobs.py +++ b/src/grimoirelab/core/scheduler/jobs.py @@ -22,6 +22,7 @@ import typing import rq.job +import structlog if typing.TYPE_CHECKING: from typing import Any @@ -29,7 +30,7 @@ from rq.types import FunctionReferenceType -logger = logging.getLogger(__name__) +logger = structlog.get_logger(__name__) class GrimoireLabJob(rq.job.Job): @@ -129,9 +130,9 @@ def _execute(self) -> Any: try: self._add_log_handler() return super()._execute() - except Exception: - logger.exception(f"Error running job {self.id}.") - raise + except Exception as ex: + logger.error("job exception", job_id=self.id, exc_info=ex) + raise ex finally: self._remove_log_handler() diff --git a/src/grimoirelab/core/scheduler/scheduler.py b/src/grimoirelab/core/scheduler/scheduler.py index c01dc63..e94e491 100644 --- a/src/grimoirelab/core/scheduler/scheduler.py +++ b/src/grimoirelab/core/scheduler/scheduler.py @@ -19,7 +19,6 @@ from __future__ import annotations import datetime -import logging import typing import uuid @@ -27,6 +26,7 @@ import django_rq import rq.exceptions import rq.job +import structlog from django.conf import settings from rq.registry import StartedJobRegistry @@ -59,7 +59,8 @@ rq.job.JobStatus.CANCELED, ] -logger = logging.getLogger(__name__) + +logger = structlog.get_logger(__name__) def schedule_task( @@ -88,6 +89,15 @@ def schedule_task( ) _enqueue_task(task, scheduled_at=datetime_utcnow()) + logger.info( + "task scheduled", + task_uuid=task.uuid, + task_args=task_args, + job_interval=job_interval, + job_max_retries=job_max_retries, + burst=burst, + ) + return task @@ -122,6 +132,8 @@ def cancel_task(task_uuid: str) -> None: task.status = SchedulerStatus.CANCELED task.save() + logger.info("task canceled", task_uuid=task.uuid) + def reschedule_task(task_uuid: str) -> None: """Reschedule a task @@ -147,7 +159,6 @@ def reschedule_task(task_uuid: str) -> None: except (rq.exceptions.NoSuchJobError, rq.exceptions.InvalidJobOperation): pass _schedule_job(task, job, datetime_utcnow(), job.job_args) - elif task.status == SchedulerStatus.RUNNING: # Make sure it is running job = task.jobs.order_by("-scheduled_at").first() @@ -157,6 +168,8 @@ def reschedule_task(task_uuid: str) -> None: else: _enqueue_task(task) + logger.info("task rescheduled", task_uuid=task.uuid) + def maintain_tasks() -> None: """Maintain the tasks that are scheduled to be executed. @@ -178,7 +191,12 @@ def maintain_tasks() -> None: if not _is_job_removed_or_stopped(job_db, task.default_job_queue): continue - logger.info(f"Job #{job_db.job_id} in queue (task: {task.task_id}) stopped. Rescheduling.") + logger.error( + "Job stopped but task wasn't updated; rescheduling", + task_uuid=task.uuid, + job_uuid=job_db.uuid, + queue=job_db.queue, + ) current_time = datetime_utcnow() scheduled_at = max(task.scheduled_at, current_time) @@ -186,6 +204,8 @@ def maintain_tasks() -> None: job_db.save_run(SchedulerStatus.CANCELED) _enqueue_task(task, scheduled_at=scheduled_at) + logger.debug("Maintenance of tasks completed") + def _is_job_removed_or_stopped(job: Job, queue: str) -> bool: """ @@ -246,10 +266,6 @@ def _enqueue_task(task: Task, scheduled_at: datetime.datetime | None = None) -> _schedule_job(task, job, scheduled_at, job_args) - logger.info( - f"Job #{job.job_id} (task: {task.task_id}) enqueued in '{job.queue}' at {scheduled_at}" - ) - return job @@ -277,15 +293,29 @@ def _schedule_job( task.status = SchedulerStatus.ENQUEUED job.scheduled_at = scheduled_at task.scheduled_at = scheduled_at - except Exception as e: - logger.error(f"Error enqueuing job of task {task.task_id}. Not scheduled. Error: {e}") + except Exception as ex: job.status = SchedulerStatus.FAILED task.status = SchedulerStatus.FAILED - raise e + logger.error( + "job scheduling", + job_uuid=job.uuid, + task_uuid=task.uuid, + exc_info=ex, + ) + raise ex finally: job.save() task.save() + logger.info( + "job scheduled", + job_uuid=job.uuid, + job_args=job_args, + task_uuid=task.uuid, + queue=job.queue, + scheduled_at=scheduled_at, + ) + return rq_job @@ -304,17 +334,21 @@ def _on_success_callback( try: job_db = find_job(job.id) except NotFoundError: - logger.error("Job not found. Not rescheduling.") + logger.error("job not found", job_uuid=job.uuid) return - job_db.save_run(SchedulerStatus.COMPLETED, progress=result, logs=job.meta.get("log", None)) + job_db.save_run( + SchedulerStatus.COMPLETED, + progress=result, + logs=job.meta.get("log", None), + ) task = job_db.task - logger.info(f"Job #{job_db.job_id} (task: {task.task_id}) completed.") + logger.info("job completed", job_uuid=job_db.uuid, task_uuid=task.uuid) # Reschedule task if task.burst: - logger.info(f"Task: {task.task_id} finished. It was a burst task. It won't be rescheduled.") + logger.info("task completed", task_uuid=task.uuid, burst=True) return else: scheduled_at = datetime_utcnow() + datetime.timedelta(seconds=task.job_interval) @@ -342,27 +376,31 @@ def _on_failure_callback( try: job_db = find_job(job.id) except NotFoundError: - logger.error("Job not found. Not rescheduling.") + logger.error("job not found", job_uuid=job.uuid) return job_db.save_run( - SchedulerStatus.FAILED, progress=job.meta["progress"], logs=job.meta.get("log", None) + SchedulerStatus.FAILED, + progress=job.meta["progress"], + logs=job.meta.get("log", None), ) task = job_db.task - logger.error(f"Job #{job_db.job_id} (task: {task.task_id}) failed; error: {value}") + logger.error("job failed", job_uuid=job_db.uuid, task_uuid=task.uuid, error=value) + + # Define new log to reuse parameters + log = logger.bind(task_uuid=task.uuid, job_uuid=job_db.uuid) - # Try to retry the task + # Retry the task, if possible if task.failures >= task.job_max_retries: - logger.error(f"Task: {task.task_id} max retries reached; cancelled") + log.error("task failed; max retries reached", max_failures=task.failures) return elif not task.can_be_retried(): - logger.error(f"Task: {task.task_id} can't be retried") + log.error("task failed; can't be retried") return else: - logger.error(f"Task: {task.task_id} failed but task will be retried") task.status = SchedulerStatus.RECOVERY task.save() - - scheduled_at = datetime_utcnow() + datetime.timedelta(seconds=task.job_interval) - _enqueue_task(task, scheduled_at=scheduled_at) + scheduled_at = datetime_utcnow() + datetime.timedelta(seconds=task.job_interval) + _enqueue_task(task, scheduled_at=scheduled_at) + log.error("task failed; recovered") diff --git a/src/grimoirelab/core/scheduler/tasks/chronicler.py b/src/grimoirelab/core/scheduler/tasks/chronicler.py index 611497b..4fd775d 100644 --- a/src/grimoirelab/core/scheduler/tasks/chronicler.py +++ b/src/grimoirelab/core/scheduler/tasks/chronicler.py @@ -18,11 +18,11 @@ from __future__ import annotations -import logging import typing import cloudevents.conversion -import rq.job +import rq +import structlog import perceval.backend import perceval.backends @@ -37,7 +37,7 @@ from datetime import datetime -logger = logging.getLogger("chronicler") +logger = structlog.get_logger("__name__") def chronicler_job( diff --git a/src/grimoirelab/core/scheduler/worker.py b/src/grimoirelab/core/scheduler/worker.py index 40371da..d3ebaff 100644 --- a/src/grimoirelab/core/scheduler/worker.py +++ b/src/grimoirelab/core/scheduler/worker.py @@ -18,11 +18,11 @@ from __future__ import annotations -import logging import typing import django.db.transaction import rq.worker +import structlog from grimoirelab_toolkit.datetime import datetime_utcnow @@ -33,7 +33,7 @@ from .jobs import GrimoireLabJob -logger = logging.getLogger(__name__) +logger = structlog.get_logger(__name__) class GrimoireLabWorker(rq.worker.Worker):