Skip to content

Commit 1d8b969

Browse files
authored
Fix AwaitMessageSensor to accept timeout and soft_fail parameters (#57863) (#58070)
- Changed AwaitMessageSensor and AwaitMessageTriggerFunctionSensor to inherit from BaseSensorOperator instead of BaseOperator - This enables both sensors to accept standard sensor parameters like timeout and soft_fail - Added comprehensive test coverage for timeout and soft_fail parameters - Updated documentation to reflect the new parameters Fixes #57863
1 parent c2144fd commit 1d8b969

File tree

2 files changed

+61
-3
lines changed
  • providers/apache/kafka

2 files changed

+61
-3
lines changed

providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
from typing import Any
2121

2222
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
23-
from airflow.providers.common.compat.sdk import BaseOperator
23+
from airflow.providers.common.compat.sdk import BaseSensorOperator
2424

2525
VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"}
2626

2727

28-
class AwaitMessageSensor(BaseOperator):
28+
class AwaitMessageSensor(BaseSensorOperator):
2929
"""
3030
An Airflow sensor that defers until a specific message is published to Kafka.
3131
@@ -53,6 +53,10 @@ class AwaitMessageSensor(BaseOperator):
5353
:param poll_interval: How long the kafka consumer should sleep after reaching the end of the Kafka log,
5454
defaults to 5
5555
:param xcom_push_key: the name of a key to push the returned message to, defaults to None
56+
:param soft_fail: Set to true to mark the task as SKIPPED on failure
57+
:param timeout: Time elapsed before the task times out and fails (in seconds)
58+
:param poke_interval: This parameter is inherited but not used in this deferrable implementation
59+
:param mode: This parameter is inherited but not used in this deferrable implementation
5660
5761
5862
"""
@@ -111,7 +115,7 @@ def execute_complete(self, context, event=None):
111115
return event
112116

113117

114-
class AwaitMessageTriggerFunctionSensor(BaseOperator):
118+
class AwaitMessageTriggerFunctionSensor(BaseSensorOperator):
115119
"""
116120
Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.
117121
@@ -137,6 +141,10 @@ class AwaitMessageTriggerFunctionSensor(BaseOperator):
137141
cluster, defaults to 1
138142
:param poll_interval: How long the kafka consumer should sleep after reaching the end of the Kafka log,
139143
defaults to 5
144+
:param soft_fail: Set to true to mark the task as SKIPPED on failure
145+
:param timeout: Time elapsed before the task times out and fails (in seconds)
146+
:param poke_interval: This parameter is inherited but not used in this deferrable implementation
147+
:param mode: This parameter is inherited but not used in this deferrable implementation
140148
141149
142150
"""

providers/apache/kafka/tests/unit/apache/kafka/sensors/test_kafka.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,53 @@ def test_await_message_trigger_event_execute_complete(self):
9191
# task should immediately come out of deferred
9292
with pytest.raises(TaskDeferred):
9393
sensor.execute_complete(context={})
94+
95+
def test_await_message_with_timeout_parameter(self):
96+
"""Test that AwaitMessageSensor accepts timeout parameter."""
97+
sensor = AwaitMessageSensor(
98+
kafka_config_id="kafka_d",
99+
topics=["test"],
100+
task_id="test",
101+
apply_function=_return_true,
102+
timeout=600, # This should now work without errors
103+
)
104+
105+
assert sensor.timeout == 600
106+
107+
def test_await_message_with_soft_fail_parameter(self):
108+
"""Test that AwaitMessageSensor accepts soft_fail parameter."""
109+
sensor = AwaitMessageSensor(
110+
kafka_config_id="kafka_d",
111+
topics=["test"],
112+
task_id="test",
113+
apply_function=_return_true,
114+
soft_fail=True, # This should now work without errors
115+
)
116+
117+
assert sensor.soft_fail is True
118+
119+
def test_await_message_trigger_function_with_timeout_parameter(self):
120+
"""Test that AwaitMessageTriggerFunctionSensor accepts timeout parameter."""
121+
sensor = AwaitMessageTriggerFunctionSensor(
122+
kafka_config_id="kafka_d",
123+
topics=["test"],
124+
task_id="test",
125+
apply_function=_return_true,
126+
event_triggered_function=_return_true,
127+
timeout=600,
128+
)
129+
130+
assert sensor.timeout == 600
131+
132+
def test_await_message_trigger_function_with_soft_fail_parameter(self):
133+
"""Test that AwaitMessageTriggerFunctionSensor accepts soft_fail parameter."""
134+
sensor = AwaitMessageTriggerFunctionSensor(
135+
kafka_config_id="kafka_d",
136+
topics=["test"],
137+
task_id="test",
138+
apply_function=_return_true,
139+
event_triggered_function=_return_true,
140+
soft_fail=True,
141+
)
142+
143+
assert sensor.soft_fail is True

0 commit comments

Comments
 (0)