|
35 | 35 | SQL = "select 1" |
36 | 36 | BUCKET = "gs://test" |
37 | 37 | JSON_FILENAME = "test_{}.ndjson" |
| 38 | +PARQUET_FILENAME = "test_{}.parquet" |
38 | 39 | GZIP = False |
39 | 40 |
|
40 | 41 | ROWS = [ |
|
57 | 58 | SCHEMA_JSON = [ |
58 | 59 | b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ', |
59 | 60 | b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ', |
60 | | - b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ', |
61 | | - b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOLEAN"}]', |
| 61 | + b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOL"}, ', |
| 62 | + b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOL"}]', |
62 | 63 | ] |
63 | 64 |
|
64 | 65 | SCHEMA_JSON_BIT_FIELDS = [ |
65 | 66 | b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ', |
66 | 67 | b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ', |
67 | | - b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ', |
| 68 | + b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOL"}, ', |
68 | 69 | b'{"mode": "NULLABLE", "name": "some_bit", "type": "INTEGER"}]', |
69 | 70 | ] |
70 | 71 |
|
@@ -254,3 +255,53 @@ def db_hook(self): |
254 | 255 | assert len(lineage.job_facets) == 1 |
255 | 256 | assert lineage.job_facets["sql"].query == sql |
256 | 257 | assert lineage.run_facets == {} |
| 258 | + |
| 259 | + @mock.patch("airflow.providers.google.cloud.transfers.mssql_to_gcs.MsSqlHook") |
| 260 | + @mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook") |
| 261 | + def test_bit_to_boolean_field_conversion(self, gcs_hook_mock_class, mssql_hook_mock_class): |
| 262 | + """Test successful run of execute function for Parquet format with boolean fields. |
| 263 | +
|
| 264 | + This test verifies that MSSQL tables with columns of type "BIT" can exported |
| 265 | + using the bit_fields parameter, resulting in boolean fields in the Parquet file. |
| 266 | + """ |
| 267 | + import pyarrow |
| 268 | + |
| 269 | + op = MSSQLToGCSOperator( |
| 270 | + task_id=TASK_ID, |
| 271 | + mssql_conn_id=MSSQL_CONN_ID, |
| 272 | + sql=SQL, |
| 273 | + bucket=BUCKET, |
| 274 | + filename=PARQUET_FILENAME, |
| 275 | + export_format="parquet", |
| 276 | + bit_fields=["some_binary", "some_bit"], |
| 277 | + ) |
| 278 | + |
| 279 | + mssql_hook_mock = mssql_hook_mock_class.return_value |
| 280 | + mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) |
| 281 | + mssql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION |
| 282 | + |
| 283 | + gcs_hook_mock = gcs_hook_mock_class.return_value |
| 284 | + |
| 285 | + upload_called = False |
| 286 | + |
| 287 | + def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None): |
| 288 | + nonlocal upload_called |
| 289 | + upload_called = True |
| 290 | + assert bucket == BUCKET |
| 291 | + assert obj == PARQUET_FILENAME.format(0) |
| 292 | + assert mime_type == "application/octet-stream" |
| 293 | + assert gzip == GZIP |
| 294 | + |
| 295 | + parquet_file = pyarrow.parquet.ParquetFile(tmp_filename) |
| 296 | + schema = parquet_file.schema_arrow |
| 297 | + # Verify that bit fields are mapped to boolean type in parquet schema |
| 298 | + assert schema.field("some_binary").type.equals(pyarrow.bool_()) |
| 299 | + assert schema.field("some_bit").type.equals(pyarrow.bool_()) |
| 300 | + |
| 301 | + gcs_hook_mock.upload.side_effect = _assert_upload |
| 302 | + |
| 303 | + op.execute(None) |
| 304 | + |
| 305 | + assert upload_called, "Expected upload to be called" |
| 306 | + mssql_hook_mock_class.assert_called_once_with(mssql_conn_id=MSSQL_CONN_ID) |
| 307 | + mssql_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL) |
0 commit comments