5454from rptest .services .serde_client import SerdeClient
5555from rptest .tests .pandaproxy_test import PandaProxyTLSProvider , User
5656from rptest .tests .redpanda_test import RedpandaTest
57- from rptest .util import expect_exception , inject_remote_script , search_logs_with_timeout
57+ from rptest .util import (
58+ expect_exception ,
59+ inject_remote_script ,
60+ search_logs_with_timeout ,
61+ wait_until_result ,
62+ )
5863from rptest .utils .log_utils import wait_until_nag_is_set
5964from rptest .utils .mode_checks import skip_fips_mode
6065
@@ -2978,6 +2983,7 @@ def bool_alpha(b: bool) -> str:
29782983 @parametrize (move_controller_leader = True )
29792984 def test_restarts (self , move_controller_leader : bool ):
29802985 admin = Admin (self .redpanda )
2986+ rpk = RpkTool (self .redpanda )
29812987
29822988 def check_connection (hostname : str ):
29832989 result_raw = self .sr_client .get_subjects (hostname = hostname )
@@ -2986,6 +2992,22 @@ def check_connection(hostname: str):
29862992 assert result_raw .status_code == requests .codes .ok
29872993 assert result_raw .json () == []
29882994
2995+ def get_leader_epoch ():
2996+ def rpk_get_leader_epoch ():
2997+ partitions = rpk .describe_topic ("_schemas" )
2998+ par_0 = next ((p for p in partitions if p .id == 0 ), None )
2999+ if not par_0 :
3000+ return (False , (- 1 ))
3001+ return (True , par_0 .leader_epoch )
3002+
3003+ leader_epoch = wait_until_result (
3004+ rpk_get_leader_epoch ,
3005+ timeout_sec = 10 ,
3006+ backoff_sec = 1 ,
3007+ err_msg = "Could not get leader info" ,
3008+ )
3009+ return leader_epoch
3010+
29893011 def restart_leader ():
29903012 leader = admin .get_partition_leader (
29913013 namespace = "kafka" , topic = "_schemas" , partition = 0
@@ -2995,11 +3017,25 @@ def restart_leader():
29953017 admin .partition_transfer_leadership (
29963018 namespace = "redpanda" , topic = "controller" , partition = 0
29973019 )
3020+ last_epoch = get_leader_epoch ()
29983021 self .logger .info (f"Restarting node: { leader } " )
29993022 self .redpanda .restart_nodes (self .redpanda .get_node (leader ))
3000- admin .await_stable_leader (topic = "_schemas" , partition = 0 , namespace = "kafka" )
30013023
3002- for _ in range (20 ):
3024+ def get_new_leader ():
3025+ new_epoch = get_leader_epoch ()
3026+ had_election = last_epoch < new_epoch
3027+ return (had_election , new_epoch )
3028+
3029+ new_epoch = wait_until_result (
3030+ get_new_leader ,
3031+ timeout_sec = 20 ,
3032+ backoff_sec = 1 ,
3033+ err_msg = "Leadership did not stabilize" ,
3034+ )
3035+ self .logger .info (f"Epoch: { new_epoch } " )
3036+
3037+ for i in range (20 ):
3038+ self .logger .info (f"Iteration { i } " )
30033039 for n in self .redpanda .nodes :
30043040 check_connection (n .account .hostname )
30053041 restart_leader ()
0 commit comments