Skip to content

Commit 439efdd

Browse files
committed
Archive Events in OpenSearch
This commit implements archiving of events in an OpenSearch backend. It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance. Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent 2e6fd19 commit 439efdd

File tree

10 files changed

+936
-19
lines changed

10 files changed

+936
-19
lines changed

config/settings/testing.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __call__(self, _, strict):
6767
return self.conn
6868

6969

70+
RQ_QUEUES['default'] = _RQ_DATABASE # noqa: F405
7071
RQ_QUEUES['testing'] = _RQ_DATABASE # noqa: F405
7172
RQ['WORKER_CLASS'] = rq.worker.SimpleWorker
7273

poetry.lock

Lines changed: 37 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
5454
grimoirelab-chronicler = {git = "https://github.com/chaoss/grimoirelab-chronicler.git", allow-prereleases = true}
5555
django-cors-headers = "^4.6.0"
5656
djangorestframework = "^3.15.2"
57+
opensearch-py = "^2.8.0"
5758

5859
[tool.poetry.group.dev.dependencies]
5960
fakeredis = "^2.0.0"

src/grimoirelab/core/config/settings.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@
273273
# https://github.com/rq/django-rq
274274
#
275275

276-
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
276+
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'eventizer_jobs')
277+
GRIMOIRELAB_Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs')
277278

278279
_RQ_DATABASE = {
279280
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
@@ -284,15 +285,11 @@
284285

285286
RQ_QUEUES = {
286287
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
288+
GRIMOIRELAB_Q_ARCHIVIST_JOBS: _RQ_DATABASE,
287289
}
288290

289-
GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
290-
'events')
291-
# Maximum events in Redis stream before dropping. Consumers must process events
292-
# faster than production to avoid loss. Default max size is 1M events (~2.5GB Git events).
293-
# Adjust for memory constraints.
294-
GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH',
295-
1 * 10 ** 6))
291+
GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME', 'events')
292+
GRIMOIRELAB_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_STREAM_MAX_LENGTH', 2 * 10 ** 6))
296293

297294
RQ = {
298295
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
@@ -309,3 +306,15 @@
309306
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))
310307

311308
GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
309+
310+
#
311+
# Archivist configuration
312+
#
313+
GRIMOIRELAB_ARCHIVIST = {
314+
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
315+
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
316+
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
317+
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
318+
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
319+
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
320+
}

src/grimoirelab/core/runner/commands/run.py

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import annotations
2020

2121
import os
22+
import sys
2223
import typing
2324

2425
import click
@@ -43,9 +44,13 @@ def run(ctx: Context):
4344
is_flag=True,
4445
default=False,
4546
help="Run the service in developer mode.")
47+
@click.option("--clear-tasks",
48+
is_flag=True,
49+
default=False,
50+
help="Clear background tasks.")
4651
@run.command()
4752
@click.pass_context
48-
def server(ctx: Context, devel: bool):
53+
def server(ctx: Context, devel: bool, clear_tasks: bool):
4954
"""Start the GrimoireLab core server.
5055
5156
GrimoireLab server allows to schedule tasks and fetch data from
@@ -56,6 +61,8 @@ def server(ctx: Context, devel: bool):
5661
should be run with a reverse proxy. If you activate the '--dev' flag,
5762
a HTTP server will be run instead.
5863
"""
64+
create_background_tasks(clear_tasks)
65+
5966
env = os.environ
6067

6168
env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
@@ -91,24 +98,89 @@ def server(ctx: Context, devel: bool):
9198

9299

93100
@run.command()
101+
@click.argument('task-types', nargs=-1)
94102
@click.option('--workers',
95103
default=10,
96104
show_default=True,
97105
help="Number of workers to run in the pool.")
98-
def eventizers(workers: int):
99-
"""Start a pool of eventizer workers.
106+
def worker_pool(task_types: str, workers: int):
107+
"""Start a pool of workers that run specific tasks.
100108
101-
The workers on the pool will run tasks to fetch data from software
102-
development repositories. Data will be processed in form of events,
103-
and published in the events queue.
109+
If multiple tasks share the same queue, they will run in the same
110+
pool of workers. The tasks to run are defined as arguments to the
111+
command.
104112
105113
The number of workers running in the pool can be defined with the
106114
parameter '--workers'.
107-
108-
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
109-
in the configuration file.
110115
"""
116+
from grimoirelab.core.scheduler.models import (get_registered_task_model,
117+
get_all_registered_task_names)
118+
119+
available_tasks = get_all_registered_task_names()
120+
121+
queues = []
122+
for task in task_types:
123+
try:
124+
Task = get_registered_task_model(task)[0]
125+
except KeyError:
126+
click.echo(f"Task '{task}' is not a valid task. "
127+
f"Options: {available_tasks}", err=True)
128+
sys.exit(1)
129+
queues.append(Task().default_job_queue)
130+
131+
if not queues:
132+
click.echo(f"You must define at least one valid task. "
133+
f"Options: {available_tasks}", err=True)
134+
sys.exit(1)
135+
111136
django.core.management.call_command(
112-
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
137+
'rqworker-pool', queues,
113138
num_workers=workers
114139
)
140+
141+
142+
def create_background_tasks(clear_tasks: bool):
143+
"""
144+
Create background tasks before starting the server.
145+
:param clear_tasks: clear tasks before creating new ones.
146+
:return:
147+
"""
148+
from grimoirelab.core.scheduler.scheduler import schedule_task
149+
from grimoirelab.core.scheduler.tasks.models import StorageTask
150+
151+
workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
152+
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
153+
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
154+
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
155+
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
156+
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']
157+
158+
if clear_tasks:
159+
StorageTask.objects.all().delete()
160+
click.echo("Removing old background tasks.")
161+
162+
current = StorageTask.objects.filter(burst=False).count()
163+
if workers == current:
164+
click.echo("Background tasks already created. Skipping.")
165+
return
166+
167+
task_args = {
168+
'storage_url': storage_url,
169+
'storage_db_name': storage_db_name,
170+
'storage_verify_certs': verify_certs,
171+
'redis_group': 'archivist',
172+
'limit': events_per_job
173+
}
174+
if workers > current:
175+
for _ in range(workers - current):
176+
schedule_task(
177+
task_type=StorageTask.TASK_TYPE,
178+
storage_type=storage_type,
179+
task_args=task_args,
180+
job_interval=1,
181+
job_max_retries=10
182+
)
183+
click.echo(f"Created {workers} background tasks.")
184+
elif workers < current:
185+
tasks = StorageTask.objects.all()[workers:]
186+
tasks.update(burst=True)

src/grimoirelab/core/scheduler/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
357357
job classes.
358358
"""
359359
return iter(GRIMOIRELAB_TASK_MODELS.values())
360+
361+
362+
def get_all_registered_task_names() -> list[str]:
363+
"""Return all registered task names.
364+
365+
:returns: a list with all registered task names.
366+
"""
367+
return list(GRIMOIRELAB_TASK_MODELS.keys())

0 commit comments

Comments
 (0)