Skip to content

AwaitMessageSensor can't set timeout parameter #57863

@Dukastlik

Description

@Dukastlik

Apache Airflow Provider(s)

apache-kafka

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka == 1.10.0

Apache Airflow version

2.10.5

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

No response

What happened

Since AwaitMessageSensor doesn't inherit from airflow.sensors.base.BaseSensorOperator parameters like timeout, soft_fail, etc don't work for it.
To be clear, when setting 'timeout' parameter for AwaitMessageSensor i get DAG import error:

airflow.exceptions.AirflowException: Invalid arguments were passed to AwaitMessageSensor (task_id: wait_for_kafka_message). Invalid arguments were:
**kwargs: {'timeout': 600}

Is there a proper way to set a timeout for AwaitMessageSensor without editing Airflow configuration?

What you think should happen instead

I think it's strange that AwaitMessageSensor inherits from BaseOperator instead of BaseSensorOperator. Is there a good reason for that?

How to reproduce

On Apache Airflow version 2.10+ create a DAG:

from airflow import DAG
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor


with DAG(
    dag_id="sensor_bug_1",
    schedule_interval=None,
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["kafka", "sensor", "example"],
) as dag:

    wait_for_kafka_message = AwaitMessageSensor(
        task_id="wait_for_kafka_message",
        kafka_config_id="kafka_default",
        topics=["example_topic"],
        apply_function=lambda messages: messages,
        timeout=600,
    )

You'll get DAG import error in the UI.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions