Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 0 additions & 180 deletions devel/grimoirelab-dev.py

This file was deleted.

18 changes: 11 additions & 7 deletions src/grimoirelab/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@
# https://github.com/rq/django-rq
#

Q_PERCEVAL_JOBS = os.environ.get('GRIMOIRELAB_Q_PERCEVAL_JOBS', 'default')
Q_STORAGE_ITEMS = os.environ.get('GRIMOIRELAB_Q_STORAGE_ITEMS', 'items')
Q_EVENTS = os.environ.get('GRIMOIRELAB_Q_EVENTS', 'events')
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')

_RQ_DATABASE = {
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
Expand All @@ -285,11 +283,17 @@
}

RQ_QUEUES = {
Q_PERCEVAL_JOBS: _RQ_DATABASE,
Q_STORAGE_ITEMS: _RQ_DATABASE,
Q_EVENTS: _RQ_DATABASE,
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
}

GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
'events')
# Maximum events in Redis stream before dropping. Consumers must process events
# faster than production to avoid loss. Default max size is 1M events (~2.5GB Git events).
# Adjust for memory constraints.
GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH',
1 * 10 ** 6))

RQ = {
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
'WORKER_CLASS': 'grimoirelab.core.scheduler.worker.GrimoireLabWorker',
Expand All @@ -304,4 +308,4 @@
GRIMOIRELAB_JOB_RESULT_TTL = int(os.environ.get('GRIMOIRELAB_JOB_RESULT_TTL', 300))
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))

GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
10 changes: 5 additions & 5 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

from django.conf import settings

from grimoirelab.core.scheduler.scheduler import maintain_tasks

if typing.TYPE_CHECKING:
from click import Context

Expand Down Expand Up @@ -83,6 +81,8 @@ def server(ctx: Context, devel: bool):
env["UWSGI_SINGLE_INTERPRETER"] = "true"

# Run maintenance tasks
from grimoirelab.core.scheduler.scheduler import maintain_tasks

_ = django.core.wsgi.get_wsgi_application()
maintain_tasks()

Expand All @@ -105,10 +105,10 @@ def eventizers(workers: int):
The number of workers running in the pool can be defined with the
parameter '--workers'.

Workers get jobs from the Q_PERCEVAL_JOBS queue defined in the
configuration file.
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
in the configuration file.
"""
django.core.management.call_command(
'rqworker-pool', settings.Q_PERCEVAL_JOBS,
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
num_workers=workers
)
13 changes: 10 additions & 3 deletions src/grimoirelab/core/scheduler/tasks/chronicler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
def chronicler_job(
datasource_type: str,
datasource_category: str,
events_queue: str,
events_stream: str,
stream_max_length: int,
job_args: dict[str, Any] = None
) -> ChroniclerProgress:
"""Fetch and eventize data.
Expand All @@ -61,6 +62,8 @@ def chronicler_job(
(e.g., 'git', 'github')
:param datasource_category: category of the datasource
(e.g., 'pull_request', 'issue')
:param events_stream: Redis queue where the events will be published
:param stream_max_length: maximum length of the stream
:param job_args: extra arguments to pass to the job
(e.g., 'url', 'owner', 'repository')
"""
Expand Down Expand Up @@ -95,7 +98,11 @@ def chronicler_job(
perceval_gen.items)
for event in events:
data = cloudevents.conversion.to_json(event)
rq_job.connection.rpush(events_queue, data)
message = {
'data': data
}
rq_job.connection.xadd(events_stream, message,
maxlen=stream_max_length)
finally:
progress.summary = perceval_gen.summary

Expand Down Expand Up @@ -254,7 +261,7 @@ def initial_args(task_args: dict[str, Any]) -> dict[str, Any]:
from django.conf import settings

# For the first run, make some arguments mandatory
base_path = os.path.expanduser(settings.GIT_STORAGE_PATH)
base_path = os.path.expanduser(settings.GRIMOIRELAB_GIT_STORAGE_PATH)
uri = task_args['uri']
processed_uri = uri.lstrip('/')
git_path = os.path.join(base_path, processed_uri) + '-git'
Expand Down
6 changes: 4 additions & 2 deletions src/grimoirelab/core/scheduler/tasks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import typing

from django.conf import settings
from django.db.models import CharField

from ...scheduler.models import (
Expand Down Expand Up @@ -117,7 +118,8 @@ def prepare_job_parameters(self):
task_args = {
'datasource_type': self.datasource_type,
'datasource_category': self.datasource_category,
'events_queue': 'events'
'events_stream': settings.GRIMOIRELAB_EVENTS_STREAM_NAME,
'stream_max_length': settings.GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH,
}

args_gen = get_chronicler_argument_generator(self.datasource_type)
Expand Down Expand Up @@ -146,7 +148,7 @@ def can_be_retried(self):

@property
def default_job_queue(self):
return 'default'
return settings.GRIMOIRELAB_Q_EVENTIZER_JOBS

@staticmethod
def job_function(*args, **kwargs):
Expand Down
13 changes: 8 additions & 5 deletions tests/scheduler/test_task_eventizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def test_job(self):
job_args = {
'datasource_type': 'git',
'datasource_category': 'commit',
'events_queue': 'events',
'events_stream': 'events',
'stream_max_length': 500,
'job_args': {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
Expand Down Expand Up @@ -91,8 +92,8 @@ def test_job(self):
'ce8e0b86a1e9877f42fe9453ede418519115f367')

# Check generated events
events = self.conn.lrange('events', 0, -1)
events = [json.loads(e) for e in events]
events = self.conn.xread({'events': b'0-0'}, count=None, block=0)
events = [json.loads(e[1][b'data']) for e in events[0][1]]

expected = [
('2d85a883e0ef63ebf7fa40e372aed44834092592', 'org.grimoirelab.events.git.merge'),
Expand Down Expand Up @@ -130,7 +131,8 @@ def test_job_no_result(self):
job_args = {
'datasource_type': 'git',
'datasource_category': 'commit',
'events_queue': 'events',
'events_stream': 'events',
'stream_max_length': 500,
'job_args': {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt')
Expand Down Expand Up @@ -172,7 +174,8 @@ def test_backend_not_found(self):
job_args = {
'datasource_type': 'nobackend',
'datasource_category': 'unknown',
'events_queue': 'events',
'events_stream': 'events',
'stream_max_length': 500,
'job_args': {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt')
Expand Down
Loading