Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/grimoirelab/core/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import json
import logging
import time
import typing

from collections import namedtuple
Expand All @@ -37,6 +38,9 @@
RECOVER_IDLE_TIME = 300000 # 5 minutes (in ms)
STREAM_BLOCK_TIMEOUT = 60000 # 1 minute (in ms)

EXPONENTIAL_BACKOFF_FACTOR = 2
MAX_CONNECTION_WAIT_TIME = 60


Entry = namedtuple("Entry", ["message_id", "event"])

Expand Down Expand Up @@ -87,12 +91,23 @@ def start(self, burst: bool = False):

self._create_consumer_group()

connection_wait_time = 1
while True:
recovered_entries = self.recover_stream_entries()
self.process_entries(recovered_entries, recovery=True)

new_entries = self.fetch_new_entries()
self.process_entries(new_entries)
try:
recovered_entries = self.recover_stream_entries()
self.process_entries(recovered_entries, recovery=True)

new_entries = self.fetch_new_entries()
self.process_entries(new_entries)
except redis.exceptions.ConnectionError as conn_err:
self.logger.error(
f"Could not connect to Redis instance: {conn_err} Retrying in {connection_wait_time} seconds..."
)
time.sleep(connection_wait_time)
connection_wait_time *= EXPONENTIAL_BACKOFF_FACTOR
connection_wait_time = min(connection_wait_time, MAX_CONNECTION_WAIT_TIME)
else:
connection_wait_time = 1

if burst or self._stop_event.is_set():
break
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/consumers/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import multiprocessing
import time

from unittest.mock import MagicMock, patch

import redis

from grimoirelab.core.consumers.consumer import Consumer, Entry

from ..base import GrimoireLabTestCase
Expand Down Expand Up @@ -231,3 +235,54 @@ def test_different_consumer_groups(self):
for entry, expected_entry in zip(consumer_2.entries, expected_entries):
self.assertEqual(entry.message_id.decode(), expected_entry.message_id)
self.assertDictEqual(entry.event, expected_entry.event)

def test_consumer_exponential_backoff(self):
"""Test whether the consumer implements exponential backoff on Redis connection errors"""

# Create a mock consumer instance
consumer = SampleConsumer(
connection=self.conn,
stream_name="test_stream",
consumer_group="test_group",
consumer_name="test_consumer",
stream_block_timeout=1000,
logging_level="DEBUG",
)

# Setup the mock for redis client to raise ConnectionError
consumer.recover_stream_entries = MagicMock(
side_effect=[
redis.exceptions.ConnectionError("fail 1"),
redis.exceptions.ConnectionError("fail 2"),
redis.exceptions.ConnectionError("fail 3"),
[],
]
) # Succeeds on 4th call

consumer.fetch_new_entries = MagicMock(return_value=[])
consumer.process_entries = MagicMock()
consumer.logger = MagicMock()

# Mock time.sleep to avoid real waiting
with patch("time.sleep") as sleep_mock:
try:
consumer.start(burst=False)
except StopIteration:
# the recover_stream_entries mock will eventually fail
pass

# Check that sleep was called with exponential backoff times
sleep_mock.assert_any_call(1)
sleep_mock.assert_any_call(2)
sleep_mock.assert_any_call(4)

# Check that the logger recorded the connection errors
consumer.logger.error.assert_any_call(
"Could not connect to Redis instance: fail 1 Retrying in 1 seconds..."
)
consumer.logger.error.assert_any_call(
"Could not connect to Redis instance: fail 2 Retrying in 2 seconds..."
)
consumer.logger.error.assert_any_call(
"Could not connect to Redis instance: fail 3 Retrying in 4 seconds..."
)