Skip to content

Commit 7cb5cee

Browse files
committed
Store events in a Redis stream instead of a list
This commit updates the method of storing events in Redis for later consumption. Events are now added to a Redis Stream, allowing different types of consumers to process the items for various purposes. The trade-off is that the stream has a fixed length, causing older items to be deleted when new items are added. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent 34f4c36 commit 7cb5cee

File tree

10 files changed

+1020
-336
lines changed

10 files changed

+1020
-336
lines changed

poetry.lock

Lines changed: 359 additions & 316 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
5252
grimoirelab-chronicler = {git = "https://github.com/grimoirelab/grimoirelab-chronicler.git", allow-prereleases = true}
5353
django-cors-headers = "^4.6.0"
5454
djangorestframework = "^3.15.2"
55+
opensearch-py = "^2.8.0"
5556

5657
[tool.poetry.group.dev.dependencies]
5758
fakeredis = "^2.0.0"

src/grimoirelab/core/config/settings.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,7 @@
274274
#
275275

276276
Q_PERCEVAL_JOBS = os.environ.get('GRIMOIRELAB_Q_PERCEVAL_JOBS', 'default')
277-
Q_STORAGE_ITEMS = os.environ.get('GRIMOIRELAB_Q_STORAGE_ITEMS', 'items')
278-
Q_EVENTS = os.environ.get('GRIMOIRELAB_Q_EVENTS', 'events')
277+
Q_STORAGE_JOBS = os.environ.get('GRIMOIRELAB_Q_STORAGE_JOBS', 'storage_jobs')
279278

280279
_RQ_DATABASE = {
281280
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
@@ -286,10 +285,12 @@
286285

287286
RQ_QUEUES = {
288287
Q_PERCEVAL_JOBS: _RQ_DATABASE,
289-
Q_STORAGE_ITEMS: _RQ_DATABASE,
290-
Q_EVENTS: _RQ_DATABASE,
288+
Q_STORAGE_JOBS: _RQ_DATABASE,
291289
}
292290

291+
EVENTS_STREAM = os.environ.get('GRIMOIRELAB_EVENTS_STREAM', 'events')
292+
STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_STREAM_MAX_LENGTH', 2 * 10 ** 6))
293+
293294
RQ = {
294295
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
295296
'WORKER_CLASS': 'grimoirelab.core.scheduler.worker.GrimoireLabWorker',
@@ -305,3 +306,15 @@
305306
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))
306307

307308
GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
309+
310+
#
311+
# Archivist configuration
312+
#
313+
GRIMOIRELAB_ARCHIVIST = {
314+
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
315+
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
316+
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
317+
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
318+
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
319+
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
320+
}

src/grimoirelab/core/runner/commands/run.py

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import annotations
2020

2121
import os
22+
import sys
2223
import typing
2324

2425
import click
@@ -40,9 +41,13 @@ def run(ctx: Context):
4041
is_flag=True,
4142
default=False,
4243
help="Run the service in developer mode.")
44+
@click.option("--clear-tasks",
45+
is_flag=True,
46+
default=False,
47+
help="Clear background tasks.")
4348
@run.command()
4449
@click.pass_context
45-
def server(ctx: Context, devel: bool):
50+
def server(ctx: Context, devel: bool, clear_tasks: bool):
4651
"""Start the GrimoireLab core server.
4752
4853
GrimoireLab server allows to schedule tasks and fetch data from
@@ -53,6 +58,9 @@ def server(ctx: Context, devel: bool):
5358
should be run with a reverse proxy. If you activate the '--dev' flag,
5459
a HTTP server will be run instead.
5560
"""
61+
62+
create_background_tasks(clear_tasks)
63+
5664
env = os.environ
5765

5866
env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
@@ -84,26 +92,91 @@ def server(ctx: Context, devel: bool):
8492

8593

8694
@run.command()
95+
@click.argument('task-types', nargs=-1)
8796
@click.option('--workers',
8897
default=10,
8998
show_default=True,
9099
help="Number of workers to run in the pool.")
91-
def eventizers(workers: int):
92-
"""Start a pool of eventizer workers.
100+
def worker_pool(task_types: str, workers: int):
101+
"""Start a pool of workers that run specific tasks.
93102
94-
The workers on the pool will run tasks to fetch data from software
95-
development repositories. Data will be processed in form of events,
96-
and published in the events queue.
103+
If multiple tasks share the same queue, they will run in the same
104+
pool of workers. The tasks to run are defined as arguments to the
105+
command.
97106
98107
The number of workers running in the pool can be defined with the
99108
parameter '--workers'.
100-
101-
Workers get jobs from the Q_PERCEVAL_JOBS queue defined in the
102-
configuration file.
103109
"""
104-
from django.conf import settings
110+
from grimoirelab.core.scheduler.models import (get_registered_task_model,
111+
get_all_registered_task_names)
112+
113+
available_tasks = get_all_registered_task_names()
114+
115+
queues = []
116+
for task in task_types:
117+
try:
118+
Task = get_registered_task_model(task)[0]
119+
except KeyError:
120+
click.echo(f"Task '{task}' is not a valid task. "
121+
f"Options: {available_tasks}", err=True)
122+
sys.exit(1)
123+
queues.append(Task().default_job_queue)
124+
125+
if not queues:
126+
click.echo(f"You must define at least one valid task. "
127+
f"Options: {available_tasks}", err=True)
128+
sys.exit(1)
105129

106130
django.core.management.call_command(
107-
'rqworker-pool', settings.Q_PERCEVAL_JOBS,
131+
'rqworker-pool', queues,
108132
num_workers=workers
109133
)
134+
135+
136+
def create_background_tasks(clear_tasks: bool):
137+
"""
138+
Create background tasks before starting the server.
139+
:param clear_tasks: clear tasks before creating new ones.
140+
:return:
141+
"""
142+
from django.conf import settings
143+
144+
from grimoirelab.core.scheduler.scheduler import schedule_task
145+
from grimoirelab.core.scheduler.tasks.models import StorageTask
146+
147+
workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
148+
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
149+
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
150+
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
151+
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
152+
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']
153+
154+
if clear_tasks:
155+
StorageTask.objects.all().delete()
156+
click.echo("Removing old background tasks.")
157+
158+
current = StorageTask.objects.filter(burst=False).count()
159+
if workers == current:
160+
click.echo("Tasks already created. Skipping.")
161+
return
162+
163+
task_args = {
164+
'storage_url': storage_url,
165+
'storage_db_name': storage_db_name,
166+
'storage_verify_certs': verify_certs,
167+
'redis_group': 'archivist',
168+
'limit': events_per_job
169+
}
170+
if workers > current:
171+
for _ in range(workers - current):
172+
schedule_task(
173+
task_type=StorageTask.TASK_TYPE,
174+
storage_type=storage_type,
175+
task_args=task_args,
176+
job_interval=1,
177+
job_max_retries=10
178+
)
179+
click.echo(f"Created {workers} background tasks.")
180+
elif workers < current:
181+
tasks = StorageTask.objects.all()[workers:]
182+
tasks.update(burst=True)

src/grimoirelab/core/scheduler/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
357357
job classes.
358358
"""
359359
return iter(GRIMOIRELAB_TASK_MODELS.values())
360+
361+
362+
def get_all_registered_task_names() -> list[str]:
363+
"""Return all registered task names.
364+
365+
:returns: an iterator with all registered task names.
366+
"""
367+
return list(GRIMOIRELAB_TASK_MODELS.keys())

0 commit comments

Comments
 (0)