Skip to content

Commit 19ebc85

Browse files
authored
Create CloudComposerExternalTaskSensor for Cloud Composer service (#57971)
1 parent bdf6c91 commit 19ebc85

File tree

10 files changed

+1026
-17
lines changed

10 files changed

+1026
-17
lines changed

dev/breeze/tests/test_selective_checks.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1852,7 +1852,7 @@ def test_expected_output_push(
18521852
"selected-providers-list-as-string": "amazon apache.beam apache.cassandra apache.kafka "
18531853
"cncf.kubernetes common.compat common.sql "
18541854
"facebook google hashicorp http microsoft.azure microsoft.mssql mysql "
1855-
"openlineage oracle postgres presto salesforce samba sftp ssh trino",
1855+
"openlineage oracle postgres presto salesforce samba sftp ssh standard trino",
18561856
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
18571857
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
18581858
"ci-image-build": "true",
@@ -1864,7 +1864,7 @@ def test_expected_output_push(
18641864
"docs-list-as-string": "apache-airflow helm-chart amazon apache.beam apache.cassandra "
18651865
"apache.kafka cncf.kubernetes common.compat common.sql facebook google hashicorp http microsoft.azure "
18661866
"microsoft.mssql mysql openlineage oracle postgres "
1867-
"presto salesforce samba sftp ssh trino",
1867+
"presto salesforce samba sftp ssh standard trino",
18681868
"skip-prek-hooks": ALL_SKIPPED_COMMITS_IF_NO_UI,
18691869
"run-kubernetes-tests": "true",
18701870
"upgrade-to-newer-dependencies": "false",
@@ -1874,12 +1874,13 @@ def test_expected_output_push(
18741874
"providers-test-types-list-as-strings-in-json": json.dumps(
18751875
[
18761876
{
1877-
"description": "amazon...google",
1877+
"description": "amazon...standard",
18781878
"test_types": "Providers[amazon] Providers[apache.beam,apache.cassandra,"
18791879
"apache.kafka,cncf.kubernetes,common.compat,common.sql,facebook,"
18801880
"hashicorp,http,microsoft.azure,microsoft.mssql,mysql,"
18811881
"openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] "
1882-
"Providers[google]",
1882+
"Providers[google] "
1883+
"Providers[standard]",
18831884
}
18841885
]
18851886
),
@@ -2122,7 +2123,7 @@ def test_upgrade_to_newer_dependencies(
21222123
"docs-list-as-string": "amazon apache.beam apache.cassandra apache.kafka "
21232124
"cncf.kubernetes common.compat common.sql facebook google hashicorp http "
21242125
"microsoft.azure microsoft.mssql mysql openlineage oracle "
2125-
"postgres presto salesforce samba sftp ssh trino",
2126+
"postgres presto salesforce samba sftp ssh standard trino",
21262127
},
21272128
id="Google provider docs changed",
21282129
),

providers/google/docs/operators/cloud/cloud_composer.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,3 +209,23 @@ You can trigger a DAG in another Composer environment, use:
209209
:dedent: 4
210210
:start-after: [START howto_operator_trigger_dag_run]
211211
:end-before: [END howto_operator_trigger_dag_run]
212+
213+
Waits for a different DAG, task group, or task to complete
214+
----------------------------------------------------------
215+
216+
You can use sensor that waits for a different DAG, task group, or task to complete for a specific composer environment, use:
217+
:class:`~airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerExternalTaskSensor`
218+
219+
.. exampleinclude:: /../../google/tests/system/google/cloud/composer/example_cloud_composer.py
220+
:language: python
221+
:dedent: 4
222+
:start-after: [START howto_sensor_external_task]
223+
:end-before: [END howto_sensor_external_task]
224+
225+
or you can define the same sensor in the deferrable mode:
226+
227+
.. exampleinclude:: /../../google/tests/system/google/cloud/composer/example_cloud_composer.py
228+
:language: python
229+
:dedent: 4
230+
:start-after: [START howto_sensor_external_task_deferrable_mode]
231+
:end-before: [END howto_sensor_external_task_deferrable_mode]

providers/google/pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ dependencies = [
204204
"http" = [
205205
"apache-airflow-providers-http"
206206
]
207+
"standard" = [
208+
"apache-airflow-providers-standard"
209+
]
207210

208211
[dependency-groups]
209212
dev = [
@@ -228,6 +231,7 @@ dev = [
228231
"apache-airflow-providers-salesforce",
229232
"apache-airflow-providers-sftp",
230233
"apache-airflow-providers-ssh",
234+
"apache-airflow-providers-standard",
231235
"apache-airflow-providers-trino",
232236
# Additional devel dependencies (do not remove this line and add extra development dependencies)
233237
"apache-airflow-providers-apache-kafka",

providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import time
2323
from collections.abc import MutableSequence, Sequence
2424
from typing import TYPE_CHECKING, Any
25-
from urllib.parse import urljoin
25+
from urllib.parse import urlencode, urljoin
2626

2727
from aiohttp import ClientSession
2828
from google.api_core.client_options import ClientOptions
@@ -505,6 +505,42 @@ def get_dag_runs(
505505

506506
return response.json()
507507

508+
def get_task_instances(
509+
self,
510+
composer_airflow_uri: str,
511+
composer_dag_id: str,
512+
query_parameters: dict | None = None,
513+
timeout: float | None = None,
514+
) -> dict:
515+
"""
516+
Get the list of task instances for provided DAG.
517+
518+
:param composer_airflow_uri: The URI of the Apache Airflow Web UI hosted within Composer environment.
519+
:param composer_dag_id: The ID of DAG.
520+
:query_parameters: Query parameters for this request.
521+
:param timeout: The timeout for this request.
522+
"""
523+
query_string = f"?{urlencode(query_parameters)}" if query_parameters else ""
524+
525+
response = self.make_composer_airflow_api_request(
526+
method="GET",
527+
airflow_uri=composer_airflow_uri,
528+
path=f"/api/v1/dags/{composer_dag_id}/dagRuns/~/taskInstances{query_string}",
529+
timeout=timeout,
530+
)
531+
532+
if response.status_code != 200:
533+
self.log.error(
534+
"Failed to get task instances for dag_id=%s from %s (status=%s): %s",
535+
composer_dag_id,
536+
composer_airflow_uri,
537+
response.status_code,
538+
response.text,
539+
)
540+
response.raise_for_status()
541+
542+
return response.json()
543+
508544

509545
class CloudComposerAsyncHook(GoogleBaseAsyncHook):
510546
"""Hook for Google Cloud Composer async APIs."""
@@ -849,3 +885,39 @@ async def get_dag_runs(
849885
raise AirflowException(response_body["title"])
850886

851887
return response_body
888+
889+
async def get_task_instances(
890+
self,
891+
composer_airflow_uri: str,
892+
composer_dag_id: str,
893+
query_parameters: dict | None = None,
894+
timeout: float | None = None,
895+
) -> dict:
896+
"""
897+
Get the list of task instances for provided DAG.
898+
899+
:param composer_airflow_uri: The URI of the Apache Airflow Web UI hosted within Composer environment.
900+
:param composer_dag_id: The ID of DAG.
901+
:query_parameters: Query parameters for this request.
902+
:param timeout: The timeout for this request.
903+
"""
904+
query_string = f"?{urlencode(query_parameters)}" if query_parameters else ""
905+
906+
response_body, response_status_code = await self.make_composer_airflow_api_request(
907+
method="GET",
908+
airflow_uri=composer_airflow_uri,
909+
path=f"/api/v1/dags/{composer_dag_id}/dagRuns/~/taskInstances{query_string}",
910+
timeout=timeout,
911+
)
912+
913+
if response_status_code != 200:
914+
self.log.error(
915+
"Failed to get task instances for dag_id=%s from %s (status=%s): %s",
916+
composer_dag_id,
917+
composer_airflow_uri,
918+
response_status_code,
919+
response_body["title"],
920+
)
921+
raise AirflowException(response_body["title"])
922+
923+
return response_body

0 commit comments

Comments
 (0)