airflow.providers.google.cloud.hooks.dataflow
¶
此模块包含 Google Dataflow Hook。
模块内容¶
类¶
带有 Dataflow 作业状态的助手类。 |
|
带有 Dataflow 作业类型的助手类。 |
|
Google Dataflow 的 Hook。 |
|
用于 Dataflow 服务的异步 Hook 类。 |
函数¶
构建触发指定函数的回调。 |
属性¶
- airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[源代码]¶
构建触发指定函数的回调。
返回的回调旨在用作
BeamCommandRunner
中的process_line_callback
。- 参数
on_new_job_id_callback (Callable[[str], None] | None) – 当已知作业 ID 时调用的回调
- class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[源代码]¶
带有 Dataflow 作业状态的助手类。
参考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
- class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[源代码]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
Google Dataflow 的 Hook。
Hook 中所有使用 project_id 的方法都必须使用关键字参数而不是位置参数调用。
- start_java_dataflow(job_name, variables, jar, project_id, job_class=None, append_job_name=True, multiple_jobs=False, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]¶
启动 Dataflow java 作业。
- 参数
job_name (str) – 作业的名称。
variables (dict) – 传递给作业的变量。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
jar (str) – 作业的 jar 文件名
job_class (str | None) – 作业的 java 类名。
append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。
multiple_jobs (bool) – 如果要检查 Dataflow 中是否存在多个作业,则为 True。
on_new_job_id_callback (Callable[[str], None] | None) – 当知道作业 ID 时调用的回调。
location (str) – 作业位置。
- start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]¶
使用经典模板启动 Dataflow 作业并等待其完成。
- 参数
job_name (str) – 作业的名称。
variables (dict) –
作业运行时环境选项的映射。如果传递了 environment 参数,它将更新 environment 参数。
参见
有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 模板的参数
dataflow_template (str) – 模板的 GCS 路径。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。
on_new_job_id_callback (Callable[[str], None] | None) – (已弃用) 当知道作业时调用的回调。
on_new_job_callback (Callable[[dict], None] | None) – 当知道作业时调用的回调。
location (str) –
作业位置。
参见
有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[源代码]¶
使用经典模板启动 Dataflow 作业,并在不等待其完成的情况下退出。
- 参数
job_name (str) – 作业的名称。
variables (dict) –
作业运行时环境选项的映射。如果传递了 environment 参数,它将更新 environment 参数。
参见
有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 模板的参数
dataflow_template (str) – 模板的 GCS 路径。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。
location (str) –
作业位置。
参见
有关可能配置的更多信息,请查看 API 文档 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- 返回
Dataflow 作业响应
- 返回类型
dict[str, str]
- send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[源代码]¶
- start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[source]¶
使用 Flex 模板启动 Dataflow 作业并等待其完成。
- 参数
body (dict) – 请求正文。 请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 作业的位置(例如 europe-west1)
project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已弃用) 在检测到作业 ID 时调用的回调。
on_new_job_callback (Callable[[dict], None] | None) – 在检测到作业时调用的回调。
- 返回
作业
- 返回类型
dict[str, str]
- launch_job_with_flex_template(body, location, project_id)[source]¶
使用 Flex 模板启动 Dataflow 作业,并在不等待作业完成的情况下退出。
- 参数
body (dict) – 请求正文。 请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 作业的位置(例如 europe-west1)
project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。
- 返回
Dataflow 作业响应
- 返回类型
dict[str, str]
- launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
启动 Dataflow YAML 作业并运行至完成。
- 参数
job_name (str) – 要分配给 Cloud Dataflow 作业的唯一名称。
yaml_pipeline_file (str) – 定义要运行的 YAML 管道的文件的路径。 必须是本地文件或以“gs://”开头的 URL。
append_job_name (bool) – 如果必须将唯一后缀附加到 job_name,则设置为 True。
jinja_variables (dict[str, str] | None) – 用于具体化 yaml 管道文件的 Jinja2 变量的字典。
options (dict[str, Any] | None) – 其他 gcloud 或 Beam 作业参数。它必须是一个字典,其中键与 gcloud 中的可选标志名称匹配。 支持的标志列表可以在以下位置找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。 请注意,如果某个标志不需要值,则其字典值必须为 True 或 None。 例如,--log-http 标志可以作为 {‘log-http’: True} 传递。
project_id (str) – 拥有该作业的 GCP 项目的 ID。
location (str) – 作业的区域端点的区域 ID。 默认为“us-central1”。
on_new_job_callback – 将作业传递给操作员的回调函数,一旦已知。
- 返回
作业 ID。
- 返回类型
- start_python_dataflow(job_name, variables, dataflow, py_options, project_id, py_interpreter='python3', py_requirements=None, py_system_site_packages=False, append_job_name=True, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
启动 Dataflow 作业。
- 参数
job_name (str) – 作业的名称。
variables (dict) – 传递给作业的变量。
dataflow (str) – Dataflow 进程的名称。
py_options (list[str]) – 其他选项。
project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。py_interpreter (str) – beam 管道的 Python 版本。 如果为 None,则默认为 python3。 要跟踪 beam 支持的 Python 版本和相关问题,请检查:https://issues.apache.org/jira/browse/BEAM-1251
py_requirements (list[str] | None) –
要安装的其他 Python 包。 如果将值传递给此参数,则会创建一个新的虚拟环境,其中安装了其他包。
如果您的系统上未安装 apache-beam 包,或者您想使用其他版本,您也可以安装该包。
py_system_site_packages (bool) –
是否在您的虚拟环境中包含 system_site_packages。 有关更多信息,请参阅 virtualenv 文档。
仅当
py_requirements
参数不为 None 时,此选项才相关。append_job_name (bool) – 如果必须将唯一的后缀附加到作业名称,则为 True。
project_id – 可选,要在其中启动作业的 Google Cloud 项目 ID。 如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
on_new_job_id_callback (Callable[[str], None] | None) – 当知道作业 ID 时调用的回调。
location (str) – 作业位置。
- cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[源]¶
取消具有指定名称前缀或作业 ID 的作业。
参数
name
和job_id
是互斥的。
- start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[源]¶
启动 Dataflow SQL 查询。
- 参数
job_name (str) – 要分配给 Cloud Dataflow 作业的唯一名称。
query (str) – 要执行的 SQL 查询。
options (dict[str, Any]) – 要执行的作业参数。有关更多信息,请查看:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令参考
location (str) – Dataflow 作业的位置(例如 europe-west1)
project_id (str) – 拥有该作业的 GCP 项目的 ID。 如果设置为
None
或缺失,则使用 GCP 连接中的默认 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已弃用)当知道作业 ID 时调用的回调。
on_new_job_callback (Callable[[dict], None] | None) – 当知道作业时调用的回调。
- 返回
新的作业对象
- get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[源]¶
获取具有指定作业 ID 的作业。
- 参数
job_id (str) – 要获取的作业 ID。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
location (str) – Dataflow 作业的位置(例如 europe-west1)。请参阅:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 返回
作业
- 返回类型
- fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]¶
获取具有指定作业 ID 的作业指标。
- 参数
job_id (str) – 要获取的作业 ID。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
location (str) – Dataflow 作业的位置(例如 europe-west1)。请参阅:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 返回
JobMetrics。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics
- 返回类型
- fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]¶
获取具有指定作业 ID 的作业消息。
- 参数
job_id (str) – 要获取的作业 ID。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
location (str) – 作业位置。
- 返回
JobMessages 的列表。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
- 返回类型
- fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]¶
获取具有指定作业 ID 的作业自动缩放事件。
- 参数
job_id (str) – 要获取的作业 ID。
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
location (str) – 作业位置。
- 返回
AutoscalingEvents 的列表。请参阅:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
- 返回类型
- wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[源]¶
等待 Dataflow 作业。
- 参数
job_name (str) – 执行 DataFlow 作业时使用的“jobName”(已模板化)。它最终会设置在管道选项中,因此
options
中任何键为'jobName'
的条目都将被覆盖。location (str) – 作业运行的位置
project_id (str) – 可选,用于启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺少,则使用 Google Cloud 连接中的默认 project_id。
job_id (str | None) – Dataflow 作业 ID
multiple_jobs (bool) – 如果管道创建了多个作业,则监视所有作业
- create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[源]¶
创建一个新的 Dataflow Data Pipelines 实例。
- 参数
body (dict) – 请求体(包含 Pipeline 实例)。请参阅:https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body
project_id (str) – 拥有该作业的 GCP 项目的 ID。
location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。
以 JSON 格式返回创建的 Data Pipelines 实例。
- get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
检索新的 Dataflow Data Pipelines 实例。
- 参数
pipeline_name (str) – Pipeline 的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。
project_id (str) – 拥有该作业的 GCP 项目的 ID。
location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。
以 JSON 格式返回创建的 Data Pipelines 实例。
- run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
运行 Dataflow Data Pipeline 实例。
- 参数
pipeline_name (str) – Pipeline 的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。
project_id (str) – 拥有该作业的 GCP 项目的 ID。
location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。
以 JSON 格式返回创建的 Job。
- delete_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
删除 Dataflow Data Pipelines 实例。
- 参数
pipeline_name (str) – Pipeline 的显示名称。例如,在 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它将是 PIPELINE_ID。
project_id (str) – 拥有该作业的 GCP 项目的 ID。
location (str) – Data Pipelines 实例定向到的位置(例如 us-central1)。
以 JSON 格式返回创建的 Job。
- class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
用于 Dataflow 服务的异步 Hook 类。
- async initialize_client(client_class)[source]¶
初始化给定类的对象。
此方法用于初始化异步客户端。由于用于 Dataflow 服务的类数量众多,因此决定使用从 GoogleBaseHook 类的方法接收的凭据以相同的方式初始化它们。:param client_class: Google Cloud SDK 的类
- async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取具有指定作业 ID 的作业。
- 参数
job_id (str) – 要获取的作业 ID。
project_id (str) – 启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式
location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参阅:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
获取具有指定 Job ID 的作业状态。
- 参数
job_id (str) – 要获取的作业 ID。
project_id (str) – 启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
job_view (int) – 可选。JobView 对象,用于确定返回数据的表示形式
location (str) – 可选。Dataflow 作业的位置(例如 europe-west1)。请参阅:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]¶
列出作业。
- 参数
jobs_filter (int | None) – 可选。此字段会过滤掉并返回指定作业状态的作业。
project_id (str | None) – 可选。启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
location (str | None) – 可选。Dataflow 作业的位置(例如 europe-west1)。
page_size (int | None) – 可选。如果有许多作业,则将响应限制为此值。
page_token ( str | None) – 可选。将其设置为先前响应的 ‘next_page_token’ 字段,以便在长列表中请求其他结果。
- async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]¶
从 MessagesV1Beta3AsyncClient 返回 ListJobMessagesAsyncPager 对象。
此方法包装了 MessagesV1Beta3AsyncClient 的类似方法。可以迭代 ListJobMessagesAsyncPager 以提取与特定 Job ID 关联的消息。
有关更多详细信息,请参阅 MessagesV1Beta3AsyncClient 方法说明,网址为:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient
- 参数
job_id ( str) – 要获取消息的 Dataflow 作业的 ID。
project_id (str | None) – 可选。启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
minimum_importance ( int) – 可选。筛选以仅获取重要性 >= level 的消息。有关更多详细信息,请参阅以下说明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance
page_size ( int | None) – 可选。如果指定,则确定要返回的最大消息数。如果未指定,服务可能会选择合适的默认值,或者可能会返回任意大量的结果。
page_token ( str | None) – 可选。如果提供,则应为先前调用返回的 next_page_token 值。这将导致返回下一页结果。
start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 >= start_time 的消息。默认值为作业创建时间(即消息的开始时间)。
end_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。如果指定,则仅返回时间戳 < end_time 的消息。默认值为当前时间。
location ( str | None) – 可选。包含由 job_id 指定的作业的 [区域端点] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。
- async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[源代码]¶
从 MetricsV1Beta3AsyncClient 返回 JobMetrics 对象。
此方法包装了 MetricsV1Beta3AsyncClient 的类似方法。
有关更多详细信息,请参阅 MetricsV1Beta3AsyncClient 方法说明,网址为:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient
- 参数
job_id ( str) – 要获取指标的 Dataflow 作业的 ID。
project_id (str | None) – 可选。启动作业的 Google Cloud 项目 ID。如果设置为 None 或缺失,则使用 Google Cloud 连接中的默认 project_id。
start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可选。仅返回自此时间以来已更改的指标数据。默认值是返回有关作业的所有指标的所有信息。
location ( str | None) – 可选。包含由 job_id 指定的作业的 [区域端点] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。