Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ byteorder = "1.3.4"
bytes = "1.9"
cargo_metadata = "0.19.2"
chrono = "0.4"
cron = "0.15"
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = "4.5.47"
clru = "0.6.2"
owo-colors = "4.2.0"
crc32fast = "1.2.0"
criterion = { version = "0.5", features = ["html_reports"] }
cron = "0.15"
crossbeam-channel = "0.5.11"
csv = "1.3.0"
owo-colors = "4.2.0"
# Use DataFusion fork (see below)
datafusion = { version = "49" }
dashmap = "6.1.0"
Expand Down
3 changes: 2 additions & 1 deletion influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ workspace = true
[dependencies]
# Core Crates
authz.workspace = true
clap_builder.workspace = true
datafusion_util.workspace = true
iox_query.workspace = true
iox_time.workspace = true
Expand All @@ -30,8 +31,8 @@ panic_logging.workspace = true
parquet_file.workspace = true
tokio_metrics_bridge.workspace = true
trace.workspace = true
trace_http.workspace = true
trace_exporters.workspace = true
trace_http.workspace = true
trogging.workspace = true

# Local Crates
Expand Down
106 changes: 62 additions & 44 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use influxdb3_server::{
use influxdb3_shutdown::{ShutdownManager, ShutdownToken, wait_for_signal};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::{
ProcessingEngineMetrics,
ProcessingEngineMetrics, ServeInvocationMethod,
store::{CreateTelemetryStoreArgs, TelemetryStore},
};
use influxdb3_wal::{Gen1Duration, WalConfig};
Expand All @@ -57,8 +57,8 @@ use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::collections::HashMap;
use std::str::FromStr;
use std::{env, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{path::Path, str::FromStr};
use std::{path::PathBuf, process::Command};
use thiserror::Error;
use tokio::net::TcpListener;
Expand All @@ -77,7 +77,6 @@ use super::helpers::DisableAuthzList;
mod jemalloc;

/// The default name of the influxdb data directory
#[allow(dead_code)]
pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3";

/// The default bind address for the HTTP API.
Expand Down Expand Up @@ -116,11 +115,11 @@ pub enum Error {
#[error("invalid token: {0}")]
InvalidToken(#[from] hex::FromHexError),

#[error("failed to initialized write buffer: {0}")]
#[error("failed to initialized write buffer: {0:?}")]
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in error message: "initialized" should be "initialize" to match tense with similar error messages (lines 121, 124, 127).

Suggested change
#[error("failed to initialized write buffer: {0:?}")]
#[error("failed to initialize write buffer: {0:?}")]

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 4a38489

WriteBufferInit(#[source] anyhow::Error),

#[error("failed to initialize catalog: {0}")]
InitializeCatalog(CatalogError),
InitializeCatalog(#[source] CatalogError),

#[error("failed to initialize last cache: {0}")]
InitializeLastCache(#[source] last_cache::Error),
Expand All @@ -145,8 +144,10 @@ pub enum Error {
#[source] influxdb3_write::table_index_cache::TableIndexCacheError,
),

#[error("Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX to a valid string value")]
NodeIdEnvVarMissing,
#[error(
"Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX={0} to a valid env var value for the node id"
)]
NodeIdEnvVarMissing(String),

#[error(
"Python environment initialization failed: {0}\nPlease ensure Python and either pip or uv package manager is installed"
Expand All @@ -171,7 +172,7 @@ fn wal_replay_concurrency_limit_default() -> String {

/// Try to keep all the memory size in MB instead of raw bytes, also allow
/// them to be configured as a percentage of total memory using MemorySizeMb
#[derive(Debug, clap::Parser)]
#[derive(Clone, Debug, clap::Parser)]
pub struct Config {
/// object store options
#[clap(flatten)]
Expand All @@ -195,32 +196,32 @@ pub struct Config {

/// Maximum size of HTTP requests.
#[clap(
long = "max-http-request-size",
env = "INFLUXDB3_MAX_HTTP_REQUEST_SIZE",
default_value = "10485760", // 10 MiB
action,
long = "max-http-request-size",
env = "INFLUXDB3_MAX_HTTP_REQUEST_SIZE",
default_value = "10485760", // 10 MiB
action,
)]
pub max_http_request_size: usize,

/// The address on which InfluxDB will serve HTTP API requests
#[clap(
long = "http-bind",
env = "INFLUXDB3_HTTP_BIND_ADDR",
default_value = DEFAULT_HTTP_BIND_ADDR,
action,
long = "http-bind",
env = "INFLUXDB3_HTTP_BIND_ADDR",
default_value = DEFAULT_HTTP_BIND_ADDR,
action,
)]
pub http_bind_address: SocketAddr,

/// Enable admin token recovery endpoint on the specified address.
/// Use flag alone for default address (127.0.0.1:8182) or provide a custom address.
/// WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution!
#[clap(
long = "admin-token-recovery-http-bind",
env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR",
num_args = 0..=1,
default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR,
help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address",
action,
long = "admin-token-recovery-http-bind",
env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR",
num_args = 0..=1,
default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR,
help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address",
action,
)]
pub admin_token_recovery_bind_address: Option<SocketAddr>,

Expand Down Expand Up @@ -488,6 +489,17 @@ pub struct Config {
)]
pub telemetry_endpoint: String,

/// Information on how the serve command was used
#[clap(
long = "serve-invocation-method",
env = "INFLUXDB3_SERVE_INVOCATION_METHOD",
hide = true,
value_parser = ServeInvocationMethod::parse,
action
)]
#[arg(default_value_t = ServeInvocationMethod::Explicit)]
pub serve_invocation_method: ServeInvocationMethod,

/// Set the limit for number of parquet files allowed in a query. Defaults
/// to 432 which is about 3 days worth of files using default settings.
/// This number can be increased to allow more files to be queried, but
Expand Down Expand Up @@ -520,7 +532,7 @@ pub struct Config {
)]
pub tcp_listener_file_path: Option<PathBuf>,

/// Provide a file path to write the address that the admin recovery endpoint mounted server is listening on to
/// Provide a file path to write the address that the admin token recovery server is listening on to.
///
/// This is mainly intended for testing purposes and is not considered stable.
#[clap(
Expand Down Expand Up @@ -576,16 +588,17 @@ pub fn fail_cluster_id(_: &str) -> Result<String, anyhow::Error> {
}

#[derive(Clone, Debug, clap::Args)]
#[group(required = true, multiple = false)]
#[group(required = true)]
pub struct NodeId {
/// The node idendifier used as a prefix in all object store file paths. This should be unique
/// for any InfluxDB 3 Enterprise servers that share the same object store configuration, i.e., the
/// The node identifier used as a prefix in all object store file paths. This should be unique
/// for any InfluxDB 3 Core servers that share the same object store configuration, i.e., the
/// same bucket.
#[clap(
long = "node-id",
// TODO: deprecate this alias in future version
alias = "host-id",
env = "INFLUXDB3_NODE_IDENTIFIER_PREFIX",
group = "node_id",
action
)]
pub prefix: Option<String>,
Expand All @@ -596,6 +609,7 @@ pub struct NodeId {
#[clap(
long = "node-id-from-env",
env = "INFLUXDB3_NODE_IDENTIFIER_FROM_ENV",
group = "node_id",
action
)]
pub from_env_var: Option<String>,
Expand All @@ -610,7 +624,11 @@ impl NodeId {
.clone()
.expect(".from_env_var must be Some if .prefix is None"),
)
.map_err(|_| Error::NodeIdEnvVarMissing)
.map_err(|_| {
Error::NodeIdEnvVarMissing(
self.from_env_var.clone().unwrap_or("missing".to_string()),
)
})
},
Ok,
)
Expand Down Expand Up @@ -707,20 +725,6 @@ impl FromStr for ParquetCachePrunePercent {
}
}

/// If `p` does not exist, try to create it as a directory.
///
/// panic's if the directory does not exist and can not be created
#[allow(dead_code)]
fn ensure_directory_exists(p: &Path) {
if !p.exists() {
info!(
p=%p.display(),
"Creating directory",
);
std::fs::create_dir_all(p).expect("Could not create default directory");
}
}

pub async fn command(config: Config, user_params: HashMap<String, String>) -> Result<()> {
let node_id = config.get_node_id()?;

Expand Down Expand Up @@ -796,6 +800,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re

// setup cached object store:
let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache {
info!("initialising parquet cache");
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider) as _,
Expand Down Expand Up @@ -842,6 +847,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
"datafusion",
tokio_datafusion_config
.builder()
.map(|mut builder| {
builder.enable_all();
builder
Comment on lines +850 to +852
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builder.enable_all() call is used for the first executor but not for the write_path_executor (line 882). This inconsistency could lead to different runtime behavior between the two executors. Consider applying enable_all() to both executors if that's the intended behavior, or document why they should differ.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the docs on enable_all say:

    /// Enables both I/O and time drivers.
    ///
    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
    /// individually. If additional components are added to Tokio in the future,
    /// `enable_all` will include these future components.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(not(target_family = "wasm"))]
    /// # {
    /// use tokio::runtime;
    ///
    /// let rt = runtime::Builder::new_multi_thread()
    ///     .enable_all()
    ///     .build()
    ///     .unwrap();
    /// # }
    /// ```
    pub fn enable_all(&mut self) -> &mut Self {
  • Here (where is using enable_all) is for initializing the executor created for the query path.
  • Where it does not use enable_all is for initializing the executor created for the write path.

I don't know why we would not enable IO on one but not the other.

The code is currently the same between Core and Enterprise, so I am hesitant to follow this suggestion on this PR, but it may warrant further investigation.

})
.map_err(Error::TokioRuntime)?,
Arc::clone(&metrics),
),
Expand Down Expand Up @@ -925,6 +934,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
retention_handler_token,
)
.await
.inspect_err(|_e| {
warn!("TableIndexCache initialization failed, continuing in degraded state.");
warn!("Without TableIndexCache, object store cleanup for retention policies and hard deletes will temporarily be unable to proceed; compacted data and queries should not be affected.");
})
.unwrap_or(None);

// Initialize tokens from files if provided and auth is enabled
Expand Down Expand Up @@ -957,7 +970,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
info!(instance_id = ?node_def.instance_id(), "catalog initialized");

let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&catalog) as _,
Arc::clone(&catalog),
config.last_cache_eviction_interval.into(),
)
.await
Expand Down Expand Up @@ -1053,6 +1066,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
persisted_files: Some(persisted_files),
telemetry_endpoint: &config.telemetry_endpoint,
disable_upload: config.disable_telemetry_upload,
serve_invocation_method: config.serve_invocation_method,
catalog_uuid: catalog.catalog_uuid().to_string(),
processing_engine_metrics: Arc::clone(&catalog) as Arc<dyn ProcessingEngineMetrics>,
})
Expand Down Expand Up @@ -1277,9 +1291,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
Err(error) => {
error!("HTTP/gRPC error");
res = res.and(Err(Error::Server(error)));
}
},
}
}
},
// Admin token recovery endpoint has stopped
recovery_result = recovery_frontend => {
// Only process recovery endpoint results if it was actually enabled and active
if recovery_endpoint_enabled && recovery_endpoint_active {
Expand Down Expand Up @@ -1430,6 +1445,7 @@ struct TelemetryStoreSetupArgs<'a> {
telemetry_endpoint: &'a str,
disable_upload: bool,
catalog_uuid: String,
serve_invocation_method: ServeInvocationMethod,
processing_engine_metrics: Arc<dyn ProcessingEngineMetrics>,
}

Expand All @@ -1442,6 +1458,7 @@ async fn setup_telemetry_store(
telemetry_endpoint,
disable_upload,
catalog_uuid,
serve_invocation_method,
processing_engine_metrics,
}: TelemetryStoreSetupArgs<'_>,
) -> Arc<TelemetryStore> {
Expand Down Expand Up @@ -1470,6 +1487,7 @@ async fn setup_telemetry_store(
persisted_files: persisted_files.map(|p| p as _),
telemetry_endpoint: telemetry_endpoint.to_string(),
catalog_uuid,
serve_invocation_method,
processing_engine_metrics,
})
.await
Expand Down
1 change: 1 addition & 0 deletions influxdb3/src/commands/serve/cli_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const NON_SENSITIVE_PARAMS: &[&str] = &[
// Telemetry
"telemetry-endpoint",
"disable-telemetry-upload",
"serve-invocation-method",
// TLS parameters
"tls-minimum-version",
// Python integration
Expand Down
Loading