Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions drivers/mongodb/internal/mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestMongodbIntegration(t *testing.T) {
DataTypeSchema: MongoToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "mongodb_olake_mongodb_test",
CursorField: "_id",
PartitionRegex: "/{_id,identity}",
}
testConfig.TestIntegration(t)
}
Expand Down
2 changes: 2 additions & 0 deletions drivers/mysql/internal/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestMySQLIntegration(t *testing.T) {
DataTypeSchema: MySQLToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "mysql_olake_mysql_test",
CursorField: "id",
PartitionRegex: "/{id,identity}",
}
testConfig.TestIntegration(t)
}
Expand Down
2 changes: 2 additions & 0 deletions drivers/postgres/internal/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func TestPostgresIntegration(t *testing.T) {
DataTypeSchema: PostgresToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "postgres_postgres_public",
CursorField: "col_bigserial",
PartitionRegex: "/{col_bigserial,identity}",
}
testConfig.TestIntegration(t)
}
Expand Down
175 changes: 149 additions & 26 deletions utils/testutils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/apache/spark-connect-go/v35/spark/sql"
"github.com/apache/spark-connect-go/v35/spark/sql/types"
"github.com/datazip-inc/olake/constants"
"github.com/datazip-inc/olake/utils"
"github.com/datazip-inc/olake/utils/typeutils"
Expand Down Expand Up @@ -37,6 +38,8 @@ type IntegrationTest struct {
Namespace string
ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool)
IcebergDB string
CursorField string
PartitionRegex string
}

type PerformanceTest struct {
Expand Down Expand Up @@ -114,8 +117,8 @@ func discoverCommand(config TestConfig) string {
return fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath)
}

// TODO: check if we can remove namespace from being passed as a parameter and use a common namespace for all drivers
func updateStreamsCommand(config TestConfig, namespace string, stream []string, isBackfill bool) string {
// update normalization=true for selected streams under selected_streams.<namespace> by name
func updateSelectedStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string {
if len(stream) == 0 {
return ""
}
Expand All @@ -126,10 +129,11 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string,
condition := strings.Join(streamConditions, " or ")
tmpCatalog := fmt.Sprintf("/tmp/%s_%s_streams.json", config.Driver, utils.Ternary(isBackfill, "backfill", "cdc").(string))
jqExpr := fmt.Sprintf(
`jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true)) }' %s > %s && mv %s %s`,
`jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true | .partition_regex = "%s")) }' %s > %s && mv %s %s`,
namespace,
namespace,
condition,
partitionRegex,
config.CatalogPath,
tmpCatalog,
tmpCatalog,
Expand All @@ -138,6 +142,23 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string,
return jqExpr
}

// set sync_mode and cursor_field for a specific stream object in streams[] by namespace+name
func updateStreamConfigCommand(config TestConfig, namespace, streamName, syncMode, cursorField string) string {
tmpCatalog := fmt.Sprintf("/tmp/%s_set_mode_streams.json", config.Driver)
// map/select pattern updates nested array members
return fmt.Sprintf(
`jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor) else . end))' %s > %s && mv %s %s`,
namespace, streamName, syncMode, cursorField,
config.CatalogPath, tmpCatalog, tmpCatalog, config.CatalogPath,
)
}

// reset state file so incremental can perform initial load (equivalent to full load on first run)
func resetStateFileCommand(config TestConfig) string {
// Ensure the state is clean irrespective of previous CDC run
return fmt.Sprintf(`rm -f %s; echo '{}' > %s`, config.StatePath, config.StatePath)
}

// to get backfill streams from cdc streams e.g. "demo_cdc" -> "demo"
func GetBackfillStreamsFromCDC(cdcStreams []string) []string {
backfillStreams := []string{}
Expand Down Expand Up @@ -261,15 +282,30 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) {
// `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`,
// cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath,
// )
streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true)
streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true)
if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 {
return fmt.Errorf("failed to enable normalization in streams.json (%d): %s\n%s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this error message as well

code, err, out,
)
}

t.Logf("Enabled normalization in %s", cfg.TestConfig.CatalogPath)
t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath)

// Helper to run sync and verify
runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error {
cmd := syncCommand(*cfg.TestConfig, useState)
if useState && operation != "" {
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false)
}
if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 {
return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out)
}
t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver)
VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver)
return nil
}

// 3. Phase A: Full load + CDC
testCases := []struct {
syncMode string
operation string
Expand Down Expand Up @@ -306,30 +342,54 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) {
dummySchema: nil,
},
}

runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error {
cmd := syncCommand(*cfg.TestConfig, useState)
if useState && operation != "" {
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false)
}

if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 {
return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out)
}
t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver)
VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver)
return nil
}

// 3. Run Sync command and verify records in Iceberg
for _, test := range testCases {
t.Logf("Running test for: %s", test.syncMode)
if err := runSync(c, test.useState, test.operation, test.opSymbol, test.dummySchema); err != nil {
return err
}
}

// 4. Clean up
// 4. Phase B: Full load + Incremental (switch streams.json cdc -> incremental, set cursor_field = "id")
// Reset table to isolate incremental scenario
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false)
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false)
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false)
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false)

// Ensure normalization remains on for selected stream
streamUpdateCmd = updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true)
if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 {
return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s",
code, err, out,
)
}
Comment on lines +360 to +365
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting repeated here.


// Patch: sync_mode = incremental, cursor_field = "id"
incPatch := updateStreamConfigCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField)
if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 {
return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out)
}

// Reset state so initial incremental behaves like a first full incremental load
resetState := resetStateFileCommand(*cfg.TestConfig)
if code, out, err := utils.ExecCommand(ctx, c, resetState); err != nil || code != 0 {
return fmt.Errorf("failed to reset state for incremental (%d): %s\n%s", code, err, out)
}

// Initial incremental run (equivalent to full on first run)
t.Log("Running Incremental - full load")
if err := runSync(c, true, "", "r", cfg.ExpectedData); err != nil {
return err
}

// Delta incremental: add new rows and sync again
t.Log("Running Incremental - inserts")
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false)
if err := runSync(c, true, "", "u", cfg.ExpectedData); err != nil {
return err
}
Comment on lines +379 to +390
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here as well a test cases array can be created and call that same for loop


// 5. Clean up
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false)
t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver)
return nil
Expand All @@ -353,8 +413,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) {
})
}

// TODO: Refactor parsing logic into a reusable utility functions
// verifyIcebergSync verifies that data was correctly synchronized to Iceberg
func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, driver string) {
func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, partitionRegex, driver string) {
t.Helper()
ctx := context.Background()
spark, err := sql.NewSessionBuilder().Remote(sparkConnectAddress).Build(ctx)
Expand Down Expand Up @@ -393,7 +454,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema
for key, expected := range schema {
icebergValue, ok := icebergMap[key]
require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key)
require.Equal(t, icebergValue, expected, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected)
require.Equal(t, expected, icebergValue, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected)
}
}
t.Logf("Verified Iceberg synced data with respect to data synced from source[%s] found equal", driver)
Expand Down Expand Up @@ -426,6 +487,24 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema
"Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType)
}
t.Logf("Verified datatypes in Iceberg after sync")

// Partition verification using only metadata tables
if partitionRegex == "" {
t.Log("No partitionRegex provided, skipping partition verification")
return
}
// Extract partition columns from describe rows
partitionCols := extractFirstPartitionColFromRows(describeRows)
require.NotEmpty(t, partitionCols, "Partition columns not found in Iceberg metadata")

// Parse expected partition columns from pattern like "/{col,identity}"
// Supports multiple entries like "/{col1,identity}" by taking the first token as the source column
clean := strings.TrimPrefix(partitionRegex, "/{")
clean = strings.TrimSuffix(clean, "}")
toks := strings.Split(clean, ",")
expectedCol := strings.TrimSpace(toks[0])
require.Equal(t, expectedCol, partitionCols, "Partition column does not match expected '%s'", expectedCol)
t.Logf("Verified partition column: %s", expectedCol)
}

func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
Expand Down Expand Up @@ -501,7 +580,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
}
t.Log("(backfill) discover completed")

updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.BackfillStreams, true)
updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.BackfillStreams, true)
if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 {
return fmt.Errorf("failed to update streams: %s", err)
}
Expand Down Expand Up @@ -535,7 +614,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
}
t.Log("(cdc) discover completed")

updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.CDCStreams, false)
updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.CDCStreams, false)
if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 {
return fmt.Errorf("failed to update streams: %s", err)
}
Expand Down Expand Up @@ -585,3 +664,47 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
}()
})
}

// extractFirstPartitionColFromRows extracts the first partition column from DESCRIBE EXTENDED rows
func extractFirstPartitionColFromRows(rows []types.Row) string {
inPartitionSection := false

for _, row := range rows {
// Convert []any -> []string
vals := row.Values()
parts := make([]string, len(vals))
for i, v := range vals {
if v == nil {
parts[i] = ""
} else {
parts[i] = fmt.Sprint(v) // safe string conversion
}
}
line := strings.TrimSpace(strings.Join(parts, " "))
if line == "" {
continue
}

if strings.HasPrefix(line, "# Partition Information") {
inPartitionSection = true
continue
}

if inPartitionSection {
if strings.HasPrefix(line, "# col_name") {
continue
}

if strings.HasPrefix(line, "#") {
break
}

fields := strings.Fields(line)
if len(fields) > 0 {
return fields[0] // return the first partition col
}
}
}

return ""
}
Loading