-
Couldn't load subscription status.
- Fork 132
feat: integration test for incremental sync and partitioning #521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from all commits
2018fd5
49f7854
9d6304f
8ec5e2c
6802830
cff601b
47abe61
35a35ff
7893352
bac32fd
b670258
a00207d
eb2e112
6141f3b
ac7877d
b437a78
3ba920f
42e459e
06b3e9d
2cd7c5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/apache/spark-connect-go/v35/spark/sql" | ||
| "github.com/apache/spark-connect-go/v35/spark/sql/types" | ||
| "github.com/datazip-inc/olake/constants" | ||
| "github.com/datazip-inc/olake/utils" | ||
| "github.com/datazip-inc/olake/utils/typeutils" | ||
|
|
@@ -37,6 +38,8 @@ type IntegrationTest struct { | |
| Namespace string | ||
| ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) | ||
| IcebergDB string | ||
| CursorField string | ||
| PartitionRegex string | ||
| } | ||
|
|
||
| type PerformanceTest struct { | ||
|
|
@@ -114,8 +117,8 @@ func discoverCommand(config TestConfig) string { | |
| return fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath) | ||
| } | ||
|
|
||
| // TODO: check if we can remove namespace from being passed as a parameter and use a common namespace for all drivers | ||
| func updateStreamsCommand(config TestConfig, namespace string, stream []string, isBackfill bool) string { | ||
| // update normalization=true for selected streams under selected_streams.<namespace> by name | ||
| func updateSelectedStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string { | ||
| if len(stream) == 0 { | ||
| return "" | ||
| } | ||
|
|
@@ -126,10 +129,11 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, | |
| condition := strings.Join(streamConditions, " or ") | ||
| tmpCatalog := fmt.Sprintf("/tmp/%s_%s_streams.json", config.Driver, utils.Ternary(isBackfill, "backfill", "cdc").(string)) | ||
| jqExpr := fmt.Sprintf( | ||
| `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true)) }' %s > %s && mv %s %s`, | ||
| `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true | .partition_regex = "%s")) }' %s > %s && mv %s %s`, | ||
| namespace, | ||
| namespace, | ||
| condition, | ||
| partitionRegex, | ||
| config.CatalogPath, | ||
| tmpCatalog, | ||
| tmpCatalog, | ||
|
|
@@ -138,6 +142,23 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, | |
| return jqExpr | ||
| } | ||
|
|
||
| // set sync_mode and cursor_field for a specific stream object in streams[] by namespace+name | ||
| func updateStreamConfigCommand(config TestConfig, namespace, streamName, syncMode, cursorField string) string { | ||
| tmpCatalog := fmt.Sprintf("/tmp/%s_set_mode_streams.json", config.Driver) | ||
| // map/select pattern updates nested array members | ||
| return fmt.Sprintf( | ||
| `jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor) else . end))' %s > %s && mv %s %s`, | ||
| namespace, streamName, syncMode, cursorField, | ||
| config.CatalogPath, tmpCatalog, tmpCatalog, config.CatalogPath, | ||
| ) | ||
| } | ||
|
|
||
| // reset state file so incremental can perform initial load (equivalent to full load on first run) | ||
| func resetStateFileCommand(config TestConfig) string { | ||
| // Ensure the state is clean irrespective of previous CDC run | ||
| return fmt.Sprintf(`rm -f %s; echo '{}' > %s`, config.StatePath, config.StatePath) | ||
| } | ||
|
|
||
| // to get backfill streams from cdc streams e.g. "demo_cdc" -> "demo" | ||
| func GetBackfillStreamsFromCDC(cdcStreams []string) []string { | ||
| backfillStreams := []string{} | ||
|
|
@@ -261,15 +282,30 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { | |
| // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, | ||
| // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, | ||
| // ) | ||
| streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true) | ||
| streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) | ||
| if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to enable normalization in streams.json (%d): %s\n%s", | ||
| code, err, out, | ||
| ) | ||
| } | ||
|
|
||
| t.Logf("Enabled normalization in %s", cfg.TestConfig.CatalogPath) | ||
| t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) | ||
|
|
||
| // Helper to run sync and verify | ||
| runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { | ||
| cmd := syncCommand(*cfg.TestConfig, useState) | ||
| if useState && operation != "" { | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) | ||
| } | ||
| if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { | ||
| return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) | ||
| } | ||
| t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) | ||
| VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver) | ||
| return nil | ||
| } | ||
|
|
||
| // 3. Phase A: Full load + CDC | ||
| testCases := []struct { | ||
| syncMode string | ||
| operation string | ||
|
|
@@ -306,30 +342,54 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { | |
| dummySchema: nil, | ||
| }, | ||
| } | ||
|
|
||
| runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { | ||
| cmd := syncCommand(*cfg.TestConfig, useState) | ||
| if useState && operation != "" { | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) | ||
| } | ||
|
|
||
| if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { | ||
| return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) | ||
| } | ||
| t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) | ||
| VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver) | ||
| return nil | ||
| } | ||
|
|
||
| // 3. Run Sync command and verify records in Iceberg | ||
| for _, test := range testCases { | ||
| t.Logf("Running test for: %s", test.syncMode) | ||
| if err := runSync(c, test.useState, test.operation, test.opSymbol, test.dummySchema); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // 4. Clean up | ||
| // 4. Phase B: Full load + Incremental (switch streams.json cdc -> incremental, set cursor_field = "id") | ||
| // Reset table to isolate incremental scenario | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) | ||
|
|
||
| // Ensure normalization remains on for selected stream | ||
| streamUpdateCmd = updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) | ||
| if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s", | ||
| code, err, out, | ||
| ) | ||
| } | ||
|
Comment on lines
+360
to
+365
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is getting repeated here. |
||
|
|
||
| // Patch: sync_mode = incremental, cursor_field = "id" | ||
| incPatch := updateStreamConfigCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField) | ||
| if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) | ||
| } | ||
|
|
||
| // Reset state so initial incremental behaves like a first full incremental load | ||
| resetState := resetStateFileCommand(*cfg.TestConfig) | ||
| if code, out, err := utils.ExecCommand(ctx, c, resetState); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to reset state for incremental (%d): %s\n%s", code, err, out) | ||
| } | ||
|
|
||
| // Initial incremental run (equivalent to full on first run) | ||
| t.Log("Running Incremental - full load") | ||
| if err := runSync(c, true, "", "r", cfg.ExpectedData); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Delta incremental: add new rows and sync again | ||
| t.Log("Running Incremental - inserts") | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false) | ||
| if err := runSync(c, true, "", "u", cfg.ExpectedData); err != nil { | ||
| return err | ||
| } | ||
|
Comment on lines
+379
to
+390
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here as well a test cases array can be created and call that same for loop |
||
|
|
||
| // 5. Clean up | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) | ||
| t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) | ||
| return nil | ||
|
|
@@ -353,8 +413,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { | |
| }) | ||
| } | ||
|
|
||
| // TODO: Refactor parsing logic into a reusable utility functions | ||
| // verifyIcebergSync verifies that data was correctly synchronized to Iceberg | ||
| func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, driver string) { | ||
| func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, partitionRegex, driver string) { | ||
| t.Helper() | ||
| ctx := context.Background() | ||
| spark, err := sql.NewSessionBuilder().Remote(sparkConnectAddress).Build(ctx) | ||
|
|
@@ -393,7 +454,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema | |
| for key, expected := range schema { | ||
| icebergValue, ok := icebergMap[key] | ||
| require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key) | ||
| require.Equal(t, icebergValue, expected, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) | ||
| require.Equal(t, expected, icebergValue, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) | ||
| } | ||
| } | ||
| t.Logf("Verified Iceberg synced data with respect to data synced from source[%s] found equal", driver) | ||
|
|
@@ -426,6 +487,24 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema | |
| "Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType) | ||
| } | ||
| t.Logf("Verified datatypes in Iceberg after sync") | ||
|
|
||
| // Partition verification using only metadata tables | ||
| if partitionRegex == "" { | ||
| t.Log("No partitionRegex provided, skipping partition verification") | ||
| return | ||
| } | ||
| // Extract partition columns from describe rows | ||
| partitionCols := extractFirstPartitionColFromRows(describeRows) | ||
| require.NotEmpty(t, partitionCols, "Partition columns not found in Iceberg metadata") | ||
|
|
||
| // Parse expected partition columns from pattern like "/{col,identity}" | ||
| // Supports multiple entries like "/{col1,identity}" by taking the first token as the source column | ||
| clean := strings.TrimPrefix(partitionRegex, "/{") | ||
| clean = strings.TrimSuffix(clean, "}") | ||
| toks := strings.Split(clean, ",") | ||
| expectedCol := strings.TrimSpace(toks[0]) | ||
| require.Equal(t, expectedCol, partitionCols, "Partition column does not match expected '%s'", expectedCol) | ||
| t.Logf("Verified partition column: %s", expectedCol) | ||
| } | ||
|
|
||
| func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | ||
|
|
@@ -501,7 +580,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | |
| } | ||
| t.Log("(backfill) discover completed") | ||
|
|
||
| updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.BackfillStreams, true) | ||
| updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.BackfillStreams, true) | ||
| if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to update streams: %s", err) | ||
| } | ||
|
|
@@ -535,7 +614,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | |
| } | ||
| t.Log("(cdc) discover completed") | ||
|
|
||
| updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.CDCStreams, false) | ||
| updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.CDCStreams, false) | ||
| if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to update streams: %s", err) | ||
| } | ||
|
|
@@ -585,3 +664,47 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | |
| }() | ||
| }) | ||
| } | ||
|
|
||
| // extractFirstPartitionColFromRows extracts the first partition column from DESCRIBE EXTENDED rows | ||
| func extractFirstPartitionColFromRows(rows []types.Row) string { | ||
| inPartitionSection := false | ||
|
|
||
| for _, row := range rows { | ||
| // Convert []any -> []string | ||
| vals := row.Values() | ||
| parts := make([]string, len(vals)) | ||
| for i, v := range vals { | ||
| if v == nil { | ||
| parts[i] = "" | ||
| } else { | ||
| parts[i] = fmt.Sprint(v) // safe string conversion | ||
| } | ||
| } | ||
| line := strings.TrimSpace(strings.Join(parts, " ")) | ||
| if line == "" { | ||
| continue | ||
| } | ||
|
|
||
| if strings.HasPrefix(line, "# Partition Information") { | ||
| inPartitionSection = true | ||
| continue | ||
| } | ||
|
|
||
| if inPartitionSection { | ||
| if strings.HasPrefix(line, "# col_name") { | ||
| continue | ||
| } | ||
|
|
||
| if strings.HasPrefix(line, "#") { | ||
| break | ||
| } | ||
|
|
||
| fields := strings.Fields(line) | ||
| if len(fields) > 0 { | ||
| return fields[0] // return the first partition col | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return "" | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this error message as well