Apache Kafka 操作符

ConsumeFromTopicOperator

一个从一个或多个 Kafka 主题消费并处理消息的操作符。该操作符创建一个 Kafka 消费者,从集群读取一批消息,并使用用户提供的可调用函数 apply_function 处理它们。消费者将继续批量读取,直到到达日志末尾或达到读取的最大消息数(max_messages)。

有关参数定义,请查看 ConsumeFromTopicOperator

使用操作符

tests/system/apache/kafka/example_dag_hello_kafka.py[源代码]

t2 = ConsumeFromTopicOperator(
    kafka_config_id="t2",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

参考

有关更多信息,请参阅 Apache Kafka 消费者文档

ProduceToTopicOperator

一个将消息生成到 Kafka 主题的操作符。该操作符将生成由用户提供的 producer_function 创建为键/值对的消息。

有关参数定义,请查看 ProduceToTopicOperator

使用操作符

tests/system/apache/kafka/example_dag_hello_kafka.py[源代码]

t1 = ProduceToTopicOperator(
    kafka_config_id="t1-3",
    task_id="produce_to_topic",
    topic="test_1",
    producer_function="example_dag_hello_kafka.producer_function",
)

参考

有关更多信息,请参阅 Apache Kafka 生产者文档

此条目是否有帮助?