Skip to content

Commit 30d39cf

Browse files
Merge pull request #28471 from michael-redpanda/dr/allow-sr-writes-post-failover
cl: Permit writes to failed-over _schemas topic
2 parents 47e4d59 + 36b6473 commit 30d39cf

File tree

2 files changed

+56
-17
lines changed

2 files changed

+56
-17
lines changed

src/v/cluster/cluster_link/frontend.cc

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,20 @@ errc map_errc(std::error_code ec) {
7575
}
7676
return errc::rpc_error;
7777
}
78+
79+
bool is_topic_mutable(::cluster_link::model::mirror_topic_status status) {
80+
switch (status) {
81+
case ::cluster_link::model::mirror_topic_status::active:
82+
case ::cluster_link::model::mirror_topic_status::failed:
83+
case ::cluster_link::model::mirror_topic_status::paused:
84+
case ::cluster_link::model::mirror_topic_status::failing_over:
85+
case ::cluster_link::model::mirror_topic_status::promoting:
86+
return false;
87+
case ::cluster_link::model::mirror_topic_status::failed_over:
88+
case ::cluster_link::model::mirror_topic_status::promoted:
89+
return true;
90+
}
91+
}
7892
} // namespace
7993

8094
frontend::frontend(
@@ -211,17 +225,7 @@ bool frontend::is_topic_mutable_for_kafka_api(const model::topic& topic) const {
211225
// topic does not belong to any cluster link
212226
return true;
213227
}
214-
switch (*status) {
215-
case ::cluster_link::model::mirror_topic_status::active:
216-
case ::cluster_link::model::mirror_topic_status::failed:
217-
case ::cluster_link::model::mirror_topic_status::paused:
218-
case ::cluster_link::model::mirror_topic_status::failing_over:
219-
case ::cluster_link::model::mirror_topic_status::promoting:
220-
return false;
221-
case ::cluster_link::model::mirror_topic_status::failed_over:
222-
case ::cluster_link::model::mirror_topic_status::promoted:
223-
return true;
224-
}
228+
return is_topic_mutable(*status);
225229
}
226230

227231
std::optional<chunked_hash_map<
@@ -310,8 +314,16 @@ bool frontend::schema_registry_shadowing_active() const {
310314
if (!md.has_value()) {
311315
return false;
312316
}
313-
// If mirror_schema_registry_topic option is set, then shadowing for
314-
// SR is active
317+
// Check to see if the schema registry topic is in the mirror topic list
318+
const auto& mirror_topics = md->get().state.mirror_topics;
319+
auto topic_it = mirror_topics.find(
320+
::model::schema_registry_internal_tp.topic);
321+
if (topic_it != mirror_topics.end()) {
322+
// If it is, return whether or not it is mutable based on its status
323+
return !is_topic_mutable(topic_it->second.status);
324+
}
325+
// If mirror_schema_registry_topic option is set and the topic is not
326+
// yet in the mirror topic list, then shadowing for SR is active
315327
const auto& sr_cfg = md->get().configuration.schema_registry_sync_cfg;
316328
if (
317329
sr_cfg.sync_schema_registry_topic_mode.has_value()
@@ -322,10 +334,7 @@ bool frontend::schema_registry_shadowing_active() const {
322334
return true;
323335
}
324336

325-
// If the schema registry topic is in the mirror_topics list, then
326-
// disable writes
327-
return md->get().state.mirror_topics.contains(
328-
::model::schema_registry_internal_tp.topic);
337+
return false;
329338
});
330339
}
331340

tests/rptest/tests/cluster_linking_topic_syncing_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,13 @@ class ClusterLinkingSchemaRegistry(ShadowLinkTestBase):
682682
double d = 1;
683683
}"""
684684

685+
simple_c_proto_def = """
686+
syntax = "proto3";
687+
688+
message CType {
689+
string id = 1;
690+
}"""
691+
685692
def __init__(self, test_context, *args, **kwargs):
686693
super().__init__(
687694
test_context=test_context,
@@ -856,6 +863,29 @@ def subjects_match():
856863
err_msg="Subjects do not match",
857864
)
858865

866+
# Now fail over the Schemas topic and verify that we can now write to it
867+
self.logger.info("Failing over the _schemas topic")
868+
self.failover_link_topic(link_name="test-link", topic="_schemas")
869+
870+
def topic_has_failed_over(link_name: str, topic: str) -> bool:
871+
shadow_topic = self.get_shadow_topic(
872+
shadow_link_name=link_name, shadow_topic_name=topic
873+
)
874+
self.logger.debug(f"shadow_topic: {shadow_topic}")
875+
return (
876+
shadow_topic.status.state
877+
== shadow_link_pb2.SHADOW_TOPIC_STATE_FAILED_OVER
878+
)
879+
880+
wait_until(
881+
lambda: topic_has_failed_over("test-link", "_schemas"),
882+
timeout_sec=30,
883+
backoff_sec=1,
884+
err_msg="_schemas topic did not failover",
885+
)
886+
887+
self.post_schema_to_subject(target_sr_client, "fourth", self.simple_c_proto_def)
888+
859889
@cluster(
860890
num_nodes=6,
861891
log_allow_list=[

0 commit comments

Comments
 (0)