@@ -69,19 +69,16 @@ def create(self):
6969 "Creating topic with {self._current_partitions} partitions "
7070 "and {replication_factor=}"
7171 )
72- self .client ().create_topic (
73- TopicSpec (
74- name = self .topic_name ,
75- partition_count = self ._current_partitions ,
76- replication_factor = replication_factor ,
77- )
72+ self .rpk .create_topic (
73+ topic = self .topic_name ,
74+ partitions = self ._current_partitions ,
75+ replicas = replication_factor ,
7876 )
79- self .client ().create_topic (
80- TopicSpec (
81- name = self .cloud_topic_name ,
82- partition_count = self ._current_partitions ,
83- replication_factor = replication_factor ,
84- )
77+ self .rpk .create_topic (
78+ topic = self .cloud_topic_name ,
79+ partitions = self ._current_partitions ,
80+ replicas = replication_factor ,
81+ config = {TopicSpec .PROPERTY_CLOUD_TOPIC_ENABLE : "true" },
8582 )
8683
8784 def delete (self ):
@@ -207,18 +204,19 @@ def test_cloud_topic_recreation_while_producing(self):
207204 Test that we are able to recreate topic multiple times
208205 """
209206 self ._client = DefaultClient (self .redpanda )
207+ rpk = RpkTool (self .redpanda )
210208
211209 # scaling parameters
212210 partition_count = 30
213211 producer_count = 10
214212
215- spec = TopicSpec (
216- partition_count = partition_count ,
217- replication_factor = 3 ,
218- cloud_topics_enabled = True ,
219- )
213+ topic = topic_name ()
220214
221- self .client ().create_topic (spec )
215+ rpk .create_topic (
216+ topic = topic ,
217+ partitions = partition_count ,
218+ config = {TopicSpec .PROPERTY_CLOUD_TOPIC_ENABLE : "true" },
219+ )
222220
223221 producer_properties = {
224222 "acks" : - 1 ,
@@ -228,32 +226,34 @@ def test_cloud_topic_recreation_while_producing(self):
228226 swarm = ProducerSwarm (
229227 self .test_context ,
230228 self .redpanda ,
231- spec . name ,
229+ topic ,
232230 producer_count ,
233231 10000000000 ,
234232 log_level = "ERROR" ,
235233 properties = producer_properties ,
236234 )
237235 swarm .start ()
238236
239- rpk = RpkTool (self .redpanda )
240-
241237 def topic_is_healthy ():
242238 if not swarm .is_alive ():
243239 swarm .stop ()
244240 swarm .start ()
245- partitions = rpk .describe_topic (spec . name )
241+ partitions = rpk .describe_topic (topic )
246242 hw_offsets = [p .high_watermark for p in partitions ]
247243 offsets_present = [hw > 0 for hw in hw_offsets ]
248244 self .logger .debug (f"High watermark offsets: { hw_offsets } " )
249245 return len (offsets_present ) == partition_count and all (offsets_present )
250246
251247 for i in range (1 , 20 ):
252248 rf = 3 if i % 2 == 0 else 1
253- self .client ().delete_topic (spec .name )
254- spec .replication_factor = rf
255- self .client ().create_topic (spec )
256- wait_until (topic_is_healthy , 30 , 2 , err_msg = f"Topic { spec .name } health" )
249+ self .client ().delete_topic (topic )
250+ rpk .create_topic (
251+ topic = topic ,
252+ partitions = partition_count ,
253+ replicas = rf ,
254+ config = {TopicSpec .PROPERTY_CLOUD_TOPIC_ENABLE : "true" },
255+ )
256+ wait_until (topic_is_healthy , 30 , 2 , err_msg = f"Topic { topic } health" )
257257 sleep (5 )
258258
259259 swarm .stop ()
0 commit comments