Skip to content

Commit 0c25953

Browse files
authored
Merge pull request #28155 from rockwotj/CORE-13745
[CORE-13745] rptest: add cloud topics to topic_creation_test.py
2 parents d8002c9 + c1ff5a7 commit 0c25953

File tree

2 files changed

+82
-7
lines changed

2 files changed

+82
-7
lines changed

src/v/cloud_topics/level_one/metastore/topic_purger.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ topic_purger::purge_tombstoned_topics(ss::abort_source* as) {
9494
ss::max_concurrent_for_each(
9595
ntrs_to_purge,
9696
max_concurrent_purges,
97-
[this, &first_error](const cluster::nt_revision& ntr) {
97+
[this, as, &first_error](const cluster::nt_revision& ntr) {
98+
as->check();
9899
return remove_tombstone_(ntr).then(
99100
[&first_error](
100101
const topic_purger::remove_tombstone_ret_t& ret) {

tests/rptest/tests/topic_creation_test.py

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from rptest.services.redpanda import (
3131
ResourceSettings,
3232
SISettings,
33+
CLOUD_TOPICS_CONFIG_STR,
3334
)
3435
from rptest.services.rpk_producer import RpkProducer
3536
from rptest.tests.cluster_config_test import wait_for_version_sync
@@ -54,10 +55,12 @@ def __init__(self, test_context):
5455
),
5556
extra_rp_conf={
5657
"iceberg_enabled": True, # to create relevant STMs
58+
CLOUD_TOPICS_CONFIG_STR: True,
5759
},
5860
)
5961
self.rpk = RpkTool(self.redpanda)
6062
self.topic_name = topic_name()
63+
self.cloud_topic_name = topic_name()
6164

6265
def create(self):
6366
self._current_partitions = random.randint(1, 4)
@@ -66,17 +69,22 @@ def create(self):
6669
"Creating topic with {self._current_partitions} partitions "
6770
"and {replication_factor=}"
6871
)
69-
self.client().create_topic(
70-
TopicSpec(
71-
name=self.topic_name,
72-
partition_count=self._current_partitions,
73-
replication_factor=replication_factor,
74-
)
72+
self.rpk.create_topic(
73+
topic=self.topic_name,
74+
partitions=self._current_partitions,
75+
replicas=replication_factor,
76+
)
77+
self.rpk.create_topic(
78+
topic=self.cloud_topic_name,
79+
partitions=self._current_partitions,
80+
replicas=replication_factor,
81+
config={TopicSpec.PROPERTY_CLOUD_TOPIC_ENABLE: "true"},
7582
)
7683

7784
def delete(self):
7885
self.logger.info("Deleting topic")
7986
self.client().delete_topic(self.topic_name)
87+
self.client().delete_topic(self.cloud_topic_name)
8088

8189
def add_partitions(self):
8290
partitions_to_add = random.randint(1, 4)
@@ -85,6 +93,7 @@ def add_partitions(self):
8593
f"to {self._current_partitions} existing"
8694
)
8795
self.rpk.add_partitions(self.topic_name, partitions_to_add)
96+
self.rpk.add_partitions(self.cloud_topic_name, partitions_to_add)
8897
self._current_partitions += partitions_to_add
8998

9099
@cluster(num_nodes=3)
@@ -114,9 +123,13 @@ def __init__(self, test_context):
114123
test_context=test_context,
115124
num_brokers=5,
116125
resource_settings=ResourceSettings(num_cpus=1),
126+
si_settings=SISettings(
127+
test_context=test_context, skip_end_of_test_scrubbing=True
128+
),
117129
extra_rp_conf={
118130
"auto_create_topics_enabled": False,
119131
"max_compacted_log_segment_size": 5 * (2 << 20),
132+
CLOUD_TOPICS_CONFIG_STR: True,
120133
},
121134
)
122135

@@ -185,6 +198,67 @@ def topic_is_healthy():
185198
swarm.stop()
186199
swarm.wait()
187200

201+
@cluster(num_nodes=6)
202+
def test_cloud_topic_recreation_while_producing(self):
203+
"""
204+
Test that we are able to recreate topic multiple times
205+
"""
206+
self._client = DefaultClient(self.redpanda)
207+
rpk = RpkTool(self.redpanda)
208+
209+
# scaling parameters
210+
partition_count = 30
211+
producer_count = 10
212+
213+
topic = topic_name()
214+
215+
rpk.create_topic(
216+
topic=topic,
217+
partitions=partition_count,
218+
config={TopicSpec.PROPERTY_CLOUD_TOPIC_ENABLE: "true"},
219+
)
220+
221+
producer_properties = {
222+
"acks": -1,
223+
"enable.idempotence": True,
224+
}
225+
226+
swarm = ProducerSwarm(
227+
self.test_context,
228+
self.redpanda,
229+
topic,
230+
producer_count,
231+
10000000000,
232+
log_level="ERROR",
233+
properties=producer_properties,
234+
)
235+
swarm.start()
236+
237+
def topic_is_healthy():
238+
if not swarm.is_alive():
239+
swarm.stop()
240+
swarm.start()
241+
partitions = rpk.describe_topic(topic)
242+
hw_offsets = [p.high_watermark for p in partitions]
243+
offsets_present = [hw > 0 for hw in hw_offsets]
244+
self.logger.debug(f"High watermark offsets: {hw_offsets}")
245+
return len(offsets_present) == partition_count and all(offsets_present)
246+
247+
for i in range(1, 20):
248+
rf = 3 if i % 2 == 0 else 1
249+
self.client().delete_topic(topic)
250+
rpk.create_topic(
251+
topic=topic,
252+
partitions=partition_count,
253+
replicas=rf,
254+
config={TopicSpec.PROPERTY_CLOUD_TOPIC_ENABLE: "true"},
255+
)
256+
wait_until(topic_is_healthy, 30, 2, err_msg=f"Topic {topic} health")
257+
sleep(5)
258+
259+
swarm.stop()
260+
swarm.wait()
261+
188262

189263
class TopicAutocreateTest(RedpandaTest):
190264
"""

0 commit comments

Comments
 (0)