Skip to content

Commit bd03ee1

Browse files
authored
Merge branch 'master' into fix/mongodb_type
2 parents 1fbc48b + 45ef2d1 commit bd03ee1

File tree

14 files changed

+303
-216
lines changed

14 files changed

+303
-216
lines changed

.github/workflows/performance-test.yml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,20 @@ jobs:
9595
- name: Cleanup
9696
if: always()
9797
run: |
98-
aws glue delete-database --name performance_${{ matrix.driver }} || { echo "failed to delete glue database: performance_${{ matrix.driver }}"; aws glue get-database --name performance_${{ matrix.driver }} || true; }
99-
aws s3 rm s3://dz-stag-github-actions/performance_${{ matrix.driver }} --recursive || { echo "failed to delete s3 bucket: performance_${{ matrix.driver }}"; aws s3 ls s3://dz-stag-github-actions/performance_${{ matrix.driver }} || true; }
98+
# Delete all Glue databases matching prefix
99+
for db in $(aws glue get-databases \
100+
--query "DatabaseList[?starts_with(Name, 'performance_${{ matrix.driver }}')].Name" \
101+
--output text); do
102+
echo "Deleting Glue database: $db"
103+
aws glue delete-database --name "$db" \
104+
|| echo "Failed to delete Glue database: $db"
105+
done
106+
107+
# Delete corresponding S3 path
108+
aws s3 rm s3://dz-stag-github-actions/performance_${{ matrix.driver }}/ --recursive \
109+
|| { echo "Failed to delete S3 bucket: performance_${{ matrix.driver }}"; \
110+
aws s3 ls s3://dz-stag-github-actions/performance_${{ matrix.driver }} || true; }
111+
100112
echo "Catalog cleanup completed"
101113
102114
if [[ "${{ matrix.driver }}" != "oracle" ]]; then

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ RUN go build -o /olake main.go
1212
# Final Runtime Stage
1313
FROM alpine:3.18
1414

15-
# Install Java 17 instead of Java 11
16-
RUN apk add --no-cache openjdk17
15+
# Install Java 17 and iproute2 for ss command
16+
RUN apk add --no-cache openjdk17 iproute2
1717

1818
# Copy the binary from the build stage
1919
COPY --from=base /olake /home/olake

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272

7373
| Source | Full Load | CDC | Incremental | Notes | Documentation |
7474
|---------------|--------------|---------------|-------------------|-----------------------------|-----------------------------|
75-
| PostgreSQL ||`wal2json` ||`pgoutput` support WIP |[Postgres Docs](https://olake.io/docs/connectors/postgres/overview) |
75+
| PostgreSQL ||`pgoutput` ||`wal2json` deprecated |[Postgres Docs](https://olake.io/docs/connectors/postgres/overview) |
7676
| MySQL |||| Binlog-based CDC | [MySQL Docs](https://olake.io/docs/connectors/mysql/overview) |
7777
| MongoDB |||| Oplog-based CDC |[MongoDB Docs](https://olake.io/docs/connectors/mongodb/overview) |
7878
| Oracle || WIP || JDBC based Full Load & Incremental | [Oracle Docs](https://olake.io/docs/connectors/oracle/overview) |

destination/iceberg/iceberg.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/datazip-inc/olake/utils"
1818
"github.com/datazip-inc/olake/utils/logger"
1919
"github.com/datazip-inc/olake/utils/typeutils"
20+
"github.com/spf13/viper"
2021
)
2122

2223
type Iceberg struct {
@@ -261,8 +262,13 @@ func (i *Iceberg) Check(ctx context.Context) error {
261262
i.options = &destination.Options{
262263
ThreadID: "test_iceberg_destination",
263264
}
265+
266+
destinationDB := "test_olake"
267+
if prefix := viper.GetString(constants.DestinationDatabasePrefix); prefix != "" {
268+
destinationDB = fmt.Sprintf("%s_%s", utils.Reformat(prefix), destinationDB)
269+
}
264270
// Create a temporary setup for checking
265-
server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, true, false, "test_olake")
271+
server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, true, false, destinationDB)
266272
if err != nil {
267273
return fmt.Errorf("failed to setup iceberg server: %s", err)
268274
}
@@ -281,7 +287,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
281287
Type: proto.IcebergPayload_GET_OR_CREATE_TABLE,
282288
Metadata: &proto.IcebergPayload_Metadata{
283289
ThreadId: server.serverID,
284-
DestTableName: "test_olake",
290+
DestTableName: destinationDB,
285291
Schema: icebergRawSchema(),
286292
},
287293
}
@@ -296,7 +302,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
296302
// try writing record in dest table
297303
currentTime := time.Now().UTC()
298304
protoSchema := icebergRawSchema()
299-
record := types.CreateRawRecord("olake_test", map[string]any{"name": "olake"}, "r", &currentTime)
305+
record := types.CreateRawRecord(destinationDB, map[string]any{"name": "olake"}, "r", &currentTime)
300306
protoColumns, err := rawDataColumnBuffer(record, protoSchema)
301307
if err != nil {
302308
return fmt.Errorf("failed to create raw data column buffer: %s", err)
@@ -305,7 +311,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
305311
Type: proto.IcebergPayload_RECORDS,
306312
Metadata: &proto.IcebergPayload_Metadata{
307313
ThreadId: server.serverID,
308-
DestTableName: "test_olake",
314+
DestTableName: destinationDB,
309315
Schema: protoSchema,
310316
},
311317
Records: []*proto.IcebergPayload_IceRecord{{

0 commit comments

Comments
 (0)