Google Cloud Dataproc 运算符

Dataproc 是一项托管式 Apache Spark 和 Apache Hadoop 服务,可让您利用开源数据工具进行批处理、查询、流处理和机器学习。Dataproc 自动化可帮助您快速创建集群、轻松管理集群,并在不需要时关闭集群以节省资金。

有关该服务的更多信息,请访问 Dataproc 产品文档 <产品文档

先决条件任务

要使用这些运算符,您必须执行一些操作

创建集群

创建 Dataproc 集群时,可以选择 Compute Engine 作为部署平台。在此配置中,Dataproc 会自动配置所需的 Compute Engine VM 实例来运行集群。VM 实例用于主节点、主工作节点和辅助工作节点(如果指定)。这些 VM 实例由 Compute Engine 创建和管理,而 Dataproc 负责配置大数据处理任务所需的软件和编排。通过提供节点配置,您可以描述主节点和辅助节点的配置,以及 Compute Engine 实例集群的状态。配置辅助工作节点时,您可以指定工作节点的数量及其类型。通过启用抢占式选项,对这些节点使用抢占式 VM(相当于 Spot 实例),您可以利用这些实例为 Dataproc 工作负载提供的成本节约优势。主节点通常承载集群主服务和各种控制服务,没有抢占式选项,因为主节点保持稳定性和可用性至关重要。创建集群后,配置设置(包括辅助工作节点的抢占性)无法直接修改。

有关创建集群时可传递的可用字段的更多信息,请访问 Dataproc 创建集群 API。

集群配置可以如下所示

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}

使用此配置,我们可以创建集群: DataprocCreateClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

Dataproc on GKE 在 GKE 集群上部署 Dataproc 虚拟集群。与 Compute Engine 集群上的 Dataproc 不同,Dataproc on GKE 虚拟集群不包括单独的主 VM 和工作 VM。相反,在创建 Dataproc on GKE 虚拟集群时,Dataproc on GKE 会在 GKE 集群内创建节点池。Dataproc on GKE 作业作为 Pod 在这些节点池上运行。节点池和节点池上 Pod 的调度由 GKE 管理。

在创建 GKE Dataproc 集群时,可以指定基础计算资源的可抢占式虚拟机使用情况。GKE 支持使用可抢占式虚拟机作为节省成本的措施。通过启用可抢占式虚拟机,GKE 将使用可抢占式虚拟机配置集群节点。或者,可以将节点创建为 Spot VM 实例,这是对旧版可抢占式虚拟机的最新更新。这有助于在 GKE 上运行 Dataproc 工作负载,同时优化成本。

要在 Google Kubernetes Engine 中创建 Dataproc 集群,可以传递集群配置

tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py[源代码]


VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": True,
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": b"3"}},
    },
    "staging_bucket": "test-staging-bucket",
}

使用此配置,我们可以创建集群: DataprocCreateClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py[源代码]

create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
)

还可以创建带有可选组件 Presto 的 Dataproc 集群。要执行此操作,请使用以下配置。请注意,默认映像可能不支持所选的可选组件。如果是这种情况,请指定可以在 文档 中找到的正确的 image_version

tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py[源代码]

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}

还可以创建带有可选组件 Trino 的 Dataproc 集群。要执行此操作,请使用以下配置。请注意,默认映像可能不支持所选的可选组件。如果是这种情况,请指定可以在 文档 中找到的正确的 image_version

tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py[源代码]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}

可以为此操作使用可延期模式,以便异步运行运算符

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[源代码]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
)

生成集群配置

还可以使用功能 API 生成 CLUSTER_CONFIG,这可以通过 ClusterGeneratormake() 轻松完成。可以生成并使用配置,如下所示

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py[源代码]


INIT_FILE = "pip-install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    master_disk_size=32,
    worker_machine_type="n1-standard-4",
    worker_disk_size=32,
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[f"gs://{BUCKET_NAME}/{INIT_FILE}"],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
).make()

诊断集群

Dataproc 支持收集集群诊断信息,例如系统、Spark、Hadoop 和 Dataproc 日志、可用于对 Dataproc 集群或作业进行故障排除的集群配置文件。需要注意的是,只有在删除集群之前才能收集此信息。有关在诊断集群时传递的可用字段的更多信息,请访问Dataproc 诊断集群 API。

要诊断 Dataproc 集群,请使用:DataprocDiagnoseClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[源代码]

    diagnose_cluster = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
    )

您还可以使用可延迟模式来异步运行操作员

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[源代码]

    diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster_deferrable",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        deferrable=True,
    )

更新集群

您可以通过提供集群配置和 updateMask 来扩展或缩小集群。在 updateMask 参数中,您指定相对于 Cluster 的字段的路径以进行更新。有关 updateMask 和其他参数的更多信息,请参阅Dataproc 更新集群 API。

新集群配置和 updateMask 的示例

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py[源代码]

CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}

要更新集群,您可以使用:DataprocUpdateClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py[源代码]

scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
)

可以为此操作使用可延期模式,以便异步运行运算符

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[源代码]

update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
)

启动集群

要启动集群,您可以使用 DataprocStartClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[源代码]

start_cluster = DataprocStartClusterOperator(
    task_id="start_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

停止集群

要停止集群,您可以使用 DataprocStopClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[源代码]

stop_cluster = DataprocStopClusterOperator(
    task_id="stop_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

删除集群

要删除集群,您可以使用: DataprocDeleteClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)

可以为此操作使用可延期模式,以便异步运行运算符

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[源代码]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)

向集群提交作业

Dataproc 支持提交不同大数据组件的作业。当前列表包括 Spark、Hadoop、Pig 和 Hive。有关版本和映像的更多信息,请参阅 Cloud Dataproc 映像版本列表

要向集群提交作业,您需要提供作业源文件。作业源文件可以在 GCS、集群或本地文件系统上。您可以指定 file:/// 路径来引用集群主节点上的本地文件。

可以使用以下方法提交作业配置: DataprocSubmitJobOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py[源代码]

pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)

提交作业配置示例

我们为以下每个框架提供了一个示例。作业中提供的参数比示例中显示的参数更多。有关参数的完整列表,请参阅 DataProc 作业参数

PySpark 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py[源代码]

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": JOB_FILE_URI},
}

SparkSQL 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py[源代码]

SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Spark 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py[源代码]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

可延迟模式 下运行的 Spark 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py[源代码]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Hive 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]

HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Hadoop 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py[源代码]

HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}

Pig 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py[源代码]

PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}

SparkR 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py[源代码]

SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": JOB_FILE_URI},
}

Presto 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py[源代码]

PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Trino 作业配置示例

tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py[源代码]

TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

使用工作流模板

Dataproc 支持创建可在以后触发的 workflow 模板。

可以使用以下方法创建 workflow 模板: DataprocCreateWorkflowTemplateOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py[源代码]

create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)

创建 workflow 后,用户可以使用 DataprocInstantiateWorkflowTemplateOperator 触发 workflow。

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py[源代码]

trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)

对于所有这些操作,你还可以使用可延迟模式中的操作符

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[源代码]

trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)

内联操作符是另一种选择。它创建一个工作流,运行它,然后删除它:DataprocInstantiateInlineWorkflowTemplateOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py[源代码]

instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)

对于所有这些操作,你还可以使用可延迟模式中的操作符

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[源代码]

instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)

创建批处理

Dataproc 支持创建批处理工作负载。

可以使用以下方式创建批处理:DataprocCreateBatchOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[源代码]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
)

create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
)

create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
)

要使用持久性历史记录服务器创建批处理,首先应使用特定参数创建 Dataproc 集群。有关如何创建集群的文档,请参阅此处

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py[源代码]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

创建集群后,应将其添加到批处理配置中。

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py[源代码]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
)

若要检查操作是否成功,可以使用DataprocBatchSensor

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[源代码]

batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)

对于所有这些操作,你还可以使用可延迟模式中的操作符

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py[源代码]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
)

获取批处理

要获取批处理,可以使用:DataprocGetBatchOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[源代码]

get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)

get_batch_2 = DataprocGetBatchOperator(
    task_id="get_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)

列出批处理

要获取现有批处理的列表,可以使用:DataprocListBatchesOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[源代码]

list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)

删除批处理

要删除批处理,可以使用:DataprocDeleteBatchOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[源代码]

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)

取消批处理操作

要取消操作,可以使用:DataprocCancelOperationOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[源代码]

cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4') }}",
)

参考

有关更多信息,请参阅

此条目是否有用?