-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat(port): port from ent to core #26996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,7 @@ use influxdb3_server::{ | |
| use influxdb3_shutdown::{ShutdownManager, ShutdownToken, wait_for_signal}; | ||
| use influxdb3_sys_events::SysEventStore; | ||
| use influxdb3_telemetry::{ | ||
| ProcessingEngineMetrics, | ||
| ProcessingEngineMetrics, ServeInvocationMethod, | ||
| store::{CreateTelemetryStoreArgs, TelemetryStore}, | ||
| }; | ||
| use influxdb3_wal::{Gen1Duration, WalConfig}; | ||
|
|
@@ -57,8 +57,8 @@ use observability_deps::tracing::*; | |
| use panic_logging::SendPanicsToTracing; | ||
| use parquet_file::storage::{ParquetStorage, StorageId}; | ||
| use std::collections::HashMap; | ||
| use std::str::FromStr; | ||
| use std::{env, num::NonZeroUsize, sync::Arc, time::Duration}; | ||
| use std::{path::Path, str::FromStr}; | ||
| use std::{path::PathBuf, process::Command}; | ||
| use thiserror::Error; | ||
| use tokio::net::TcpListener; | ||
|
|
@@ -77,7 +77,6 @@ use super::helpers::DisableAuthzList; | |
| mod jemalloc; | ||
|
|
||
| /// The default name of the influxdb data directory | ||
| #[allow(dead_code)] | ||
| pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3"; | ||
|
|
||
| /// The default bind address for the HTTP API. | ||
|
|
@@ -116,11 +115,11 @@ pub enum Error { | |
| #[error("invalid token: {0}")] | ||
| InvalidToken(#[from] hex::FromHexError), | ||
|
|
||
| #[error("failed to initialized write buffer: {0}")] | ||
| #[error("failed to initialized write buffer: {0:?}")] | ||
| WriteBufferInit(#[source] anyhow::Error), | ||
|
|
||
| #[error("failed to initialize catalog: {0}")] | ||
| InitializeCatalog(CatalogError), | ||
| InitializeCatalog(#[source] CatalogError), | ||
|
|
||
| #[error("failed to initialize last cache: {0}")] | ||
| InitializeLastCache(#[source] last_cache::Error), | ||
|
|
@@ -145,8 +144,10 @@ pub enum Error { | |
| #[source] influxdb3_write::table_index_cache::TableIndexCacheError, | ||
| ), | ||
|
|
||
| #[error("Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX to a valid string value")] | ||
| NodeIdEnvVarMissing, | ||
| #[error( | ||
| "Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX={0} to a valid env var value for the node id" | ||
| )] | ||
| NodeIdEnvVarMissing(String), | ||
|
|
||
| #[error( | ||
| "Python environment initialization failed: {0}\nPlease ensure Python and either pip or uv package manager is installed" | ||
|
|
@@ -171,7 +172,7 @@ fn wal_replay_concurrency_limit_default() -> String { | |
|
|
||
| /// Try to keep all the memory size in MB instead of raw bytes, also allow | ||
| /// them to be configured as a percentage of total memory using MemorySizeMb | ||
| #[derive(Debug, clap::Parser)] | ||
| #[derive(Clone, Debug, clap::Parser)] | ||
| pub struct Config { | ||
| /// object store options | ||
| #[clap(flatten)] | ||
|
|
@@ -195,32 +196,32 @@ pub struct Config { | |
|
|
||
| /// Maximum size of HTTP requests. | ||
| #[clap( | ||
| long = "max-http-request-size", | ||
| env = "INFLUXDB3_MAX_HTTP_REQUEST_SIZE", | ||
| default_value = "10485760", // 10 MiB | ||
| action, | ||
| long = "max-http-request-size", | ||
| env = "INFLUXDB3_MAX_HTTP_REQUEST_SIZE", | ||
| default_value = "10485760", // 10 MiB | ||
| action, | ||
| )] | ||
| pub max_http_request_size: usize, | ||
|
|
||
| /// The address on which InfluxDB will serve HTTP API requests | ||
| #[clap( | ||
| long = "http-bind", | ||
| env = "INFLUXDB3_HTTP_BIND_ADDR", | ||
| default_value = DEFAULT_HTTP_BIND_ADDR, | ||
| action, | ||
| long = "http-bind", | ||
| env = "INFLUXDB3_HTTP_BIND_ADDR", | ||
| default_value = DEFAULT_HTTP_BIND_ADDR, | ||
| action, | ||
| )] | ||
| pub http_bind_address: SocketAddr, | ||
|
|
||
| /// Enable admin token recovery endpoint on the specified address. | ||
| /// Use flag alone for default address (127.0.0.1:8182) or provide a custom address. | ||
| /// WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution! | ||
| #[clap( | ||
| long = "admin-token-recovery-http-bind", | ||
| env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR", | ||
| num_args = 0..=1, | ||
| default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR, | ||
| help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address", | ||
| action, | ||
| long = "admin-token-recovery-http-bind", | ||
| env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR", | ||
| num_args = 0..=1, | ||
| default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR, | ||
| help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address", | ||
| action, | ||
| )] | ||
| pub admin_token_recovery_bind_address: Option<SocketAddr>, | ||
|
|
||
|
|
@@ -488,6 +489,17 @@ pub struct Config { | |
| )] | ||
| pub telemetry_endpoint: String, | ||
|
|
||
| /// Information on how the serve command was used | ||
| #[clap( | ||
| long = "serve-invocation-method", | ||
| env = "INFLUXDB3_SERVE_INVOCATION_METHOD", | ||
| hide = true, | ||
| value_parser = ServeInvocationMethod::parse, | ||
| action | ||
| )] | ||
| #[arg(default_value_t = ServeInvocationMethod::Explicit)] | ||
| pub serve_invocation_method: ServeInvocationMethod, | ||
|
|
||
| /// Set the limit for number of parquet files allowed in a query. Defaults | ||
| /// to 432 which is about 3 days worth of files using default settings. | ||
| /// This number can be increased to allow more files to be queried, but | ||
|
|
@@ -520,7 +532,7 @@ pub struct Config { | |
| )] | ||
| pub tcp_listener_file_path: Option<PathBuf>, | ||
|
|
||
| /// Provide a file path to write the address that the admin recovery endpoint mounted server is listening on to | ||
| /// Provide a file path to write the address that the admin token recovery server is listening on to. | ||
| /// | ||
| /// This is mainly intended for testing purposes and is not considered stable. | ||
| #[clap( | ||
|
|
@@ -576,16 +588,17 @@ pub fn fail_cluster_id(_: &str) -> Result<String, anyhow::Error> { | |
| } | ||
|
|
||
| #[derive(Clone, Debug, clap::Args)] | ||
| #[group(required = true, multiple = false)] | ||
| #[group(required = true)] | ||
| pub struct NodeId { | ||
| /// The node idendifier used as a prefix in all object store file paths. This should be unique | ||
| /// for any InfluxDB 3 Enterprise servers that share the same object store configuration, i.e., the | ||
| /// The node identifier used as a prefix in all object store file paths. This should be unique | ||
| /// for any InfluxDB 3 Core servers that share the same object store configuration, i.e., the | ||
| /// same bucket. | ||
| #[clap( | ||
| long = "node-id", | ||
| // TODO: deprecate this alias in future version | ||
| alias = "host-id", | ||
| env = "INFLUXDB3_NODE_IDENTIFIER_PREFIX", | ||
| group = "node_id", | ||
| action | ||
| )] | ||
| pub prefix: Option<String>, | ||
|
|
@@ -596,6 +609,7 @@ pub struct NodeId { | |
| #[clap( | ||
| long = "node-id-from-env", | ||
| env = "INFLUXDB3_NODE_IDENTIFIER_FROM_ENV", | ||
| group = "node_id", | ||
| action | ||
| )] | ||
| pub from_env_var: Option<String>, | ||
|
|
@@ -610,7 +624,11 @@ impl NodeId { | |
| .clone() | ||
| .expect(".from_env_var must be Some if .prefix is None"), | ||
| ) | ||
| .map_err(|_| Error::NodeIdEnvVarMissing) | ||
| .map_err(|_| { | ||
| Error::NodeIdEnvVarMissing( | ||
| self.from_env_var.clone().unwrap_or("missing".to_string()), | ||
| ) | ||
| }) | ||
| }, | ||
| Ok, | ||
| ) | ||
|
|
@@ -707,20 +725,6 @@ impl FromStr for ParquetCachePrunePercent { | |
| } | ||
| } | ||
|
|
||
| /// If `p` does not exist, try to create it as a directory. | ||
| /// | ||
| /// panic's if the directory does not exist and can not be created | ||
| #[allow(dead_code)] | ||
| fn ensure_directory_exists(p: &Path) { | ||
| if !p.exists() { | ||
| info!( | ||
| p=%p.display(), | ||
| "Creating directory", | ||
| ); | ||
| std::fs::create_dir_all(p).expect("Could not create default directory"); | ||
| } | ||
| } | ||
|
|
||
| pub async fn command(config: Config, user_params: HashMap<String, String>) -> Result<()> { | ||
| let node_id = config.get_node_id()?; | ||
|
|
||
|
|
@@ -796,6 +800,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re | |
|
|
||
| // setup cached object store: | ||
| let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache { | ||
| info!("initialising parquet cache"); | ||
| let (object_store, parquet_cache) = create_cached_obj_store_and_oracle( | ||
| object_store, | ||
| Arc::clone(&time_provider) as _, | ||
|
|
@@ -842,6 +847,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re | |
| "datafusion", | ||
| tokio_datafusion_config | ||
| .builder() | ||
| .map(|mut builder| { | ||
| builder.enable_all(); | ||
| builder | ||
|
Comment on lines
+850
to
+852
|
||
| }) | ||
| .map_err(Error::TokioRuntime)?, | ||
| Arc::clone(&metrics), | ||
| ), | ||
|
|
@@ -925,6 +934,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re | |
| retention_handler_token, | ||
| ) | ||
| .await | ||
| .inspect_err(|_e| { | ||
| warn!("TableIndexCache initialization failed, continuing in degraded state."); | ||
| warn!("Without TableIndexCache, object store cleanup for retention policies and hard deletes will temporarily be unable to proceed; compacted data and queries should not be affected."); | ||
| }) | ||
| .unwrap_or(None); | ||
|
|
||
| // Initialize tokens from files if provided and auth is enabled | ||
|
|
@@ -957,7 +970,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re | |
| info!(instance_id = ?node_def.instance_id(), "catalog initialized"); | ||
|
|
||
| let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction( | ||
| Arc::clone(&catalog) as _, | ||
| Arc::clone(&catalog), | ||
| config.last_cache_eviction_interval.into(), | ||
| ) | ||
| .await | ||
|
|
@@ -1053,6 +1066,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re | |
| persisted_files: Some(persisted_files), | ||
| telemetry_endpoint: &config.telemetry_endpoint, | ||
| disable_upload: config.disable_telemetry_upload, | ||
| serve_invocation_method: config.serve_invocation_method, | ||
| catalog_uuid: catalog.catalog_uuid().to_string(), | ||
| processing_engine_metrics: Arc::clone(&catalog) as Arc<dyn ProcessingEngineMetrics>, | ||
| }) | ||
|
|
@@ -1277,9 +1291,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re | |
| Err(error) => { | ||
| error!("HTTP/gRPC error"); | ||
| res = res.and(Err(Error::Server(error))); | ||
| } | ||
| }, | ||
| } | ||
| } | ||
| }, | ||
| // Admin token recovery endpoint has stopped | ||
| recovery_result = recovery_frontend => { | ||
| // Only process recovery endpoint results if it was actually enabled and active | ||
| if recovery_endpoint_enabled && recovery_endpoint_active { | ||
|
|
@@ -1430,6 +1445,7 @@ struct TelemetryStoreSetupArgs<'a> { | |
| telemetry_endpoint: &'a str, | ||
| disable_upload: bool, | ||
| catalog_uuid: String, | ||
| serve_invocation_method: ServeInvocationMethod, | ||
| processing_engine_metrics: Arc<dyn ProcessingEngineMetrics>, | ||
| } | ||
|
|
||
|
|
@@ -1442,6 +1458,7 @@ async fn setup_telemetry_store( | |
| telemetry_endpoint, | ||
| disable_upload, | ||
| catalog_uuid, | ||
| serve_invocation_method, | ||
| processing_engine_metrics, | ||
| }: TelemetryStoreSetupArgs<'_>, | ||
| ) -> Arc<TelemetryStore> { | ||
|
|
@@ -1470,6 +1487,7 @@ async fn setup_telemetry_store( | |
| persisted_files: persisted_files.map(|p| p as _), | ||
| telemetry_endpoint: telemetry_endpoint.to_string(), | ||
| catalog_uuid, | ||
| serve_invocation_method, | ||
| processing_engine_metrics, | ||
| }) | ||
| .await | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in error message: "initialized" should be "initialize" to match tense with similar error messages (lines 121, 124, 127).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 4a38489