From 2018fd55b8f8fd5e41a0ee9c4634044a57d96b76 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Fri, 19 Sep 2025 17:09:21 +0530 Subject: [PATCH 01/13] feat: integration test for incremental sync --- utils/testutils/test_utils.go | 92 ++++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 18 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 2fa839545..8622c63a2 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -114,7 +114,7 @@ func discoverCommand(config TestConfig) string { return fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath) } -// TODO: check if we can remove namespace from being passed as a parameter and use a common namespace for all drivers +// update normalization=true for selected streams under selected_streams. by name func updateStreamsCommand(config TestConfig, namespace string, stream []string, isBackfill bool) string { if len(stream) == 0 { return "" @@ -138,6 +138,23 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, return jqExpr } +// set sync_mode and cursor_field for a specific stream object in streams[] by namespace+name +func setStreamSyncModeCommand(config TestConfig, namespace, streamName, syncMode, cursorField string, stateRequired bool) string { + tmpCatalog := fmt.Sprintf("/tmp/%s_set_mode_streams.json", config.Driver) + // --argjson is used for boolean state; map/select pattern updates nested array members + return fmt.Sprintf( + `jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" --argjson state %t '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor | .stream.state_required = $state) | . else . end))' %s > %s && mv %s %s`, + namespace, streamName, syncMode, cursorField, stateRequired, + config.CatalogPath, tmpCatalog, tmpCatalog, config.CatalogPath, + ) +} + +// reset state file so incremental can perform initial load (equivalent to full load on first run) +func resetStateFileCommand(config TestConfig) string { + // Ensure the state is clean irrespective of previous CDC run + return fmt.Sprintf(`rm -f %s; echo '{}' > %s`, config.StatePath, config.StatePath) +} + // to get backfill streams from cdc streams e.g. "demo_cdc" -> "demo" func GetBackfillStreamsFromCDC(cdcStreams []string) []string { backfillStreams := []string{} @@ -270,6 +287,21 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { t.Logf("Enabled normalization in %s", cfg.TestConfig.CatalogPath) + // Helper to run sync and verify + runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { + cmd := syncCommand(*cfg.TestConfig, useState) + if useState && operation != "" { + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) + } + if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { + return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) + } + t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) + VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver) + return nil + } + + // 3. Phase A: Full load + CDC testCases := []struct { syncMode string operation string @@ -306,22 +338,6 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { dummySchema: nil, }, } - - runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { - cmd := syncCommand(*cfg.TestConfig, useState) - if useState && operation != "" { - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) - } - - if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { - return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) - } - t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) - VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver) - return nil - } - - // 3. Run Sync command and verify records in Iceberg for _, test := range testCases { t.Logf("Running test for: %s", test.syncMode) if err := runSync(c, test.useState, test.operation, test.opSymbol, test.dummySchema); err != nil { @@ -329,7 +345,47 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } } - // 4. Clean up + // 4. Phase B: Full load + Incremental (switch streams.json cdc -> incremental, set cursor_field = "id") + // Reset table to isolate incremental scenario + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + + // Ensure normalization remains on for selected stream + streamUpdateCmd = updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true) + if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { + return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s", + code, err, out, + ) + } + + // Patch: sync_mode = incremental, cursor_field = "id", state_required = true + incPatch := setStreamSyncModeCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", "id", true) + if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 { + return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) + } + + // Reset state so initial incremental behaves like a first full incremental load + resetState := resetStateFileCommand(*cfg.TestConfig) + if code, out, err := utils.ExecCommand(ctx, c, resetState); err != nil || code != 0 { + return fmt.Errorf("failed to reset state for incremental (%d): %s\n%s", code, err, out) + } + + // Initial incremental run (equivalent to full on first run) + t.Log("Running Incremental - initial load") + if err := runSync(c, true, "", "i", cfg.ExpectedData); err != nil { + return err + } + + // Delta incremental: add new rows and sync again + t.Log("Running Incremental - delta load (inserts)") + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false) + if err := runSync(c, true, "", "i", cfg.ExpectedData); err != nil { + return err + } + + // 5. Clean up cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) return nil From 9d6304f38fa841fbb685e4720ad86229943de346 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Wed, 24 Sep 2025 13:35:35 +0530 Subject: [PATCH 02/13] fix: cursor field for each driver --- drivers/mongodb/internal/mon_test.go | 1 + drivers/mysql/internal/mysql_test.go | 1 + drivers/postgres/internal/postgres_test.go | 1 + utils/testutils/test_utils.go | 7 ++++--- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/drivers/mongodb/internal/mon_test.go b/drivers/mongodb/internal/mon_test.go index 84498fbcb..12ae12ffc 100644 --- a/drivers/mongodb/internal/mon_test.go +++ b/drivers/mongodb/internal/mon_test.go @@ -17,6 +17,7 @@ func TestMongodbIntegration(t *testing.T) { DataTypeSchema: MongoToIcebergSchema, ExecuteQuery: ExecuteQuery, IcebergDB: "mongodb_olake_mongodb_test", + CursorField: "_id", } testConfig.TestIntegration(t) } diff --git a/drivers/mysql/internal/mysql_test.go b/drivers/mysql/internal/mysql_test.go index ba421d8b9..452a60b89 100644 --- a/drivers/mysql/internal/mysql_test.go +++ b/drivers/mysql/internal/mysql_test.go @@ -17,6 +17,7 @@ func TestMySQLIntegration(t *testing.T) { DataTypeSchema: MySQLToIcebergSchema, ExecuteQuery: ExecuteQuery, IcebergDB: "mysql_olake_mysql_test", + CursorField: "id", } testConfig.TestIntegration(t) } diff --git a/drivers/postgres/internal/postgres_test.go b/drivers/postgres/internal/postgres_test.go index 1773c8ffb..52b729d11 100644 --- a/drivers/postgres/internal/postgres_test.go +++ b/drivers/postgres/internal/postgres_test.go @@ -18,6 +18,7 @@ func TestPostgresIntegration(t *testing.T) { DataTypeSchema: PostgresToIcebergSchema, ExecuteQuery: ExecuteQuery, IcebergDB: "postgres_postgres_public", + CursorField: "col_bigserial", } testConfig.TestIntegration(t) } diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 8622c63a2..af765b1cc 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -37,6 +37,7 @@ type IntegrationTest struct { Namespace string ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) IcebergDB string + CursorField string } type PerformanceTest struct { @@ -361,7 +362,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // Patch: sync_mode = incremental, cursor_field = "id", state_required = true - incPatch := setStreamSyncModeCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", "id", true) + incPatch := setStreamSyncModeCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField, true) if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 { return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) } @@ -374,14 +375,14 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { // Initial incremental run (equivalent to full on first run) t.Log("Running Incremental - initial load") - if err := runSync(c, true, "", "i", cfg.ExpectedData); err != nil { + if err := runSync(c, true, "", "r", cfg.ExpectedData); err != nil { return err } // Delta incremental: add new rows and sync again t.Log("Running Incremental - delta load (inserts)") cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false) - if err := runSync(c, true, "", "i", cfg.ExpectedData); err != nil { + if err := runSync(c, true, "", "u", cfg.ExpectedData); err != nil { return err } From 8ec5e2c905820326dd0bff937dede75468a0692c Mon Sep 17 00:00:00 2001 From: vikash390 Date: Wed, 24 Sep 2025 19:00:40 +0530 Subject: [PATCH 03/13] feat: add incremental test with filter --- drivers/mongodb/internal/mon_test.go | 1 + drivers/mysql/internal/mysql_test.go | 1 + drivers/postgres/internal/postgres_test.go | 1 + utils/testutils/test_utils.go | 52 +++++++++++++++++----- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/drivers/mongodb/internal/mon_test.go b/drivers/mongodb/internal/mon_test.go index 12ae12ffc..6612686e6 100644 --- a/drivers/mongodb/internal/mon_test.go +++ b/drivers/mongodb/internal/mon_test.go @@ -18,6 +18,7 @@ func TestMongodbIntegration(t *testing.T) { ExecuteQuery: ExecuteQuery, IcebergDB: "mongodb_olake_mongodb_test", CursorField: "_id", + PartitionRegex: "/{_id,identity}", } testConfig.TestIntegration(t) } diff --git a/drivers/mysql/internal/mysql_test.go b/drivers/mysql/internal/mysql_test.go index 452a60b89..62a195bf5 100644 --- a/drivers/mysql/internal/mysql_test.go +++ b/drivers/mysql/internal/mysql_test.go @@ -18,6 +18,7 @@ func TestMySQLIntegration(t *testing.T) { ExecuteQuery: ExecuteQuery, IcebergDB: "mysql_olake_mysql_test", CursorField: "id", + PartitionRegex: "/{id,identity}", } testConfig.TestIntegration(t) } diff --git a/drivers/postgres/internal/postgres_test.go b/drivers/postgres/internal/postgres_test.go index 52b729d11..d43a95b5f 100644 --- a/drivers/postgres/internal/postgres_test.go +++ b/drivers/postgres/internal/postgres_test.go @@ -19,6 +19,7 @@ func TestPostgresIntegration(t *testing.T) { ExecuteQuery: ExecuteQuery, IcebergDB: "postgres_postgres_public", CursorField: "col_bigserial", + PartitionRegex: "/{col_bigserial,identity}", } testConfig.TestIntegration(t) } diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index af765b1cc..32f7bed12 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -38,6 +38,7 @@ type IntegrationTest struct { ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) IcebergDB string CursorField string + PartitionRegex string } type PerformanceTest struct { @@ -46,6 +47,7 @@ type PerformanceTest struct { BackfillStreams []string CDCStreams []string ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) + PartitionRegex string } type benchmarkStats struct { @@ -116,7 +118,7 @@ func discoverCommand(config TestConfig) string { } // update normalization=true for selected streams under selected_streams. by name -func updateStreamsCommand(config TestConfig, namespace string, stream []string, isBackfill bool) string { +func updateStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string { if len(stream) == 0 { return "" } @@ -127,10 +129,11 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, condition := strings.Join(streamConditions, " or ") tmpCatalog := fmt.Sprintf("/tmp/%s_%s_streams.json", config.Driver, utils.Ternary(isBackfill, "backfill", "cdc").(string)) jqExpr := fmt.Sprintf( - `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true)) }' %s > %s && mv %s %s`, + `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true | .partition_regex = "%s")) }' %s > %s && mv %s %s`, namespace, namespace, condition, + partitionRegex, config.CatalogPath, tmpCatalog, tmpCatalog, @@ -279,14 +282,14 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, // ) - streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true) + streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization in streams.json (%d): %s\n%s", code, err, out, ) } - t.Logf("Enabled normalization in %s", cfg.TestConfig.CatalogPath) + t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) // Helper to run sync and verify runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { @@ -298,7 +301,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) } t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) - VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver) + VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver) return nil } @@ -354,7 +357,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) // Ensure normalization remains on for selected stream - streamUpdateCmd = updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true) + streamUpdateCmd = updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s", code, err, out, @@ -411,7 +414,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // verifyIcebergSync verifies that data was correctly synchronized to Iceberg -func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, driver string) { +func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, partitionRegex, driver string) { t.Helper() ctx := context.Background() spark, err := sql.NewSessionBuilder().Remote(sparkConnectAddress).Build(ctx) @@ -482,7 +485,36 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema require.Equal(t, expectedIceType, iceType, "Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType) } - t.Logf("Verified datatypes in Iceberg after sync") + // Skip if no partitionRegex provided + if partitionRegex == "" { + t.Log("No partitionRegex provided, skipping partition verification") + return + } + + // Execute SHOW CREATE TABLE + createQuery := fmt.Sprintf("SHOW CREATE TABLE %s.%s.%s", icebergCatalog, icebergDB, tableName) + createDf, err := spark.Sql(ctx, createQuery) + require.NoError(t, err, "Failed to SHOW CREATE TABLE on Iceberg table") + + // Collect and concatenate the CREATE TABLE statement + createRows, err := createDf.Collect(ctx) + require.NoError(t, err, "Failed to collect SHOW CREATE TABLE from Iceberg") + require.NotEmpty(t, createRows, "No CREATE TABLE statement returned") + var createStmt strings.Builder + for _, row := range createRows { + createStmt.WriteString(row.Value("createtab_stmt").(string) + "\n") + } + fullStmt := createStmt.String() + + // Parse partitionRegex (e.g., "/{id,identity}" -> ["id", "identity"]) + regex := strings.TrimPrefix(partitionRegex, "/{") + regex = strings.TrimSuffix(regex, "}") + partitionCols := strings.Split(regex, ",") + expectedPartition := fmt.Sprintf("PARTITIONED BY (%s)", strings.Join(partitionCols, ", ")) + + // Verify partition spec + require.Contains(t, fullStmt, expectedPartition, "Partition spec mismatch: expected '%s' in CREATE TABLE, got:\n%s", expectedPartition, fullStmt) + t.Logf("Verified Iceberg partition spec: %s", expectedPartition) } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { @@ -558,7 +590,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { } t.Log("(backfill) discover completed") - updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.BackfillStreams, true) + updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.BackfillStreams, true) if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { return fmt.Errorf("failed to update streams: %s", err) } @@ -592,7 +624,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { } t.Log("(cdc) discover completed") - updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.CDCStreams, false) + updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.CDCStreams, false) if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { return fmt.Errorf("failed to update streams: %s", err) } From 35a35ffce4ab09b17c7f6f8f4ee24ce4bb46bc78 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Wed, 24 Sep 2025 21:28:04 +0530 Subject: [PATCH 04/13] fix: partition verification --- utils/testutils/test_utils.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 32f7bed12..fe86bc965 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -491,12 +491,9 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema return } - // Execute SHOW CREATE TABLE createQuery := fmt.Sprintf("SHOW CREATE TABLE %s.%s.%s", icebergCatalog, icebergDB, tableName) createDf, err := spark.Sql(ctx, createQuery) require.NoError(t, err, "Failed to SHOW CREATE TABLE on Iceberg table") - - // Collect and concatenate the CREATE TABLE statement createRows, err := createDf.Collect(ctx) require.NoError(t, err, "Failed to collect SHOW CREATE TABLE from Iceberg") require.NotEmpty(t, createRows, "No CREATE TABLE statement returned") @@ -505,14 +502,10 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema createStmt.WriteString(row.Value("createtab_stmt").(string) + "\n") } fullStmt := createStmt.String() - - // Parse partitionRegex (e.g., "/{id,identity}" -> ["id", "identity"]) - regex := strings.TrimPrefix(partitionRegex, "/{") + regex := strings.TrimPrefix(string(partitionRegex[0]), "/{") regex = strings.TrimSuffix(regex, "}") partitionCols := strings.Split(regex, ",") expectedPartition := fmt.Sprintf("PARTITIONED BY (%s)", strings.Join(partitionCols, ", ")) - - // Verify partition spec require.Contains(t, fullStmt, expectedPartition, "Partition spec mismatch: expected '%s' in CREATE TABLE, got:\n%s", expectedPartition, fullStmt) t.Logf("Verified Iceberg partition spec: %s", expectedPartition) } From bac32fdc8f23a6f1f86f44ebd5230281a2a65776 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Thu, 25 Sep 2025 12:26:13 +0530 Subject: [PATCH 05/13] fix: partition column verify --- utils/testutils/test_utils.go | 50 ++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index fe86bc965..4c552ada7 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -491,23 +491,41 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema return } - createQuery := fmt.Sprintf("SHOW CREATE TABLE %s.%s.%s", icebergCatalog, icebergDB, tableName) - createDf, err := spark.Sql(ctx, createQuery) - require.NoError(t, err, "Failed to SHOW CREATE TABLE on Iceberg table") - createRows, err := createDf.Collect(ctx) - require.NoError(t, err, "Failed to collect SHOW CREATE TABLE from Iceberg") - require.NotEmpty(t, createRows, "No CREATE TABLE statement returned") - var createStmt strings.Builder - for _, row := range createRows { - createStmt.WriteString(row.Value("createtab_stmt").(string) + "\n") + // Parse expected partitions from partitionRegex + // Example: "/{_id,identity}" -> Column: _id, Transform: identity + clean := strings.TrimPrefix(partitionRegex, "/{") + clean = strings.TrimSuffix(clean, "}") + parts := strings.Split(clean, ",") + expectedCol := strings.TrimSpace(parts[0]) + expectedTransform := "identity" // default + if len(parts) > 1 { + expectedTransform = strings.TrimSpace(parts[1]) } - fullStmt := createStmt.String() - regex := strings.TrimPrefix(string(partitionRegex[0]), "/{") - regex = strings.TrimSuffix(regex, "}") - partitionCols := strings.Split(regex, ",") - expectedPartition := fmt.Sprintf("PARTITIONED BY (%s)", strings.Join(partitionCols, ", ")) - require.Contains(t, fullStmt, expectedPartition, "Partition spec mismatch: expected '%s' in CREATE TABLE, got:\n%s", expectedPartition, fullStmt) - t.Logf("Verified Iceberg partition spec: %s", expectedPartition) + + // Query Iceberg partitions metadata table + partitionQuery := fmt.Sprintf("SELECT partition, transform FROM %s.%s.%s.partitions", icebergCatalog, icebergDB, tableName) + df, err := spark.Sql(ctx, partitionQuery) + require.NoError(t, err, "Failed to query Iceberg partitions metadata") + rows, err := df.Collect(ctx) + require.NoError(t, err, "Failed to collect partitions metadata") + require.NotEmpty(t, rows, "No partitions found in Iceberg table") + + // Build map of actual partitions + actual := make(map[string]string) + for _, row := range rows { + col := row.Value("partition").(string) + transform := row.Value("transform").(string) + actual[col] = transform + t.Logf("Found partition in Iceberg: %s (%s)", col, transform) + } + + // Compare expected partition with actual + got, found := actual[expectedCol] + require.Truef(t, found, "Expected partition column %s not found in Iceberg", expectedCol) + require.Equalf(t, expectedTransform, got, "Partition transform mismatch for column %s", expectedCol) + + t.Logf("Verified Iceberg partition spec: %s (%s)", expectedCol, expectedTransform) + } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { From b670258b38e4e0eb7cb2366125b8863b4619374d Mon Sep 17 00:00:00 2001 From: vikash390 Date: Thu, 25 Sep 2025 12:40:40 +0530 Subject: [PATCH 06/13] fix: lint issue --- utils/testutils/test_utils.go | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 4c552ada7..ff27b2dcf 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -485,47 +485,43 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema require.Equal(t, expectedIceType, iceType, "Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType) } + t.Logf("Verified datatypes in Iceberg after sync") // Skip if no partitionRegex provided if partitionRegex == "" { t.Log("No partitionRegex provided, skipping partition verification") return } - // Parse expected partitions from partitionRegex - // Example: "/{_id,identity}" -> Column: _id, Transform: identity + // Parse expected partition column from partitionRegex + // Example: "/{_id,identity}" -> Column: _id clean := strings.TrimPrefix(partitionRegex, "/{") clean = strings.TrimSuffix(clean, "}") parts := strings.Split(clean, ",") expectedCol := strings.TrimSpace(parts[0]) - expectedTransform := "identity" // default - if len(parts) > 1 { - expectedTransform = strings.TrimSpace(parts[1]) - } - // Query Iceberg partitions metadata table - partitionQuery := fmt.Sprintf("SELECT partition, transform FROM %s.%s.%s.partitions", icebergCatalog, icebergDB, tableName) + // Query Iceberg partitions metadata table (only 'partition' column) + partitionQuery := fmt.Sprintf("SELECT partition FROM %s.%s.%s.partitions", + icebergCatalog, icebergDB, tableName) df, err := spark.Sql(ctx, partitionQuery) require.NoError(t, err, "Failed to query Iceberg partitions metadata") + rows, err := df.Collect(ctx) require.NoError(t, err, "Failed to collect partitions metadata") require.NotEmpty(t, rows, "No partitions found in Iceberg table") - // Build map of actual partitions - actual := make(map[string]string) + // Collect actual partition columns + actualCols := make(map[string]struct{}) for _, row := range rows { col := row.Value("partition").(string) - transform := row.Value("transform").(string) - actual[col] = transform - t.Logf("Found partition in Iceberg: %s (%s)", col, transform) + actualCols[col] = struct{}{} + t.Logf("Found partition in Iceberg: %s", col) } - // Compare expected partition with actual - got, found := actual[expectedCol] + // Compare expected partition column with actual + _, found := actualCols[expectedCol] require.Truef(t, found, "Expected partition column %s not found in Iceberg", expectedCol) - require.Equalf(t, expectedTransform, got, "Partition transform mismatch for column %s", expectedCol) - - t.Logf("Verified Iceberg partition spec: %s (%s)", expectedCol, expectedTransform) + t.Logf("Verified Iceberg partition spec: %s", expectedCol) } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { From a00207d54649afdedc8f2152797be72849c354a7 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Thu, 25 Sep 2025 12:56:30 +0530 Subject: [PATCH 07/13] fix: partition column verify --- utils/testutils/test_utils.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index ff27b2dcf..b476b3a8f 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -512,9 +512,15 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema // Collect actual partition columns actualCols := make(map[string]struct{}) for _, row := range rows { - col := row.Value("partition").(string) - actualCols[col] = struct{}{} - t.Logf("Found partition in Iceberg: %s", col) + val := row.Value("partition") + colMap, ok := val.(map[string]interface{}) + require.True(t, ok, "Expected partition to be a map, got %#v", val) + + colName, ok := colMap["col"].(string) // key may vary by Spark version + require.True(t, ok, "Expected col to be a string, got %#v", colMap["col"]) + + actualCols[colName] = struct{}{} + t.Logf("Found partition in Iceberg: %s", colName) } // Compare expected partition column with actual @@ -522,6 +528,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema require.Truef(t, found, "Expected partition column %s not found in Iceberg", expectedCol) t.Logf("Verified Iceberg partition spec: %s", expectedCol) + } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { From eb2e112da3121bc16db8da4ce75737f3b85e14ab Mon Sep 17 00:00:00 2001 From: vikash390 Date: Thu, 25 Sep 2025 13:35:22 +0530 Subject: [PATCH 08/13] fix: partition column verify --- utils/testutils/test_utils.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index b476b3a8f..f57962519 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -513,11 +513,22 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema actualCols := make(map[string]struct{}) for _, row := range rows { val := row.Value("partition") - colMap, ok := val.(map[string]interface{}) - require.True(t, ok, "Expected partition to be a map, got %#v", val) - - colName, ok := colMap["col"].(string) // key may vary by Spark version - require.True(t, ok, "Expected col to be a string, got %#v", colMap["col"]) + var colName string + + switch v := val.(type) { + case string: + colName = v + case map[string]interface{}: + if c, ok := v["col"].(string); ok { + colName = c + } else if f, ok := v["field"].(string); ok { + colName = f + } else { + t.Fatalf("Cannot extract partition column from map: %#v", v) + } + default: + t.Fatalf("Unexpected type for partition column: %#v", v) + } actualCols[colName] = struct{}{} t.Logf("Found partition in Iceberg: %s", colName) From 6141f3b26be36885afd006965c377d8ea8ff239d Mon Sep 17 00:00:00 2001 From: vikash390 Date: Thu, 25 Sep 2025 14:38:57 +0530 Subject: [PATCH 09/13] fix: partition column verify --- utils/testutils/test_utils.go | 98 ++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index f57962519..088ad0f0d 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -453,7 +453,8 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema for key, expected := range schema { icebergValue, ok := icebergMap[key] require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key) - require.Equal(t, icebergValue, expected, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) + // expected first, actual second + require.Equal(t, expected, icebergValue, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) } } t.Logf("Verified Iceberg synced data with respect to data synced from source[%s] found equal", driver) @@ -486,60 +487,73 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema "Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType) } t.Logf("Verified datatypes in Iceberg after sync") - // Skip if no partitionRegex provided + + // Partition verification using only metadata tables if partitionRegex == "" { t.Log("No partitionRegex provided, skipping partition verification") return } - // Parse expected partition column from partitionRegex - // Example: "/{_id,identity}" -> Column: _id + // Parse expected partition columns from pattern like "/{col,identity}" + // Supports multiple entries like "/{col1,identity}" by taking the first token as the source column clean := strings.TrimPrefix(partitionRegex, "/{") clean = strings.TrimSuffix(clean, "}") - parts := strings.Split(clean, ",") - expectedCol := strings.TrimSpace(parts[0]) - - // Query Iceberg partitions metadata table (only 'partition' column) - partitionQuery := fmt.Sprintf("SELECT partition FROM %s.%s.%s.partitions", - icebergCatalog, icebergDB, tableName) - df, err := spark.Sql(ctx, partitionQuery) - require.NoError(t, err, "Failed to query Iceberg partitions metadata") - - rows, err := df.Collect(ctx) - require.NoError(t, err, "Failed to collect partitions metadata") - require.NotEmpty(t, rows, "No partitions found in Iceberg table") - - // Collect actual partition columns - actualCols := make(map[string]struct{}) - for _, row := range rows { - val := row.Value("partition") - var colName string - - switch v := val.(type) { - case string: - colName = v - case map[string]interface{}: - if c, ok := v["col"].(string); ok { - colName = c - } else if f, ok := v["field"].(string); ok { - colName = f - } else { - t.Fatalf("Cannot extract partition column from map: %#v", v) + toks := strings.Split(clean, ",") + expectedCol := strings.TrimSpace(toks[0]) + + // 1) Verify the declared Partitioning from table metadata (no data scan) + // Checks identity(col) appears in the Partitioning spec string. + describeExtQuery := fmt.Sprintf("DESCRIBE TABLE EXTENDED %s.%s.%s", icebergCatalog, icebergDB, tableName) + descExtDF, err := spark.Sql(ctx, describeExtQuery) + require.NoError(t, err, "Failed to DESCRIBE TABLE EXTENDED for partitioning") + + descExtRows, err := descExtDF.Collect(ctx) + require.NoError(t, err, "Failed to collect DESCRIBE EXTENDED rows") + var partitioningLine string + for _, r := range descExtRows { + cn, _ := r.Value("col_name").(string) + dt, _ := r.Value("data_type").(string) + if strings.EqualFold(strings.TrimSpace(cn), "Partitioning") { + partitioningLine = dt + break + } + } + require.NotEmpty(t, partitioningLine, "Partitioning line not found in DESCRIBE TABLE EXTENDED output") + // Expect identity transform for this test input + require.Containsf(t, strings.ToLower(partitioningLine), fmt.Sprintf("identity(%s)", strings.ToLower(expectedCol)), + "Declared partitioning does not include identity(%s): %s", expectedCol, partitioningLine) + + // 2) Extract actual partition field names from metadata table by converting struct -> JSON -> MAP -> keys + // This avoids schema-dependent struct access and reads only metadata. + partsQuery := fmt.Sprintf(` + SELECT DISTINCT k AS part_col + FROM ( + SELECT explode(map_keys(from_json(to_json(partition), 'MAP'))) AS k + FROM %s.%s.%s.partitions + ) s + `, icebergCatalog, icebergDB, tableName) + + pDF, err := spark.Sql(ctx, partsQuery) + require.NoError(t, err, "Failed to query Iceberg partitions metadata for keys") + + pRows, err := pDF.Collect(ctx) + require.NoError(t, err, "Failed to collect partition keys from metadata") + require.NotEmpty(t, pRows, "No partitions found in Iceberg table") + + actualCols := make(map[string]struct{}, len(pRows)) + for _, r := range pRows { + if v := r.Value("part_col"); v != nil { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + actualCols[strings.TrimSpace(s)] = struct{}{} + t.Logf("Found partition key: %s", s) } - default: - t.Fatalf("Unexpected type for partition column: %#v", v) } - - actualCols[colName] = struct{}{} - t.Logf("Found partition in Iceberg: %s", colName) } - // Compare expected partition column with actual _, found := actualCols[expectedCol] - require.Truef(t, found, "Expected partition column %s not found in Iceberg", expectedCol) - - t.Logf("Verified Iceberg partition spec: %s", expectedCol) + require.Truef(t, found, "Expected partition column %s not found in Iceberg metadata", expectedCol) + t.Logf("Verified Iceberg partitioning via metadata: %s", expectedCol) } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { From ac7877d8eb595125e4c0d0c043570e3841906af4 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Thu, 25 Sep 2025 17:02:09 +0530 Subject: [PATCH 10/13] fix: partition column verify --- utils/testutils/test_utils.go | 112 ++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 088ad0f0d..578ea9764 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -10,6 +10,7 @@ import ( "time" "github.com/apache/spark-connect-go/v35/spark/sql" + "github.com/apache/spark-connect-go/v35/spark/sql/types" "github.com/datazip-inc/olake/constants" "github.com/datazip-inc/olake/utils" "github.com/datazip-inc/olake/utils/typeutils" @@ -494,66 +495,27 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema return } + // DESCRIBE TABLE EXTENDED to get partition info + describeExtQuery := fmt.Sprintf("DESCRIBE TABLE EXTENDED %s.%s.%s", icebergCatalog, icebergDB, tableName) + describeExtDf, err := spark.Sql(ctx, describeExtQuery) + require.NoError(t, err, "Failed to describe Iceberg table (extended)") + + describeExtRows, err := describeExtDf.Collect(ctx) + require.NoError(t, err, "Failed to collect extended describe data") + + partitionColsStr := extractFirstPartitionColFromRows(describeExtRows) + + require.NotEmpty(t, partitionColsStr, "Partition columns not found in Iceberg metadata") + // Parse expected partition columns from pattern like "/{col,identity}" // Supports multiple entries like "/{col1,identity}" by taking the first token as the source column clean := strings.TrimPrefix(partitionRegex, "/{") clean = strings.TrimSuffix(clean, "}") toks := strings.Split(clean, ",") expectedCol := strings.TrimSpace(toks[0]) + require.Equal(t, expectedCol, partitionColsStr, "Partition column does not match expected '%s'", expectedCol) + t.Logf("Verified partition column: %s", expectedCol) - // 1) Verify the declared Partitioning from table metadata (no data scan) - // Checks identity(col) appears in the Partitioning spec string. - describeExtQuery := fmt.Sprintf("DESCRIBE TABLE EXTENDED %s.%s.%s", icebergCatalog, icebergDB, tableName) - descExtDF, err := spark.Sql(ctx, describeExtQuery) - require.NoError(t, err, "Failed to DESCRIBE TABLE EXTENDED for partitioning") - - descExtRows, err := descExtDF.Collect(ctx) - require.NoError(t, err, "Failed to collect DESCRIBE EXTENDED rows") - var partitioningLine string - for _, r := range descExtRows { - cn, _ := r.Value("col_name").(string) - dt, _ := r.Value("data_type").(string) - if strings.EqualFold(strings.TrimSpace(cn), "Partitioning") { - partitioningLine = dt - break - } - } - require.NotEmpty(t, partitioningLine, "Partitioning line not found in DESCRIBE TABLE EXTENDED output") - // Expect identity transform for this test input - require.Containsf(t, strings.ToLower(partitioningLine), fmt.Sprintf("identity(%s)", strings.ToLower(expectedCol)), - "Declared partitioning does not include identity(%s): %s", expectedCol, partitioningLine) - - // 2) Extract actual partition field names from metadata table by converting struct -> JSON -> MAP -> keys - // This avoids schema-dependent struct access and reads only metadata. - partsQuery := fmt.Sprintf(` - SELECT DISTINCT k AS part_col - FROM ( - SELECT explode(map_keys(from_json(to_json(partition), 'MAP'))) AS k - FROM %s.%s.%s.partitions - ) s - `, icebergCatalog, icebergDB, tableName) - - pDF, err := spark.Sql(ctx, partsQuery) - require.NoError(t, err, "Failed to query Iceberg partitions metadata for keys") - - pRows, err := pDF.Collect(ctx) - require.NoError(t, err, "Failed to collect partition keys from metadata") - require.NotEmpty(t, pRows, "No partitions found in Iceberg table") - - actualCols := make(map[string]struct{}, len(pRows)) - for _, r := range pRows { - if v := r.Value("part_col"); v != nil { - if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { - actualCols[strings.TrimSpace(s)] = struct{}{} - t.Logf("Found partition key: %s", s) - } - } - } - - _, found := actualCols[expectedCol] - require.Truef(t, found, "Expected partition column %s not found in Iceberg metadata", expectedCol) - - t.Logf("Verified Iceberg partitioning via metadata: %s", expectedCol) } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { @@ -713,3 +675,47 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { }() }) } + +// extractFirstPartitionColFromRows extracts the first partition column from DESCRIBE EXTENDED rows +func extractFirstPartitionColFromRows(rows []types.Row) string { + inPartitionSection := false + + for _, row := range rows { + // Convert []any -> []string + vals := row.Values() + parts := make([]string, len(vals)) + for i, v := range vals { + if v == nil { + parts[i] = "" + } else { + parts[i] = fmt.Sprint(v) // safe string conversion + } + } + line := strings.TrimSpace(strings.Join(parts, " ")) + if line == "" { + continue + } + + if strings.HasPrefix(line, "# Partition Information") { + inPartitionSection = true + continue + } + + if inPartitionSection { + if strings.HasPrefix(line, "# col_name") { + continue + } + + if strings.HasPrefix(line, "#") { + break + } + + fields := strings.Fields(line) + if len(fields) > 0 { + return fields[0] // return the first partition col + } + } + } + + return "" +} From b437a78f709c68101b74d8cef05195b3a89d572d Mon Sep 17 00:00:00 2001 From: vikash390 Date: Mon, 29 Sep 2025 16:18:20 +0530 Subject: [PATCH 11/13] fix: lint issue --- utils/testutils/test_utils.go | 1 - 1 file changed, 1 deletion(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 578ea9764..d0f7dfca2 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -515,7 +515,6 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema expectedCol := strings.TrimSpace(toks[0]) require.Equal(t, expectedCol, partitionColsStr, "Partition column does not match expected '%s'", expectedCol) t.Logf("Verified partition column: %s", expectedCol) - } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { From 42e459e763c40e94f27330d018161c93814ddce1 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Mon, 29 Sep 2025 18:16:32 +0530 Subject: [PATCH 12/13] chore: updated partioning verify query --- utils/testutils/test_utils.go | 42 ++++++++++++++--------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index d0f7dfca2..12be1c42a 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -48,7 +48,6 @@ type PerformanceTest struct { BackfillStreams []string CDCStreams []string ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) - PartitionRegex string } type benchmarkStats struct { @@ -119,7 +118,7 @@ func discoverCommand(config TestConfig) string { } // update normalization=true for selected streams under selected_streams. by name -func updateStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string { +func updateSelectedStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string { if len(stream) == 0 { return "" } @@ -144,12 +143,12 @@ func updateStreamsCommand(config TestConfig, namespace, partitionRegex string, s } // set sync_mode and cursor_field for a specific stream object in streams[] by namespace+name -func setStreamSyncModeCommand(config TestConfig, namespace, streamName, syncMode, cursorField string, stateRequired bool) string { +func updateStreamConfigCommand(config TestConfig, namespace, streamName, syncMode, cursorField string) string { tmpCatalog := fmt.Sprintf("/tmp/%s_set_mode_streams.json", config.Driver) - // --argjson is used for boolean state; map/select pattern updates nested array members + // map/select pattern updates nested array members return fmt.Sprintf( - `jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" --argjson state %t '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor | .stream.state_required = $state) | . else . end))' %s > %s && mv %s %s`, - namespace, streamName, syncMode, cursorField, stateRequired, + `jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor) else . end))' %s > %s && mv %s %s`, + namespace, streamName, syncMode, cursorField, config.CatalogPath, tmpCatalog, tmpCatalog, config.CatalogPath, ) } @@ -283,7 +282,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, // ) - streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) + streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization in streams.json (%d): %s\n%s", code, err, out, @@ -358,15 +357,15 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) // Ensure normalization remains on for selected stream - streamUpdateCmd = updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) + streamUpdateCmd = updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s", code, err, out, ) } - // Patch: sync_mode = incremental, cursor_field = "id", state_required = true - incPatch := setStreamSyncModeCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField, true) + // Patch: sync_mode = incremental, cursor_field = "id" + incPatch := updateStreamConfigCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField) if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 { return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) } @@ -378,13 +377,13 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // Initial incremental run (equivalent to full on first run) - t.Log("Running Incremental - initial load") + t.Log("Running Incremental - full load") if err := runSync(c, true, "", "r", cfg.ExpectedData); err != nil { return err } // Delta incremental: add new rows and sync again - t.Log("Running Incremental - delta load (inserts)") + t.Log("Running Incremental - inserts") cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false) if err := runSync(c, true, "", "u", cfg.ExpectedData); err != nil { return err @@ -454,7 +453,6 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema for key, expected := range schema { icebergValue, ok := icebergMap[key] require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key) - // expected first, actual second require.Equal(t, expected, icebergValue, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) } } @@ -495,17 +493,11 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema return } - // DESCRIBE TABLE EXTENDED to get partition info - describeExtQuery := fmt.Sprintf("DESCRIBE TABLE EXTENDED %s.%s.%s", icebergCatalog, icebergDB, tableName) - describeExtDf, err := spark.Sql(ctx, describeExtQuery) - require.NoError(t, err, "Failed to describe Iceberg table (extended)") - - describeExtRows, err := describeExtDf.Collect(ctx) - require.NoError(t, err, "Failed to collect extended describe data") + require.NoError(t, err, "Failed to collect describe data from Iceberg") - partitionColsStr := extractFirstPartitionColFromRows(describeExtRows) + partitionCols := extractFirstPartitionColFromRows(describeRows) - require.NotEmpty(t, partitionColsStr, "Partition columns not found in Iceberg metadata") + require.NotEmpty(t, partitionCols, "Partition columns not found in Iceberg metadata") // Parse expected partition columns from pattern like "/{col,identity}" // Supports multiple entries like "/{col1,identity}" by taking the first token as the source column @@ -513,7 +505,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema clean = strings.TrimSuffix(clean, "}") toks := strings.Split(clean, ",") expectedCol := strings.TrimSpace(toks[0]) - require.Equal(t, expectedCol, partitionColsStr, "Partition column does not match expected '%s'", expectedCol) + require.Equal(t, expectedCol, partitionCols, "Partition column does not match expected '%s'", expectedCol) t.Logf("Verified partition column: %s", expectedCol) } @@ -590,7 +582,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { } t.Log("(backfill) discover completed") - updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.BackfillStreams, true) + updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.BackfillStreams, true) if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { return fmt.Errorf("failed to update streams: %s", err) } @@ -624,7 +616,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { } t.Log("(cdc) discover completed") - updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.CDCStreams, false) + updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.CDCStreams, false) if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { return fmt.Errorf("failed to update streams: %s", err) } From 06b3e9d4bd22a5897ed591eaea886372fe161883 Mon Sep 17 00:00:00 2001 From: vikash390 Date: Mon, 29 Sep 2025 18:30:51 +0530 Subject: [PATCH 13/13] chore: added a todo --- utils/testutils/test_utils.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 12be1c42a..1c644f6e2 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -413,6 +413,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { }) } +// TODO: Refactor parsing logic into a reusable utility functions // verifyIcebergSync verifies that data was correctly synchronized to Iceberg func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, partitionRegex, driver string) { t.Helper() @@ -492,11 +493,8 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema t.Log("No partitionRegex provided, skipping partition verification") return } - - require.NoError(t, err, "Failed to collect describe data from Iceberg") - + // Extract partition columns from describe rows partitionCols := extractFirstPartitionColFromRows(describeRows) - require.NotEmpty(t, partitionCols, "Partition columns not found in Iceberg metadata") // Parse expected partition columns from pattern like "/{col,identity}"