Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
94ba786
fix: changes
ImDoubD-datazip Sep 5, 2025
5b998f4
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Sep 8, 2025
c3f10c6
fix: staging pulled
ImDoubD-datazip Sep 10, 2025
d37702a
fix: changes
ImDoubD-datazip Sep 10, 2025
391ad62
fix: nothing
ImDoubD-datazip Sep 10, 2025
5822e25
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Sep 11, 2025
88d09dd
fix: comments
ImDoubD-datazip Sep 11, 2025
3048a93
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Sep 16, 2025
6e434e4
Merge branch 'staging' into chore/clear-dest-ice
hash-data Sep 25, 2025
58c07b3
fix: changes
ImDoubD-datazip Sep 25, 2025
a090f46
fix: staging pull
ImDoubD-datazip Oct 6, 2025
4653be1
fix: changes
ImDoubD-datazip Oct 7, 2025
b4e860c
fix: changes pulled
ImDoubD-datazip Oct 7, 2025
aa3c006
fix: changes
ImDoubD-datazip Oct 7, 2025
4ddf2d4
fix: changes
ImDoubD-datazip Oct 10, 2025
d76d5aa
fix: pull changes
ImDoubD-datazip Oct 10, 2025
678e8b9
fix: changes
ImDoubD-datazip Oct 10, 2025
552e78b
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 12, 2025
e42145c
fix: pull changes
ImDoubD-datazip Oct 13, 2025
49a9a83
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 14, 2025
eece7d8
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 14, 2025
c0d77ba
fix: changes
ImDoubD-datazip Oct 15, 2025
f991826
fix: review changes
ImDoubD-datazip Oct 17, 2025
15b89dc
Merge branch 'staging' into chore/clear-dest-ice
hash-data Oct 17, 2025
c9db930
fix: review changes
ImDoubD-datazip Oct 17, 2025
756b8b5
fix: review changes
ImDoubD-datazip Oct 17, 2025
9090c06
fix: review changes
ImDoubD-datazip Oct 17, 2025
d1bd844
fix: lint changes
ImDoubD-datazip Oct 17, 2025
c2132d4
fix: purge removed, only data to be dropped
ImDoubD-datazip Oct 18, 2025
1701166
fix: changes
ImDoubD-datazip Oct 18, 2025
29dc095
fix: review changes
ImDoubD-datazip Oct 20, 2025
2600385
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 20, 2025
85ad4ea
Merge branch 'staging' into chore/clear-dest-ice
ImDoubD-datazip Oct 20, 2025
e733ab7
fix: changes
ImDoubD-datazip Oct 22, 2025
14c6721
fix: s3 client check added in drop streams
ImDoubD-datazip Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions destination/iceberg/iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,6 @@ func (i *Iceberg) Check(ctx context.Context) error {

// to close client properly
i.server = server
defer func() {
i.Close(ctx)
}()

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand Down Expand Up @@ -570,17 +567,46 @@ func (i *Iceberg) parsePartitionRegex(pattern string) error {
return nil
}

func (i *Iceberg) DropStreams(_ context.Context, _ []string) error {
logger.Info("iceberg destination not support clear destination, skipping clear operation")
// drop streams required for clear destination
func (i *Iceberg) DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error {
if len(dropStreams) == 0 {
logger.Info("No streams selected for clearing Iceberg destination, skipping operation")
return nil
}

// to close client properly
defer func() {
i.Close(ctx)
}()

logger.Infof("Starting Clear Iceberg destination for %d selected streams", len(dropStreams))

// process each stream
for _, stream := range dropStreams {
destDB := stream.GetDestinationDatabase(&i.config.IcebergDatabase)
destTable := stream.GetDestinationTable()
dropTable := fmt.Sprintf("%s.%s", destDB, destTable)

// logger.Infof("Clearing Iceberg destination for %d selected streams: %v", len(selectedStreams), selectedStreams)
logger.Infof("Dropping Iceberg table: %s", dropTable)

// TODO: Implement Iceberg table clearing logic
// 1. Connect to the Iceberg catalog
// 2. Use Iceberg's delete API or drop/recreate the table
// 3. Handle any Iceberg-specific cleanup
request := proto.IcebergPayload{
Type: proto.IcebergPayload_DROP_TABLE,
Metadata: &proto.IcebergPayload_Metadata{
DestTableName: dropTable,
ThreadId: i.server.serverID,
},
}
_, err := i.server.sendClientRequest(ctx, &request)
if err != nil {
if strings.Contains(err.Error(), "not found") {
logger.Infof("Table %s does not exist, skipping drop", dropTable)
continue
}
return fmt.Errorf("failed to drop table %s: %s", dropTable, err)
}
}

// logger.Info("Successfully cleared Iceberg destination for selected streams")
logger.Info("Successfully cleared Iceberg destination for selected streams")
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,20 @@ public static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

public static boolean dropIcebergTable(String namespace, String tableName, Catalog icebergCatalog) {
try{
TableIdentifier tableID = TableIdentifier.of(namespace, tableName);
// Check if table exists
if (!icebergCatalog.tableExists(tableID)) {
LOGGER.warn("Table not found: {}", tableID.toString());
return false;
}
icebergCatalog.dropTable(tableID, true);
return true;
} catch(Exception e){
LOGGER.error("Failed to drop table {}.{}: {}", namespace, tableName, e.getMessage());
throw new RuntimeException("Failed to drop table: " + namespace + "." + tableName, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,24 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
break;

case DROP_TABLE:
LOGGER.warn("{} Table {} not dropped, drop table not implemented", requestId, destTableName);
sendResponse(responseObserver, "Drop table not implemented");
String dropTable = metadata.getDestTableName();
String[] parts = dropTable.split("\\.", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid destination table name: " + dropTable);
}
String namespace = parts[0], tableName = parts[1];

LOGGER.warn("{} Dropping table {}.{}", requestId, namespace, tableName);

boolean dropped = IcebergUtil.dropIcebergTable(namespace, tableName, icebergCatalog);
if (dropped) {
sendResponse(responseObserver, "Successfully dropped table " + tableName);
LOGGER.info("{} Table {} dropped", requestId, tableName);
} else {
sendResponse(responseObserver, "Table " + tableName + " does not exist");
LOGGER.warn("{} Table {} not dropped, table does not exist", requestId, tableName);
}
break;

default:
throw new IllegalArgumentException("Unknown payload type: " + request.getType());
}
Expand Down
2 changes: 1 addition & 1 deletion destination/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ type Writer interface {
// Need to pass olakeTimestamp as end argument to get the correct partition path based on record ingestion time.
EvolveSchema(ctx context.Context, globalSchema, recordsSchema any) (any, error)
// DropStreams is used to clear the destination before re-writing the stream
DropStreams(ctx context.Context, selectedStream []string) error
DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error
Close(ctx context.Context) error
}
31 changes: 11 additions & 20 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (p *Parquet) getPartitionedFilePath(values map[string]any, olakeTimestamp t
return filepath.Join(p.basePath, strings.TrimSuffix(result, "/"))
}

func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []string) error {
func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []types.StreamInterface) error {
if len(selectedStreams) == 0 {
logger.Infof("Thread[%s]: no streams selected for clearing, skipping clear operation", p.options.ThreadID)
return nil
Expand All @@ -443,16 +443,11 @@ func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []string) err
return nil
}

func (p *Parquet) clearLocalFiles(selectedStreams []string) error {
for _, streamID := range selectedStreams {
parts := strings.SplitN(streamID, ".", 2)
if len(parts) != 2 {
logger.Warnf("Thread[%s]: invalid stream ID format: %s, skipping", p.options.ThreadID, streamID)
continue
}

namespace, streamName := parts[0], parts[1]
streamPath := filepath.Join(p.config.Path, namespace, streamName)
func (p *Parquet) clearLocalFiles(selectedStreams []types.StreamInterface) error {
for _, stream := range selectedStreams {
namespace := stream.GetDestinationDatabase(nil)
tableName := stream.GetDestinationTable()
streamPath := filepath.Join(p.config.Path, namespace, tableName)

logger.Infof("Thread[%s]: clearing local path: %s", p.options.ThreadID, streamPath)

Expand All @@ -471,7 +466,7 @@ func (p *Parquet) clearLocalFiles(selectedStreams []string) error {
return nil
}

func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []string) error {
func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []types.StreamInterface) error {
deleteS3PrefixStandard := func(filtPath string) error {
iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{
Bucket: aws.String(p.config.Bucket),
Expand All @@ -484,15 +479,11 @@ func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []string) er
return nil
}

for _, streamID := range selectedStreams {
parts := strings.SplitN(streamID, ".", 2)
if len(parts) != 2 {
logger.Warnf("Thread[%s]: invalid stream ID format: %s, skipping", p.options.ThreadID, streamID)
continue
}
for _, stream := range selectedStreams {
namespace := stream.GetDestinationDatabase(nil)
tableName := stream.GetDestinationTable()
s3TablePath := filepath.Join(p.config.Prefix, namespace, tableName, "/")

namespace, streamName := parts[0], parts[1]
s3TablePath := filepath.Join(p.config.Prefix, namespace, streamName, "/")
logger.Debugf("Thread[%s]: clearing S3 prefix: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3TablePath)
if err := deleteS3PrefixStandard(s3TablePath); err != nil {
return fmt.Errorf("failed to clear S3 prefix %s: %s", s3TablePath, err)
Expand Down
2 changes: 1 addition & 1 deletion destination/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func WithThreadID(threadID string) ThreadOptions {
}
}

func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams, dropStreams []string) (*WriterPool, error) {
func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, dropStreams []types.StreamInterface) (*WriterPool, error) {
newfunc, found := RegisteredWriters[config.Type]
if !found {
return nil, fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
Expand Down
11 changes: 7 additions & 4 deletions protocol/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var syncCmd = &cobra.Command{
incrementalStreams := []types.StreamInterface{}
standardModeStreams := []types.StreamInterface{}
newStreamsState := []*types.StreamState{}
fullLoadStreams := []string{}
dropStreams := []types.StreamInterface{}

var stateStreamMap = make(map[string]*types.StreamState)
for _, stream := range state.Streams {
Expand Down Expand Up @@ -131,7 +131,6 @@ var syncCmd = &cobra.Command{
newStreamsState = append(newStreamsState, streamState)
}
default:
fullLoadStreams = append(fullLoadStreams, elem.ID())
standardModeStreams = append(standardModeStreams, elem)
}

Expand All @@ -144,8 +143,12 @@ var syncCmd = &cobra.Command{

logger.Infof("Valid selected streams are %s", strings.Join(selectedStreams, ", "))

fullLoadStreams = utils.Ternary(clearDestinationFlag, selectedStreams, fullLoadStreams).([]string)
pool, err := destination.NewWriterPool(cmd.Context(), destinationConfig, selectedStreams, fullLoadStreams)
dropStreams = append(dropStreams, standardModeStreams...)
if clearDestinationFlag {
dropStreams = append(dropStreams, cdcStreams...)
dropStreams = append(dropStreams, incrementalStreams...)
}
pool, err := destination.NewWriterPool(cmd.Context(), destinationConfig, selectedStreams, dropStreams)
if err != nil {
return err
}
Expand Down
Loading