-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Description
Apache Airflow Provider(s)
apache-kafka
Versions of Apache Airflow Providers
apache-airflow-providers-apache-kafka == 1.10.0
Apache Airflow version
2.10.5
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Docker-Compose
Deployment details
No response
What happened
Since AwaitMessageSensor doesn't inherit from airflow.sensors.base.BaseSensorOperator parameters like timeout, soft_fail, etc don't work for it.
To be clear, when setting 'timeout' parameter for AwaitMessageSensor i get DAG import error:
airflow.exceptions.AirflowException: Invalid arguments were passed to AwaitMessageSensor (task_id: wait_for_kafka_message). Invalid arguments were:
**kwargs: {'timeout': 600}
Is there a proper way to set a timeout for AwaitMessageSensor without editing Airflow configuration?
What you think should happen instead
I think it's strange that AwaitMessageSensor inherits from BaseOperator instead of BaseSensorOperator. Is there a good reason for that?
How to reproduce
On Apache Airflow version 2.10+ create a DAG:
from airflow import DAG
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
with DAG(
dag_id="sensor_bug_1",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["kafka", "sensor", "example"],
) as dag:
wait_for_kafka_message = AwaitMessageSensor(
task_id="wait_for_kafka_message",
kafka_config_id="kafka_default",
topics=["example_topic"],
apply_function=lambda messages: messages,
timeout=600,
)
You'll get DAG import error in the UI.
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct