Google 云端传输服务操作符¶
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
CloudDataTransferServiceCreateJobOperator¶
创建传输作业。
该函数接受两种格式的日期
与 Google API 一致
{ "year": 2019, "month": 2, "day": 11 }
作为
datetime
对象
该函数接受两种格式的时间
与 Google API 一致
{ "hours": 12, "minutes": 30, "seconds": 0 }
作为
time
对象
如果要创建从 AWS S3 复制数据的作业传输,则必须配置连接。有关 AWS 配置的信息,请参见:Amazon Web Services 连接 AWS 的选定连接可以通过参数 aws_conn_id
指示。
有关参数定义,请查看 CloudDataTransferServiceCreateJobOperator
。
使用操作符¶
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
模板化¶
template_fields: Sequence[str] = (
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
更多信息¶
CloudDataTransferServiceDeleteJobOperator¶
删除传输作业。
有关参数定义,请查看 CloudDataTransferServiceDeleteJobOperator
。
使用操作符¶
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceRunJobOperator¶
运行传输作业。
有关参数定义,请查看 CloudDataTransferServiceRunJobOperator
。
使用操作符¶
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceUpdateJobOperator¶
更新传输作业。
有关参数定义,请查看 CloudDataTransferServiceUpdateJobOperator
。
使用操作符¶
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceCancelOperationOperator¶
获取传输操作。结果将返回到 XCOM。
有关参数定义,请查看 CloudDataTransferServiceCancelOperationOperator
。
使用操作符¶
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceGetOperationOperator¶
获取传输操作。结果将返回到 XCOM。
有关参数定义,请查看 CloudDataTransferServiceGetOperationOperator
。
使用操作符¶
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceListOperationsOperator¶
列出传输操作。结果将返回到 XCOM。
有关参数定义,请查看 CloudDataTransferServiceListOperationsOperator
。
使用操作符¶
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
模板化¶
template_fields: Sequence[str] = (
"request_filter",
"gcp_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServicePauseOperationOperator¶
暂停传输操作。
有关参数定义,请查看 CloudDataTransferServicePauseOperationOperator
。
使用操作符¶
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceResumeOperationOperator¶
恢复传输操作。
有关参数定义,请查看 CloudDataTransferServiceResumeOperationOperator
。
使用操作符¶
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceJobStatusSensor¶
等待属于该作业的至少一个操作达到预期状态。
有关参数定义,请查看 CloudDataTransferServiceJobStatusSensor
。
使用操作符¶
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
将数据从一个 GCS 存储桶复制到另一个 GCS 存储桶。
有关参数定义,请查看 CloudDataTransferServiceGCSToGCSOperator
。
使用该操作符¶
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
task_id="transfer_gcs_to_gcs",
source_bucket=BUCKET_NAME_SRC,
source_path=FILE_URI,
destination_bucket=BUCKET_NAME_DST,
destination_path=FILE_URI,
wait=True,
)
模板化¶
template_fields: Sequence[str] = (
"gcp_conn_id",
"source_bucket",
"destination_bucket",
"source_path",
"destination_path",
"description",
"object_conditions",
"google_impersonation_chain",
)