Skip to content

Commit 236fd93

Browse files
authored
Merge pull request #28428 from Lazin/fix/timequery-test
[CORE-14434] rptest: Fix timequery test
2 parents d1b9c44 + 033ca19 commit 236fd93

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

tests/rptest/tests/timequery_test.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ class BaseTimeQuery:
4848
test_context: dict
4949
logger: Logger
5050

51+
base_ts = int(time.time() - 600) * 1000
52+
5153
def _create_and_produce(
52-
self, cluster, cloud_storage, local_retention, base_ts, record_size, msg_count
54+
self, cluster, cloud_storage, local_retention, record_size, msg_count
5355
):
5456
topic = TopicSpec(name="tqtopic", partition_count=1, replication_factor=3)
5557
self.client().create_topic(topic)
@@ -95,20 +97,19 @@ def _create_and_produce(
9597
# but small enough that we are getting plenty of distinct batches.
9698
batch_max_bytes=record_size * 10,
9799
# A totally arbitrary artificial timestamp base in milliseconds
98-
fake_timestamp_ms=base_ts,
100+
fake_timestamp_ms=self.base_ts,
99101
)
100102
producer.start()
101103
producer.wait()
102104

103105
# We know timestamps, they are generated linearly from the
104106
# base we provided to kgo-verifier
105-
timestamps = dict((i, base_ts + i) for i in range(0, msg_count))
107+
timestamps = dict((i, self.base_ts + i) for i in range(0, msg_count))
106108
return topic, timestamps
107109

108110
def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
109111
total_segments = 12
110112
record_size = 1024
111-
base_ts = 1664453149000
112113
msg_count = (self.log_segment_size * total_segments) // record_size
113114
local_retention = self.log_segment_size * 4
114115
kcat = KafkaCat(cluster)
@@ -118,13 +119,13 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
118119
name="tq_empty_topic", partition_count=1, replication_factor=3
119120
)
120121
self.client().create_topic(empty_topic)
121-
offset = kcat.query_offset(empty_topic.name, 0, base_ts)
122+
offset = kcat.query_offset(empty_topic.name, 0, self.base_ts)
122123
self.logger.info(f"Time query returned offset {offset}")
123124
assert offset == -1, f"Expected -1, got {offset}"
124125

125126
# Create a topic and produce a run of messages we will query.
126127
topic, timestamps = self._create_and_produce(
127-
cluster, cloud_storage, local_retention, base_ts, record_size, msg_count
128+
cluster, cloud_storage, local_retention, record_size, msg_count
128129
)
129130
for k, v in timestamps.items():
130131
self.logger.debug(f" Offset {k} -> Timestamp {v}")
@@ -270,11 +271,10 @@ def _test_timequery_below_start_offset(self, cluster):
270271
total_segments = 3
271272
local_retain_segments = 1
272273
record_size = 1024
273-
base_ts = 1664453149000
274274
msg_count = (self.log_segment_size * total_segments) // record_size
275275

276276
topic, timestamps = self._create_and_produce(
277-
cluster, False, local_retain_segments, base_ts, record_size, msg_count
277+
cluster, False, local_retain_segments, record_size, msg_count
278278
)
279279

280280
self.client().alter_topic_config(
@@ -295,7 +295,7 @@ def start_offset():
295295

296296
kcat = KafkaCat(cluster)
297297
lwm_before = start_offset()
298-
offset = kcat.query_offset(topic.name, 0, base_ts)
298+
offset = kcat.query_offset(topic.name, 0, self.base_ts)
299299
lwm_after = start_offset()
300300

301301
# When querying before the start of the log, we should get
@@ -416,11 +416,10 @@ def test_timequery_with_local_gc(self):
416416
self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=False)
417417
local_retention = self.log_segment_size * 4
418418
record_size = 1024
419-
base_ts = 1664453149000
420419
msg_count = (self.log_segment_size * total_segments) // record_size
421420

422421
topic, timestamps = self._create_and_produce(
423-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
422+
self.redpanda, True, local_retention, record_size, msg_count
424423
)
425424

426425
# While waiting for local GC to occur, run several concurrent
@@ -494,11 +493,10 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool):
494493
)
495494
total_segments = 12
496495
record_size = 1024
497-
base_ts = 1664453149000
498496
msg_count = (self.log_segment_size * total_segments) // record_size
499497
local_retention = self.log_segment_size * 4
500498
topic, timestamps = self._create_and_produce(
501-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
499+
self.redpanda, True, local_retention, record_size, msg_count
502500
)
503501

504502
# Confirm messages written
@@ -554,12 +552,11 @@ def test_timequery_with_spillover_gc_delayed(self):
554552
self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=True)
555553
total_segments = 16
556554
record_size = 1024
557-
base_ts = 1664453149000
558555
msg_count = (self.log_segment_size * total_segments) // record_size
559556
local_retention = self.log_segment_size * 4
560557
topic_retention = self.log_segment_size * 8
561558
topic, timestamps = self._create_and_produce(
562-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
559+
self.redpanda, True, local_retention, record_size, msg_count
563560
)
564561

565562
# Confirm messages written
@@ -655,11 +652,10 @@ def test_timequery_empty_local_log(self):
655652

656653
total_segments = 3
657654
record_size = 1024
658-
base_ts = 1664453149000
659655
msg_count = (self.log_segment_size * total_segments) // record_size
660656
local_retention = 1 # Any value works for this test.
661657
topic, timestamps = self._create_and_produce(
662-
self.redpanda, True, local_retention, base_ts, record_size, msg_count
658+
self.redpanda, True, local_retention, record_size, msg_count
663659
)
664660

665661
# Confirm messages written
@@ -770,6 +766,7 @@ class TestReadReplicaTimeQuery(RedpandaTest):
770766

771767
log_segment_size = 1024 * 1024
772768
topic_name = "panda-topic"
769+
base_ts = int(time.time() - 600) * 1000
773770

774771
def __init__(self, test_context):
775772
super(TestReadReplicaTimeQuery, self).__init__(
@@ -812,7 +809,7 @@ def create_read_replica_topic(self) -> None:
812809
return False
813810
return True
814811

815-
def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None:
812+
def setup_clusters(self, num_messages=0, partition_count=1) -> None:
816813
spec = TopicSpec(
817814
name=self.topic_name,
818815
partition_count=partition_count,
@@ -829,7 +826,7 @@ def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None:
829826
msg_size=1024,
830827
msg_count=num_messages,
831828
batch_max_bytes=10240,
832-
fake_timestamp_ms=base_ts,
829+
fake_timestamp_ms=self.base_ts,
833830
)
834831

835832
producer.start()
@@ -868,9 +865,8 @@ def query_timestamp(self, ts, kcat_src, kcat_rr):
868865

869866
@ducktape_cluster(num_nodes=7)
870867
def test_timequery(self):
871-
base_ts = 1664453149000
872868
num_messages = 1000
873-
self.setup_clusters(base_ts, num_messages, 3)
869+
self.setup_clusters(num_messages, 3)
874870

875871
def read_replica_ready():
876872
orig = RpkTool(self.redpanda).describe_topic(self.topic_name)
@@ -889,4 +885,4 @@ def read_replica_ready():
889885
kcat1 = KafkaCat(self.redpanda)
890886
kcat2 = KafkaCat(self.rr_cluster)
891887
for ix in range(0, num_messages, 20):
892-
self.query_timestamp(base_ts + ix, kcat1, kcat2)
888+
self.query_timestamp(self.base_ts + ix, kcat1, kcat2)

0 commit comments

Comments
 (0)