-
-
Notifications
You must be signed in to change notification settings - Fork 122
Description
Bug report
- I confirm this is a bug with Supabase, not with my own application.
- I confirm I have searched the Docs, GitHub Discussions, and Discord.
Describe the bug
Moving on from the Getting Started example, I tried using a Postgres backend for state persistence and get the following error message:
Error: EtlError { repr: Single(ErrorPayload { kind: SourceQueryFailed, description: "Table mappings loading failed", detail: Some("Failed to load table mappings from PostgreSQL: error returned from database: relation "etl.table_mappings" does not exist") ...
A quick grep shows that there are some migrations defined in etl-replicator/migrations/20250827000000_base.sql:
-- Replication state
create table etl.replication_state (
...
);
However it is unclear to me how to actually trigger the migrations to get this up and running.
To Reproduce
# Cargo.toml
[package]
name = "extracterator"
version = "0.1.0"
edition = "2024"
[dependencies]
etl = { git = "https://github.com/supabase/etl" }
etl-destinations = { git = "https://github.com/supabase/etl", features = ["iceberg"] }
tokio = "1"
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }// main.rs
use etl::store::both::postgres::PostgresStore;
use etl::types::PipelineId;
use etl::{
config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
pipeline::Pipeline,
};
use etl_destinations::iceberg::{DestinationNamespace, IcebergClient, IcebergDestination};
use std::collections::HashMap;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
const ICEBERG_CATALOG_URL: &str = "http://localhost:8181/catalog";
const WAREHOUSE_NAME: &str = "demo";
const NAMESPACE_NAME: &str = "whatever";
const PUBLICATION_NAME: &str = "my_publication";
const PIPELINE_ID: PipelineId = 1;
const POSTGRES_SOURCE_DB: &str = "extract";
const POSTGRES_STORE_DB: &str = "etl";
let client = IcebergClient::new_with_rest_catalog(
ICEBERG_CATALOG_URL.to_string(),
WAREHOUSE_NAME.to_string(),
HashMap::new(),
)
.await
.unwrap();
let pg_store = PostgresStore::new(PIPELINE_ID, postgres_config(POSTGRES_STORE_DB));
let namespace = DestinationNamespace::Single(NAMESPACE_NAME.to_string());
let destination = IcebergDestination::new(client, namespace, pg_store.clone());
let config = PipelineConfig {
id: PIPELINE_ID,
publication_name: PUBLICATION_NAME.into(),
pg_connection: postgres_config(POSTGRES_SOURCE_DB),
batch: BatchConfig {
max_size: 1000,
max_fill_ms: 5000,
},
table_error_retry_delay_ms: 10_000,
table_error_retry_max_attempts: 5,
max_table_sync_workers: 4,
};
let mut pipeline = Pipeline::new(config, pg_store, destination);
pipeline.start().await?;
pipeline.wait().await?;
Ok(())
}
fn postgres_config(db_name: impl ToString) -> PgConnectionConfig {
PgConnectionConfig {
host: "localhost".into(),
port: 5432,
name: db_name.to_string(),
username: "postgres".into(),
password: None,
tls: TlsConfig {
enabled: false,
trusted_root_certs: String::new(),
},
}
}Expected behavior
etl schema containing the needed tables should be created, and the program should sync my table data.
System information
- OS: MacOS 14.8.2
- Postgres 18 as source database
- Rust 1.91
Additional context
Memory store and Iceberg catalog integration seems to work fine? I'm pretty new to Iceberg so I'm just messing around for now.