airflow.providers.google.cloud.hooks.dataflow

此模块包含 Google Dataflow Hook。

模块内容

DataflowJobStatus

带有 Dataflow 作业状态的助手类。

DataflowJobType

带有 Dataflow 作业类型的助手类。

DataflowHook

Google Dataflow 的 Hook。

AsyncDataflowHook

用于 Dataflow 服务的异步 Hook 类。

函数

process_line_and_extract_dataflow_job_id_callback(...)

构建触发指定函数的回调。

属性

DEFAULT_DATAFLOW_LOCATION

JOB_ID_PATTERN

T

airflow.providers.google.cloud.hooks.dataflow.DEFAULT_DATAFLOW_LOCATION = 'us-central1'[源代码]
airflow.providers.google.cloud.hooks.dataflow.JOB_ID_PATTERN[源代码]
airflow.providers.google.cloud.hooks.dataflow.T[源代码]
airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[源代码]

构建触发指定函数的回调。

返回的回调旨在用作 BeamCommandRunner 中的 process_line_callback

参数

on_new_job_id_callback (Callable[[str], None] | None) – 当已知作业 ID 时调用的回调

class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[源代码]

带有 Dataflow 作业状态的助手类。

参考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

JOB_STATE_DONE = 'JOB_STATE_DONE'[源代码]
JOB_STATE_UNKNOWN = 'JOB_STATE_UNKNOWN'[源代码]
JOB_STATE_STOPPED = 'JOB_STATE_STOPPED'[源代码]
JOB_STATE_RUNNING = 'JOB_STATE_RUNNING'[源代码]
JOB_STATE_FAILED = 'JOB_STATE_FAILED'[源代码]
JOB_STATE_CANCELLED = 'JOB_STATE_CANCELLED'[源代码]
JOB_STATE_UPDATED = 'JOB_STATE_UPDATED'[源代码]
JOB_STATE_DRAINING = 'JOB_STATE_DRAINING'[源代码]
JOB_STATE_DRAINED = 'JOB_STATE_DRAINED'[源代码]
JOB_STATE_PENDING = 'JOB_STATE_PENDING'[源代码]
JOB_STATE_CANCELLING = 'JOB_STATE_CANCELLING'[源代码]
JOB_STATE_QUEUED = 'JOB_STATE_QUEUED'[源代码]
FAILED_END_STATES[源代码]
SUCCEEDED_END_STATES[源代码]
TERMINAL_STATES[源代码]
AWAITING_STATES[源代码]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobType[源代码]

带有 Dataflow 作业类型的助手类。

JOB_TYPE_UNKNOWN = 'JOB_TYPE_UNKNOWN'[源代码]
JOB_TYPE_BATCH = 'JOB_TYPE_BATCH'[源代码]
JOB_TYPE_STREAMING = 'JOB_TYPE_STREAMING'[源代码]
class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[源代码]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Google Dataflow 的 Hook。

Hook 中所有使用 project_id 的方法都必须使用关键字参数而不是位置参数调用。

get_conn()[源代码]

返回 Google Cloud Dataflow 服务对象。

get_pipelines_conn()[源代码]

返回 Google Cloud Data Pipelines 服务对象。

start_java_dataflow(job_name, variables, jar, project_id, job_class=None, append_job_name=True, multiple_jobs=False, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]

启动 Dataflow java 作业。

参数
  • job_name (str) – 作业的名称。

  • variables (dict) – 传递给作业的变量。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • jar (str) – 作业的 jar 文件名

  • job_class (str | None) – 作业的 java 类名。

  • append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。

  • multiple_jobs (bool) – 如果要检查 Dataflow 中是否存在多个作业,则为 True。

  • on_new_job_id_callback (Callable[[str], None] | None) – 当知道作业 ID 时调用的回调。

  • location (str) – 作业位置。

start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]

使用经典模板启动 Dataflow 作业并等待其完成。

参数
  • job_name (str) – 作业的名称。

  • variables (dict) –

    作业运行时环境选项的映射。如果传递了 environment 参数,它将更新 environment 参数。

    参见

    有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – 模板的参数

  • dataflow_template (str) – 模板的 GCS 路径。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已弃用) 当知道作业时调用的回调。

  • on_new_job_callback (Callable[[dict], None] | None) – 当知道作业时调用的回调。

  • location (str) –

    作业位置。

    参见

    有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]

使用经典模板启动 Dataflow 作业,并在不等待其完成的情况下退出。

参数
  • job_name (str) – 作业的名称。

  • variables (dict) –

    作业运行时环境选项的映射。如果传递了 environment 参数,它将更新 environment 参数。

    参见

    有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – 模板的参数

  • dataflow_template (str) – 模板的 GCS 路径。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。

  • location (str) –

    作业位置。

    参见

    有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

返回

Dataflow 作业响应

返回类型

dict[str, str]

send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[源代码]
start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[source]

使用 Flex 模板启动 Dataflow 作业并等待其完成。

参数
  • body (dict) – 请求正文。 请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body

  • location (str) – Dataflow 作业的位置(例如 europe-west1)

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为 None 或缺失,则使用 GCP 连接中的默认 project_id。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已弃用) 在检测到作业 ID 时调用的回调。

  • on_new_job_callback (Callable[[dict], None] | None) – 在检测到作业时调用的回调。

返回

作业

返回类型

dict[str, str]

launch_job_with_flex_template(body, location, project_id)[source]

使用 Flex 模板启动 Dataflow 作业,并在不等待作业完成的情况下退出。

参数
返回

Dataflow 作业响应

返回类型

dict[str, str]

launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

启动 Dataflow YAML 作业并运行至完成。

参数
  • job_name (str) – 要分配给 Cloud Dataflow 作业的唯一名称。

  • yaml_pipeline_file (str) – 定义要运行的 YAML 管道的文件的路径。 必须是本地文件或以“gs://”开头的 URL。

  • append_job_name (bool) – 如果必须将唯一后缀附加到 job_name,则设置为 True。

  • jinja_variables (dict[str, str] | None) – 用于具体化 yaml 管道文件的 Jinja2 变量的字典。

  • options (dict[str, Any] | None) – 其他 gcloud 或 Beam 作业参数。它必须是一个字典,其中键与 gcloud 中的可选标志名称匹配。 支持的标志列表可以在以下位置找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。 请注意,如果某个标志不需要值,则其字典值必须为 True 或 None。 例如,--log-http 标志可以作为 {‘log-http’: True} 传递。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – 作业的区域端点的区域 ID。 默认为“us-central1”。

  • on_new_job_callback – 将作业传递给操作员的回调函数,一旦已知。

返回

作业 ID。

返回类型

字符串

static extract_job_id(job)[source]
start_python_dataflow(job_name, variables, dataflow, py_options, project_id, py_interpreter='python3', py_requirements=None, py_system_site_packages=False, append_job_name=True, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

启动 Dataflow 作业。

参数
  • job_name (str) – 作业的名称。

  • variables (dict) – 传递给作业的变量。

  • dataflow (str) – Dataflow 进程的名称。

  • py_options (list[str]) – 其他选项。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为 None 或缺失,则使用 GCP 连接中的默认 project_id。

  • py_interpreter (str) – beam 管道的 Python 版本。 如果为 None,则默认为 python3。 要跟踪 beam 支持的 Python 版本和相关问题,请检查:https://issues.apache.org/jira/browse/BEAM-1251

  • py_requirements (list[str] | None) –

    要安装的其他 Python 包。 如果将值传递给此参数,则会创建一个新的虚拟环境,其中安装了其他包。

    如果您的系统上未安装 apache-beam 包,或者您想使用其他版本,您也可以安装该包。

  • py_system_site_packages (bool) –

    是否在您的虚拟环境中包含 system_site_packages。 有关更多信息,请参阅 virtualenv 文档。

    仅当 py_requirements 参数不为 None 时,此选项才相关。

  • append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。

  • project_id – 可选,要在其中启动作业的 Google Cloud 项目 ID。 如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • on_new_job_id_callback (Callable[[str], None] | None) – 当知道作业 ID 时调用的回调。

  • location (str) – 作业位置。

static build_dataflow_job_name(job_name, append_job_name=True)[source]

构建 Dataflow 作业名称。

is_job_dataflow_running(name, project_id, location=None, variables=None)[源]

检查 Dataflow 中作业是否仍在运行。

参数
  • name (str) – 作业的名称。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • location (str | None) – 作业位置。

返回

如果作业正在运行,则为 True。

返回类型

bool

cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[源]

取消具有指定名称前缀或作业 ID 的作业。

参数 namejob_id 是互斥的。

参数
  • job_name (str | None) – 指定要取消的作业的名称前缀。

  • job_id (str | None) – 指定要取消的作业的作业 ID。

  • location (str) – 作业位置。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[源]

启动 Dataflow SQL 查询。

参数
  • job_name (str) – 要分配给 Cloud Dataflow 作业的唯一名称。

  • query (str) – 要执行的 SQL 查询。

  • options (dict[str, Any]) – 要执行的作业参数。有关更多信息,请查看:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令参考

  • location (str) – Dataflow 作业的位置(例如 europe-west1)

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为 None 或缺失,则使用 GCP 连接中的默认 project_id。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)当知道作业 ID 时调用的回调。

  • on_new_job_callback (Callable[[dict], None] | None) – 当知道作业时调用的回调。

返回

新的作业对象

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[源]

获取具有指定作业 ID 的作业。

参数
返回

作业

返回类型

dict

fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]

获取具有指定作业 ID 的作业指标。

参数
返回

JobMetrics。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

返回类型

dict

fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]

获取具有指定作业 ID 的作业消息。

参数
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • location (str) – 作业位置。

返回

JobMessages 的列表。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

返回类型

list[dict]

fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]

获取具有指定作业 ID 的作业自动缩放事件。

参数
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • location (str) – 作业位置。

返回

AutoscalingEvents 的列表。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

返回类型

list[dict]

wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[源]

等待 Dataflow 作业。

参数
  • job_name (str) – 执行 DataFlow 作业时使用的“jobName”(已模板化)。它最终会设置在管道选项中,因此 options 中任何键为 'jobName' 的条目都将被覆盖。

  • location (str) – 作业运行的位置

  • project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。

  • job_id (str | None) – Dataflow 作业 ID

  • multiple_jobs (bool) – 如果管道创建了多个作业,则监视所有作业

is_job_done(location, project_id, job_id)[源]

检查 Dataflow 作业是否已启动(对于流式作业)或已完成(对于批处理作业)。

参数
  • location (str) – 作业运行的位置

  • project_id (str) – 启动作业的 Google Cloud 项目 ID

  • job_id (str) – Dataflow 作业 ID

create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]

创建一个新的 Dataflow Data Pipelines 实例。

参数

以 JSON 格式返回创建的 Data Pipelines 实例。

get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

检索新的 Dataflow Data Pipelines 实例。

参数
  • pipeline_name (str) – Pipeline 的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。

以 JSON 格式返回创建的 Data Pipelines 实例。

run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

运行 Dataflow Data Pipeline 实例。

参数
  • pipeline_name (str) – Pipeline 的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。

以 JSON 格式返回创建的 Job。

delete_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

删除 Dataflow Data Pipelines 实例。

参数
  • pipeline_name (str) – Pipeline 的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。

  • project_id (str) – 拥有该作业的 GCP 项目的 ID。

  • location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。

以 JSON 格式返回创建的 Job。

static build_parent_name(project_id, location)[source]
class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

用于 Dataflow 服务的异步 Hook 类。

sync_hook_class[source]
async initialize_client(client_class)[source]

初始化给定类的对象。

此方法用于初始化异步客户端。由于用于 Dataflow 服务的类数量众多,因此决定使用从 GoogleBaseHook 类的方法接收的凭据以相同的方式初始化它们。:param client_class: Google Cloud SDK 的类

async get_project_id()[source]
async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取具有指定作业 ID 的作业。

参数
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式

  • location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参阅:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

获取具有指定 Job ID 的作业状态。

参数
  • job_id (str) – 要获取的作业 ID。

  • project_id (str) – 启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式

  • location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参阅:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]

列出作业。

详情请参阅:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.ListJobsRequest

参数
  • jobs_filter (int | None) – 可选。此字段会过滤掉并返回指定作业状态的作业。

  • project_id (str | None) – 可选。启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • location (str | None) – 可选。Dataflow 作业的位置(例如 europe-west1)。

  • page_size (int | None) – 可选。如果有许多作业,则将响应限制为此值。

  • page_token ( str | None) – 可选。将其设置为先前响应的 ‘next_page_token’ 字段,以便在长列表中请求其他结果。

async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]

从 MessagesV1Beta3AsyncClient 返回 ListJobMessagesAsyncPager 对象。

此方法包装了 MessagesV1Beta3AsyncClient 的类似方法。可以迭代 ListJobMessagesAsyncPager 以提取与特定 Job ID 关联的消息。

有关更多详细信息,请参阅 MessagesV1Beta3AsyncClient 方法说明,网址为:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient

参数
  • job_id ( str) – 要获取消息的 Dataflow 作业的 ID。

  • project_id (str | None) – 可选。启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • minimum_importance ( int) – 可选。筛选以仅获取重要性 >= level 的消息。有关更多详细信息,请参阅以下说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance

  • page_size ( int | None) – 可选。如果指定,则确定要返回的最大消息数。如果未指定,服务可能会选择合适的默认值,或者可能会返回任意大量的结果。

  • page_token ( str | None) – 可选。如果提供,则应为先前调用返回的 next_page_token 值。这将导致返回下一页结果。

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 >= start_time 的消息。默认值为作业创建时间(即消息的开始时间)。

  • end_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 < end_time 的消息。默认值为当前时间。

  • location ( str | None) – 可选。包含由 job_id 指定的作业的 [区域端点] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。

async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]

从 MetricsV1Beta3AsyncClient 返回 JobMetrics 对象。

此方法包装了 MetricsV1Beta3AsyncClient 的类似方法。

有关更多详细信息,请参阅 MetricsV1Beta3AsyncClient 方法说明,网址为:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient

参数
  • job_id ( str) – 要获取指标的 Dataflow 作业的 ID。

  • project_id (str | None) – 可选。启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。仅返回自此时间以来已更改的指标数据。默认值是返回有关作业的所有指标的所有信息。

  • location ( str | None) – 可选。包含由 job_id 指定的作业的 [区域端点] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。

此条目是否有帮助?