Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions drivers/postgres/internal/backfill.go
Copy link
Collaborator

@ImDoubD-datazip ImDoubD-datazip Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use QueryContext function in ChunkIterator function in postgres backfill.go.
@abhinav-1305

Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,22 @@ func (p *Postgres) ChunkIterator(ctx context.Context, stream types.StreamInterfa
})
}

func (p *Postgres) GetOrSplitChunks(_ context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in doesReplicationSlotExists as well use QueryRowContext

func (p *Postgres) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error) {
var approxRowCount int64
approxRowCountQuery := jdbc.PostgresRowCountQuery(stream)
// TODO: use ctx while querying
err := p.client.QueryRow(approxRowCountQuery).Scan(&approxRowCount)
err := p.client.QueryRowContext(ctx, approxRowCountQuery).Scan(&approxRowCount)
if err != nil {
return nil, fmt.Errorf("failed to get approx row count: %s", err)
}
pool.AddRecordsToSync(approxRowCount)
return p.splitTableIntoChunks(stream)
return p.splitTableIntoChunks(ctx, stream)
}

func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Set[types.Chunk], error) {
func (p *Postgres) splitTableIntoChunks(ctx context.Context, stream types.StreamInterface) (*types.Set[types.Chunk], error) {
generateCTIDRanges := func(stream types.StreamInterface) (*types.Set[types.Chunk], error) {
var relPages, blockSize uint32
relPagesQuery := jdbc.PostgresRelPageCount(stream)
err := p.client.QueryRow(relPagesQuery).Scan(&relPages)
err := p.client.QueryRowContext(ctx, relPagesQuery).Scan(&relPages)
if err != nil {
return nil, fmt.Errorf("failed to get relPages: %s", err)
}
Expand Down Expand Up @@ -109,7 +108,7 @@ func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Se
chunkStart := min
splits := types.NewSet[types.Chunk]()
for {
chunkEnd, err := p.nextChunkEnd(stream, chunkStart, chunkColumn)
chunkEnd, err := p.nextChunkEnd(ctx, stream, chunkStart, chunkColumn)
if err != nil {
return nil, fmt.Errorf("failed to split chunks based on next query size: %s", err)
}
Expand All @@ -129,7 +128,7 @@ func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Se
var minValue, maxValue interface{}
minMaxRowCountQuery := jdbc.MinMaxQuery(stream, chunkColumn)
// TODO: Fails on UUID type (Good First Issue)
err := p.client.QueryRow(minMaxRowCountQuery).Scan(&minValue, &maxValue)
err := p.client.QueryRowContext(ctx, minMaxRowCountQuery).Scan(&minValue, &maxValue)
if err != nil {
return nil, fmt.Errorf("failed to fetch table min max: %s", err)
}
Expand All @@ -155,10 +154,10 @@ func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Se
}
}

func (p *Postgres) nextChunkEnd(stream types.StreamInterface, previousChunkEnd interface{}, chunkColumn string) (interface{}, error) {
func (p *Postgres) nextChunkEnd(ctx context.Context, stream types.StreamInterface, previousChunkEnd interface{}, chunkColumn string) (interface{}, error) {
var chunkEnd interface{}
nextChunkEnd := jdbc.PostgresNextChunkEndQuery(stream, chunkColumn, previousChunkEnd, p.config.BatchSize)
err := p.client.QueryRow(nextChunkEnd).Scan(&chunkEnd)
err := p.client.QueryRowContext(ctx, nextChunkEnd).Scan(&chunkEnd)
if err != nil {
return nil, fmt.Errorf("failed to query[%s] next chunk end: %s", nextChunkEnd, err)
}
Copy link
Collaborator

@ImDoubD-datazip ImDoubD-datazip Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for context in MySQL and MongoDB drivers as well, where ever there is a direct call for driver instance or server.
@abhinav-1305

Copy link
Author

@abhinav-1305 abhinav-1305 Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be addressed in other pr?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be addressed in other pr?

Do changes in the CDC for postgres. For mongoDB you can raise another PR if there is any context changes, for the MySQL one please do changes here itself if possible. Pease check for CDC for other drivers as well.

@abhinav-1305

Expand Down
Loading