阿里云 AnalyticDB Spark 操作符

概述

Airflow 与阿里云 AnalyticDB Spark 的集成提供了多个操作符来开发 Spark 批处理和 SQL 应用程序。

开发 Spark 批处理应用程序

目的

此示例 DAG 使用 AnalyticDBSparkBatchOperator 来提交 Spark Pi 和 Spark 逻辑回归应用程序。

定义任务

在以下代码中,我们提交 Spark Pi 和 Spark 逻辑回归应用程序。

tests/system/alibaba/example_adb_spark_batch.py[源代码]

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2021, 1, 1),
    schedule=None,
    default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    max_active_runs=1,
    catchup=False,
) as dag:
    spark_pi = AnalyticDBSparkBatchOperator(
        task_id="task1",
        file="local:///tmp/spark-examples.jar",
        class_name="org.apache.spark.examples.SparkPi",
    )

    spark_lr = AnalyticDBSparkBatchOperator(
        task_id="task2",
        file="local:///tmp/spark-examples.jar",
        class_name="org.apache.spark.examples.SparkLR",
    )

    spark_pi >> spark_lr

    from tests_common.test_utils.watcher import watcher

    # This test needs watcher in order to properly mark success/failure
    # when "tearDown" task with trigger rule is part of the DAG
    list(dag.tasks) >> watcher()

此条目是否有帮助?