Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
94ba786
fix: changes
ImDoubD-datazip Sep 5, 2025
5b998f4
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Sep 8, 2025
c3f10c6
fix: staging pulled
ImDoubD-datazip Sep 10, 2025
d37702a
fix: changes
ImDoubD-datazip Sep 10, 2025
391ad62
fix: nothing
ImDoubD-datazip Sep 10, 2025
5822e25
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Sep 11, 2025
88d09dd
fix: comments
ImDoubD-datazip Sep 11, 2025
3048a93
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Sep 16, 2025
6e434e4
Merge branch 'staging' into chore/clear-dest-ice
hash-data Sep 25, 2025
58c07b3
fix: changes
ImDoubD-datazip Sep 25, 2025
a090f46
fix: staging pull
ImDoubD-datazip Oct 6, 2025
4653be1
fix: changes
ImDoubD-datazip Oct 7, 2025
b4e860c
fix: changes pulled
ImDoubD-datazip Oct 7, 2025
aa3c006
fix: changes
ImDoubD-datazip Oct 7, 2025
4ddf2d4
fix: changes
ImDoubD-datazip Oct 10, 2025
d76d5aa
fix: pull changes
ImDoubD-datazip Oct 10, 2025
678e8b9
fix: changes
ImDoubD-datazip Oct 10, 2025
552e78b
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 12, 2025
e42145c
fix: pull changes
ImDoubD-datazip Oct 13, 2025
49a9a83
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 14, 2025
eece7d8
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 14, 2025
c0d77ba
fix: changes
ImDoubD-datazip Oct 15, 2025
f991826
fix: review changes
ImDoubD-datazip Oct 17, 2025
15b89dc
Merge branch 'staging' into chore/clear-dest-ice
hash-data Oct 17, 2025
c9db930
fix: review changes
ImDoubD-datazip Oct 17, 2025
756b8b5
fix: review changes
ImDoubD-datazip Oct 17, 2025
9090c06
fix: review changes
ImDoubD-datazip Oct 17, 2025
d1bd844
fix: lint changes
ImDoubD-datazip Oct 17, 2025
c2132d4
fix: purge removed, only data to be dropped
ImDoubD-datazip Oct 18, 2025
1701166
fix: changes
ImDoubD-datazip Oct 18, 2025
29dc095
fix: review changes
ImDoubD-datazip Oct 20, 2025
2600385
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 20, 2025
85ad4ea
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 20, 2025
e733ab7
fix: changes
ImDoubD-datazip Oct 22, 2025
14c6721
fix: s3 client check added in drop streams
ImDoubD-datazip Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
ConfigFolder = "CONFIG_FOLDER"
StatePath = "STATE_PATH"
StreamsPath = "STREAMS_PATH"
DifferencePath = "DIFFERENCE_STREAMS_PATH"
// DestinationDatabasePrefix is used as prefix for destination database name
DestinationDatabasePrefix = "DESTINATION_DATABASE_PREFIX"
// EffectiveParquetSize is the effective size in bytes considering 256mb targeted parquet size, compression ratio as 8
Expand Down
51 changes: 43 additions & 8 deletions destination/iceberg/iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,17 +613,52 @@ func (i *Iceberg) parsePartitionRegex(pattern string) error {
return nil
}

func (i *Iceberg) DropStreams(_ context.Context, _ []string) error {
logger.Info("iceberg destination not support clear destination, skipping clear operation")
// drop streams required for clear destination
func (i *Iceberg) DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error {
i.options = &destination.Options{
ThreadID: "iceberg_destination_drop",
}
if len(dropStreams) == 0 {
logger.Info("No streams selected for clearing Iceberg destination, skipping operation")
return nil
}

// logger.Infof("Clearing Iceberg destination for %d selected streams: %v", len(selectedStreams), selectedStreams)
// server setup for dropping tables
server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, false, false, "")
if err != nil {
return fmt.Errorf("failed to setup iceberg server for dropping streams: %s", err)
}

// TODO: Implement Iceberg table clearing logic
// 1. Connect to the Iceberg catalog
// 2. Use Iceberg's delete API or drop/recreate the table
// 3. Handle any Iceberg-specific cleanup
// to close client properly
i.server = server
defer func() {
i.Close(ctx)
}()

logger.Infof("Starting Clear Iceberg destination for %d selected streams", len(dropStreams))

// process each stream
for _, stream := range dropStreams {
destDB := stream.GetDestinationDatabase(&i.config.IcebergDatabase)
destTable := stream.GetDestinationTable()
dropTable := fmt.Sprintf("%s.%s", destDB, destTable)

logger.Infof("Dropping Iceberg table: %s", dropTable)

request := proto.IcebergPayload{
Type: proto.IcebergPayload_DROP_TABLE,
Metadata: &proto.IcebergPayload_Metadata{
DestTableName: dropTable,
ThreadId: i.server.serverID,
},
}
_, err := i.server.sendClientRequest(ctx, &request)
if err != nil {
return fmt.Errorf("failed to drop table %s: %s", dropTable, err)
}
}

// logger.Info("Successfully cleared Iceberg destination for selected streams")
logger.Info("Successfully cleared Iceberg destination for selected streams")
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,19 @@ public static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

public static boolean dropIcebergTable(String namespace, String tableName, Catalog icebergCatalog) {
try{
TableIdentifier tableID = TableIdentifier.of(namespace, tableName);
// Check if table exists
if (!icebergCatalog.tableExists(tableID)) {
LOGGER.warn("Table not found: {}", tableID.toString());
return false;
}
return icebergCatalog.dropTable(tableID, true);
} catch(Exception e){
LOGGER.error("Failed to drop table {}.{}: {}", namespace, tableName, e.getMessage());
throw new RuntimeException("Failed to drop table: " + namespace + "." + tableName, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
throw new Exception("Destination table name not present in metadata");
}

if (this.icebergTable == null) {
if (this.icebergTable == null && request.getType() != IcebergPayload.PayloadType.DROP_TABLE) {
SchemaConvertor schemaConvertor = new SchemaConvertor(identifierField, schemaMetadata);
this.icebergTable = loadIcebergTable(TableIdentifier.of(icebergNamespace, destTableName),
schemaConvertor.convertToIcebergSchema());
Expand Down Expand Up @@ -111,10 +111,24 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
break;

case DROP_TABLE:
LOGGER.warn("{} Table {} not dropped, drop table not implemented", requestId, destTableName);
sendResponse(responseObserver, "Drop table not implemented");
String dropTable = metadata.getDestTableName();
String[] parts = dropTable.split("\\.", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid destination table name: " + dropTable);
}
String namespace = parts[0], tableName = parts[1];

LOGGER.warn("{} Dropping table {}.{}", requestId, namespace, tableName);

boolean dropped = IcebergUtil.dropIcebergTable(namespace, tableName, icebergCatalog);
if (dropped) {
sendResponse(responseObserver, "Successfully dropped table " + tableName);
LOGGER.info("{} Table {} dropped", requestId, tableName);
} else {
sendResponse(responseObserver, "Table " + tableName + " does not exist");
LOGGER.warn("{} Table {} not dropped, table does not exist", requestId, tableName);
}
break;

default:
throw new IllegalArgumentException("Unknown payload type: " + request.getType());
}
Expand Down
2 changes: 1 addition & 1 deletion destination/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ type Writer interface {
// Need to pass olakeTimestamp as end argument to get the correct partition path based on record ingestion time.
EvolveSchema(ctx context.Context, globalSchema, recordsSchema any) (any, error)
// DropStreams is used to clear the destination before re-writing the stream
DropStreams(ctx context.Context, selectedStream []string) error
DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error
Close(ctx context.Context) error
}
28 changes: 16 additions & 12 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,20 +456,25 @@ func (p *Parquet) getPartitionedFilePath(values map[string]any, olakeTimestamp t
return filepath.Join(p.basePath, strings.TrimSuffix(result, "/"))
}

func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []string) error {
func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []types.StreamInterface) error {
if len(selectedStreams) == 0 {
logger.Infof("Thread[%s]: no streams selected for clearing, skipping clear operation", p.options.ThreadID)
return nil
}

logger.Infof("Thread[%s]: clearing destination for %d selected streams: %v", p.options.ThreadID, len(selectedStreams), selectedStreams)

paths := make([]string, 0, len(selectedStreams))
for _, stream := range selectedStreams {
paths = append(paths, stream.GetDestinationDatabase(nil)+"."+stream.GetDestinationTable())
}

if p.s3Client == nil {
if err := p.clearLocalFiles(selectedStreams); err != nil {
if err := p.clearLocalFiles(paths); err != nil {
return fmt.Errorf("failed to clear local files: %s", err)
}
} else {
if err := p.clearS3Files(ctx, selectedStreams); err != nil {
if err := p.clearS3Files(ctx, paths); err != nil {
return fmt.Errorf("failed to clear S3 files: %s", err)
}
}
Expand All @@ -478,16 +483,15 @@ func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []string) err
return nil
}

func (p *Parquet) clearLocalFiles(selectedStreams []string) error {
for _, streamID := range selectedStreams {
func (p *Parquet) clearLocalFiles(paths []string) error {
for _, streamID := range paths {
parts := strings.SplitN(streamID, ".", 2)
if len(parts) != 2 {
logger.Warnf("Thread[%s]: invalid stream ID format: %s, skipping", p.options.ThreadID, streamID)
continue
}

namespace, streamName := parts[0], parts[1]
streamPath := filepath.Join(p.config.Path, namespace, streamName)
namespace, tableName := parts[0], parts[1]
streamPath := filepath.Join(p.config.Path, namespace, tableName)

logger.Infof("Thread[%s]: clearing local path: %s", p.options.ThreadID, streamPath)

Expand All @@ -506,7 +510,7 @@ func (p *Parquet) clearLocalFiles(selectedStreams []string) error {
return nil
}

func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []string) error {
func (p *Parquet) clearS3Files(ctx context.Context, paths []string) error {
deleteS3PrefixStandard := func(filtPath string) error {
iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{
Bucket: aws.String(p.config.Bucket),
Expand All @@ -519,15 +523,15 @@ func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []string) er
return nil
}

for _, streamID := range selectedStreams {
for _, streamID := range paths {
parts := strings.SplitN(streamID, ".", 2)
if len(parts) != 2 {
logger.Warnf("Thread[%s]: invalid stream ID format: %s, skipping", p.options.ThreadID, streamID)
continue
}
namespace, tableName := parts[0], parts[1]
s3TablePath := filepath.Join(p.config.Prefix, namespace, tableName, "/")

namespace, streamName := parts[0], parts[1]
s3TablePath := filepath.Join(p.config.Prefix, namespace, streamName, "/")
logger.Debugf("Thread[%s]: clearing S3 prefix: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3TablePath)
if err := deleteS3PrefixStandard(s3TablePath); err != nil {
return fmt.Errorf("failed to clear S3 prefix %s: %s", s3TablePath, err)
Expand Down
27 changes: 20 additions & 7 deletions destination/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func WithThreadID(threadID string) ThreadOptions {
}
}

func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams, dropStreams []string, batchSize int64) (*WriterPool, error) {
func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, batchSize int64) (*WriterPool, error) {
newfunc, found := RegisteredWriters[config.Type]
if !found {
return nil, fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
Expand All @@ -99,12 +99,6 @@ func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams,
return nil, fmt.Errorf("failed to test destination: %s", err)
}

if dropStreams != nil {
if err := adapter.DropStreams(ctx, dropStreams); err != nil {
return nil, fmt.Errorf("failed to clear destination: %s", err)
}
}

pool := &WriterPool{
stats: &Stats{
TotalRecordsToSync: atomic.Int64{},
Expand Down Expand Up @@ -279,3 +273,22 @@ func (wt *WriterThread) Close(ctx context.Context) error {
return wt.writer.Close(ctx)
}
}

func ClearDestination(ctx context.Context, config *types.WriterConfig, dropStreams []types.StreamInterface) error {
newfunc, found := RegisteredWriters[config.Type]
if !found {
return fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
}

adapter := newfunc()
if err := utils.Unmarshal(config.WriterConfig, adapter.GetConfigRef()); err != nil {
return err
}

if dropStreams != nil {
if err := adapter.DropStreams(ctx, dropStreams); err != nil {
return fmt.Errorf("failed to drop the streams: %s", err)
}
}
return nil
}
29 changes: 28 additions & 1 deletion drivers/abstract/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,34 @@ func (a *AbstractDriver) Setup(ctx context.Context) error {
return a.driver.Setup(ctx)
}

// Read handles different sync modes for data retrieval
func (a *AbstractDriver) ClearState(streams []types.StreamInterface) (*types.State, error) {
if a.state == nil {
return &types.State{}, nil
}

dropStreams := make(map[string]bool)
for _, stream := range streams {
dropStreams[stream.ID()] = true
}

// if global state exists (in case of relational sources)
if a.state.Global != nil && a.state.Global.Streams != nil {
for streamID := range dropStreams {
a.state.Global.Streams.Remove(streamID)
}
}

if len(a.state.Streams) > 0 {
for _, streamState := range a.state.Streams {
if dropStreams[fmt.Sprintf("%s.%s", streamState.Namespace, streamState.Stream)] {
streamState.HoldsValue.Store(false)
streamState.State = sync.Map{}
}
}
}
return a.state, nil
}

func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, backfillStreams, cdcStreams, incrementalStreams []types.StreamInterface) error {
// set max read connections
if a.driver.MaxConnections() > 0 {
Expand Down
2 changes: 1 addition & 1 deletion protocol/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var checkCmd = &cobra.Command{
err := func() error {
// If connector is not set, we are checking the destination
if destinationConfigPath != "not-set" {
_, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, nil, batchSize)
_, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, batchSize)
return err
}

Expand Down
77 changes: 77 additions & 0 deletions protocol/clear.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package protocol

import (
"fmt"

"github.com/datazip-inc/olake/destination"
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/utils"
"github.com/datazip-inc/olake/utils/logger"
"github.com/spf13/cobra"
)

var clearCmd = &cobra.Command{
Use: "clear-destination",
Short: "Olake clear command to clear destination data and state for selected streams",
PersistentPreRunE: func(_ *cobra.Command, _ []string) error {
if destinationConfigPath == "" {
return fmt.Errorf("--destination not passed")
} else if streamsPath == "" {
return fmt.Errorf("--streams not passed")
}

destinationConfig = &types.WriterConfig{}
if err := utils.UnmarshalFile(destinationConfigPath, destinationConfig, true); err != nil {
return err
}

catalog = &types.Catalog{}
if err := utils.UnmarshalFile(streamsPath, catalog, false); err != nil {
return err
}

state = &types.State{
Type: types.StreamType,
}
if statePath != "" {
if err := utils.UnmarshalFile(statePath, state, false); err != nil {
return err
}
}
return nil
},
RunE: func(cmd *cobra.Command, _ []string) error {
selectedStreamsMetadata, err := GetStreamsClassification(catalog, nil, state)
if err != nil {
return fmt.Errorf("failed to get selected streams for clearing: %w", err)
}
dropStreams := []types.StreamInterface{}
dropStreams = append(dropStreams, append(append(selectedStreamsMetadata.IncrementalStreams, selectedStreamsMetadata.FullLoadStreams...), selectedStreamsMetadata.CDCStreams...)...)
if len(dropStreams) == 0 {
logger.Infof("No streams selected for clearing")
return nil
}

connector.SetupState(state)
// clear state for selected streams
newState, err := connector.ClearState(dropStreams)
if err != nil {
return fmt.Errorf("error clearing state: %w", err)
}
logger.Infof("State for selected streams cleared successfully.")
// Setup new state after clear for connector
connector.SetupState(newState)

// drop/clear streams from destination
cerr := destination.ClearDestination(cmd.Context(), destinationConfig, dropStreams)
if cerr != nil {
return fmt.Errorf("failed to clear destination: %w", err)
}
logger.Infof("Successfully cleared destination data for selected streams.")
stateBytes, _ := newState.MarshalJSON()
logger.Infof("New saved state: %s", stateBytes)
// save new state in state file
newState.LogState()
return nil
},
}
Loading
Loading