Skip to content

Commit b1f500d

Browse files
committed
Archive Events in OpenSearch
This commit implements archiving of events in an OpenSearch backend. It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance. Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent 2e6fd19 commit b1f500d

File tree

13 files changed

+983
-7
lines changed

13 files changed

+983
-7
lines changed

config/settings/testing.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __call__(self, _, strict):
6767
return self.conn
6868

6969

70+
RQ_QUEUES['default'] = _RQ_DATABASE # noqa: F405
7071
RQ_QUEUES['testing'] = _RQ_DATABASE # noqa: F405
7172
RQ['WORKER_CLASS'] = rq.worker.SimpleWorker
7273

poetry.lock

Lines changed: 37 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
5454
grimoirelab-chronicler = {git = "https://github.com/chaoss/grimoirelab-chronicler.git", allow-prereleases = true}
5555
django-cors-headers = "^4.6.0"
5656
djangorestframework = "^3.15.2"
57+
opensearch-py = "^2.8.0"
5758

5859
[tool.poetry.group.dev.dependencies]
5960
fakeredis = "^2.0.0"

src/grimoirelab/core/config/settings.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@
273273
# https://github.com/rq/django-rq
274274
#
275275

276-
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
276+
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'eventizer_jobs')
277+
GRIMOIRELAB_Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs')
277278

278279
_RQ_DATABASE = {
279280
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
@@ -284,6 +285,7 @@
284285

285286
RQ_QUEUES = {
286287
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
288+
GRIMOIRELAB_Q_ARCHIVIST_JOBS: _RQ_DATABASE,
287289
}
288290

289291
GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
@@ -309,3 +311,15 @@
309311
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))
310312

311313
GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
314+
315+
#
316+
# Archivist configuration
317+
#
318+
GRIMOIRELAB_ARCHIVIST = {
319+
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
320+
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
321+
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
322+
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
323+
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
324+
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
325+
}

src/grimoirelab/core/runner/commands/run.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,13 @@ def run(ctx: Context):
4343
is_flag=True,
4444
default=False,
4545
help="Run the service in developer mode.")
46+
@click.option("--clear-tasks",
47+
is_flag=True,
48+
default=False,
49+
help="Clear background tasks.")
4650
@run.command()
4751
@click.pass_context
48-
def server(ctx: Context, devel: bool):
52+
def server(ctx: Context, devel: bool, clear_tasks: bool):
4953
"""Start the GrimoireLab core server.
5054
5155
GrimoireLab server allows to schedule tasks and fetch data from
@@ -56,6 +60,8 @@ def server(ctx: Context, devel: bool):
5660
should be run with a reverse proxy. If you activate the '--dev' flag,
5761
a HTTP server will be run instead.
5862
"""
63+
create_background_tasks(clear_tasks)
64+
5965
env = os.environ
6066

6167
env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
@@ -112,3 +118,73 @@ def eventizers(workers: int):
112118
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
113119
num_workers=workers
114120
)
121+
122+
123+
@run.command()
124+
@click.option('--workers',
125+
default=30,
126+
show_default=True,
127+
help="Number of workers to run in the pool.")
128+
def archivists(workers: int):
129+
"""Start a pool of archivists workers.
130+
131+
The workers on the pool will run tasks to fetch events from redis.
132+
Data will be stored in the defined data source.
133+
134+
The number of workers running in the pool can be defined with the
135+
parameter '--workers'.
136+
137+
Workers get jobs from the GRIMOIRELAB_Q_ARCHIVIST_JOBS queue defined
138+
in the configuration file.
139+
"""
140+
django.core.management.call_command(
141+
'rqworker-pool', settings.GRIMOIRELAB_Q_ARCHIVIST_JOBS,
142+
num_workers=workers
143+
)
144+
145+
146+
def create_background_tasks(clear_tasks: bool):
147+
"""
148+
Create background tasks before starting the server.
149+
:param clear_tasks: clear tasks before creating new ones.
150+
:return:
151+
"""
152+
from grimoirelab.core.scheduler.scheduler import schedule_task
153+
from grimoirelab.core.scheduler.tasks.models import StorageTask
154+
155+
workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
156+
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
157+
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
158+
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
159+
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
160+
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']
161+
162+
if clear_tasks:
163+
StorageTask.objects.all().delete()
164+
click.echo("Removing old background tasks.")
165+
166+
current = StorageTask.objects.filter(burst=False).count()
167+
if workers == current:
168+
click.echo("Background tasks already created. Skipping.")
169+
return
170+
171+
task_args = {
172+
'storage_url': storage_url,
173+
'storage_db_name': storage_db_name,
174+
'storage_verify_certs': verify_certs,
175+
'redis_group': 'archivist',
176+
'limit': events_per_job
177+
}
178+
if workers > current:
179+
for _ in range(workers - current):
180+
schedule_task(
181+
task_type=StorageTask.TASK_TYPE,
182+
storage_type=storage_type,
183+
task_args=task_args,
184+
job_interval=1,
185+
job_max_retries=10
186+
)
187+
click.echo(f"Created {workers} background tasks.")
188+
elif workers < current:
189+
tasks = StorageTask.objects.all()[workers:]
190+
tasks.update(burst=True)

src/grimoirelab/core/scheduler/jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class GrimoireLabJob(rq.job.Job):
4646
"""
4747

4848
# Default packages to log
49-
PACKAGES_TO_LOG = [__name__, 'chronicler', 'perceval', 'rq']
49+
PACKAGES_TO_LOG = [__name__, 'chronicler', 'archivist', 'perceval', 'rq']
5050

5151
def __init__(self, *args, **kwargs) -> None:
5252
super().__init__(*args, **kwargs)

src/grimoirelab/core/scheduler/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
357357
job classes.
358358
"""
359359
return iter(GRIMOIRELAB_TASK_MODELS.values())
360+
361+
362+
def get_all_registered_task_names() -> list[str]:
363+
"""Return all registered task names.
364+
365+
:returns: a list with all registered task names.
366+
"""
367+
return list(GRIMOIRELAB_TASK_MODELS.keys())

src/grimoirelab/core/scheduler/scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def cancel_task(task_uuid: str) -> None:
100100

101101
jobs = job_class.objects.filter(task=task).all()
102102
for job in jobs:
103-
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection())
103+
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection(task.default_job_queue))
104104
job_rq.delete()
105105

106106
task.delete()
@@ -125,7 +125,7 @@ def maintain_tasks() -> None:
125125
job_db = task.jobs.order_by('scheduled_at').first()
126126

127127
try:
128-
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
128+
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection(task.default_job_queue))
129129
continue
130130
except rq.exceptions.NoSuchJobError:
131131
logger.debug(

0 commit comments

Comments
 (0)