Skip to content

Commit 85ea29a

Browse files
committed
feat(port): port from ent to core
This ports a number of changes from enterprise to core. Primarily this adds a new telemetry field for how influxdb3 has been started (aka invocatio method). Most of the changes are to sync core code with the shape of ent code to make future ports easier. There are some error reporting improvements and logging improvements mixed in there too. * follows #26991
1 parent 114e752 commit 85ea29a

File tree

12 files changed

+459
-248
lines changed

12 files changed

+459
-248
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@ byteorder = "1.3.4"
7575
bytes = "1.9"
7676
cargo_metadata = "0.19.2"
7777
chrono = "0.4"
78-
cron = "0.15"
7978
clap = { version = "4", features = ["derive", "env", "string"] }
79+
clap_builder = "4.5.47"
8080
clru = "0.6.2"
81-
owo-colors = "4.2.0"
8281
crc32fast = "1.2.0"
8382
criterion = { version = "0.5", features = ["html_reports"] }
83+
cron = "0.15"
8484
crossbeam-channel = "0.5.11"
8585
csv = "1.3.0"
86+
owo-colors = "4.2.0"
8687
# Use DataFusion fork (see below)
8788
datafusion = { version = "49" }
8889
dashmap = "6.1.0"

influxdb3/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ workspace = true
1919
[dependencies]
2020
# Core Crates
2121
authz.workspace = true
22+
clap_builder.workspace = true
2223
datafusion_util.workspace = true
2324
iox_query.workspace = true
2425
iox_time.workspace = true
@@ -30,8 +31,8 @@ panic_logging.workspace = true
3031
parquet_file.workspace = true
3132
tokio_metrics_bridge.workspace = true
3233
trace.workspace = true
33-
trace_http.workspace = true
3434
trace_exporters.workspace = true
35+
trace_http.workspace = true
3536
trogging.workspace = true
3637

3738
# Local Crates

influxdb3/src/commands/serve.rs

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use influxdb3_server::{
3333
use influxdb3_shutdown::{ShutdownManager, ShutdownToken, wait_for_signal};
3434
use influxdb3_sys_events::SysEventStore;
3535
use influxdb3_telemetry::{
36-
ProcessingEngineMetrics,
36+
ProcessingEngineMetrics, ServeInvocationMethod,
3737
store::{CreateTelemetryStoreArgs, TelemetryStore},
3838
};
3939
use influxdb3_wal::{Gen1Duration, WalConfig};
@@ -57,8 +57,8 @@ use observability_deps::tracing::*;
5757
use panic_logging::SendPanicsToTracing;
5858
use parquet_file::storage::{ParquetStorage, StorageId};
5959
use std::collections::HashMap;
60+
use std::str::FromStr;
6061
use std::{env, num::NonZeroUsize, sync::Arc, time::Duration};
61-
use std::{path::Path, str::FromStr};
6262
use std::{path::PathBuf, process::Command};
6363
use thiserror::Error;
6464
use tokio::net::TcpListener;
@@ -77,7 +77,6 @@ use super::helpers::DisableAuthzList;
7777
mod jemalloc;
7878

7979
/// The default name of the influxdb data directory
80-
#[allow(dead_code)]
8180
pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3";
8281

8382
/// The default bind address for the HTTP API.
@@ -116,11 +115,11 @@ pub enum Error {
116115
#[error("invalid token: {0}")]
117116
InvalidToken(#[from] hex::FromHexError),
118117

119-
#[error("failed to initialized write buffer: {0}")]
118+
#[error("failed to initialized write buffer: {0:?}")]
120119
WriteBufferInit(#[source] anyhow::Error),
121120

122121
#[error("failed to initialize catalog: {0}")]
123-
InitializeCatalog(CatalogError),
122+
InitializeCatalog(#[source] CatalogError),
124123

125124
#[error("failed to initialize last cache: {0}")]
126125
InitializeLastCache(#[source] last_cache::Error),
@@ -145,8 +144,10 @@ pub enum Error {
145144
#[source] influxdb3_write::table_index_cache::TableIndexCacheError,
146145
),
147146

148-
#[error("Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX to a valid string value")]
149-
NodeIdEnvVarMissing,
147+
#[error(
148+
"Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX={0} to a valid env var value for the node id"
149+
)]
150+
NodeIdEnvVarMissing(String),
150151

151152
#[error(
152153
"Python environment initialization failed: {0}\nPlease ensure Python and either pip or uv package manager is installed"
@@ -171,7 +172,7 @@ fn wal_replay_concurrency_limit_default() -> String {
171172

172173
/// Try to keep all the memory size in MB instead of raw bytes, also allow
173174
/// them to be configured as a percentage of total memory using MemorySizeMb
174-
#[derive(Debug, clap::Parser)]
175+
#[derive(Clone, Debug, clap::Parser)]
175176
pub struct Config {
176177
/// object store options
177178
#[clap(flatten)]
@@ -195,32 +196,32 @@ pub struct Config {
195196

196197
/// Maximum size of HTTP requests.
197198
#[clap(
198-
long = "max-http-request-size",
199-
env = "INFLUXDB3_MAX_HTTP_REQUEST_SIZE",
200-
default_value = "10485760", // 10 MiB
201-
action,
199+
long = "max-http-request-size",
200+
env = "INFLUXDB3_MAX_HTTP_REQUEST_SIZE",
201+
default_value = "10485760", // 10 MiB
202+
action,
202203
)]
203204
pub max_http_request_size: usize,
204205

205206
/// The address on which InfluxDB will serve HTTP API requests
206207
#[clap(
207-
long = "http-bind",
208-
env = "INFLUXDB3_HTTP_BIND_ADDR",
209-
default_value = DEFAULT_HTTP_BIND_ADDR,
210-
action,
208+
long = "http-bind",
209+
env = "INFLUXDB3_HTTP_BIND_ADDR",
210+
default_value = DEFAULT_HTTP_BIND_ADDR,
211+
action,
211212
)]
212213
pub http_bind_address: SocketAddr,
213214

214215
/// Enable admin token recovery endpoint on the specified address.
215216
/// Use flag alone for default address (127.0.0.1:8182) or provide a custom address.
216217
/// WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution!
217218
#[clap(
218-
long = "admin-token-recovery-http-bind",
219-
env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR",
220-
num_args = 0..=1,
221-
default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR,
222-
help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address",
223-
action,
219+
long = "admin-token-recovery-http-bind",
220+
env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR",
221+
num_args = 0..=1,
222+
default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR,
223+
help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address",
224+
action,
224225
)]
225226
pub admin_token_recovery_bind_address: Option<SocketAddr>,
226227

@@ -488,6 +489,17 @@ pub struct Config {
488489
)]
489490
pub telemetry_endpoint: String,
490491

492+
/// Information on how the serve command was used
493+
#[clap(
494+
long = "serve-invocation-method",
495+
env = "INFLUXDB3_SERVE_INVOCATION_METHOD",
496+
hide = true,
497+
value_parser = ServeInvocationMethod::parse,
498+
action
499+
)]
500+
#[arg(default_value_t = ServeInvocationMethod::Explicit)]
501+
pub serve_invocation_method: ServeInvocationMethod,
502+
491503
/// Set the limit for number of parquet files allowed in a query. Defaults
492504
/// to 432 which is about 3 days worth of files using default settings.
493505
/// This number can be increased to allow more files to be queried, but
@@ -520,7 +532,7 @@ pub struct Config {
520532
)]
521533
pub tcp_listener_file_path: Option<PathBuf>,
522534

523-
/// Provide a file path to write the address that the admin recovery endpoint mounted server is listening on to
535+
/// Provide a file path to write the address that the admin token recovery server is listening on to.
524536
///
525537
/// This is mainly intended for testing purposes and is not considered stable.
526538
#[clap(
@@ -576,16 +588,17 @@ pub fn fail_cluster_id(_: &str) -> Result<String, anyhow::Error> {
576588
}
577589

578590
#[derive(Clone, Debug, clap::Args)]
579-
#[group(required = true, multiple = false)]
591+
#[group(required = true)]
580592
pub struct NodeId {
581-
/// The node idendifier used as a prefix in all object store file paths. This should be unique
582-
/// for any InfluxDB 3 Enterprise servers that share the same object store configuration, i.e., the
593+
/// The node identifier used as a prefix in all object store file paths. This should be unique
594+
/// for any InfluxDB 3 Core servers that share the same object store configuration, i.e., the
583595
/// same bucket.
584596
#[clap(
585597
long = "node-id",
586598
// TODO: deprecate this alias in future version
587599
alias = "host-id",
588600
env = "INFLUXDB3_NODE_IDENTIFIER_PREFIX",
601+
group = "node_id",
589602
action
590603
)]
591604
pub prefix: Option<String>,
@@ -596,6 +609,7 @@ pub struct NodeId {
596609
#[clap(
597610
long = "node-id-from-env",
598611
env = "INFLUXDB3_NODE_IDENTIFIER_FROM_ENV",
612+
group = "node_id",
599613
action
600614
)]
601615
pub from_env_var: Option<String>,
@@ -610,7 +624,11 @@ impl NodeId {
610624
.clone()
611625
.expect(".from_env_var must be Some if .prefix is None"),
612626
)
613-
.map_err(|_| Error::NodeIdEnvVarMissing)
627+
.map_err(|_| {
628+
Error::NodeIdEnvVarMissing(
629+
self.from_env_var.clone().unwrap_or("missing".to_string()),
630+
)
631+
})
614632
},
615633
Ok,
616634
)
@@ -707,20 +725,6 @@ impl FromStr for ParquetCachePrunePercent {
707725
}
708726
}
709727

710-
/// If `p` does not exist, try to create it as a directory.
711-
///
712-
/// panic's if the directory does not exist and can not be created
713-
#[allow(dead_code)]
714-
fn ensure_directory_exists(p: &Path) {
715-
if !p.exists() {
716-
info!(
717-
p=%p.display(),
718-
"Creating directory",
719-
);
720-
std::fs::create_dir_all(p).expect("Could not create default directory");
721-
}
722-
}
723-
724728
pub async fn command(config: Config, user_params: HashMap<String, String>) -> Result<()> {
725729
let node_id = config.get_node_id()?;
726730

@@ -796,6 +800,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
796800

797801
// setup cached object store:
798802
let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache {
803+
info!("initialising parquet cache");
799804
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
800805
object_store,
801806
Arc::clone(&time_provider) as _,
@@ -842,6 +847,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
842847
"datafusion",
843848
tokio_datafusion_config
844849
.builder()
850+
.map(|mut builder| {
851+
builder.enable_all();
852+
builder
853+
})
845854
.map_err(Error::TokioRuntime)?,
846855
Arc::clone(&metrics),
847856
),
@@ -881,9 +890,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
881890
.with_jaeger_trace_context_header_name(
882891
config
883892
.tracing_config
884-
.traces_jaeger_trace_context_header_name,
893+
.traces_jaeger_trace_context_header_name
894+
.as_str(),
885895
)
886-
.with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name);
896+
.with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name.as_str());
887897

888898
// Create table index cache configuration from CLI arguments
889899
let table_index_cache_config = TableIndexCacheConfig {
@@ -925,6 +935,10 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
925935
retention_handler_token,
926936
)
927937
.await
938+
.inspect_err(|_e| {
939+
warn!("TableIndexCache initialization failed, continuing in degraded state.");
940+
warn!("Without TableIndexCache, object store cleanup for retention policies and hard deletes will temporarily be unable to proceed; compacted data and queries should not be affected.");
941+
})
928942
.unwrap_or(None);
929943

930944
// Initialize tokens from files if provided and auth is enabled
@@ -957,7 +971,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
957971
info!(instance_id = ?node_def.instance_id(), "catalog initialized");
958972

959973
let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
960-
Arc::clone(&catalog) as _,
974+
Arc::clone(&catalog),
961975
config.last_cache_eviction_interval.into(),
962976
)
963977
.await
@@ -1053,6 +1067,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
10531067
persisted_files: Some(persisted_files),
10541068
telemetry_endpoint: &config.telemetry_endpoint,
10551069
disable_upload: config.disable_telemetry_upload,
1070+
serve_invocation_method: config.serve_invocation_method,
10561071
catalog_uuid: catalog.catalog_uuid().to_string(),
10571072
processing_engine_metrics: Arc::clone(&catalog) as Arc<dyn ProcessingEngineMetrics>,
10581073
})
@@ -1279,7 +1294,8 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
12791294
res = res.and(Err(Error::Server(error)));
12801295
}
12811296
}
1282-
}
1297+
},
1298+
// Admin token recovery endpoint has stopped
12831299
recovery_result = recovery_frontend => {
12841300
// Only process recovery endpoint results if it was actually enabled and active
12851301
if recovery_endpoint_enabled && recovery_endpoint_active {
@@ -1430,6 +1446,7 @@ struct TelemetryStoreSetupArgs<'a> {
14301446
telemetry_endpoint: &'a str,
14311447
disable_upload: bool,
14321448
catalog_uuid: String,
1449+
serve_invocation_method: ServeInvocationMethod,
14331450
processing_engine_metrics: Arc<dyn ProcessingEngineMetrics>,
14341451
}
14351452

@@ -1442,6 +1459,7 @@ async fn setup_telemetry_store(
14421459
telemetry_endpoint,
14431460
disable_upload,
14441461
catalog_uuid,
1462+
serve_invocation_method,
14451463
processing_engine_metrics,
14461464
}: TelemetryStoreSetupArgs<'_>,
14471465
) -> Arc<TelemetryStore> {
@@ -1470,6 +1488,7 @@ async fn setup_telemetry_store(
14701488
persisted_files: persisted_files.map(|p| p as _),
14711489
telemetry_endpoint: telemetry_endpoint.to_string(),
14721490
catalog_uuid,
1491+
serve_invocation_method,
14731492
processing_engine_metrics,
14741493
})
14751494
.await

influxdb3/src/commands/serve/cli_params.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ const NON_SENSITIVE_PARAMS: &[&str] = &[
107107
// Telemetry
108108
"telemetry-endpoint",
109109
"disable-telemetry-upload",
110+
"serve-invocation-method",
110111
// TLS parameters
111112
"tls-minimum-version",
112113
// Python integration

0 commit comments

Comments
 (0)