diff --git a/proxy_components/engine_store_ffi/src/observer.rs b/proxy_components/engine_store_ffi/src/observer.rs index 21d701eb792..d972814538f 100644 --- a/proxy_components/engine_store_ffi/src/observer.rs +++ b/proxy_components/engine_store_ffi/src/observer.rs @@ -1,5 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use encryption::DataKeyManager; use engine_traits::RaftEngine; @@ -32,12 +32,21 @@ const TIFLASH_OBSERVER_PRIORITY: u32 = 0; #[derive(Clone)] pub struct TiFlashObserver { - pub forwarder: ProxyForwarder, + pub forwarder: Arc>>>, +} + +impl Default for TiFlashObserver { + fn default() -> Self { + Self { + forwarder: Arc::new(RwLock::new(None)), + } + } } impl TiFlashObserver { #[allow(clippy::too_many_arguments)] - pub fn new( + pub fn init_forwarder( + &mut self, store_id: u64, engine: engine_tiflash::MixedModeEngine, raft_engine: ER, @@ -47,20 +56,19 @@ impl TiFlashObserver { packed_envs: PackedEnvs, debug_struct: DebugStruct, key_manager: Option>, - ) -> Self { - TiFlashObserver { - forwarder: ProxyForwarder::new( - store_id, - engine, - raft_engine, - sst_importer, - trans, - snap_mgr, - packed_envs, - debug_struct, - key_manager, - ), - } + ) { + let f = ProxyForwarder::new( + store_id, + engine, + raft_engine, + sst_importer, + trans, + snap_mgr, + packed_envs, + debug_struct, + key_manager, + ); + self.forwarder.write().expect("poisoned").replace(f); } pub fn register_to( @@ -105,7 +113,9 @@ impl TiFlashObserver { impl Coprocessor for TiFlashObserver { fn stop(&self) { - self.forwarder.stop(); + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.stop(); + } } } @@ -117,8 +127,11 @@ impl AdminObserver for TiFlashObserver bool { - self.forwarder - .pre_exec_admin(ob_ctx.region(), req, index, term) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_exec_admin(ob_ctx.region(), req, index, term) + } else { + false + } } fn post_exec_admin( @@ -129,19 +142,25 @@ impl AdminObserver for TiFlashObserver, ) -> bool { - self.forwarder.post_exec_admin( - ob_ctx.region(), - cmd, - apply_state, - region_state, - apply_ctx_info, - ) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.post_exec_admin( + ob_ctx.region(), + cmd, + apply_state, + region_state, + apply_ctx_info, + ) + } else { + false + } } } impl QueryObserver for TiFlashObserver { fn on_empty_cmd(&self, ob_ctx: &mut ObserverContext<'_>, index: u64, term: u64) { - self.forwarder.on_empty_cmd(ob_ctx.region(), index, term) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_empty_cmd(ob_ctx.region(), index, term) + } } fn post_exec_query( @@ -152,20 +171,25 @@ impl QueryObserver for TiFlashObserver, ) -> bool { - self.forwarder.post_exec_query( - ob_ctx.region(), - cmd, - apply_state, - region_state, - apply_ctx_info, - ) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.post_exec_query( + ob_ctx.region(), + cmd, + apply_state, + region_state, + apply_ctx_info, + ) + } else { + false + } } } impl UpdateSafeTsObserver for TiFlashObserver { fn on_update_safe_ts(&self, region_id: u64, self_safe_ts: u64, leader_safe_ts: u64) { - self.forwarder - .on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts) + } } } @@ -176,7 +200,9 @@ impl RegionChangeObserver for TiFlashObs e: RegionChangeEvent, r: StateRole, ) { - self.forwarder.on_region_changed(ob_ctx.region(), e, r) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_region_changed(ob_ctx.region(), e, r) + } } #[allow(clippy::match_like_matches_macro)] @@ -186,24 +212,37 @@ impl RegionChangeObserver for TiFlashObs is_finished: bool, cmd: Option<&RaftCmdRequest>, ) -> bool { - self.forwarder - .pre_persist(ob_ctx.region(), is_finished, cmd) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_persist(ob_ctx.region(), is_finished, cmd) + } else { + true + } } fn pre_write_apply_state(&self, ob_ctx: &mut ObserverContext<'_>) -> bool { - self.forwarder.pre_write_apply_state(ob_ctx.region()) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_write_apply_state(ob_ctx.region()) + } else { + true + } } } impl RaftMessageObserver for TiFlashObserver { fn on_raft_message(&self, msg: &RaftMessage) -> bool { - self.forwarder.on_raft_message(msg) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_raft_message(msg) + } else { + true + } } } impl PdTaskObserver for TiFlashObserver { fn on_compute_engine_size(&self, store_size: &mut Option) { - self.forwarder.on_compute_engine_size(store_size) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_compute_engine_size(store_size) + } } } @@ -216,8 +255,9 @@ impl ApplySnapshotObserver for TiFlashOb snap_key: &store::SnapKey, snap: Option<&store::Snapshot>, ) { - self.forwarder - .pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + } } fn post_apply_snapshot( @@ -227,21 +267,30 @@ impl ApplySnapshotObserver for TiFlashOb snap_key: &store::SnapKey, snap: Option<&store::Snapshot>, ) { - self.forwarder - .post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + } } fn should_pre_apply_snapshot(&self) -> bool { - self.forwarder.should_pre_apply_snapshot() + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.should_pre_apply_snapshot() + } else { + false + } } fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) { - self.forwarder.cancel_apply_snapshot(region_id, peer_id) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.cancel_apply_snapshot(region_id, peer_id) + } } } impl RoleObserver for TiFlashObserver { fn on_role_change(&self, ob_ctx: &mut ObserverContext<'_>, r: &RoleChange) { - self.forwarder.on_role_change(ob_ctx.region(), r) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_role_change(ob_ctx.region(), r) + } } } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs index 34c3d0590a2..3e32cefc283 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs @@ -39,6 +39,19 @@ pub struct FFIHelperSet { pub struct TestData { pub expected_leader_safe_ts: u64, pub expected_self_safe_ts: u64, + pub updated_leader_safe_ts: u64, + pub updated_self_safe_ts: u64, + pub checked_time: u64, +} + +impl TestData { + pub fn reset(&mut self) { + self.expected_leader_safe_ts = 0; + self.expected_self_safe_ts = 0; + self.updated_leader_safe_ts = 0; + self.updated_self_safe_ts = 0; + self.checked_time = 0; + } } #[allow(clippy::type_complexity)] diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index fe8a5a58036..3d3fc90a247 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -13,7 +13,7 @@ use encryption::DataKeyManager; // mock cluster use engine_traits::{Engines, KvEngine, CF_DEFAULT}; use file_system::IoRateLimiter; -use futures::executor::block_on; +use futures::{executor::block_on, future::BoxFuture, StreamExt}; use kvproto::{ errorpb::Error as PbError, metapb::{self, PeerRole, RegionEpoch, StoreLabel}, @@ -43,7 +43,7 @@ use resource_control::ResourceGroupManager; use tempfile::TempDir; use test_pd_client::TestPdClient; use test_raftstore::{ - is_error_response, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd, + is_error_response, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd, new_put_cmd, new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, new_transfer_leader_cmd, sleep_ms, }; @@ -1121,4 +1121,40 @@ impl> Cluster { panic!("Flashback call msg failed"); } } + + pub fn async_request( + &mut self, + req: RaftCmdRequest, + ) -> Result> { + self.async_request_with_opts(req, Default::default()) + } + + pub fn async_request_with_opts( + &mut self, + mut req: RaftCmdRequest, + opts: RaftCmdExtraOpts, + ) -> Result> { + let region_id = req.get_header().get_region_id(); + let leader = self.leader_of_region(region_id).unwrap(); + req.mut_header().set_peer(leader.clone()); + let (cb, mut rx) = make_cb::(&req); + self.sim + .rl() + .async_command_on_node_with_opts(leader.get_store_id(), req, cb, opts)?; + Ok(Box::pin(async move { + let fut = rx.next(); + fut.await.unwrap() + })) + } + + pub fn async_put( + &mut self, + key: &[u8], + value: &[u8], + ) -> Result> { + let mut region = self.get_region(key); + let reqs = vec![new_put_cmd(key, value)]; + let put = new_request(region.get_id(), region.take_region_epoch(), reqs, false); + self.async_request(put) + } } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index b4a04958aad..9df019fbe4c 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -348,7 +348,8 @@ impl Simulator for NodeCluster { pd_endpoints: cfg.pd.endpoints.clone(), snap_handle_pool_size: cfg.proxy_cfg.raft_store.snap_handle_pool_size, }; - let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new( + let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default(); + tiflash_ob.init_forwarder( node_id, engines.kv.clone(), engines.raft.clone(), diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index b517db3f4f3..162ede38d36 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -299,6 +299,9 @@ impl ServerCluster { // Create coprocessor. let mut coprocessor_host = CoprocessorHost::new(router.clone(), cfg.coprocessor.clone()); + let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default(); + tiflash_ob.register_to(&mut coprocessor_host); + let local_reader = LocalReader::new( engines.kv.clone(), StoreMetaDelegate::new(store_meta.clone(), engines.kv.clone()), @@ -572,8 +575,7 @@ impl ServerCluster { let mut server = server.unwrap(); let addr = server.listening_addr(); cfg.server.addr = format!("{}", addr); - let trans = server.transport(); - let simulate_trans = SimulateTransport::new(trans); + let simulate_trans = SimulateTransport::new(server.transport()); let max_grpc_thread_count = cfg.server.grpc_concurrency; let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); @@ -582,7 +584,7 @@ impl ServerCluster { pd_endpoints: cfg.pd.endpoints.clone(), snap_handle_pool_size: cfg.proxy_cfg.raft_store.snap_handle_pool_size, }; - let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new( + tiflash_ob.init_forwarder( node_id, engines.kv.clone(), engines.raft.clone(), @@ -593,7 +595,6 @@ impl ServerCluster { DebugStruct::default(), key_mgr_cloned, ); - tiflash_ob.register_to(&mut coprocessor_host); engines .kv .proxy_ext diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index ce722ab23f5..56b0e205dfd 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -598,17 +598,28 @@ unsafe extern "C" fn ffi_handle_destroy( unsafe extern "C" fn ffi_handle_safe_ts_update( arg1: *mut interfaces_ffi::EngineStoreServerWrap, - _region_id: u64, + region_id: u64, self_safe_ts: u64, leader_safe_ts: u64, ) { - let store = into_engine_store_server_wrap(arg1); - let cluster_ext = store.cluster_ext_ptr as *const mock_cluster::ClusterExt; - assert_eq!(self_safe_ts, (*cluster_ext).test_data.expected_self_safe_ts); - assert_eq!( - leader_safe_ts, - (*cluster_ext).test_data.expected_leader_safe_ts + info!( + "ffi_handle_safe_ts_update region_id {}, self_safe_ts {} leader_safe_ts {}", + region_id, self_safe_ts, leader_safe_ts ); + let store = into_engine_store_server_wrap(arg1); + let cluster_ext = store.cluster_ext_ptr as *mut mock_cluster::ClusterExt; + if (*cluster_ext).test_data.expected_self_safe_ts != 0 { + assert_eq!(self_safe_ts, (*cluster_ext).test_data.expected_self_safe_ts); + } + if (*cluster_ext).test_data.expected_leader_safe_ts != 0 { + assert_eq!( + leader_safe_ts, + (*cluster_ext).test_data.expected_leader_safe_ts + ); + } + (*cluster_ext).test_data.updated_leader_safe_ts = leader_safe_ts; + (*cluster_ext).test_data.updated_self_safe_ts = self_safe_ts; + (*cluster_ext).test_data.checked_time += 1; } unsafe extern "C" fn ffi_handle_compute_store_stats( diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index c8f127e937b..1f3bb1fdfed 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -1236,6 +1236,10 @@ impl TiKvServer { } let importer = Arc::new(importer); + // Must be registered before `CheckLeaderRunner`, to get safe_ts updates. + let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default(); + tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap()); + let check_leader_runner = CheckLeaderRunner::new( engines.store_meta.clone(), self.coprocessor_host.clone().unwrap(), @@ -1283,7 +1287,7 @@ impl TiKvServer { pd_endpoints: self.core.config.pd.endpoints.clone(), snap_handle_pool_size: self.proxy_config.raft_store.snap_handle_pool_size, }; - let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new( + tiflash_ob.init_forwarder( node.id(), self.engines.as_ref().unwrap().engines.kv.clone(), self.engines.as_ref().unwrap().engines.raft.clone(), @@ -1294,7 +1298,6 @@ impl TiKvServer { DebugStruct::default(), self.core.encryption_key_manager.clone(), ); - tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap()); cfg_controller.register( tikv::config::Module::Server, diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index 7cf283227fd..231d5ad3c6a 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -24,7 +24,6 @@ elif [[ $M == "testold" ]]; then echo "Finish tikv code consistency" exit # If we depend TiKV as a Cargo component, the following is not necessary, and can fail. # TODO we have to let tests support openssl-vendored. - yum install openssl openssl-devel -y cargo test --features "$ENABLE_FEATURES" --package tests --test failpoints cases::test_normal cargo test --features "$ENABLE_FEATURES" --package tests --test failpoints cases::test_bootstrap cargo test --features "$ENABLE_FEATURES" --package tests --test failpoints cases::test_compact_log diff --git a/proxy_tests/proxy/shared/server_cluster_test.rs b/proxy_tests/proxy/shared/server_cluster_test.rs index 8f954925bac..9b5687df134 100644 --- a/proxy_tests/proxy/shared/server_cluster_test.rs +++ b/proxy_tests/proxy/shared/server_cluster_test.rs @@ -101,6 +101,159 @@ fn test_safe_ts_basic() { .cluster_ext .set_expected_safe_ts(physical_time, physical_time); suite.must_check_leader(1, TimeStamp::new(physical_time), 1, 1); + assert_eq!(suite.cluster.cluster_ext.test_data.checked_time, 1); suite.stop(); } + +const INVALID_TIMESTAMP: u64 = u64::MAX; + +#[test] +fn test_safe_ts_updates() { + let mut suite = TestSuite::new(1); + + suite.cluster.cluster_ext.test_data.reset(); + + let states = collect_all_states(&suite.cluster.cluster_ext, 1); + let applied_index = states + .get(&1) + .unwrap() + .in_memory_apply_state + .get_applied_index(); + + let physical_time = 646454654654; + suite.must_check_leader(1, TimeStamp::new(physical_time), applied_index + 1, 1); + + assert_ne!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time + ); + + suite.cluster.must_put(b"k1", b"v1"); + + let eng_ids = suite + .cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + check_key( + &*(suite.cluster), + b"k1", + b"v1", + Some(true), + None, + Some(vec![eng_ids[0]]), + ); + + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time + ); + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_leader_safe_ts, + INVALID_TIMESTAMP + ); + + let physical_time2 = 666454654654; + suite.must_check_leader(1, TimeStamp::new(physical_time2), applied_index + 2, 1); + + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time + ); + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_leader_safe_ts, + physical_time2 + ); + + suite.cluster.must_put(b"k2", b"v1"); + + check_key( + &*(suite.cluster), + b"k2", + b"v1", + Some(true), + None, + Some(vec![eng_ids[0]]), + ); + + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time2 + ); + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_leader_safe_ts, + INVALID_TIMESTAMP + ); + suite.stop(); +} + +#[test] +fn test_raft_message_observer() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + + fail::cfg("tiflash_force_reject_raft_append_message", "return").unwrap(); + fail::cfg("tiflash_force_reject_raft_snapshot_message", "return").unwrap(); + + cluster.pd_client.add_peer(r1, new_peer(2, 2)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + check_key( + &cluster, + b"k1", + b"v", + Some(false), + Some(false), + Some(vec![2]), + ); + + fail::remove("tiflash_force_reject_raft_append_message"); + fail::remove("tiflash_force_reject_raft_snapshot_message"); + + cluster.pd_client.must_have_peer(r1, new_peer(2, 2)); + cluster.pd_client.must_add_peer(r1, new_peer(3, 3)); + + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![2, 3]), + ); + + fail::cfg("tiflash_force_reject_raft_append_message", "return").unwrap(); + + let _ = cluster.async_put(b"k2", b"v2").unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + check_key( + &cluster, + b"k3", + b"v3", + Some(false), + Some(false), + Some(vec![2, 3]), + ); + + fail::remove("tiflash_force_reject_raft_append_message"); + + cluster.must_put(b"k3", b"v3"); + check_key( + &cluster, + b"k3", + b"v3", + Some(true), + Some(true), + Some(vec![1, 2, 3]), + ); + cluster.shutdown(); +} diff --git a/proxy_tests/proxy/utils/v1_server.rs b/proxy_tests/proxy/utils/v1_server.rs index fbb0b175da7..88bae96c30f 100644 --- a/proxy_tests/proxy/utils/v1_server.rs +++ b/proxy_tests/proxy/utils/v1_server.rs @@ -72,9 +72,10 @@ impl TestSuite { req.set_regions(regions.into()); req.set_ts(resolved_ts.into_inner()); - let _check_leader_resp = self + let check_leader_resp = self .get_client_from_store_id(store_id) .check_leader(&req) .unwrap(); + info!("check_leader_resp: {:?}", check_leader_resp); } }