Skip to content

Commit d9c3e88

Browse files
committed
Store events in a Redis stream instead of a list
This commit updates the method of storing events in Redis for later consumption. Events are now added to a Redis Stream, allowing different types of consumers to process the items for various purposes. The trade-off is that the stream has a fixed length, causing older items to be deleted when new items are added. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent d6d2117 commit d9c3e88

File tree

6 files changed

+39
-202
lines changed

6 files changed

+39
-202
lines changed

devel/grimoirelab-dev.py

Lines changed: 0 additions & 180 deletions
This file was deleted.

src/grimoirelab/core/config/settings.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,7 @@
273273
# https://github.com/rq/django-rq
274274
#
275275

276-
Q_PERCEVAL_JOBS = os.environ.get('GRIMOIRELAB_Q_PERCEVAL_JOBS', 'default')
277-
Q_STORAGE_ITEMS = os.environ.get('GRIMOIRELAB_Q_STORAGE_ITEMS', 'items')
278-
Q_EVENTS = os.environ.get('GRIMOIRELAB_Q_EVENTS', 'events')
276+
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
279277

280278
_RQ_DATABASE = {
281279
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
@@ -285,11 +283,18 @@
285283
}
286284

287285
RQ_QUEUES = {
288-
Q_PERCEVAL_JOBS: _RQ_DATABASE,
289-
Q_STORAGE_ITEMS: _RQ_DATABASE,
290-
Q_EVENTS: _RQ_DATABASE,
286+
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
291287
}
292288

289+
GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
290+
'events')
291+
292+
# Maximum events in Redis stream before dropping. Consumers must process events
293+
# faster than production to avoid loss. Default max size is 1M events (~2.5GB Git events).
294+
# Adjust for memory constraints.
295+
GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH',
296+
1 * 10 ** 6))
297+
293298
RQ = {
294299
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
295300
'WORKER_CLASS': 'grimoirelab.core.scheduler.worker.GrimoireLabWorker',
@@ -304,4 +309,4 @@
304309
GRIMOIRELAB_JOB_RESULT_TTL = int(os.environ.get('GRIMOIRELAB_JOB_RESULT_TTL', 300))
305310
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))
306311

307-
GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')
312+
GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')

src/grimoirelab/core/runner/commands/run.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727

2828
from django.conf import settings
2929

30-
from grimoirelab.core.scheduler.scheduler import maintain_tasks
31-
3230
if typing.TYPE_CHECKING:
3331
from click import Context
3432

@@ -83,6 +81,8 @@ def server(ctx: Context, devel: bool):
8381
env["UWSGI_SINGLE_INTERPRETER"] = "true"
8482

8583
# Run maintenance tasks
84+
from grimoirelab.core.scheduler.scheduler import maintain_tasks
85+
8686
_ = django.core.wsgi.get_wsgi_application()
8787
maintain_tasks()
8888

@@ -105,10 +105,10 @@ def eventizers(workers: int):
105105
The number of workers running in the pool can be defined with the
106106
parameter '--workers'.
107107
108-
Workers get jobs from the Q_PERCEVAL_JOBS queue defined in the
109-
configuration file.
108+
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
109+
in the configuration file.
110110
"""
111111
django.core.management.call_command(
112-
'rqworker-pool', settings.Q_PERCEVAL_JOBS,
112+
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
113113
num_workers=workers
114114
)

src/grimoirelab/core/scheduler/tasks/chronicler.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
def chronicler_job(
4444
datasource_type: str,
4545
datasource_category: str,
46-
events_queue: str,
46+
events_stream: str,
47+
stream_max_length: int,
4748
job_args: dict[str, Any] = None
4849
) -> ChroniclerProgress:
4950
"""Fetch and eventize data.
@@ -61,6 +62,8 @@ def chronicler_job(
6162
(e.g., 'git', 'github')
6263
:param datasource_category: category of the datasource
6364
(e.g., 'pull_request', 'issue')
65+
:param events_stream: Redis queue where the events will be published
66+
:param stream_max_length: maximum length of the stream
6467
:param job_args: extra arguments to pass to the job
6568
(e.g., 'url', 'owner', 'repository')
6669
"""
@@ -95,7 +98,11 @@ def chronicler_job(
9598
perceval_gen.items)
9699
for event in events:
97100
data = cloudevents.conversion.to_json(event)
98-
rq_job.connection.rpush(events_queue, data)
101+
message = {
102+
'data': data
103+
}
104+
rq_job.connection.xadd(events_stream, message,
105+
maxlen=stream_max_length)
99106
finally:
100107
progress.summary = perceval_gen.summary
101108

@@ -254,7 +261,7 @@ def initial_args(task_args: dict[str, Any]) -> dict[str, Any]:
254261
from django.conf import settings
255262

256263
# For the first run, make some arguments mandatory
257-
base_path = os.path.expanduser(settings.GIT_STORAGE_PATH)
264+
base_path = os.path.expanduser(settings.GRIMOIRELAB_GIT_STORAGE_PATH)
258265
uri = task_args['uri']
259266
processed_uri = uri.lstrip('/')
260267
git_path = os.path.join(base_path, processed_uri) + '-git'

src/grimoirelab/core/scheduler/tasks/models.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import typing
2222

23+
from django.conf import settings
2324
from django.db.models import CharField
2425

2526
from ...scheduler.models import (
@@ -117,7 +118,8 @@ def prepare_job_parameters(self):
117118
task_args = {
118119
'datasource_type': self.datasource_type,
119120
'datasource_category': self.datasource_category,
120-
'events_queue': 'events'
121+
'events_stream': settings.GRIMOIRELAB_EVENTS_STREAM_NAME,
122+
'stream_max_length': settings.GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH,
121123
}
122124

123125
args_gen = get_chronicler_argument_generator(self.datasource_type)
@@ -146,7 +148,7 @@ def can_be_retried(self):
146148

147149
@property
148150
def default_job_queue(self):
149-
return 'default'
151+
return settings.GRIMOIRELAB_Q_EVENTIZER_JOBS
150152

151153
@staticmethod
152154
def job_function(*args, **kwargs):

tests/scheduler/test_task_eventizer.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ def test_job(self):
5353
job_args = {
5454
'datasource_type': 'git',
5555
'datasource_category': 'commit',
56-
'events_queue': 'events',
56+
'events_stream': 'events',
57+
'stream_max_length': 500,
5758
'job_args': {
5859
'uri': 'http://example.com/',
5960
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
@@ -91,8 +92,8 @@ def test_job(self):
9192
'ce8e0b86a1e9877f42fe9453ede418519115f367')
9293

9394
# Check generated events
94-
events = self.conn.lrange('events', 0, -1)
95-
events = [json.loads(e) for e in events]
95+
events = self.conn.xread({'events': b'0-0'}, count=None, block=0)
96+
events = [json.loads(e[1][b'data']) for e in events[0][1]]
9697

9798
expected = [
9899
('2d85a883e0ef63ebf7fa40e372aed44834092592', 'org.grimoirelab.events.git.merge'),
@@ -130,7 +131,8 @@ def test_job_no_result(self):
130131
job_args = {
131132
'datasource_type': 'git',
132133
'datasource_category': 'commit',
133-
'events_queue': 'events',
134+
'events_stream': 'events',
135+
'stream_max_length': 500,
134136
'job_args': {
135137
'uri': 'http://example.com/',
136138
'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt')
@@ -172,7 +174,8 @@ def test_backend_not_found(self):
172174
job_args = {
173175
'datasource_type': 'nobackend',
174176
'datasource_category': 'unknown',
175-
'events_queue': 'events',
177+
'events_stream': 'events',
178+
'stream_max_length': 500,
176179
'job_args': {
177180
'uri': 'http://example.com/',
178181
'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt')

0 commit comments

Comments
 (0)