Skip to content

Commit d4b08fa

Browse files
committed
Archive Events in OpenSearch
This commit implements archiving of events in an OpenSearch backend. It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance. Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent 2e6fd19 commit d4b08fa

File tree

11 files changed

+936
-14
lines changed

11 files changed

+936
-14
lines changed

config/settings/testing.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __call__(self, _, strict):
6767
return self.conn
6868

6969

70+
RQ_QUEUES['default'] = _RQ_DATABASE # noqa: F405
7071
RQ_QUEUES['testing'] = _RQ_DATABASE # noqa: F405
7172
RQ['WORKER_CLASS'] = rq.worker.SimpleWorker
7273

poetry.lock

Lines changed: 37 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
5454
grimoirelab-chronicler = {git = "https://github.com/chaoss/grimoirelab-chronicler.git", allow-prereleases = true}
5555
django-cors-headers = "^4.6.0"
5656
djangorestframework = "^3.15.2"
57+
opensearch-py = "^2.8.0"
5758

5859
[tool.poetry.group.dev.dependencies]
5960
fakeredis = "^2.0.0"

src/grimoirelab/core/config/settings.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@
273273
# https://github.com/rq/django-rq
274274
#
275275

276-
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
276+
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'eventizer_jobs')
277+
GRIMOIRELAB_Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs')
277278

278279
_RQ_DATABASE = {
279280
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
@@ -284,6 +285,7 @@
284285

285286
RQ_QUEUES = {
286287
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
288+
GRIMOIRELAB_Q_ARCHIVIST_JOBS: _RQ_DATABASE,
287289
}
288290

289291
GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
@@ -309,3 +311,15 @@
309311
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))
310312

311313
GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
314+
315+
#
316+
# Archivist configuration
317+
#
318+
GRIMOIRELAB_ARCHIVIST = {
319+
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
320+
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
321+
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
322+
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
323+
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
324+
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
325+
}

src/grimoirelab/core/runner/commands/run.py

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import annotations
2020

2121
import os
22+
import sys
2223
import typing
2324

2425
import click
@@ -43,9 +44,13 @@ def run(ctx: Context):
4344
is_flag=True,
4445
default=False,
4546
help="Run the service in developer mode.")
47+
@click.option("--clear-tasks",
48+
is_flag=True,
49+
default=False,
50+
help="Clear background tasks.")
4651
@run.command()
4752
@click.pass_context
48-
def server(ctx: Context, devel: bool):
53+
def server(ctx: Context, devel: bool, clear_tasks: bool):
4954
"""Start the GrimoireLab core server.
5055
5156
GrimoireLab server allows to schedule tasks and fetch data from
@@ -56,6 +61,8 @@ def server(ctx: Context, devel: bool):
5661
should be run with a reverse proxy. If you activate the '--dev' flag,
5762
a HTTP server will be run instead.
5863
"""
64+
create_background_tasks(clear_tasks)
65+
5966
env = os.environ
6067

6168
env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
@@ -91,24 +98,89 @@ def server(ctx: Context, devel: bool):
9198

9299

93100
@run.command()
101+
@click.argument('task-types', nargs=-1)
94102
@click.option('--workers',
95103
default=10,
96104
show_default=True,
97105
help="Number of workers to run in the pool.")
98-
def eventizers(workers: int):
99-
"""Start a pool of eventizer workers.
106+
def worker_pool(task_types: str, workers: int):
107+
"""Start a pool of workers that run specific tasks.
100108
101-
The workers on the pool will run tasks to fetch data from software
102-
development repositories. Data will be processed in form of events,
103-
and published in the events queue.
109+
If multiple tasks share the same queue, they will run in the same
110+
pool of workers. The tasks to run are defined as arguments to the
111+
command.
104112
105113
The number of workers running in the pool can be defined with the
106114
parameter '--workers'.
107-
108-
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
109-
in the configuration file.
110115
"""
116+
from grimoirelab.core.scheduler.models import (get_registered_task_model,
117+
get_all_registered_task_names)
118+
119+
available_tasks = get_all_registered_task_names()
120+
121+
queues = []
122+
for task in task_types:
123+
try:
124+
Task = get_registered_task_model(task)[0]
125+
except KeyError:
126+
click.echo(f"Task '{task}' is not a valid task. "
127+
f"Options: {available_tasks}", err=True)
128+
sys.exit(1)
129+
queues.append(Task().default_job_queue)
130+
131+
if not queues:
132+
click.echo(f"You must define at least one valid task. "
133+
f"Options: {available_tasks}", err=True)
134+
sys.exit(1)
135+
111136
django.core.management.call_command(
112-
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
137+
'rqworker-pool', queues,
113138
num_workers=workers
114139
)
140+
141+
142+
def create_background_tasks(clear_tasks: bool):
143+
"""
144+
Create background tasks before starting the server.
145+
:param clear_tasks: clear tasks before creating new ones.
146+
:return:
147+
"""
148+
from grimoirelab.core.scheduler.scheduler import schedule_task
149+
from grimoirelab.core.scheduler.tasks.models import StorageTask
150+
151+
workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
152+
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
153+
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
154+
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
155+
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
156+
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']
157+
158+
if clear_tasks:
159+
StorageTask.objects.all().delete()
160+
click.echo("Removing old background tasks.")
161+
162+
current = StorageTask.objects.filter(burst=False).count()
163+
if workers == current:
164+
click.echo("Background tasks already created. Skipping.")
165+
return
166+
167+
task_args = {
168+
'storage_url': storage_url,
169+
'storage_db_name': storage_db_name,
170+
'storage_verify_certs': verify_certs,
171+
'redis_group': 'archivist',
172+
'limit': events_per_job
173+
}
174+
if workers > current:
175+
for _ in range(workers - current):
176+
schedule_task(
177+
task_type=StorageTask.TASK_TYPE,
178+
storage_type=storage_type,
179+
task_args=task_args,
180+
job_interval=1,
181+
job_max_retries=10
182+
)
183+
click.echo(f"Created {workers} background tasks.")
184+
elif workers < current:
185+
tasks = StorageTask.objects.all()[workers:]
186+
tasks.update(burst=True)

src/grimoirelab/core/scheduler/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
357357
job classes.
358358
"""
359359
return iter(GRIMOIRELAB_TASK_MODELS.values())
360+
361+
362+
def get_all_registered_task_names() -> list[str]:
363+
"""Return all registered task names.
364+
365+
:returns: a list with all registered task names.
366+
"""
367+
return list(GRIMOIRELAB_TASK_MODELS.keys())

src/grimoirelab/core/scheduler/scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def cancel_task(task_uuid: str) -> None:
100100

101101
jobs = job_class.objects.filter(task=task).all()
102102
for job in jobs:
103-
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection())
103+
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection(task.default_job_queue))
104104
job_rq.delete()
105105

106106
task.delete()
@@ -125,7 +125,7 @@ def maintain_tasks() -> None:
125125
job_db = task.jobs.order_by('scheduled_at').first()
126126

127127
try:
128-
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
128+
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection(task.default_job_queue))
129129
continue
130130
except rq.exceptions.NoSuchJobError:
131131
logger.debug(

0 commit comments

Comments
 (0)