-
Notifications
You must be signed in to change notification settings - Fork 138
refactor(postgres): pass context to database queries in chunk processing functions #361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1400a43
a13baac
5cf86e4
9411123
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,23 +45,22 @@ func (p *Postgres) ChunkIterator(ctx context.Context, stream types.StreamInterfa | |
| }) | ||
| } | ||
|
|
||
| func (p *Postgres) GetOrSplitChunks(_ context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in doesReplicationSlotExists as well use |
||
| func (p *Postgres) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
| var approxRowCount int64 | ||
| approxRowCountQuery := jdbc.PostgresRowCountQuery(stream) | ||
| // TODO: use ctx while querying | ||
| err := p.client.QueryRow(approxRowCountQuery).Scan(&approxRowCount) | ||
| err := p.client.QueryRowContext(ctx, approxRowCountQuery).Scan(&approxRowCount) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get approx row count: %s", err) | ||
| } | ||
| pool.AddRecordsToSync(approxRowCount) | ||
| return p.splitTableIntoChunks(stream) | ||
| return p.splitTableIntoChunks(ctx, stream) | ||
| } | ||
|
|
||
| func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
| func (p *Postgres) splitTableIntoChunks(ctx context.Context, stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
| generateCTIDRanges := func(stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
| var relPages, blockSize uint32 | ||
| relPagesQuery := jdbc.PostgresRelPageCount(stream) | ||
| err := p.client.QueryRow(relPagesQuery).Scan(&relPages) | ||
| err := p.client.QueryRowContext(ctx, relPagesQuery).Scan(&relPages) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get relPages: %s", err) | ||
| } | ||
|
|
@@ -109,7 +108,7 @@ func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Se | |
| chunkStart := min | ||
| splits := types.NewSet[types.Chunk]() | ||
| for { | ||
| chunkEnd, err := p.nextChunkEnd(stream, chunkStart, chunkColumn) | ||
| chunkEnd, err := p.nextChunkEnd(ctx, stream, chunkStart, chunkColumn) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to split chunks based on next query size: %s", err) | ||
| } | ||
|
|
@@ -129,7 +128,7 @@ func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Se | |
| var minValue, maxValue interface{} | ||
| minMaxRowCountQuery := jdbc.MinMaxQuery(stream, chunkColumn) | ||
| // TODO: Fails on UUID type (Good First Issue) | ||
| err := p.client.QueryRow(minMaxRowCountQuery).Scan(&minValue, &maxValue) | ||
| err := p.client.QueryRowContext(ctx, minMaxRowCountQuery).Scan(&minValue, &maxValue) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to fetch table min max: %s", err) | ||
| } | ||
|
|
@@ -155,10 +154,10 @@ func (p *Postgres) splitTableIntoChunks(stream types.StreamInterface) (*types.Se | |
| } | ||
| } | ||
|
|
||
| func (p *Postgres) nextChunkEnd(stream types.StreamInterface, previousChunkEnd interface{}, chunkColumn string) (interface{}, error) { | ||
| func (p *Postgres) nextChunkEnd(ctx context.Context, stream types.StreamInterface, previousChunkEnd interface{}, chunkColumn string) (interface{}, error) { | ||
| var chunkEnd interface{} | ||
| nextChunkEnd := jdbc.PostgresNextChunkEndQuery(stream, chunkColumn, previousChunkEnd, p.config.BatchSize) | ||
| err := p.client.QueryRow(nextChunkEnd).Scan(&chunkEnd) | ||
| err := p.client.QueryRowContext(ctx, nextChunkEnd).Scan(&chunkEnd) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to query[%s] next chunk end: %s", nextChunkEnd, err) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check for context in MySQL and MongoDB drivers as well, where ever there is a direct call for driver instance or server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be addressed in other pr? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do changes in the CDC for postgres. For mongoDB you can raise another PR if there is any context changes, for the MySQL one please do changes here itself if possible. Pease check for CDC for other drivers as well. |
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use QueryContext function in ChunkIterator function in postgres backfill.go.
@abhinav-1305