Skip to content

Commit 2e3918d

Browse files
fix: kill process manually after timeout (#493)
Co-authored-by: Ankit Sharma <[email protected]>
1 parent bea74c4 commit 2e3918d

File tree

4 files changed

+54
-16
lines changed

4 files changed

+54
-16
lines changed

.github/workflows/performance-test.yml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,20 @@ jobs:
9595
- name: Cleanup
9696
if: always()
9797
run: |
98-
aws glue delete-database --name performance_${{ matrix.driver }} || { echo "failed to delete glue database: performance_${{ matrix.driver }}"; aws glue get-database --name performance_${{ matrix.driver }} || true; }
99-
aws s3 rm s3://dz-stag-github-actions/performance_${{ matrix.driver }} --recursive || { echo "failed to delete s3 bucket: performance_${{ matrix.driver }}"; aws s3 ls s3://dz-stag-github-actions/performance_${{ matrix.driver }} || true; }
98+
# Delete all Glue databases matching prefix
99+
for db in $(aws glue get-databases \
100+
--query "DatabaseList[?starts_with(Name, 'performance_${{ matrix.driver }}')].Name" \
101+
--output text); do
102+
echo "Deleting Glue database: $db"
103+
aws glue delete-database --name "$db" \
104+
|| echo "Failed to delete Glue database: $db"
105+
done
106+
107+
# Delete corresponding S3 path
108+
aws s3 rm s3://dz-stag-github-actions/performance_${{ matrix.driver }}/ --recursive \
109+
|| { echo "Failed to delete S3 bucket: performance_${{ matrix.driver }}"; \
110+
aws s3 ls s3://dz-stag-github-actions/performance_${{ matrix.driver }} || true; }
111+
100112
echo "Catalog cleanup completed"
101113
102114
if [[ "${{ matrix.driver }}" != "oracle" ]]; then

destination/iceberg/iceberg.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/datazip-inc/olake/utils"
1818
"github.com/datazip-inc/olake/utils/logger"
1919
"github.com/datazip-inc/olake/utils/typeutils"
20+
"github.com/spf13/viper"
2021
)
2122

2223
type Iceberg struct {
@@ -261,8 +262,13 @@ func (i *Iceberg) Check(ctx context.Context) error {
261262
i.options = &destination.Options{
262263
ThreadID: "test_iceberg_destination",
263264
}
265+
266+
destinationDB := "test_olake"
267+
if prefix := viper.GetString(constants.DestinationDatabasePrefix); prefix != "" {
268+
destinationDB = fmt.Sprintf("%s_%s", utils.Reformat(prefix), destinationDB)
269+
}
264270
// Create a temporary setup for checking
265-
server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, true, false, "test_olake")
271+
server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, true, false, destinationDB)
266272
if err != nil {
267273
return fmt.Errorf("failed to setup iceberg server: %s", err)
268274
}
@@ -281,7 +287,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
281287
Type: proto.IcebergPayload_GET_OR_CREATE_TABLE,
282288
Metadata: &proto.IcebergPayload_Metadata{
283289
ThreadId: server.serverID,
284-
DestTableName: "test_olake",
290+
DestTableName: destinationDB,
285291
Schema: icebergRawSchema(),
286292
},
287293
}
@@ -296,7 +302,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
296302
// try writing record in dest table
297303
currentTime := time.Now().UTC()
298304
protoSchema := icebergRawSchema()
299-
record := types.CreateRawRecord("olake_test", map[string]any{"name": "olake"}, "r", &currentTime)
305+
record := types.CreateRawRecord(destinationDB, map[string]any{"name": "olake"}, "r", &currentTime)
300306
protoColumns, err := rawDataColumnBuffer(record, protoSchema)
301307
if err != nil {
302308
return fmt.Errorf("failed to create raw data column buffer: %s", err)
@@ -305,7 +311,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
305311
Type: proto.IcebergPayload_RECORDS,
306312
Metadata: &proto.IcebergPayload_Metadata{
307313
ThreadId: server.serverID,
308-
DestTableName: "test_olake",
314+
DestTableName: destinationDB,
309315
Schema: protoSchema,
310316
},
311317
Records: []*proto.IcebergPayload_IceRecord{{

protocol/sync.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/datazip-inc/olake/constants"
910
"github.com/datazip-inc/olake/destination"
1011
"github.com/datazip-inc/olake/types"
1112
"github.com/datazip-inc/olake/utils"
1213
"github.com/datazip-inc/olake/utils/logger"
1314
"github.com/datazip-inc/olake/utils/telemetry"
1415
"github.com/spf13/cobra"
16+
"github.com/spf13/viper"
1517
)
1618

1719
// syncCmd represents the read command
@@ -38,6 +40,11 @@ var syncCmd = &cobra.Command{
3840
return err
3941
}
4042

43+
// to set prefix for "test_olake" db created by OLake
44+
if destinationDatabasePrefix != "" {
45+
viper.Set(constants.DestinationDatabasePrefix, destinationDatabasePrefix)
46+
}
47+
4148
catalog = &types.Catalog{}
4249
if err := utils.UnmarshalFile(streamsPath, catalog, false); err != nil {
4350
return err

utils/testutils/test_utils.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,25 @@ func GetTestConfig(driver string) *TestConfig {
102102
}
103103
}
104104

105-
func syncCommand(config TestConfig, useState bool) string {
105+
func syncCommand(config TestConfig, useState bool, flags ...string) string {
106106
baseCmd := fmt.Sprintf("/test-olake/build.sh driver-%s sync --config %s --catalog %s --destination %s", config.Driver, config.SourcePath, config.CatalogPath, config.DestinationPath)
107107
if useState {
108108
baseCmd = fmt.Sprintf("%s --state %s", baseCmd, config.StatePath)
109109
}
110+
111+
if len(flags) > 0 {
112+
baseCmd = fmt.Sprintf("%s %s", baseCmd, strings.Join(flags, " "))
113+
}
110114
return baseCmd
111115
}
112116

113-
func discoverCommand(config TestConfig) string {
114-
return fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath)
117+
// pass flags as `--flag1, flag1 value, --flag2, flag2 value...`
118+
func discoverCommand(config TestConfig, flags ...string) string {
119+
baseCmd := fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath)
120+
if len(flags) > 0 {
121+
baseCmd = fmt.Sprintf("%s %s", baseCmd, strings.Join(flags, " "))
122+
}
123+
return baseCmd
115124
}
116125

117126
// TODO: check if we can remove namespace from being passed as a parameter and use a common namespace for all drivers
@@ -307,8 +316,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) {
307316
},
308317
}
309318

319+
destDBPrefix := fmt.Sprintf("integration_%s", cfg.TestConfig.Driver)
310320
runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error {
311-
cmd := syncCommand(*cfg.TestConfig, useState)
321+
cmd := syncCommand(*cfg.TestConfig, useState, "--destination-database-prefix", destDBPrefix)
312322
if useState && operation != "" {
313323
cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false)
314324
}
@@ -460,6 +470,8 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
460470
code, output, err := utils.ExecCommand(timedCtx, c, cmd)
461471
// check if sync was canceled due to timeout (expected)
462472
if timedCtx.Err() == context.DeadlineExceeded {
473+
killCmd := "pkill -9 -f 'olake.*sync' || true"
474+
_, _, _ = utils.ExecCommand(ctx, c, killCmd)
463475
return output, nil
464476
}
465477
if err != nil || code != 0 {
@@ -491,11 +503,12 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
491503
if code, output, err := utils.ExecCommand(ctx, c, installCmd); err != nil || code != 0 {
492504
return fmt.Errorf("failed to install dependencies:\n%s", string(output))
493505
}
494-
495506
t.Logf("(backfill) running performance test for %s", cfg.TestConfig.Driver)
496507

508+
destDBPrefix := fmt.Sprintf("performance_%s", cfg.TestConfig.Driver)
509+
497510
t.Log("(backfill) discover started")
498-
discoverCmd := discoverCommand(*cfg.TestConfig)
511+
discoverCmd := discoverCommand(*cfg.TestConfig, "--destination-database-prefix", destDBPrefix)
499512
if code, output, err := utils.ExecCommand(ctx, c, discoverCmd); err != nil || code != 0 {
500513
return fmt.Errorf("failed to perform discover:\n%s", string(output))
501514
}
@@ -508,7 +521,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
508521

509522
t.Log("(backfill) sync started")
510523
usePreChunkedState := cfg.TestConfig.Driver == string(constants.MySQL)
511-
syncCmd := syncCommand(*cfg.TestConfig, usePreChunkedState)
524+
syncCmd := syncCommand(*cfg.TestConfig, usePreChunkedState, "--destination-database-prefix", destDBPrefix)
512525
if output, err := syncWithTimeout(ctx, c, syncCmd); err != nil {
513526
return fmt.Errorf("failed to perform sync:\n%s", string(output))
514527
}
@@ -529,7 +542,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
529542
t.Log("(cdc) setup cdc completed")
530543

531544
t.Log("(cdc) discover started")
532-
discoverCmd := discoverCommand(*cfg.TestConfig)
545+
discoverCmd := discoverCommand(*cfg.TestConfig, "--destination-database-prefix", destDBPrefix)
533546
if code, output, err := utils.ExecCommand(ctx, c, discoverCmd); err != nil || code != 0 {
534547
return fmt.Errorf("failed to perform discover:\n%s", string(output))
535548
}
@@ -541,7 +554,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
541554
}
542555

543556
t.Log("(cdc) state creation started")
544-
syncCmd := syncCommand(*cfg.TestConfig, false)
557+
syncCmd := syncCommand(*cfg.TestConfig, false, "--destination-database-prefix", destDBPrefix)
545558
if code, output, err := utils.ExecCommand(ctx, c, syncCmd); err != nil || code != 0 {
546559
return fmt.Errorf("failed to perform initial sync:\n%s", string(output))
547560
}
@@ -552,7 +565,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) {
552565
t.Log("(cdc) trigger cdc completed")
553566

554567
t.Log("(cdc) sync started")
555-
syncCmd = syncCommand(*cfg.TestConfig, true)
568+
syncCmd = syncCommand(*cfg.TestConfig, true, "--destination-database-prefix", destDBPrefix)
556569
if output, err := syncWithTimeout(ctx, c, syncCmd); err != nil {
557570
return fmt.Errorf("failed to perform CDC sync:\n%s", string(output))
558571
}

0 commit comments

Comments
 (0)