Skip to content

Commit 56f34dd

Browse files
committed
Consumer Redis connection backoff
Implement exponential backoff in consumers for Redis connection failures. This allows consumers to retry connecting to Redis with increasing delays when the connection fails. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent 5ad697f commit 56f34dd

File tree

2 files changed

+75
-5
lines changed

2 files changed

+75
-5
lines changed

src/grimoirelab/core/consumers/consumer.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import json
2222
import logging
23+
import time
2324
import typing
2425

2526
from collections import namedtuple
@@ -37,6 +38,9 @@
3738
RECOVER_IDLE_TIME = 300000 # 5 minutes (in ms)
3839
STREAM_BLOCK_TIMEOUT = 60000 # 1 minute (in ms)
3940

41+
EXPONENTIAL_BACKOFF_FACTOR = 2
42+
MAX_CONNECTION_WAIT_TIME = 60
43+
4044

4145
Entry = namedtuple("Entry", ["message_id", "event"])
4246

@@ -87,12 +91,23 @@ def start(self, burst: bool = False):
8791

8892
self._create_consumer_group()
8993

94+
connection_wait_time = 1
9095
while True:
91-
recovered_entries = self.recover_stream_entries()
92-
self.process_entries(recovered_entries, recovery=True)
93-
94-
new_entries = self.fetch_new_entries()
95-
self.process_entries(new_entries)
96+
try:
97+
recovered_entries = self.recover_stream_entries()
98+
self.process_entries(recovered_entries, recovery=True)
99+
100+
new_entries = self.fetch_new_entries()
101+
self.process_entries(new_entries)
102+
except redis.exceptions.ConnectionError as conn_err:
103+
self.logger.error(
104+
f"Could not connect to Redis instance: {conn_err} Retrying in {connection_wait_time} seconds..."
105+
)
106+
time.sleep(connection_wait_time)
107+
connection_wait_time *= EXPONENTIAL_BACKOFF_FACTOR
108+
connection_wait_time = min(connection_wait_time, MAX_CONNECTION_WAIT_TIME)
109+
else:
110+
connection_wait_time = 1
96111

97112
if burst or self._stop_event.is_set():
98113
break

tests/unit/consumers/test_consumer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import multiprocessing
2121
import time
2222

23+
from unittest.mock import MagicMock, patch
24+
25+
import redis
26+
2327
from grimoirelab.core.consumers.consumer import Consumer, Entry
2428

2529
from ..base import GrimoireLabTestCase
@@ -231,3 +235,54 @@ def test_different_consumer_groups(self):
231235
for entry, expected_entry in zip(consumer_2.entries, expected_entries):
232236
self.assertEqual(entry.message_id.decode(), expected_entry.message_id)
233237
self.assertDictEqual(entry.event, expected_entry.event)
238+
239+
def test_consumer_exponential_backoff(self):
240+
"""Test whether the consumer implements exponential backoff on Redis connection errors"""
241+
242+
# Create a mock consumer instance
243+
consumer = SampleConsumer(
244+
connection=self.conn,
245+
stream_name="test_stream",
246+
consumer_group="test_group",
247+
consumer_name="test_consumer",
248+
stream_block_timeout=1000,
249+
logging_level="DEBUG",
250+
)
251+
252+
# Setup the mock for redis client to raise ConnectionError
253+
consumer.recover_stream_entries = MagicMock(
254+
side_effect=[
255+
redis.exceptions.ConnectionError("fail 1"),
256+
redis.exceptions.ConnectionError("fail 2"),
257+
redis.exceptions.ConnectionError("fail 3"),
258+
[],
259+
]
260+
) # Succeeds on 4th call
261+
262+
consumer.fetch_new_entries = MagicMock(return_value=[])
263+
consumer.process_entries = MagicMock()
264+
consumer.logger = MagicMock()
265+
266+
# Mock time.sleep to avoid real waiting
267+
with patch("time.sleep") as sleep_mock:
268+
try:
269+
consumer.start(burst=False)
270+
except StopIteration:
271+
# the recover_stream_entries mock will eventually fail
272+
pass
273+
274+
# Check that sleep was called with exponential backoff times
275+
sleep_mock.assert_any_call(1)
276+
sleep_mock.assert_any_call(2)
277+
sleep_mock.assert_any_call(4)
278+
279+
# Check that the logger recorded the connection errors
280+
consumer.logger.error.assert_any_call(
281+
"Could not connect to Redis instance: fail 1 Retrying in 1 seconds..."
282+
)
283+
consumer.logger.error.assert_any_call(
284+
"Could not connect to Redis instance: fail 2 Retrying in 2 seconds..."
285+
)
286+
consumer.logger.error.assert_any_call(
287+
"Could not connect to Redis instance: fail 3 Retrying in 4 seconds..."
288+
)

0 commit comments

Comments
 (0)