From 5b5f68d9dbc4086e3ed1326fb75733c7b9329bda Mon Sep 17 00:00:00 2001 From: alamashir Date: Fri, 7 Nov 2025 23:16:55 -0500 Subject: [PATCH] Fix AwaitMessageSensor to accept timeout and soft_fail parameters (#57863) - Changed AwaitMessageSensor and AwaitMessageTriggerFunctionSensor to inherit from BaseSensorOperator instead of BaseOperator - This enables both sensors to accept standard sensor parameters like timeout and soft_fail - Added comprehensive test coverage for timeout and soft_fail parameters - Updated documentation to reflect the new parameters Fixes #57863 --- .../providers/apache/kafka/sensors/kafka.py | 14 ++++-- .../unit/apache/kafka/sensors/test_kafka.py | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py index 5a55530ebd50b..42c249b6f588c 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py @@ -20,12 +20,12 @@ from typing import Any from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger -from airflow.providers.common.compat.sdk import BaseOperator +from airflow.providers.common.compat.sdk import BaseSensorOperator VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} -class AwaitMessageSensor(BaseOperator): +class AwaitMessageSensor(BaseSensorOperator): """ An Airflow sensor that defers until a specific message is published to Kafka. @@ -53,6 +53,10 @@ class AwaitMessageSensor(BaseOperator): :param poll_interval: How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5 :param xcom_push_key: the name of a key to push the returned message to, defaults to None + :param soft_fail: Set to true to mark the task as SKIPPED on failure + :param timeout: Time elapsed before the task times out and fails (in seconds) + :param poke_interval: This parameter is inherited but not used in this deferrable implementation + :param mode: This parameter is inherited but not used in this deferrable implementation """ @@ -111,7 +115,7 @@ def execute_complete(self, context, event=None): return event -class AwaitMessageTriggerFunctionSensor(BaseOperator): +class AwaitMessageTriggerFunctionSensor(BaseSensorOperator): """ Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting. @@ -137,6 +141,10 @@ class AwaitMessageTriggerFunctionSensor(BaseOperator): cluster, defaults to 1 :param poll_interval: How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5 + :param soft_fail: Set to true to mark the task as SKIPPED on failure + :param timeout: Time elapsed before the task times out and fails (in seconds) + :param poke_interval: This parameter is inherited but not used in this deferrable implementation + :param mode: This parameter is inherited but not used in this deferrable implementation """ diff --git a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py index 817b8bdb2fe4d..eab4b968d87d3 100644 --- a/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py +++ b/providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py @@ -91,3 +91,53 @@ def test_await_message_trigger_event_execute_complete(self): # task should immediately come out of deferred with pytest.raises(TaskDeferred): sensor.execute_complete(context={}) + + def test_await_message_with_timeout_parameter(self): + """Test that AwaitMessageSensor accepts timeout parameter.""" + sensor = AwaitMessageSensor( + kafka_config_id="kafka_d", + topics=["test"], + task_id="test", + apply_function=_return_true, + timeout=600, # This should now work without errors + ) + + assert sensor.timeout == 600 + + def test_await_message_with_soft_fail_parameter(self): + """Test that AwaitMessageSensor accepts soft_fail parameter.""" + sensor = AwaitMessageSensor( + kafka_config_id="kafka_d", + topics=["test"], + task_id="test", + apply_function=_return_true, + soft_fail=True, # This should now work without errors + ) + + assert sensor.soft_fail is True + + def test_await_message_trigger_function_with_timeout_parameter(self): + """Test that AwaitMessageTriggerFunctionSensor accepts timeout parameter.""" + sensor = AwaitMessageTriggerFunctionSensor( + kafka_config_id="kafka_d", + topics=["test"], + task_id="test", + apply_function=_return_true, + event_triggered_function=_return_true, + timeout=600, + ) + + assert sensor.timeout == 600 + + def test_await_message_trigger_function_with_soft_fail_parameter(self): + """Test that AwaitMessageTriggerFunctionSensor accepts soft_fail parameter.""" + sensor = AwaitMessageTriggerFunctionSensor( + kafka_config_id="kafka_d", + topics=["test"], + task_id="test", + apply_function=_return_true, + event_triggered_function=_return_true, + soft_fail=True, + ) + + assert sensor.soft_fail is True