Skip to content

Commit c4ef818

Browse files
authored
Merge branch 'redis-down-wait' of 'https://github.com/jjmerchante/grimoirelab-core'
Merges #110 Closes #110 Related chaoss/grimoirelab#740
2 parents cfee9d6 + 78224cc commit c4ef818

File tree

2 files changed

+75
-5
lines changed

2 files changed

+75
-5
lines changed

src/grimoirelab/core/consumers/consumer.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import json
2222
import logging
23+
import time
2324
import typing
2425

2526
from collections import namedtuple
@@ -37,6 +38,9 @@
3738
RECOVER_IDLE_TIME = 300000 # 5 minutes (in ms)
3839
STREAM_BLOCK_TIMEOUT = 60000 # 1 minute (in ms)
3940

41+
EXPONENTIAL_BACKOFF_FACTOR = 2
42+
MAX_CONNECTION_WAIT_TIME = 60
43+
4044

4145
Entry = namedtuple("Entry", ["message_id", "event"])
4246

@@ -87,12 +91,23 @@ def start(self, burst: bool = False):
8791

8892
self._create_consumer_group()
8993

94+
connection_wait_time = 1
9095
while True:
91-
recovered_entries = self.recover_stream_entries()
92-
self.process_entries(recovered_entries, recovery=True)
93-
94-
new_entries = self.fetch_new_entries()
95-
self.process_entries(new_entries)
96+
try:
97+
recovered_entries = self.recover_stream_entries()
98+
self.process_entries(recovered_entries, recovery=True)
99+
100+
new_entries = self.fetch_new_entries()
101+
self.process_entries(new_entries)
102+
except redis.exceptions.ConnectionError as conn_err:
103+
self.logger.error(
104+
f"Could not connect to Redis instance: {conn_err} Retrying in {connection_wait_time} seconds..."
105+
)
106+
time.sleep(connection_wait_time)
107+
connection_wait_time *= EXPONENTIAL_BACKOFF_FACTOR
108+
connection_wait_time = min(connection_wait_time, MAX_CONNECTION_WAIT_TIME)
109+
else:
110+
connection_wait_time = 1
96111

97112
if burst or self._stop_event.is_set():
98113
break

tests/unit/consumers/test_consumer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import multiprocessing
2121
import time
2222

23+
from unittest.mock import MagicMock, patch
24+
25+
import redis
26+
2327
from grimoirelab.core.consumers.consumer import Consumer, Entry
2428

2529
from ..base import GrimoireLabTestCase
@@ -231,3 +235,54 @@ def test_different_consumer_groups(self):
231235
for entry, expected_entry in zip(consumer_2.entries, expected_entries):
232236
self.assertEqual(entry.message_id.decode(), expected_entry.message_id)
233237
self.assertDictEqual(entry.event, expected_entry.event)
238+
239+
def test_consumer_exponential_backoff(self):
240+
"""Test whether the consumer implements exponential backoff on Redis connection errors"""
241+
242+
# Create a mock consumer instance
243+
consumer = SampleConsumer(
244+
connection=self.conn,
245+
stream_name="test_stream",
246+
consumer_group="test_group",
247+
consumer_name="test_consumer",
248+
stream_block_timeout=1000,
249+
logging_level="DEBUG",
250+
)
251+
252+
# Setup the mock for redis client to raise ConnectionError
253+
consumer.recover_stream_entries = MagicMock(
254+
side_effect=[
255+
redis.exceptions.ConnectionError("fail 1"),
256+
redis.exceptions.ConnectionError("fail 2"),
257+
redis.exceptions.ConnectionError("fail 3"),
258+
[],
259+
]
260+
) # Succeeds on 4th call
261+
262+
consumer.fetch_new_entries = MagicMock(return_value=[])
263+
consumer.process_entries = MagicMock()
264+
consumer.logger = MagicMock()
265+
266+
# Mock time.sleep to avoid real waiting
267+
with patch("time.sleep") as sleep_mock:
268+
try:
269+
consumer.start(burst=False)
270+
except StopIteration:
271+
# the recover_stream_entries mock will eventually fail
272+
pass
273+
274+
# Check that sleep was called with exponential backoff times
275+
sleep_mock.assert_any_call(1)
276+
sleep_mock.assert_any_call(2)
277+
sleep_mock.assert_any_call(4)
278+
279+
# Check that the logger recorded the connection errors
280+
consumer.logger.error.assert_any_call(
281+
"Could not connect to Redis instance: fail 1 Retrying in 1 seconds..."
282+
)
283+
consumer.logger.error.assert_any_call(
284+
"Could not connect to Redis instance: fail 2 Retrying in 2 seconds..."
285+
)
286+
consumer.logger.error.assert_any_call(
287+
"Could not connect to Redis instance: fail 3 Retrying in 4 seconds..."
288+
)

0 commit comments

Comments
 (0)