diff --git a/Makefile b/Makefile index b7ffa452cb4..b643d7bbd19 100644 --- a/Makefile +++ b/Makefile @@ -211,7 +211,7 @@ unset-override: pre-format: unset-override @rustup component add rustfmt - @cargo install --force -q cargo-sort + @cargo install --locked --force -q cargo-sort ci_fmt_check: M="fmt" ./proxy_scripts/ci_check.sh diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index b3e4a71463b..a34dd3c9c53 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -396,7 +396,7 @@ impl Default for Config { apply_yield_duration: ReadableDuration::millis(500), apply_yield_write_size: ReadableSize::kb(32), perf_level: PerfLevel::Uninitialized, - evict_cache_on_memory_ratio: 0.0, + evict_cache_on_memory_ratio: 0.1, cmd_batch: true, cmd_batch_concurrent_ready_max_count: 1, raft_write_size_limit: ReadableSize::mb(1), diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index bb713e9f045..aaf47663d21 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -23,9 +23,9 @@ elif [[ $M == "testold" ]]; then cargo test --features "$ENABLE_FEATURES" --package tests --test failpoints cases::test_transaction cargo test --features "$ENABLE_FEATURES" --package tests --test failpoints cases::test_cmd_epoch_checker # cargo test --package tests --test failpoints cases::test_disk_full - cargo test --package tests --test failpoints cases::test_merge + # cargo test --package tests --test failpoints cases::test_merge # cargo test --package tests --test failpoints cases::test_snap - cargo test --package tests --test failpoints cases::test_import_service + # cargo test --package tests --test failpoints cases::test_import_service elif [[ $M == "testnew" ]]; then export ENGINE_LABEL_VALUE=tiflash export RUST_BACKTRACE=full diff --git a/proxy_server/src/config.rs b/proxy_server/src/config.rs index 63d4e57f4b1..c09ede420df 100644 --- a/proxy_server/src/config.rs +++ b/proxy_server/src/config.rs @@ -273,6 +273,14 @@ impl Default for ProxyConfig { raftdb: RaftDbConfig::default(), storage: StorageConfig::default(), enable_io_snoop: false, + // Previously, we set `memory_usage_high_water` to 0.1, in order to make TiFlash to be + // always in a high-water situation. thus by setting + // `evict_cache_on_memory_ratio`, we can evict entry cache if there is a memory usage + // peak after restart. However there're some cases that the raftstore could + // take more than 5% of the total used memory, so TiFlash will reject + // msgAppend to every region. So, it actually not a good idea to make + // TiFlash Proxy always run in a high-water state, in order to reduce the + // memory usage peak after restart. readpool: ReadPoolConfig::default(), import: ImportConfig::default(), } diff --git a/proxy_server/src/proxy.rs b/proxy_server/src/proxy.rs index c34c9ef54f9..5d0b2a7de19 100644 --- a/proxy_server/src/proxy.rs +++ b/proxy_server/src/proxy.rs @@ -267,12 +267,23 @@ pub unsafe fn run_proxy( .long("only-decryption") .help("Only do decryption in Proxy"), ) + .arg( + Arg::with_name("memory-limit-size") + .long("memory-limit-size") + .help("Used as the maximum memory we can consume, in bytes") + .takes_value(true), + ) + .arg( + Arg::with_name("memory-limit-ratio") + .long("memory-limit-ratio") + .help("Used as the maximum memory we can consume, in percentage") + .takes_value(true), + ) .get_matches_from(args); if matches.is_present("print-sample-config") { let config = TikvConfig::default(); println!("{}", toml::to_string_pretty(&config).unwrap()); - process::exit(0); } let mut unrecognized_keys = Vec::new(); @@ -308,6 +319,7 @@ pub unsafe fn run_proxy( if matches.is_present("only-decryption") { crate::run::run_tikv_only_decryption(config, proxy_config, engine_store_server_helper); } else { + // Log is enabled here. crate::run::run_tikv_proxy(config, proxy_config, engine_store_server_helper); } } diff --git a/proxy_server/src/setup.rs b/proxy_server/src/setup.rs index f04b968eff9..d0e2d7f175f 100644 --- a/proxy_server/src/setup.rs +++ b/proxy_server/src/setup.rs @@ -5,8 +5,8 @@ use std::borrow::ToOwned; use clap::ArgMatches; use collections::HashMap; pub use server::setup::initial_logger; -use tikv::config::{MetricConfig, TikvConfig}; -use tikv_util::{self, logger}; +use tikv::config::{MetricConfig, TikvConfig, MEMORY_USAGE_LIMIT_RATE}; +use tikv_util::{self, config::ReadableSize, logger, sys::SysQuota}; use crate::config::ProxyConfig; pub use crate::fatal; @@ -142,4 +142,49 @@ pub fn overwrite_config_with_cmd_args( .unwrap(), ), ); + + let mut memory_limit_set = config.memory_usage_limit.is_some(); + if !memory_limit_set { + if let Some(s) = matches.value_of("memory-limit-size") { + let result: Result = s.parse(); + if let Ok(memory_limit_size) = result { + info!( + "overwrite memory_usage_limit by `memory-limit-size` to {}", + memory_limit_size + ); + config.memory_usage_limit = Some(ReadableSize(memory_limit_size)); + memory_limit_set = true; + } else { + info!("overwrite memory_usage_limit by `memory-limit-size` failed"; "memory_limit_size" => s); + } + } + } + + let total = SysQuota::memory_limit_in_bytes(); + if !memory_limit_set { + if let Some(s) = matches.value_of("memory-limit-ratio") { + let result: Result = s.parse(); + if let Ok(memory_limit_ratio) = result { + if memory_limit_ratio <= 0.0 || memory_limit_ratio > 1.0 { + info!("overwrite memory_usage_limit meets error ratio"; "ratio" => memory_limit_ratio); + } else { + let limit = (total as f64 * memory_limit_ratio) as u64; + info!( + "overwrite memory_usage_limit by `memory-limit-ratio`={} to {}", + memory_limit_ratio, limit + ); + config.memory_usage_limit = Some(ReadableSize(limit)); + memory_limit_set = true; + } + } else { + info!("overwrite memory_usage_limit meets error ratio"; "ratio" => s); + } + } + } + + if !memory_limit_set && config.memory_usage_limit.is_none() { + let limit = (total as f64 * MEMORY_USAGE_LIMIT_RATE) as u64; + info!("overwrite memory_usage_limit failed, use TiKV's default"; "limit" => limit); + config.memory_usage_limit = Some(ReadableSize(limit)); + } } diff --git a/proxy_tests/proxy/config.rs b/proxy_tests/proxy/config.rs index a50e91d96be..ddf76eca1c6 100644 --- a/proxy_tests/proxy/config.rs +++ b/proxy_tests/proxy/config.rs @@ -8,6 +8,7 @@ use proxy_server::{ proxy::{gen_proxy_config, gen_tikv_config}, setup::overwrite_config_with_cmd_args, }; +use tikv::config::MEMORY_USAGE_LIMIT_RATE; use tikv_util::sys::SysQuota; use crate::proxy::*; @@ -122,6 +123,11 @@ fn test_config_proxy_default_no_config_item() { std::cmp::min(256, (cpu_num * 8.0) as usize) ); assert_eq!(config.server.status_thread_pool_size, 2); + + assert_eq!(config.raft_store.evict_cache_on_memory_ratio, 0.1); + assert_eq!(config.memory_usage_high_water, 0.9); + // Seems #244 doesn't goes into this branch. + assert_eq!(config.server.reject_messages_on_memory_ratio, 0.2); } /// We test if the engine-label is set properly. @@ -200,3 +206,149 @@ apply-low-priority-pool-size = 41 config.raft_store.apply_batch_system.low_priority_pool_size ); } + +#[test] +fn test_memory_limit_overwrite() { + let app = App::new("RaftStore Proxy") + .arg( + Arg::with_name("memory-limit-size") + .long("memory-limit-size") + .help("Used as the maximum memory we can consume, in bytes") + .takes_value(true), + ) + .arg( + Arg::with_name("memory-limit-ratio") + .long("memory-limit-ratio") + .help("Used as the maximum memory we can consume, in percentage") + .takes_value(true), + ); + + let bootstrap = |args: Vec<&str>| { + let mut v: Vec = vec![]; + let matches = app.clone().get_matches_from(args); + let mut config = gen_tikv_config(&None, false, &mut v); + let mut proxy_config = gen_proxy_config(&None, false, &mut v); + proxy_config.raftdb.defaultcf.block_cache_size = ReadableSize(0); + proxy_config.rocksdb.defaultcf.block_cache_size = ReadableSize(0); + proxy_config.rocksdb.lockcf.block_cache_size = ReadableSize(0); + proxy_config.rocksdb.writecf.block_cache_size = ReadableSize(0); + overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches); + address_proxy_config(&mut config, &proxy_config); + config.compatible_adjust(); + config + }; + + { + let args = vec![ + "test_memory_limit_overwrite1", + "--memory-limit-size", + "12345", + ]; + let mut config = bootstrap(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(12345))); + } + + { + let args = vec![ + "test_memory_limit_overwrite2", + "--memory-limit-size", + "12345", + "--memory-limit-ratio", + "0.9", + ]; + let mut config = bootstrap(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(12345))); + } + + let total = SysQuota::memory_limit_in_bytes(); + { + let args = vec![ + "test_memory_limit_overwrite3", + "--memory-limit-ratio", + "0.800000", + ]; + let mut config = bootstrap(args); + assert!(config.validate().is_ok()); + let limit = (total as f64 * 0.8) as u64; + assert_eq!(config.memory_usage_limit, Some(ReadableSize(limit))); + } + + let default_limit = (total as f64 * MEMORY_USAGE_LIMIT_RATE) as u64; + { + let args = vec![ + "test_memory_limit_overwrite4", + "--memory-limit-ratio", + "7.9", + ]; + let mut config = bootstrap(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(default_limit))); + } + + { + let args = vec![ + "test_memory_limit_overwrite5", + "--memory-limit-ratio", + "'-0.9'", + ]; + let mut config = bootstrap(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(default_limit))); + } + + { + let args = vec!["test_memory_limit_overwrite6"]; + let mut config = bootstrap(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(default_limit))); + } + + let bootstrap2 = |args: Vec<&str>| { + let mut v: Vec = vec![]; + let matches = app.clone().get_matches_from(args); + let mut file = tempfile::NamedTempFile::new().unwrap(); + write!( + file, + " +memory-usage-limit = 42 + " + ) + .unwrap(); + let path = file.path(); + let cpath = Some(path.as_os_str()); + let mut config = gen_tikv_config(&cpath, false, &mut v); + let mut proxy_config = gen_proxy_config(&cpath, false, &mut v); + proxy_config.raftdb.defaultcf.block_cache_size = ReadableSize(0); + proxy_config.rocksdb.defaultcf.block_cache_size = ReadableSize(0); + proxy_config.rocksdb.lockcf.block_cache_size = ReadableSize(0); + proxy_config.rocksdb.writecf.block_cache_size = ReadableSize(0); + overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches); + address_proxy_config(&mut config, &proxy_config); + config.compatible_adjust(); + config + }; + + { + let args = vec![ + "test_memory_limit_nooverwrite3", + "--memory-limit-ratio", + "0.800000", + ]; + let mut config = bootstrap2(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(42))); + } + + { + let args = vec![ + "test_memory_limit_nooverwrite1", + "--memory-limit-size", + "12345", + ]; + let mut config = bootstrap2(args); + assert!(config.validate().is_ok()); + assert_eq!(config.memory_usage_limit, Some(ReadableSize(42))); + } +} diff --git a/proxy_tests/proxy/shared/store.rs b/proxy_tests/proxy/shared/store.rs new file mode 100644 index 00000000000..f99b0d1e77f --- /dev/null +++ b/proxy_tests/proxy/shared/store.rs @@ -0,0 +1,146 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use pd_client::PdClient; +use proxy_server::config::{ + address_proxy_config, ensure_no_common_unrecognized_keys, get_last_config, + setup_default_tikv_config, validate_and_persist_config, TIFLASH_DEFAULT_LISTENING_ADDR, +}; +use tikv::config::{TikvConfig, LAST_CONFIG_FILE}; + +use crate::utils::v1::*; + +mod store { + use super::*; + #[test] + fn test_store_stats() { + let (mut cluster, pd_client) = new_mock_cluster(0, 1); + + let _ = cluster.run(); + + for id in cluster.engines.keys() { + let engine = cluster.get_tiflash_engine(*id); + assert_eq!(engine.get_store_stats().fs_stats.capacity_size, 444444); + } + + for id in cluster.engines.keys() { + cluster.must_send_store_heartbeat(*id); + } + std::thread::sleep(std::time::Duration::from_millis(1000)); + // let resp = block_on(pd_client.store_heartbeat(Default::default(), None, + // None)).unwrap(); + for id in cluster.engines.keys() { + let store_stat = pd_client.get_store_stats(*id).unwrap(); + assert_eq!(store_stat.get_capacity(), 444444); + assert_eq!(store_stat.get_available(), 333333); + } + // The same to mock-engine-store + cluster.shutdown(); + } +} + +mod config { + use super::*; + + /// Test for double read into both ProxyConfig and TikvConfig. + #[test] + fn test_config() { + // Test double read. + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + + let mut unrecognized_keys = Vec::new(); + let mut config = TikvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); + // Otherwise we have no default addr for TiKv. + setup_default_tikv_config(&mut config); + assert_eq!(config.memory_usage_high_water, 0.65); + assert_eq!(config.rocksdb.max_open_files, 111); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); + assert_eq!(unrecognized_keys.len(), 3); + + let mut proxy_unrecognized_keys = Vec::new(); + let proxy_config = + ProxyConfig::from_file(path, Some(&mut proxy_unrecognized_keys)).unwrap(); + assert_eq!(proxy_config.raft_store.snap_handle_pool_size, 4); + assert_eq!(proxy_config.server.engine_addr, "1.2.3.4:5"); + assert!(proxy_unrecognized_keys.contains(&"nosense".to_string())); + let v1 = vec!["a.b", "b"] + .iter() + .map(|e| String::from(*e)) + .collect::>(); + let v2 = vec!["a.b", "b.b", "c"] + .iter() + .map(|e| String::from(*e)) + .collect::>(); + let unknown = ensure_no_common_unrecognized_keys(&v1, &v2); + assert_eq!(unknown.is_err(), true); + assert_eq!(unknown.unwrap_err(), "a.b, b.b"); + let unknown = + ensure_no_common_unrecognized_keys(&proxy_unrecognized_keys, &unrecognized_keys); + assert_eq!(unknown.is_err(), true); + assert_eq!(unknown.unwrap_err(), "nosense, rocksdb.z"); + + // Common config can be persisted. + // Need run this test with ENGINE_LABEL_VALUE=tiflash, otherwise will fatal + // exit. + let _ = std::fs::remove_file( + PathBuf::from_str(&config.storage.data_dir) + .unwrap() + .join(LAST_CONFIG_FILE), + ); + validate_and_persist_config(&mut config, true); + + // Will not override ProxyConfig + let proxy_config_new = ProxyConfig::from_file(path, None).unwrap(); + assert_eq!(proxy_config_new.raft_store.snap_handle_pool_size, 4); + } + + /// Test for basic address_proxy_config. + #[test] + fn test_validate_config() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "[raftstore.aaa]\nbbb=2\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\nclean-stale-ranges-tick=9999\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + let tmp_store_folder = tempfile::TempDir::new().unwrap(); + let tmp_last_config_path = tmp_store_folder.path().join(LAST_CONFIG_FILE); + std::fs::copy(path, tmp_last_config_path.as_path()).unwrap(); + get_last_config(tmp_store_folder.path().to_str().unwrap()); + + let mut unrecognized_keys: Vec = vec![]; + let mut config = TikvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); + assert_eq!(config.raft_store.clean_stale_ranges_tick, 9999); + address_proxy_config(&mut config, &ProxyConfig::default()); + let clean_stale_ranges_tick = + (10_000 / config.raft_store.region_worker_tick_interval.as_millis()) as usize; + assert_eq!( + config.raft_store.clean_stale_ranges_tick, + clean_stale_ranges_tick + ); + } + + #[test] + fn test_store_setup() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Add label to cluster + address_proxy_config(&mut cluster.cfg.tikv, &ProxyConfig::default()); + + // Try to start this node, return after persisted some keys. + let _ = cluster.run(); + // Not use start to avoid "start new node" branch. + // let _ = cluster.start(); + let store_id = cluster.engines.keys().last().unwrap(); + let store = pd_client.get_store(*store_id).unwrap(); + println!("store {:?}", store); + assert!( + store + .get_labels() + .iter() + .find(|&x| x.key == "engine" && x.value == "tiflash") + .is_some() + ); + cluster.shutdown(); + } +}