Google Cloud Transfer Service 运营商¶
先决条件任务¶
要使用这些运营商,您必须执行以下几项操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用计费,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请访问 安装。
CloudDataTransferServiceCreateJobOperator¶
创建传输作业。
该函数接受两种格式的日期
与 Google API 一致
{ "year": 2019, "month": 2, "day": 11 }
作为
datetime
对象
该函数接受两种格式的时间
与 Google API 一致
{ "hours": 12, "minutes": 30, "seconds": 0 }
作为
time
对象
如果您想创建一个从 AWS S3 复制数据的作业传输,则必须配置一个连接。有关 AWS 配置的信息,请参阅:Amazon Web Services 连接可以通过参数 aws_conn_id
指示所选的 AWS 连接。
有关参数定义,请参阅 CloudDataTransferServiceCreateJobOperator
。
使用该操作符¶
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
模板化¶
template_fields: Sequence[str] = (
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceDeleteJobOperator¶
删除作业传输。
有关参数定义,请参阅 CloudDataTransferServiceDeleteJobOperator
。
使用该操作符¶
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceRunJobOperator¶
运行作业传输。
有关参数定义,请参阅 CloudDataTransferServiceRunJobOperator
。
使用运算符¶
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceUpdateJobOperator¶
更新传输作业。
有关参数定义,请参阅 CloudDataTransferServiceUpdateJobOperator
。
使用运算符¶
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceCancelOperationOperator¶
获取传输操作。结果返回到 XCOM。
有关参数定义,请参阅 CloudDataTransferServiceCancelOperationOperator
。
使用运算符¶
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceGetOperationOperator¶
获取传输操作。结果返回到 XCOM。
有关参数定义,请参阅 CloudDataTransferServiceGetOperationOperator
。
使用运算符¶
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceListOperationsOperator¶
列出传输操作。结果返回到 XCOM。
有关参数定义,请查看 CloudDataTransferServiceListOperationsOperator
。
使用运算符¶
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
模板化¶
template_fields: Sequence[str] = (
"request_filter",
"gcp_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServicePauseOperationOperator¶
暂停传输操作。
有关参数定义,请查看 CloudDataTransferServicePauseOperationOperator
。
使用运算符¶
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceResumeOperationOperator¶
恢复传输操作。
有关参数定义,请查看 CloudDataTransferServiceResumeOperationOperator
。
使用操作符¶
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceJobStatusSensor¶
等待属于该作业的至少一项操作具有预期的状态。
有关参数定义,请查看 CloudDataTransferServiceJobStatusSensor
。
使用操作符¶
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
将数据从一个 GCS 存储分区复制到另一个存储分区。
有关参数定义,请查看 CloudDataTransferServiceGCSToGCSOperator
。
使用操作符¶
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
task_id="transfer_gcs_to_gcs",
source_bucket=BUCKET_NAME_SRC,
source_path=FILE_URI,
destination_bucket=BUCKET_NAME_DST,
destination_path=FILE_URI,
wait=True,
)
模板化¶
template_fields: Sequence[str] = (
"gcp_conn_id",
"source_bucket",
"destination_bucket",
"source_path",
"destination_path",
"description",
"object_conditions",
"google_impersonation_chain",
)