Skip to content

Commit c9e415a

Browse files
committed
rptest: Fix timequery test
The test uses constant timestamp to generate records and query them. But the timestamp is too old so retention policy starts to remove data while the test is running. This commit makes the test use proper base timestamp. Signed-off-by: Evgeny Lazin <[email protected]>
1 parent 1032ce4 commit c9e415a

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

tests/rptest/tests/timequery_test.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ class BaseTimeQuery:
4848
test_context: dict
4949
logger: Logger
5050

51+
base_ts = int(time.time() - 600) * 1000
52+
5153
def _create_and_produce(
52-
self, cluster, cloud_storage, local_retention, base_ts, record_size, msg_count
54+
self, cluster, cloud_storage, local_retention, record_size, msg_count
5355
):
5456
topic = TopicSpec(name="tqtopic", partition_count=1, replication_factor=3)
5557
self.client().create_topic(topic)
@@ -95,20 +97,19 @@ def _create_and_produce(
9597
# but small enough that we are getting plenty of distinct batches.
9698
batch_max_bytes=record_size * 10,
9799
# A totally arbitrary artificial timestamp base in milliseconds
98-
fake_timestamp_ms=base_ts,
100+
fake_timestamp_ms=self.base_ts,
99101
)
100102
producer.start()
101103
producer.wait()
102104

103105
# We know timestamps, they are generated linearly from the
104106
# base we provided to kgo-verifier
105-
timestamps = dict((i, base_ts + i) for i in range(0, msg_count))
107+
timestamps = dict((i, self.base_ts + i) for i in range(0, msg_count))
106108
return topic, timestamps
107109

108110
def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
109111
total_segments = 12
110112
record_size = 1024
111-
base_ts = 1664453149000
112113
msg_count = (self.log_segment_size * total_segments) // record_size
113114
local_retention = self.log_segment_size * 4
114115
kcat = KafkaCat(cluster)
@@ -118,13 +119,13 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
118119
name="tq_empty_topic", partition_count=1, replication_factor=3
119120
)
120121
self.client().create_topic(empty_topic)
121-
offset = kcat.query_offset(empty_topic.name, 0, base_ts)
122+
offset = kcat.query_offset(empty_topic.name, 0, self.base_ts)
122123
self.logger.info(f"Time query returned offset {offset}")
123124
assert offset == -1, f"Expected -1, got {offset}"
124125

125126
# Create a topic and produce a run of messages we will query.
126127
topic, timestamps = self._create_and_produce(
127-
cluster, cloud_storage, local_retention, base_ts, record_size, msg_count
128+
cluster, cloud_storage, local_retention, record_size, msg_count
128129
)
129130
for k, v in timestamps.items():
130131
self.logger.debug(f" Offset {k} -> Timestamp {v}")
@@ -270,11 +271,10 @@ def _test_timequery_below_start_offset(self, cluster):
270271
total_segments = 3
271272
local_retain_segments = 1
272273
record_size = 1024
273-
base_ts = 1664453149000
274274
msg_count = (self.log_segment_size * total_segments) // record_size
275275

276276
topic, timestamps = self._create_and_produce(
277-
cluster, False, local_retain_segments, base_ts, record_size, msg_count
277+
cluster, False, local_retain_segments, record_size, msg_count
278278
)
279279

280280
self.client().alter_topic_config(
@@ -295,7 +295,7 @@ def start_offset():
295295

296296
kcat = KafkaCat(cluster)
297297
lwm_before = start_offset()
298-
offset = kcat.query_offset(topic.name, 0, base_ts)
298+
offset = kcat.query_offset(topic.name, 0, self.base_ts)
299299
lwm_after = start_offset()
300300

301301
# When querying before the start of the log, we should get
@@ -416,11 +416,10 @@ def test_timequery_with_local_gc(self):
416416
self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=False)
417417
local_retention = self.log_segment_size * 4
418418
record_size = 1024
419-
base_ts = 1664453149000
420419
msg_count = (self.log_segment_size * total_segments) // record_size
421420

422421
topic, timestamps = self._create_and_produce(
423-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
422+
self.redpanda, True, local_retention, record_size, msg_count
424423
)
425424

426425
# While waiting for local GC to occur, run several concurrent
@@ -494,11 +493,10 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool):
494493
)
495494
total_segments = 12
496495
record_size = 1024
497-
base_ts = 1664453149000
498496
msg_count = (self.log_segment_size * total_segments) // record_size
499497
local_retention = self.log_segment_size * 4
500498
topic, timestamps = self._create_and_produce(
501-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
499+
self.redpanda, True, local_retention, record_size, msg_count
502500
)
503501

504502
# Confirm messages written
@@ -554,12 +552,11 @@ def test_timequery_with_spillover_gc_delayed(self):
554552
self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=True)
555553
total_segments = 16
556554
record_size = 1024
557-
base_ts = 1664453149000
558555
msg_count = (self.log_segment_size * total_segments) // record_size
559556
local_retention = self.log_segment_size * 4
560557
topic_retention = self.log_segment_size * 8
561558
topic, timestamps = self._create_and_produce(
562-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
559+
self.redpanda, True, local_retention, record_size, msg_count
563560
)
564561

565562
# Confirm messages written
@@ -655,11 +652,10 @@ def test_timequery_empty_local_log(self):
655652

656653
total_segments = 3
657654
record_size = 1024
658-
base_ts = 1664453149000
659655
msg_count = (self.log_segment_size * total_segments) // record_size
660656
local_retention = 1 # Any value works for this test.
661657
topic, timestamps = self._create_and_produce(
662-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
658+
self.redpanda, True, local_retention, record_size, msg_count
663659
)
664660

665661
# Confirm messages written
@@ -770,6 +766,7 @@ class TestReadReplicaTimeQuery(RedpandaTest):
770766

771767
log_segment_size = 1024 * 1024
772768
topic_name = "panda-topic"
769+
base_ts = int(time.time() - 600) * 1000
773770

774771
def __init__(self, test_context):
775772
super(TestReadReplicaTimeQuery, self).__init__(
@@ -812,7 +809,7 @@ def create_read_replica_topic(self) -> None:
812809
return False
813810
return True
814811

815-
def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None:
812+
def setup_clusters(self, num_messages=0, partition_count=1) -> None:
816813
spec = TopicSpec(
817814
name=self.topic_name,
818815
partition_count=partition_count,
@@ -829,7 +826,7 @@ def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None:
829826
msg_size=1024,
830827
msg_count=num_messages,
831828
batch_max_bytes=10240,
832-
fake_timestamp_ms=base_ts,
829+
fake_timestamp_ms=self.base_ts,
833830
)
834831

835832
producer.start()
@@ -868,9 +865,8 @@ def query_timestamp(self, ts, kcat_src, kcat_rr):
868865

869866
@ducktape_cluster(num_nodes=7)
870867
def test_timequery(self):
871-
base_ts = 1664453149000
872868
num_messages = 1000
873-
self.setup_clusters(base_ts, num_messages, 3)
869+
self.setup_clusters(num_messages, 3)
874870

875871
def read_replica_ready():
876872
orig = RpkTool(self.redpanda).describe_topic(self.topic_name)
@@ -889,4 +885,4 @@ def read_replica_ready():
889885
kcat1 = KafkaCat(self.redpanda)
890886
kcat2 = KafkaCat(self.rr_cluster)
891887
for ix in range(0, num_messages, 20):
892-
self.query_timestamp(base_ts + ix, kcat1, kcat2)
888+
self.query_timestamp(self.base_ts + ix, kcat1, kcat2)

0 commit comments

Comments
 (0)