Google Cloud Composer 运营商¶
Cloud Composer 是一项完全托管的工作流程编排服务,可让您创建、计划、监控和管理跨云和本地数据中心的工作流程。
Cloud Composer 构建在流行的 Apache Airflow 开源项目之上,并使用 Python 编程语言进行操作。
通过使用 Cloud Composer 而不是 Apache Airflow 的本地实例,您可以享受 Airflow 的最佳功能,而无需安装或管理开销。Cloud Composer 帮助您快速创建 Airflow 环境并使用 Airflow 原生工具,例如功能强大的 Airflow Web 界面和命令行工具,以便您可以专注于工作流程,而不是基础设施。
有关该服务的更多信息,请访问 Cloud Composer 产品文档 <产品文档
创建环境¶
在创建 Cloud Composer 环境之前,您需要对其进行定义。有关在创建环境时传递的可用字段的更多信息,请访问 Cloud Composer 创建环境 API。
一个简单的环境配置如下所示
ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT = {
"config": {
"software_config": {"image_version": "composer-2.5.0-airflow-2.5.3"},
}
}
使用此配置,我们可以创建环境: CloudComposerCreateEnvironmentOperator
create_env = CloudComposerCreateEnvironmentOperator(
task_id="create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
environment=ENVIRONMENT,
)
或者,你可以在可延迟模式中定义相同的运算符: CloudComposerCreateEnvironmentOperator
defer_create_env = CloudComposerCreateEnvironmentOperator(
task_id="defer_create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
environment=ENVIRONMENT,
deferrable=True,
)
获取环境¶
要获取环境,可以使用
CloudComposerGetEnvironmentOperator
get_env = CloudComposerGetEnvironmentOperator(
task_id="get_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
列出环境¶
要获取环境,可以使用
CloudComposerListEnvironmentsOperator
list_envs = CloudComposerListEnvironmentsOperator(
task_id="list_envs", project_id=PROJECT_ID, region=REGION
)
更新环境¶
你可以通过提供环境配置和 updateMask 来更新环境。在 updateMask 参数中,你指定相对于 Environment 的字段更新路径。有关 updateMask 和其他参数的更多信息,请参阅 Cloud Composer 更新环境 API。
新服务配置和 updateMask 的示例
UPDATED_ENVIRONMENT = {
"labels": {
"label": "testing",
}
}
UPDATE_MASK = {"paths": ["labels.label"]}
要更新服务,可以使用: CloudComposerUpdateEnvironmentOperator
update_env = CloudComposerUpdateEnvironmentOperator(
task_id="update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
)
或者,你可以在可延迟模式中定义相同的运算符: CloudComposerCreateEnvironmentOperator
defer_update_env = CloudComposerUpdateEnvironmentOperator(
task_id="defer_update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
deferrable=True,
)
删除服务¶
要删除服务,可以使用
CloudComposerDeleteEnvironmentOperator
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
或者,您可以在可延迟模式中定义相同的运算符:CloudComposerDeleteEnvironmentOperator
defer_delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="defer_delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
deferrable=True,
)
Composer 映像列表¶
您还可以列出所有受支持的 Cloud Composer 映像
CloudComposerListImageVersionsOperator
image_versions = CloudComposerListImageVersionsOperator(
task_id="image_versions",
project_id=PROJECT_ID,
region=REGION,
)
运行 Airflow CLI 命令¶
您可以在环境中运行 Airflow CLI 命令,使用:CloudComposerRunAirflowCLICommandOperator
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
command=COMMAND,
)
或者,您可以在可延迟模式中定义相同的运算符
defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="defer_run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
command=COMMAND,
deferrable=True,
)
检查 DAG 运行是否已完成¶
您可以在环境中使用传感器检查 DAG 运行是否已完成,使用:CloudComposerDAGRunSensor
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
)
或者,您可以在可延迟模式中定义相同的传感器
defer_dag_run_sensor = CloudComposerDAGRunSensor(
task_id="defer_dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
deferrable=True,
)