-
Notifications
You must be signed in to change notification settings - Fork 8
fix: Improve IO Abstractions and cleanup #755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- add abstract Reader/Writer contracts - add IOStats and base write_batches implementation - add unit tests for contracts and batching Refs: #718
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 76.6%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report Summary
Scan Result Details✅ No vulnerabilities found during the scan for |
📦 Trivy Secret Scan Results
Report Summary
Scan Result Details✅ No secrets found during the scan for |
|
🛠 Docs available at: https://k.atlan.dev/application-sdk/refactor/io-too |
| parquet_output: Optional[ParquetOutput], | ||
| parquet_output: Optional[ParquetWriter], | ||
| ) -> Optional[Union[ActivityStatistics, "pd.DataFrame"]]: | ||
| """Execute multi-database flow with proper error handling and result finalization.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekagrawal-atlan to review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does effective_sql_client mean here? removing it
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified FilesNo covered modified files...
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/755 |
e9f2113 to
8ba41cc
Compare
📦 Example workflows test results
|
8ba41cc to
7bb655a
Compare
📦 Example workflows test results
|
📦 Example workflows test results
|
7bb655a to
448c5e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments
| ) | ||
| return ParquetOutput( | ||
| return ParquetWriter( | ||
| output_path=output_path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try to simplify or rename it better this output_path and prefix?
Just by looking at the names it is not self explanatory as to what is required to be passed into these values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed output_suffix, better let the developers pass output_path directly.
will remove other such patterns across
| if hasattr(dataframe, "empty") and getattr(dataframe, "empty"): | ||
| continue | ||
| valid_dataframes.append(dataframe) | ||
| # Handle both async and sync iterators |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These multidb related all funtcions and logic should be in it's separate utility module
| with self.engine.connect() as conn: | ||
| return self._execute_pandas_query(conn, query, chunksize) | ||
|
|
||
| async def get_batched_results( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can combine a lot of code for get_batched_results and get_results
| from sqlalchemy.orm import Session | ||
|
|
||
|
|
||
| class BaseSQLClient(ClientInterface): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud here
Should we keep the BaseSQLClient as a normal query executor using sqlalchemy
We create a PandasSQLClient that inherits from BaseSQLClient which uses pandas to execute queries
Similarly DaftSQLClient
This will make the all the code very modular since each executor will have it's own module and devs can choose which query executor they want to use
Also let's say there comes a source which is not supported for pandas or daft normal query executor will have the logic that will work
Dependency management for pandas and daft will also get simplified I feel
5ec5f53 to
2f94c84
Compare
| # Use _write_daft_dataframe with the DataFrame we have | ||
| daft_df = daft.from_pandas(df) | ||
| await parquet_output.write_daft_dataframe( | ||
| await parquet_writer._write_daft_dataframe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we must use write() method with dataframeType as daft
Changelog
Note
Replaces legacy inputs/outputs and SQLQueryInput with a new io Reader/Writer layer and SQL client get_results APIs, updates activities/handlers/tests accordingly, and removes old docgen/docs and related deps.
application_sdk/iowith genericReader/Writerand concreteJsonFileReader/Writer,ParquetFileReader/Writer,IcebergTableReader/Writer.application_sdk/io/_utilsand addsDataframeTypeenum.application_sdk/inputs/*andapplication_sdk/outputs/*in favor of new IO layer.BaseSQLClient.get_results()andget_batched_results(); deprecatesSQLQueryInputusage.events.modelstointerceptors.modelsin server/worker/interceptors.Written by Cursor Bugbot for commit 07c97ad. This will update automatically on new commits. Configure here.
address: #718
Additional context (e.g. screenshots, logs, links)
Checklist
Copyleft License Compliance