Skip to content

Commit 114e752

Browse files
authored
feat: port from enterprise to core (#26991)
1 parent cc637e3 commit 114e752

File tree

23 files changed

+952
-499
lines changed

23 files changed

+952
-499
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ inherits = "release"
231231
codegen-units = 16
232232
lto = false
233233
incremental = true
234+
debug = "line-tables-only"
234235

235236
# This profile extends the `quick-release` profile with debuginfo turned on in order to
236237
# produce more human friendly symbols for profiling tools

influxdb3/src/commands/serve.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@ pub enum Error {
147147

148148
#[error("Must set INFLUXDB3_NODE_IDENTIFIER_PREFIX to a valid string value")]
149149
NodeIdEnvVarMissing,
150+
151+
#[error(
152+
"Python environment initialization failed: {0}\nPlease ensure Python and either pip or uv package manager is installed"
153+
)]
154+
PythonEnvironmentInitialization(
155+
#[source] influxdb3_processing_engine::environment::PluginEnvironmentError,
156+
),
150157
}
151158

152159
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1104,7 +1111,8 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
11041111
Arc::clone(&time_provider) as _,
11051112
sys_events_store,
11061113
)
1107-
.await;
1114+
.await
1115+
.map_err(Error::PythonEnvironmentInitialization)?;
11081116

11091117
// Update query executor with processing engine reference
11101118
query_executor.set_processing_engine(Arc::clone(&processing_engine));

influxdb3_authz/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ pub enum AccessRequest {
3636
Admin,
3737
}
3838

39-
#[derive(Debug, Clone, Copy, thiserror::Error)]
39+
#[derive(Debug, Clone, thiserror::Error)]
4040
pub enum ResourceAuthorizationError {
4141
#[error("unauthorized to perform requested action with the token")]
4242
Unauthorized,
43+
44+
#[error("resource type not supported, {0}")]
45+
ResourceNotSupported(String),
4346
}
4447

4548
#[derive(Debug, thiserror::Error)]

influxdb3_cache/src/distinct_cache/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
mod cache;
44
pub use cache::{CacheError, CreateDistinctCacheArgs};
55
mod provider;
6-
pub use provider::{DistinctCacheProvider, ProviderError};
6+
pub use provider::{DistinctCacheProvider, ProviderError, background_catalog_update};
77
mod table_function;
88
pub use table_function::DISTINCT_CACHE_UDTF_NAME;
99
pub use table_function::DistinctCacheFunction;

influxdb3_cache/src/distinct_cache/provider.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,9 @@ impl DistinctCacheProvider {
271271
}
272272
}
273273

274-
fn background_catalog_update(
274+
/// Background task that listens for catalog updates and maintains distinct caches.
275+
/// This should be started explicitly after creating the provider.
276+
pub fn background_catalog_update(
275277
provider: Arc<DistinctCacheProvider>,
276278
mut subscription: CatalogUpdateReceiver,
277279
) -> tokio::task::JoinHandle<()> {

influxdb3_cache/src/last_cache/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod cache;
66
pub use cache::CreateLastCacheArgs;
77
mod metrics;
88
mod provider;
9-
pub use provider::LastCacheProvider;
9+
pub use provider::{LastCacheProvider, background_catalog_update};
1010
mod table_function;
1111
use schema::InfluxColumnType;
1212
pub use table_function::{LAST_CACHE_UDTF_NAME, LastCacheFunction};

influxdb3_cache/src/last_cache/provider.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,9 @@ impl LastCacheProvider {
354354
}
355355
}
356356

357-
fn background_catalog_update(
357+
/// Background task that listens for catalog updates and maintains last value caches.
358+
/// This should be started explicitly after creating the provider.
359+
pub fn background_catalog_update(
358360
provider: Arc<LastCacheProvider>,
359361
mut subscription: CatalogUpdateReceiver,
360362
) -> tokio::task::JoinHandle<()> {

influxdb3_catalog/src/catalog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl<I: CatalogId, R: CatalogResource> Repository<I, R> {
219219
pub(crate) fn update(&mut self, id: I, resource: impl Into<Arc<R>>) -> Result<()> {
220220
let resource = resource.into();
221221
if !self.id_exists(&id) {
222-
return Err(CatalogError::NotFound);
222+
return Err(CatalogError::NotFound(format!("catalog id: {}", id)));
223223
}
224224
self.id_name_map.insert(id, resource.name());
225225
self.repo.insert(id, resource);
@@ -383,7 +383,7 @@ impl TokenRepository {
383383
let token_id = self
384384
.repo
385385
.name_to_id(&token_name)
386-
.ok_or_else(|| CatalogError::NotFound)?;
386+
.ok_or_else(|| CatalogError::NotFound(token_name))?;
387387
self.repo.remove(&token_id);
388388
self.hash_lookup_map.remove_by_left(&token_id);
389389
Ok(())

influxdb3_catalog/src/catalog/versions/v1/update.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl Catalog {
211211
self.catalog_update_with_retry(|| {
212212
let time_ns = self.time_provider.now().timestamp_nanos();
213213
let Some(node) = self.node(node_id) else {
214-
return Err(crate::CatalogError::NotFound);
214+
return Err(crate::CatalogError::NotFound(node_id.to_string()));
215215
};
216216
if !node.is_running() {
217217
return Err(crate::CatalogError::NodeAlreadyStopped {
@@ -259,16 +259,16 @@ impl Catalog {
259259

260260
pub async fn soft_delete_database(
261261
&self,
262-
name: &str,
262+
db_name: &str,
263263
hard_delete_time: HardDeletionTime,
264264
) -> Result<OrderedCatalogBatch> {
265265
self.catalog_update_with_retry(|| {
266-
if name == INTERNAL_DB_NAME {
266+
if db_name == INTERNAL_DB_NAME {
267267
return Err(CatalogError::CannotDeleteInternalDatabase);
268268
};
269269

270-
let Some(db) = self.db_schema(name) else {
271-
return Err(CatalogError::NotFound);
270+
let Some(db) = self.db_schema(db_name) else {
271+
return Err(CatalogError::NotFound(db_name.to_string()));
272272
};
273273

274274
// If the request specifies the default hard-delete time, and the schema has an existing hard_delete_time,
@@ -338,10 +338,10 @@ impl Catalog {
338338
) -> Result<OrderedCatalogBatch> {
339339
self.catalog_update_with_retry(|| {
340340
let Some(db) = self.db_schema(db_name) else {
341-
return Err(CatalogError::NotFound);
341+
return Err(CatalogError::NotFound(db_name.to_string()));
342342
};
343343
let Some(tbl_def) = db.table_definition(table_name) else {
344-
return Err(CatalogError::NotFound);
344+
return Err(CatalogError::NotFound(table_name.to_string()));
345345
};
346346

347347
// If the request specifies the default hard-delete time, and the schema has an existing hard_delete_time,
@@ -402,10 +402,10 @@ impl Catalog {
402402
info!(?db_id, ?table_id, "Hard delete table.");
403403
self.catalog_update_with_retry(|| {
404404
let Some(db) = self.db_schema_by_id(db_id) else {
405-
return Err(CatalogError::NotFound);
405+
return Err(CatalogError::NotFound(format!("database id: {}", db_id)));
406406
};
407407
let Some(_table_def) = db.table_definition_by_id(table_id) else {
408-
return Err(CatalogError::NotFound);
408+
return Err(CatalogError::NotFound(format!("table id: {}", table_id)));
409409
};
410410

411411
let deletion_time = self.time_provider.now().timestamp_nanos();
@@ -431,7 +431,7 @@ impl Catalog {
431431
info!(?db_id, "Hard delete database.");
432432
self.catalog_update_with_retry(|| {
433433
let Some(db) = self.db_schema_by_id(db_id) else {
434-
return Err(CatalogError::NotFound);
434+
return Err(CatalogError::NotFound(format!("database id: {}", db_id)));
435435
};
436436

437437
// Prevent deletion of internal database
@@ -461,10 +461,10 @@ impl Catalog {
461461
info!(db_name, table_name, cache_name = ?cache_name, "create distinct cache");
462462
self.catalog_update_with_retry(|| {
463463
let Some(db) = self.db_schema(db_name) else {
464-
return Err(CatalogError::NotFound);
464+
return Err(CatalogError::NotFound(db_name.to_string()));
465465
};
466466
let Some(mut tbl) = db.table_definition(table_name) else {
467-
return Err(CatalogError::NotFound);
467+
return Err(CatalogError::NotFound(table_name.to_string()));
468468
};
469469
if columns.is_empty() {
470470
return Err(CatalogError::invalid_configuration(
@@ -546,10 +546,10 @@ impl Catalog {
546546
info!(db_name, table_name, cache_name = ?cache_name, "create last cache");
547547
self.catalog_update_with_retry(|| {
548548
let Some(db) = self.db_schema(db_name) else {
549-
return Err(CatalogError::NotFound);
549+
return Err(CatalogError::NotFound(db_name.to_string()));
550550
};
551551
let Some(mut tbl) = db.table_definition(table_name) else {
552-
return Err(CatalogError::NotFound);
552+
return Err(CatalogError::NotFound(table_name.to_string()));
553553
};
554554

555555
fn is_valid_last_cache_key_col(def: &ColumnDefinition) -> bool {
@@ -667,7 +667,7 @@ impl Catalog {
667667
info!(db_name, trigger_name, "create processing engine trigger");
668668
self.catalog_update_with_retry(|| {
669669
let Some(mut db) = self.db_schema(db_name) else {
670-
return Err(CatalogError::NotFound);
670+
return Err(CatalogError::NotFound(db_name.to_string()));
671671
};
672672
let trigger = TriggerSpecificationDefinition::from_string_rep(trigger_specification)?;
673673
if db.processing_engine_triggers.contains_name(trigger_name) {
@@ -709,7 +709,7 @@ impl Catalog {
709709
"create new retention policy"
710710
);
711711
let Some(db) = self.db_schema(db_name) else {
712-
return Err(CatalogError::NotFound);
712+
return Err(CatalogError::NotFound(db_name.to_string()));
713713
};
714714
self.catalog_update_with_retry(|| {
715715
Ok(CatalogBatch::database(
@@ -814,7 +814,7 @@ impl DatabaseCatalogTransaction {
814814
column_type: FieldDataType,
815815
) -> Result<ColumnId> {
816816
let Some(table_def) = self.database_schema.table_definition(table_name) else {
817-
return Err(CatalogError::NotFound);
817+
return Err(CatalogError::NotFound(table_name.to_string()));
818818
};
819819
match table_def.column_definition(column_name) {
820820
Some(def) if def.data_type == column_type.into() => Ok(def.id),

influxdb3_catalog/src/catalog/versions/v2.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3959,13 +3959,13 @@ mod tests {
39593959
let fake_db_id = DbId::from(999);
39603960
let fake_table_id = TableId::from(123);
39613961
let result = catalog.hard_delete_table(&fake_db_id, &fake_table_id).await;
3962-
assert!(matches!(result, Err(CatalogError::NotFound)));
3962+
assert!(matches!(result, Err(CatalogError::NotFound(_))));
39633963

39643964
// Create database but try to delete non-existent table
39653965
catalog.create_database("test").await.unwrap();
39663966
let db_id = catalog.db_name_to_id("test").unwrap();
39673967
let result = catalog.hard_delete_table(&db_id, &fake_table_id).await;
3968-
assert!(matches!(result, Err(CatalogError::NotFound)));
3968+
assert!(matches!(result, Err(CatalogError::NotFound(_))));
39693969
}
39703970

39713971
#[test_log::test(tokio::test)]
@@ -4149,7 +4149,7 @@ mod tests {
41494149
// Try to delete non-existent database
41504150
let fake_db_id = DbId::from(999);
41514151
let result = catalog.hard_delete_database(&fake_db_id).await;
4152-
assert!(matches!(result, Err(CatalogError::NotFound)));
4152+
assert!(matches!(result, Err(CatalogError::NotFound(_))));
41534153
}
41544154

41554155
#[test_log::test(tokio::test)]

0 commit comments

Comments
 (0)