Apache Kafka 传感器¶
AwaitMessageSensor¶
一种传感器,它会一直延迟,直到特定消息发布到 Kafka 主题。该传感器将创建一个消费者,从 Kafka 主题读取消息,直到找到满足 apply_function
参数中定义的条件的消息。如果 apply_function
返回任何数据,则会引发 TriggerEvent
,并且 AwaitMessageSensor
成功完成。
有关参数定义,请查看 AwaitMessageSensor
。
使用传感器¶
t5 = AwaitMessageSensor(
kafka_config_id="t5",
task_id="awaiting_message",
topics=["test_1"],
apply_function="example_dag_hello_kafka.await_function",
xcom_push_key="retrieved_message",
)
参考¶
有关更多信息,请参阅 Apache Kafka 消费者文档。
AwaitMessageTriggerFunctionSensor¶
与上面的 AwaitMessageSensor
类似,此传感器将一直延迟,直到它从 Kafka 主题中消耗一条满足其 apply_function
条件的消息。一旦遇到正事件,AwaitMessageTriggerFunctionSensor
将触发提供给 event_triggered_function
的可调用对象。之后,传感器将再次延迟,继续消耗消息。
有关参数定义,请查看 AwaitMessageTriggerFunctionSensor
。
使用传感器¶
listen_for_message = AwaitMessageTriggerFunctionSensor(
kafka_config_id="fizz_buzz_2",
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
event_triggered_function=wait_for_event,
)
参考¶
有关更多信息,请参阅 Apache Kafka 消费者文档。