Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 99 additions & 50 deletions proxy_components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use encryption::DataKeyManager;
use engine_traits::RaftEngine;
Expand Down Expand Up @@ -32,12 +32,21 @@ const TIFLASH_OBSERVER_PRIORITY: u32 = 0;

#[derive(Clone)]
pub struct TiFlashObserver<T: Transport + 'static, ER: RaftEngine> {
pub forwarder: ProxyForwarder<T, ER>,
pub forwarder: Arc<RwLock<Option<ProxyForwarder<T, ER>>>>,
}

impl<T: Transport + 'static, ER: RaftEngine> Default for TiFlashObserver<T, ER> {
fn default() -> Self {
Self {
forwarder: Arc::new(RwLock::new(None)),
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {
#[allow(clippy::too_many_arguments)]
pub fn new(
pub fn init_forwarder(
&mut self,
store_id: u64,
engine: engine_tiflash::MixedModeEngine,
raft_engine: ER,
Expand All @@ -47,20 +56,19 @@ impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {
packed_envs: PackedEnvs,
debug_struct: DebugStruct,
key_manager: Option<Arc<DataKeyManager>>,
) -> Self {
TiFlashObserver {
forwarder: ProxyForwarder::new(
store_id,
engine,
raft_engine,
sst_importer,
trans,
snap_mgr,
packed_envs,
debug_struct,
key_manager,
),
}
) {
let f = ProxyForwarder::new(
store_id,
engine,
raft_engine,
sst_importer,
trans,
snap_mgr,
packed_envs,
debug_struct,
key_manager,
);
self.forwarder.write().expect("poisoned").replace(f);
}

pub fn register_to<E: engine_traits::KvEngine>(
Expand Down Expand Up @@ -105,7 +113,9 @@ impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {

impl<T: Transport + 'static, ER: RaftEngine> Coprocessor for TiFlashObserver<T, ER> {
fn stop(&self) {
self.forwarder.stop();
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.stop();
}
}
}

Expand All @@ -117,8 +127,11 @@ impl<T: Transport + 'static, ER: RaftEngine> AdminObserver for TiFlashObserver<T
index: u64,
term: u64,
) -> bool {
self.forwarder
.pre_exec_admin(ob_ctx.region(), req, index, term)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_exec_admin(ob_ctx.region(), req, index, term)
} else {
false
}
}

fn post_exec_admin(
Expand All @@ -129,19 +142,25 @@ impl<T: Transport + 'static, ER: RaftEngine> AdminObserver for TiFlashObserver<T
region_state: &RegionState,
apply_ctx_info: &mut ApplyCtxInfo<'_>,
) -> bool {
self.forwarder.post_exec_admin(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.post_exec_admin(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
} else {
false
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> QueryObserver for TiFlashObserver<T, ER> {
fn on_empty_cmd(&self, ob_ctx: &mut ObserverContext<'_>, index: u64, term: u64) {
self.forwarder.on_empty_cmd(ob_ctx.region(), index, term)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_empty_cmd(ob_ctx.region(), index, term)
}
}

fn post_exec_query(
Expand All @@ -152,20 +171,25 @@ impl<T: Transport + 'static, ER: RaftEngine> QueryObserver for TiFlashObserver<T
region_state: &RegionState,
apply_ctx_info: &mut ApplyCtxInfo<'_>,
) -> bool {
self.forwarder.post_exec_query(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.post_exec_query(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
} else {
false
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> UpdateSafeTsObserver for TiFlashObserver<T, ER> {
fn on_update_safe_ts(&self, region_id: u64, self_safe_ts: u64, leader_safe_ts: u64) {
self.forwarder
.on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts)
}
}
}

Expand All @@ -176,7 +200,9 @@ impl<T: Transport + 'static, ER: RaftEngine> RegionChangeObserver for TiFlashObs
e: RegionChangeEvent,
r: StateRole,
) {
self.forwarder.on_region_changed(ob_ctx.region(), e, r)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_region_changed(ob_ctx.region(), e, r)
}
}

#[allow(clippy::match_like_matches_macro)]
Expand All @@ -186,24 +212,37 @@ impl<T: Transport + 'static, ER: RaftEngine> RegionChangeObserver for TiFlashObs
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
self.forwarder
.pre_persist(ob_ctx.region(), is_finished, cmd)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_persist(ob_ctx.region(), is_finished, cmd)
} else {
true
}
}

fn pre_write_apply_state(&self, ob_ctx: &mut ObserverContext<'_>) -> bool {
self.forwarder.pre_write_apply_state(ob_ctx.region())
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_write_apply_state(ob_ctx.region())
} else {
true
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> RaftMessageObserver for TiFlashObserver<T, ER> {
fn on_raft_message(&self, msg: &RaftMessage) -> bool {
self.forwarder.on_raft_message(msg)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_raft_message(msg)
} else {
true
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> PdTaskObserver for TiFlashObserver<T, ER> {
fn on_compute_engine_size(&self, store_size: &mut Option<StoreSizeInfo>) {
self.forwarder.on_compute_engine_size(store_size)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_compute_engine_size(store_size)
}
}
}

Expand All @@ -216,8 +255,9 @@ impl<T: Transport + 'static, ER: RaftEngine> ApplySnapshotObserver for TiFlashOb
snap_key: &store::SnapKey,
snap: Option<&store::Snapshot>,
) {
self.forwarder
.pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
}
}

fn post_apply_snapshot(
Expand All @@ -227,21 +267,30 @@ impl<T: Transport + 'static, ER: RaftEngine> ApplySnapshotObserver for TiFlashOb
snap_key: &store::SnapKey,
snap: Option<&store::Snapshot>,
) {
self.forwarder
.post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
}
}

fn should_pre_apply_snapshot(&self) -> bool {
self.forwarder.should_pre_apply_snapshot()
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.should_pre_apply_snapshot()
} else {
false
}
}

fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) {
self.forwarder.cancel_apply_snapshot(region_id, peer_id)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.cancel_apply_snapshot(region_id, peer_id)
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> RoleObserver for TiFlashObserver<T, ER> {
fn on_role_change(&self, ob_ctx: &mut ObserverContext<'_>, r: &RoleChange) {
self.forwarder.on_role_change(ob_ctx.region(), r)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_role_change(ob_ctx.region(), r)
}
}
}
13 changes: 13 additions & 0 deletions proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ pub struct FFIHelperSet {
pub struct TestData {
pub expected_leader_safe_ts: u64,
pub expected_self_safe_ts: u64,
pub updated_leader_safe_ts: u64,
pub updated_self_safe_ts: u64,
pub checked_time: u64,
}

impl TestData {
pub fn reset(&mut self) {
self.expected_leader_safe_ts = 0;
self.expected_self_safe_ts = 0;
self.updated_leader_safe_ts = 0;
self.updated_self_safe_ts = 0;
self.checked_time = 0;
}
}

#[allow(clippy::type_complexity)]
Expand Down
40 changes: 38 additions & 2 deletions proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use encryption::DataKeyManager;
// mock cluster
use engine_traits::{Engines, KvEngine, CF_DEFAULT};
use file_system::IoRateLimiter;
use futures::executor::block_on;
use futures::{executor::block_on, future::BoxFuture, StreamExt};
use kvproto::{
errorpb::Error as PbError,
metapb::{self, PeerRole, RegionEpoch, StoreLabel},
Expand Down Expand Up @@ -43,7 +43,7 @@ use resource_control::ResourceGroupManager;
use tempfile::TempDir;
use test_pd_client::TestPdClient;
use test_raftstore::{
is_error_response, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd,
is_error_response, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd, new_put_cmd,
new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config,
new_transfer_leader_cmd, sleep_ms,
};
Expand Down Expand Up @@ -1121,4 +1121,40 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
panic!("Flashback call msg failed");
}
}

pub fn async_request(
&mut self,
req: RaftCmdRequest,
) -> Result<BoxFuture<'static, RaftCmdResponse>> {
self.async_request_with_opts(req, Default::default())
}

pub fn async_request_with_opts(
&mut self,
mut req: RaftCmdRequest,
opts: RaftCmdExtraOpts,
) -> Result<BoxFuture<'static, RaftCmdResponse>> {
let region_id = req.get_header().get_region_id();
let leader = self.leader_of_region(region_id).unwrap();
req.mut_header().set_peer(leader.clone());
let (cb, mut rx) = make_cb::<TiFlashEngine>(&req);
self.sim
.rl()
.async_command_on_node_with_opts(leader.get_store_id(), req, cb, opts)?;
Ok(Box::pin(async move {
let fut = rx.next();
fut.await.unwrap()
}))
}

pub fn async_put(
&mut self,
key: &[u8],
value: &[u8],
) -> Result<BoxFuture<'static, RaftCmdResponse>> {
let mut region = self.get_region(key);
let reqs = vec![new_put_cmd(key, value)];
let put = new_request(region.get_id(), region.take_region_epoch(), reqs, false);
self.async_request(put)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ impl Simulator<TiFlashEngine> for NodeCluster {
pd_endpoints: cfg.pd.endpoints.clone(),
snap_handle_pool_size: cfg.proxy_cfg.raft_store.snap_handle_pool_size,
};
let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new(
let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default();
tiflash_ob.init_forwarder(
node_id,
engines.kv.clone(),
engines.raft.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ impl ServerCluster {
// Create coprocessor.
let mut coprocessor_host = CoprocessorHost::new(router.clone(), cfg.coprocessor.clone());

let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default();
tiflash_ob.register_to(&mut coprocessor_host);

let local_reader = LocalReader::new(
engines.kv.clone(),
StoreMetaDelegate::new(store_meta.clone(), engines.kv.clone()),
Expand Down Expand Up @@ -572,8 +575,7 @@ impl ServerCluster {
let mut server = server.unwrap();
let addr = server.listening_addr();
cfg.server.addr = format!("{}", addr);
let trans = server.transport();
let simulate_trans = SimulateTransport::new(trans);
let simulate_trans = SimulateTransport::new(server.transport());
let max_grpc_thread_count = cfg.server.grpc_concurrency;
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));

Expand All @@ -582,7 +584,7 @@ impl ServerCluster {
pd_endpoints: cfg.pd.endpoints.clone(),
snap_handle_pool_size: cfg.proxy_cfg.raft_store.snap_handle_pool_size,
};
let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new(
tiflash_ob.init_forwarder(
node_id,
engines.kv.clone(),
engines.raft.clone(),
Expand All @@ -593,7 +595,6 @@ impl ServerCluster {
DebugStruct::default(),
key_mgr_cloned,
);
tiflash_ob.register_to(&mut coprocessor_host);
engines
.kv
.proxy_ext
Expand Down
Loading
Loading