diff --git a/constants/constants.go b/constants/constants.go index dba843303..0eb338ab8 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -25,6 +25,7 @@ const ( ConfigFolder = "CONFIG_FOLDER" StatePath = "STATE_PATH" StreamsPath = "STREAMS_PATH" + DifferencePath = "DIFFERENCE_STREAMS_PATH" // DestinationDatabasePrefix is used as prefix for destination database name DestinationDatabasePrefix = "DESTINATION_DATABASE_PREFIX" // EffectiveParquetSize is the effective size in bytes considering 256mb targeted parquet size, compression ratio as 8 diff --git a/destination/iceberg/iceberg.go b/destination/iceberg/iceberg.go index 59a121af6..8038bebab 100644 --- a/destination/iceberg/iceberg.go +++ b/destination/iceberg/iceberg.go @@ -613,17 +613,52 @@ func (i *Iceberg) parsePartitionRegex(pattern string) error { return nil } -func (i *Iceberg) DropStreams(_ context.Context, _ []string) error { - logger.Info("iceberg destination not support clear destination, skipping clear operation") +// drop streams required for clear destination +func (i *Iceberg) DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error { + i.options = &destination.Options{ + ThreadID: "iceberg_destination_drop", + } + if len(dropStreams) == 0 { + logger.Info("No streams selected for clearing Iceberg destination, skipping operation") + return nil + } - // logger.Infof("Clearing Iceberg destination for %d selected streams: %v", len(selectedStreams), selectedStreams) + // server setup for dropping tables + server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, false, false, "") + if err != nil { + return fmt.Errorf("failed to setup iceberg server for dropping streams: %s", err) + } - // TODO: Implement Iceberg table clearing logic - // 1. Connect to the Iceberg catalog - // 2. Use Iceberg's delete API or drop/recreate the table - // 3. Handle any Iceberg-specific cleanup + // to close client properly + i.server = server + defer func() { + i.Close(ctx) + }() + + logger.Infof("Starting Clear Iceberg destination for %d selected streams", len(dropStreams)) + + // process each stream + for _, stream := range dropStreams { + destDB := stream.GetDestinationDatabase(&i.config.IcebergDatabase) + destTable := stream.GetDestinationTable() + dropTable := fmt.Sprintf("%s.%s", destDB, destTable) + + logger.Infof("Dropping Iceberg table: %s", dropTable) + + request := proto.IcebergPayload{ + Type: proto.IcebergPayload_DROP_TABLE, + Metadata: &proto.IcebergPayload_Metadata{ + DestTableName: dropTable, + ThreadId: i.server.serverID, + }, + } + _, err := i.server.sendClientRequest(ctx, &request) + if err != nil { + return fmt.Errorf("failed to drop table %s: %s", dropTable, err) + } + } - // logger.Info("Successfully cleared Iceberg destination for selected streams") + logger.Info("Successfully cleared Iceberg destination for selected streams") return nil } diff --git a/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 017d237f6..1509d2f22 100644 --- a/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -276,4 +276,19 @@ public static int partitionId() { return Integer.parseInt(dtFormater.format(Instant.now())); } + public static boolean dropIcebergTable(String namespace, String tableName, Catalog icebergCatalog) { + try{ + TableIdentifier tableID = TableIdentifier.of(namespace, tableName); + // Check if table exists + if (!icebergCatalog.tableExists(tableID)) { + LOGGER.warn("Table not found: {}", tableID.toString()); + return false; + } + return icebergCatalog.dropTable(tableID, false); + } catch(Exception e){ + LOGGER.error("Failed to drop table {}.{}: {}", namespace, tableName, e.getMessage()); + throw new RuntimeException("Failed to drop table: " + namespace + "." + tableName, e); + } + } + } diff --git a/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java b/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java index 844037d0e..cf50411bc 100644 --- a/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java +++ b/destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java @@ -62,7 +62,7 @@ public void sendRecords(IcebergPayload request, StreamObserver 0 { + for _, streamState := range a.state.Streams { + if dropStreams[fmt.Sprintf("%s.%s", streamState.Namespace, streamState.Stream)] { + streamState.HoldsValue.Store(false) + streamState.State = sync.Map{} + } + } + } + return a.state, nil +} + func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, backfillStreams, cdcStreams, incrementalStreams []types.StreamInterface) error { // set max read connections if a.driver.MaxConnections() > 0 { diff --git a/protocol/check.go b/protocol/check.go index 09a3c6e0b..955b419fc 100644 --- a/protocol/check.go +++ b/protocol/check.go @@ -37,7 +37,7 @@ var checkCmd = &cobra.Command{ err := func() error { // If connector is not set, we are checking the destination if destinationConfigPath != "not-set" { - _, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, nil, batchSize) + _, err := destination.NewWriterPool(cmd.Context(), destinationConfig, nil, batchSize) return err } diff --git a/protocol/clear.go b/protocol/clear.go new file mode 100644 index 000000000..9f4113cd3 --- /dev/null +++ b/protocol/clear.go @@ -0,0 +1,77 @@ +package protocol + +import ( + "fmt" + + "github.com/datazip-inc/olake/destination" + "github.com/datazip-inc/olake/types" + "github.com/datazip-inc/olake/utils" + "github.com/datazip-inc/olake/utils/logger" + "github.com/spf13/cobra" +) + +var clearCmd = &cobra.Command{ + Use: "clear-destination", + Short: "Olake clear command to clear destination data and state for selected streams", + PersistentPreRunE: func(_ *cobra.Command, _ []string) error { + if destinationConfigPath == "" { + return fmt.Errorf("--destination not passed") + } else if streamsPath == "" { + return fmt.Errorf("--streams not passed") + } + + destinationConfig = &types.WriterConfig{} + if err := utils.UnmarshalFile(destinationConfigPath, destinationConfig, true); err != nil { + return err + } + + catalog = &types.Catalog{} + if err := utils.UnmarshalFile(streamsPath, catalog, false); err != nil { + return err + } + + state = &types.State{ + Type: types.StreamType, + } + if statePath != "" { + if err := utils.UnmarshalFile(statePath, state, false); err != nil { + return err + } + } + return nil + }, + RunE: func(cmd *cobra.Command, _ []string) error { + selectedStreamsMetadata, err := classifyStreams(catalog, nil, state) + if err != nil { + return fmt.Errorf("failed to get selected streams for clearing: %w", err) + } + dropStreams := []types.StreamInterface{} + dropStreams = append(dropStreams, append(append(selectedStreamsMetadata.IncrementalStreams, selectedStreamsMetadata.FullLoadStreams...), selectedStreamsMetadata.CDCStreams...)...) + if len(dropStreams) == 0 { + logger.Infof("No streams selected for clearing") + return nil + } + + connector.SetupState(state) + // clear state for selected streams + newState, err := connector.ClearState(dropStreams) + if err != nil { + return fmt.Errorf("error clearing state: %w", err) + } + logger.Infof("State for selected streams cleared successfully.") + // Setup new state after clear for connector + connector.SetupState(newState) + + // drop/clear streams from destination + cerr := destination.ClearDestination(cmd.Context(), destinationConfig, dropStreams) + if cerr != nil { + return fmt.Errorf("failed to clear destination: %w", err) + } + logger.Infof("Successfully cleared destination data for selected streams.") + // save new state in state file + newState.LogState() + stateBytes, _ := newState.MarshalJSON() + logger.Infof("New saved state: %s", stateBytes) + return nil + }, +} diff --git a/protocol/discover.go b/protocol/discover.go index aa41b00c8..ed8cb08e4 100644 --- a/protocol/discover.go +++ b/protocol/discover.go @@ -19,6 +19,9 @@ var discoverCmd = &cobra.Command{ Use: "discover", Short: "discover command", PreRunE: func(_ *cobra.Command, _ []string) error { + if streamsPath != "" && differencePath != "" { + return nil + } if configPath == "" { return fmt.Errorf("--config not passed") } @@ -36,6 +39,10 @@ var discoverCmd = &cobra.Command{ return nil }, RunE: func(cmd *cobra.Command, _ []string) error { + if streamsPath != "" && differencePath != "" { + return compareStreams() + } + err := connector.Setup(cmd.Context()) if err != nil { return err @@ -65,3 +72,22 @@ var discoverCmd = &cobra.Command{ return nil }, } + +// compareStreams reads two streams.json files, computes the difference, and writes the result to difference_streams.json +func compareStreams() error { + var oldStreams, newStreams types.Catalog + if serr := utils.UnmarshalFile(streamsPath, &oldStreams, false); serr != nil { + return fmt.Errorf("failed to read old catalog: %s", serr) + } + + if derr := utils.UnmarshalFile(differencePath, &newStreams, false); derr != nil { + return fmt.Errorf("failed to read new catalog: %s", derr) + } + + diffCatalog := types.GetStreamsDelta(&oldStreams, &newStreams, connector.Type()) + if err := logger.FileLoggerWithPath(diffCatalog, viper.GetString(constants.DifferencePath)); err != nil { + return fmt.Errorf("failed to write difference streams: %s", err) + } + logger.Infof("Successfully wrote stream differences") + return nil +} diff --git a/protocol/root.go b/protocol/root.go index 6d3a86da6..af4b8e4bf 100644 --- a/protocol/root.go +++ b/protocol/root.go @@ -24,13 +24,13 @@ var ( syncID string batchSize int64 noSave bool - clearDestinationFlag bool encryptionKey string destinationType string catalog *types.Catalog state *types.State timeout int64 // timeout in seconds destinationConfig *types.WriterConfig + differencePath string commands = []*cobra.Command{} connector *abstract.AbstractDriver @@ -47,13 +47,16 @@ var RootCmd = &cobra.Command{ viper.SetDefault(constants.ConfigFolder, os.TempDir()) viper.SetDefault(constants.StatePath, filepath.Join(os.TempDir(), "state.json")) viper.SetDefault(constants.StreamsPath, filepath.Join(os.TempDir(), "streams.json")) + viper.SetDefault(constants.DifferencePath, filepath.Join(os.TempDir(), "difference_streams.json")) if !noSave { configFolder := utils.Ternary(configPath == "not-set", filepath.Dir(destinationConfigPath), filepath.Dir(configPath)).(string) streamsPathEnv := utils.Ternary(streamsPath == "", filepath.Join(configFolder, "streams.json"), streamsPath).(string) + differencePathEnv := utils.Ternary(streamsPath != "", filepath.Join(filepath.Dir(streamsPath), "difference_streams.json"), filepath.Join(configFolder, "difference_streams.json")).(string) statePathEnv := utils.Ternary(statePath == "", filepath.Join(configFolder, "state.json"), statePath).(string) viper.Set(constants.ConfigFolder, configFolder) viper.Set(constants.StatePath, statePathEnv) viper.Set(constants.StreamsPath, streamsPathEnv) + viper.Set(constants.DifferencePath, differencePathEnv) } if encryptionKey != "" { @@ -85,7 +88,7 @@ func CreateRootCommand(_ bool, driver any) *cobra.Command { func init() { // TODO: replace --catalog flag with --streams - commands = append(commands, specCmd, checkCmd, discoverCmd, syncCmd) + commands = append(commands, specCmd, checkCmd, discoverCmd, syncCmd, clearCmd) RootCmd.PersistentFlags().StringVarP(&configPath, "config", "", "not-set", "(Required) Config for connector") RootCmd.PersistentFlags().StringVarP(&destinationConfigPath, "destination", "", "not-set", "(Required) Destination config for connector") RootCmd.PersistentFlags().StringVarP(&destinationType, "destination-type", "", "not-set", "Destination type for spec") @@ -94,10 +97,10 @@ func init() { RootCmd.PersistentFlags().StringVarP(&statePath, "state", "", "", "(Required) State for connector") RootCmd.PersistentFlags().Int64VarP(&batchSize, "destination-buffer-size", "", 10000, "(Optional) Batch size for destination") RootCmd.PersistentFlags().BoolVarP(&noSave, "no-save", "", false, "(Optional) Flag to skip logging artifacts in file") - RootCmd.PersistentFlags().BoolVarP(&clearDestinationFlag, "clear-destination", "", false, "(Optional) Flag to clear destination and reset sync state for selected streams to force full refresh. Note: Destination is automatically cleared for full refresh streams regardless of this flag.") RootCmd.PersistentFlags().StringVarP(&encryptionKey, "encryption-key", "", "", "(Optional) Decryption key. Provide the ARN of a KMS key, a UUID, or a custom string based on your encryption configuration.") RootCmd.PersistentFlags().StringVarP(&destinationDatabasePrefix, "destination-database-prefix", "", "", "(Optional) Destination database prefix is used as prefix for destination database name") RootCmd.PersistentFlags().Int64VarP(&timeout, "timeout", "", -1, "(Optional) Timeout to override default timeouts (in seconds)") + RootCmd.PersistentFlags().StringVarP(&differencePath, "difference", "", "", "new streams.json file path to be compared. Generates a difference_streams.json file.") // Disable Cobra CLI's built-in usage and error handling RootCmd.SilenceUsage = true RootCmd.SilenceErrors = true diff --git a/protocol/sync.go b/protocol/sync.go index 4ec7976b0..fa1798d3c 100644 --- a/protocol/sync.go +++ b/protocol/sync.go @@ -16,6 +16,15 @@ import ( "github.com/spf13/viper" ) +// various stream formats +type StreamClassification struct { + SelectedStreams []string + CDCStreams []types.StreamInterface + IncrementalStreams []types.StreamInterface + FullLoadStreams []types.StreamInterface + NewStreamsState []*types.StreamState +} + // syncCmd represents the read command var syncCmd = &cobra.Command{ Use: "sync", @@ -56,7 +65,7 @@ var syncCmd = &cobra.Command{ state = &types.State{ Type: types.StreamType, } - if statePath != "" && !clearDestinationFlag { + if statePath != "" { if err := utils.UnmarshalFile(statePath, state, false); err != nil { return err } @@ -79,80 +88,29 @@ var syncCmd = &cobra.Command{ return err } - streamsMap := types.StreamsToMap(streams...) - - // create a map for namespace and streamMetadata - selectedStreamsMap := make(map[string]types.StreamMetadata) - for namespace, streamsMetadata := range catalog.SelectedStreams { - for _, streamMetadata := range streamsMetadata { - selectedStreamsMap[fmt.Sprintf("%s.%s", namespace, streamMetadata.StreamName)] = streamMetadata - } - } - - // Validating Streams and attaching State - selectedStreams := []string{} - cdcStreams := []types.StreamInterface{} - incrementalStreams := []types.StreamInterface{} - standardModeStreams := []types.StreamInterface{} - newStreamsState := []*types.StreamState{} - fullLoadStreams := []string{} - - var stateStreamMap = make(map[string]*types.StreamState) - for _, stream := range state.Streams { - stateStreamMap[fmt.Sprintf("%s.%s", stream.Namespace, stream.Stream)] = stream + // get all types of selected streams + selectedStreamsMetadata, err := classifyStreams(catalog, streams, state) + if err != nil { + return fmt.Errorf("failed to get selected streams for clearing: %s", err) } - _, _ = utils.ArrayContains(catalog.Streams, func(elem *types.ConfiguredStream) bool { - sMetadata, selected := selectedStreamsMap[fmt.Sprintf("%s.%s", elem.Namespace(), elem.Name())] - // Check if the stream is in the selectedStreamMap - if !(catalog.SelectedStreams == nil || selected) { - logger.Debugf("Skipping stream %s.%s; not in selected streams.", elem.Namespace(), elem.Name()) - return false - } - - source, found := streamsMap[elem.ID()] - if !found { - logger.Warnf("Skipping; Configured Stream %s not found in source", elem.ID()) - return false - } - elem.StreamMetadata = sMetadata - - err := elem.Validate(source) - if err != nil { - logger.Warnf("Skipping; Configured Stream %s found invalid due to reason: %s", elem.ID(), err) - return false + // for clearing streams + dropStreams := []types.StreamInterface{} + dropStreams = append(dropStreams, selectedStreamsMetadata.FullLoadStreams...) + if len(dropStreams) > 0 { + logger.Infof("Clearing state for full refresh streams") + // get the state for modification in clearstate + connector.SetupState(state) + if state, err = connector.ClearState(dropStreams); err != nil { + return fmt.Errorf("error clearing state for full refresh streams: %s", err) } - - selectedStreams = append(selectedStreams, elem.ID()) - switch elem.Stream.SyncMode { - case types.CDC, types.STRICTCDC: - cdcStreams = append(cdcStreams, elem) - streamState, exists := stateStreamMap[fmt.Sprintf("%s.%s", elem.Namespace(), elem.Name())] - if exists { - newStreamsState = append(newStreamsState, streamState) - } - case types.INCREMENTAL: - incrementalStreams = append(incrementalStreams, elem) - streamState, exists := stateStreamMap[fmt.Sprintf("%s.%s", elem.Namespace(), elem.Name())] - if exists { - newStreamsState = append(newStreamsState, streamState) - } - default: - fullLoadStreams = append(fullLoadStreams, elem.ID()) - standardModeStreams = append(standardModeStreams, elem) + cerr := destination.ClearDestination(cmd.Context(), destinationConfig, dropStreams) + if cerr != nil { + return fmt.Errorf("failed to clear destination: %w", err) } - - return false - }) - state.Streams = newStreamsState - if len(selectedStreams) == 0 { - return fmt.Errorf("no valid streams found in catalog") } - logger.Infof("Valid selected streams are %s", strings.Join(selectedStreams, ", ")) - - fullLoadStreams = utils.Ternary(clearDestinationFlag, selectedStreams, fullLoadStreams).([]string) - pool, err := destination.NewWriterPool(cmd.Context(), destinationConfig, selectedStreams, fullLoadStreams, batchSize) + pool, err := destination.NewWriterPool(cmd.Context(), destinationConfig, selectedStreamsMetadata.SelectedStreams, batchSize) if err != nil { return err } @@ -166,7 +124,7 @@ var syncCmd = &cobra.Command{ // Setup State for Connector connector.SetupState(state) // Sync Telemetry tracking - telemetry.TrackSyncStarted(syncID, streams, selectedStreams, cdcStreams, connector.Type(), destinationConfig, catalog) + telemetry.TrackSyncStarted(syncID, streams, selectedStreamsMetadata.SelectedStreams, selectedStreamsMetadata.CDCStreams, connector.Type(), destinationConfig, catalog) defer func() { telemetry.TrackSyncCompleted(err == nil, pool.GetStats().ReadCount.Load()) logger.Infof("Sync completed, wait 5 seconds cleanup in progress...") @@ -174,7 +132,7 @@ var syncCmd = &cobra.Command{ }() // init group - err = connector.Read(cmd.Context(), pool, standardModeStreams, cdcStreams, incrementalStreams) + err = connector.Read(cmd.Context(), pool, selectedStreamsMetadata.FullLoadStreams, selectedStreamsMetadata.CDCStreams, selectedStreamsMetadata.IncrementalStreams) if err != nil { return fmt.Errorf("error occurred while reading records: %s", err) } @@ -183,3 +141,81 @@ var syncCmd = &cobra.Command{ return nil }, } + +func classifyStreams(catalog *types.Catalog, streams []*types.Stream, state *types.State) (*StreamClassification, error) { + // stream-specific classifications + classifications := &StreamClassification{ + SelectedStreams: []string{}, + CDCStreams: []types.StreamInterface{}, + IncrementalStreams: []types.StreamInterface{}, + FullLoadStreams: []types.StreamInterface{}, + NewStreamsState: []*types.StreamState{}, + } + // create a map for namespace and streamMetadata + selectedStreamsMap := make(map[string]types.StreamMetadata) + for namespace, streamsMetadata := range catalog.SelectedStreams { + for _, streamMetadata := range streamsMetadata { + selectedStreamsMap[fmt.Sprintf("%s.%s", namespace, streamMetadata.StreamName)] = streamMetadata + } + } + + // Create a map for quick state lookup by stream ID + stateStreamMap := make(map[string]*types.StreamState) + for _, stream := range state.Streams { + stateStreamMap[fmt.Sprintf("%s.%s", stream.Namespace, stream.Stream)] = stream + } + + _, _ = utils.ArrayContains(catalog.Streams, func(elem *types.ConfiguredStream) bool { + sMetadata, selected := selectedStreamsMap[elem.ID()] + // Check if the stream is in the selectedStreamMap + if !(catalog.SelectedStreams == nil || selected) { + logger.Debugf("Skipping stream %s.%s; not in selected streams.", elem.Namespace(), elem.Name()) + return false + } + + if streams != nil { + source, found := types.StreamsToMap(streams...)[elem.ID()] + if !found { + logger.Warnf("Skipping; Configured Stream %s not found in source", elem.ID()) + return false + } + elem.StreamMetadata = sMetadata + err := elem.Validate(source) + if err != nil { + logger.Warnf("Skipping; Configured Stream %s found invalid due to reason: %s", elem.ID(), err) + return false + } + } + + classifications.SelectedStreams = append(classifications.SelectedStreams, elem.ID()) + switch elem.Stream.SyncMode { + case types.CDC, types.STRICTCDC: + classifications.CDCStreams = append(classifications.CDCStreams, elem) + streamState, exists := stateStreamMap[elem.ID()] + if exists { + classifications.NewStreamsState = append(classifications.NewStreamsState, streamState) + } + case types.INCREMENTAL: + classifications.IncrementalStreams = append(classifications.IncrementalStreams, elem) + streamState, exists := stateStreamMap[elem.ID()] + if exists { + classifications.NewStreamsState = append(classifications.NewStreamsState, streamState) + } + default: + classifications.FullLoadStreams = append(classifications.FullLoadStreams, elem) + } + + return false + }) + // Clear previous state streams for non-selected streams. + // Must not be called during clear destination to retain the global and stream state. (clear dest. -> when streams == nil) + if streams != nil { + state.Streams = classifications.NewStreamsState + } + if len(classifications.SelectedStreams) == 0 { + return nil, fmt.Errorf("no valid streams found in catalog") + } + + logger.Infof("Valid selected streams are %s", strings.Join(classifications.SelectedStreams, ", ")) + return classifications, nil +} diff --git a/types/catalog.go b/types/catalog.go index cb2cc52af..9e4c53c9e 100644 --- a/types/catalog.go +++ b/types/catalog.go @@ -170,3 +170,103 @@ func getDestDBPrefix(streams []*ConfiguredStream) (constantValue bool, prefix st return len(prefixOrConstValue) == 1, prefixOrConstValue[0] } + +// GetStreamsDelta compares two catalogs and returns a new catalog with streams that have differences. +// Only selected streams are compared. +// 1. Compares properties from selected_streams: normalization, partition_regex, filter, append_mode +// 2. Compares properties from streams: destination_database, cursor_field, sync_mode +// 3. For new streams: Only adds them if connector is Postgres/MySQL AND sync_mode is CDC +// +// Parameters: +// - oldStreams: The previous catalog to compare against +// - newStreams: The current catalog with potential changes +// +// Returns: +// - A catalog containing only the streams that have differences +func GetStreamsDelta(oldStreams, newStreams *Catalog, connectorType string) *Catalog { + diffStreams := &Catalog{ + Streams: []*ConfiguredStream{}, + SelectedStreams: make(map[string][]StreamMetadata), + } + + oldStreamsMap := make(map[string]*ConfiguredStream) + for _, stream := range oldStreams.Streams { + oldStreamsMap[stream.ID()] = stream + } + + newStreamsMap := make(map[string]*ConfiguredStream) + for _, stream := range newStreams.Streams { + newStreamsMap[stream.ID()] = stream + } + + oldSelectedMap := make(map[string]StreamMetadata) + for namespace, metadatas := range oldStreams.SelectedStreams { + for _, metadata := range metadatas { + oldSelectedMap[fmt.Sprintf("%s.%s", namespace, metadata.StreamName)] = metadata + } + } + + // flag for connector which have global state support + // TODO: create an array of global state supported connectors in constants + globalStateSupportedConnector := connectorType == string(constants.Postgres) || connectorType == string(constants.MySQL) + + for namespace, newMetadatas := range newStreams.SelectedStreams { + for _, newMetadata := range newMetadatas { + streamID := fmt.Sprintf("%s.%s", namespace, newMetadata.StreamName) + + // new stream definition from streams array + newStream, newStreamExists := newStreamsMap[streamID] + if !newStreamExists { + continue + } + + // Check if this stream existed in old catalog + oldMetadata, oldMetadataExists := oldSelectedMap[streamID] + oldStream, oldStreamExists := oldStreamsMap[streamID] + + // if new stream in selected_streams + if !oldMetadataExists || !oldStreamExists { + // addition of new streams + if globalStateSupportedConnector && newStream.GetStream().SyncMode == CDC { + diffStreams.Streams = append(diffStreams.Streams, newStream) + diffStreams.SelectedStreams[namespace] = append( + diffStreams.SelectedStreams[namespace], + newMetadata, + ) + } + // skip new selected streams for mongo and sync mode != cdc + continue + } + + // Stream exists in both catalogs - check for differences + // normalization difference + // partition regex difference + // filter difference + // append mode change + // destination database change + // cursor field change , Format: "primary_cursor:secondary_cursor" + // sync mode change + // TODO: log the differences for user reference + isDifferent := func() bool { + return (oldMetadata.Normalization != newMetadata.Normalization) || + (oldMetadata.PartitionRegex != newMetadata.PartitionRegex) || + (oldMetadata.Filter != newMetadata.Filter) || + (oldMetadata.AppendMode != newMetadata.AppendMode) || + (oldStream.Stream.DestinationDatabase != newStream.Stream.DestinationDatabase) || + (oldStream.Stream.CursorField != newStream.Stream.CursorField) || + (oldStream.Stream.SyncMode != newStream.Stream.SyncMode) + }() + + // if any difference, add stream to diff streams + if isDifferent { + diffStreams.Streams = append(diffStreams.Streams, newStream) + diffStreams.SelectedStreams[namespace] = append( + diffStreams.SelectedStreams[namespace], + newMetadata, + ) + } + } + } + + return diffStreams +}