Google Cloud SQL 操作符

先决任务

要使用这些操作符,您必须执行以下操作

CloudSQLCreateInstanceDatabaseOperator

在 Cloud SQL 实例中创建新数据库。

有关参数定义,请参阅 CloudSQLCreateInstanceDatabaseOperator

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
    body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)

示例请求正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何在实例内创建新数据库

CloudSQLDeleteInstanceDatabaseOperator

从 Cloud SQL 实例中删除数据库。

有关参数定义,请参阅 CloudSQLDeleteInstanceDatabaseOperator

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,了解 如何删除数据库

CloudSQLPatchInstanceDatabaseOperator

使用补丁语义更新包含 Cloud SQL 实例中数据库相关信息的资源。请参阅:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

有关参数定义,请参阅 CloudSQLPatchInstanceDatabaseOperator

使用运算符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_patch_task",
)

示例请求正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,以更新数据库

CloudSQLDeleteInstanceOperator

在 Google Cloud 中删除 Cloud SQL 实例。

它还用于删除读取和故障转移副本。

有关参数定义,请参阅 CloudSQLDeleteInstanceOperator

使用运算符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
    instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)

注意:如果实例有读取或故障转移副本,则需要在删除主实例之前删除它们。副本的删除方式与主实例相同

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_failover_replica_delete_task = CloudSQLDeleteInstanceOperator(
    instance=FAILOVER_REPLICA_NAME,
    task_id="sql_instance_failover_replica_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

sql_instance_read_replica_delete_task = CloudSQLDeleteInstanceOperator(
    instance=READ_REPLICA_NAME,
    task_id="sql_instance_read_replica_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档,以删除 SQL 实例

CloudSQLExportInstanceOperator

将数据从 Cloud SQL 实例导出到 Cloud Storage 存储分区,格式为 SQL 转储或 CSV 文件。

注意

此运算符是幂等的。如果使用相同的导出文件 URI 多次执行,GCS 中的导出文件将被简单地覆盖。

有关参数定义,请参阅 CloudSQLExportInstanceOperator

参数

定义导出操作的示例正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

export_body = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}
export_body_deferrable = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI_DEFERRABLE,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}

使用运算符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_export_task = CloudSQLExportInstanceOperator(
    body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)

此外,对于所有这些操作,您可以在可延迟模式下使用运算符

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_export_def_task = CloudSQLExportInstanceOperator(
    body=export_body_deferrable,
    instance=INSTANCE_NAME,
    task_id="sql_export_def_task",
    deferrable=True,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档以导出数据

故障排除

如果您在 Google Cloud 中收到“未授权”错误,请确保 Cloud SQL 实例的服务帐号有权写入所选的 GCS 存储分区。

与 GCS 通信的不是在 Airflow 中配置的服务帐号,而是特定 Cloud SQL 实例的服务帐号。

要授予服务帐号对 GCS 存储分区具有适当的 WRITE 权限,您可以使用GCSBucketCreateAclEntryOperator,如示例中所示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="WRITER",
    bucket=file_url_split[1],  # netloc (bucket)
    task_id="sql_gcp_add_bucket_permission_task",
)

CloudSQLImportInstanceOperator

从 Cloud Storage 中的 SQL 转储或 CSV 文件将数据导入 Cloud SQL 实例。

CSV 导入:

对于 CSV 导入,此操作符不是幂等的。如果同一文件被导入多次,导入的数据将在数据库中重复。此外,如果存在任何唯一约束,重复导入可能会导致错误。

SQL 导入:

如果 SQL 导入也是由 Cloud SQL 导出的,则此操作符对于 SQL 导入是幂等的。导出的 SQL 包含要导入的所有表的“DROP TABLE IF EXISTS”语句。

如果导入文件是以不同的方式生成的,则不保证幂等性。必须在 SQL 文件级别确保这一点。

有关参数定义,请参阅CloudSQLImportInstanceOperator

参数

定义导入操作的示例正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}

使用操作符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_import_task = CloudSQLImportInstanceOperator(
    body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档以导入数据

故障排除

如果您在 Google Cloud 中收到“未授权”错误,请确保 Cloud SQL 实例的服务帐号被授权读取选定的 GCS 对象。

与 GCS 通信的不是在 Airflow 中配置的服务帐号,而是特定 Cloud SQL 实例的服务帐号。

要授予服务帐号对 GCS 对象的相应 READ 权限,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="READER",
    bucket=file_url_split[1],  # netloc (bucket)
    object_name=file_url_split[2][1:],  # path (strip first '/')
    task_id="sql_gcp_add_object_permission_task",
)

CloudSQLCreateInstanceOperator

在 Google Cloud 中创建新的 Cloud SQL 实例。

它还用于创建只读副本。

有关参数定义,请参阅 CloudSQLCreateInstanceOperator

如果存在同名实例,则不采取任何操作,并且操作员将成功。

参数

定义具有故障转移副本的实例的示例正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {"zone": "europe-west4-a"},
        "maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
        "pricingPlan": "PER_USE",
        "replicationType": "ASYNCHRONOUS",
        "storageAutoResize": True,
        "storageAutoResizeLimit": 0,
        "userLabels": {"my-key": "my-value"},
    },
    "failoverReplica": {"name": FAILOVER_REPLICA_NAME},
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

定义上述实例的只读副本的示例正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

read_replica_body = {
    "name": READ_REPLICA_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
    "masterInstanceName": INSTANCE_NAME,
}

注意:故障转移副本与实例一起在单个任务中创建。只读副本需要在单独的任务中创建。

使用操作员

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_create_task = CloudSQLCreateInstanceOperator(
    body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud SQL API 文档以 创建实例

CloudSQLInstancePatchOperator

在 Google Cloud 中更新 Cloud SQL 实例的设置(部分更新)。

有关参数定义,请参阅 CloudSQLInstancePatchOperator

这是一个部分更新,因此只有正文中指定的设置的值将被设置/更新。其余现有实例的配置将保持不变。

参数

定义实例的示例正文

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
        "userLabels": {"my-key-patch": "my-value-patch"},
    },
}

使用运算符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_patch_task = CloudSQLInstancePatchOperator(
    body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多信息

查看 Google Cloud SQL API 文档以修补实例

CloudSQLCloneInstanceOperator

克隆 Cloud SQL 实例。

有关参数定义,请查看CloudSQLCloneInstanceOperator

参数

有关clone_context对象属性,请参阅CloneContext

使用运算符

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,则会从所用的 Google Cloud 连接中检索它。两种变体都已显示

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py[源代码]

sql_instance_clone = CloudSQLCloneInstanceOperator(
    instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "destination_instance_name",
    "gcp_conn_id",
    "api_version",
)

更多信息

查看 Google Cloud SQL API 文档以克隆实例

CloudSQLExecuteQueryOperator

在 Google Cloud SQL 实例中执行 DDL 或 DML SQL 查询。不支持 DQL(从 Google Cloud SQL 检索数据)。你可以运行 SELECT 查询,但这些查询的结果将被丢弃。

你可以指定各种连接方法来连接到正在运行的实例,从通过公共 IP 的纯连接开始,通过公共 IP 使用 SSL,或通过 Cloud SQL Proxy 通过 TCP 和套接字连接。该代理会根据需要由运算符动态下载并启动/停止。

有一个gcpcloudsql://*连接类型,你应该使用它来定义希望运算符使用哪种连接方式。该连接是一种“元”类型的连接。它不用于自己建立实际连接,但它决定 Cloud SQL Proxy 是否应由CloudSQLDatabaseHook启动,以及应该通过公共 IP 地址还是通过代理动态创建哪种数据库连接(Postgres 或 MySQL)。CloudSqlDatabaseHook使用CloudSqlProxyRunner来管理 Cloud SQL Proxy 生命周期(每个任务都有自己的 Cloud SQL Proxy)

构建连接时,应使用 CloudSQLDatabaseHook 中所述的连接参数。您可以在下面看到所有可能连接类型的连接示例。此类连接可以在不同的任务(CloudSqlQueryOperator 实例)之间重复使用。如果需要,每个任务都将启动自己的代理,并使用自己的 TCP 或 UNIX 套接字。

有关参数定义,请参阅 CloudSQLExecuteQueryOperator

由于查询运算符可以运行任意查询,因此无法保证其幂等性。SQL 查询设计人员应设计幂等的查询。例如,Postgres 和 MySQL 都支持 CREATE TABLE IF NOT EXISTS 语句,该语句可用于以幂等方式创建表。

参数

如果您通过 AIRFLOW_CONN_{CONN_ID} 在环境变量中定义的 URL 定义连接,请确保 URL 中的 URL 组件已进行 URL 编码。有关详细信息,请参见以下示例。

请注意,在 SSL 连接的情况下,您需要一种机制,以便在运算符可以运行的所有工作进程的预定义位置中提供证书/密钥文件。例如,可以通过在所有工作进程的相同路径中挂载类 NFS 卷来提供此机制。

所有非 SSL 连接的示例连接定义。请注意,连接 URI 的所有组件都应进行 URL 编码

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query.py[源代码]

# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_use_tcp": "True",
    },
}

# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_version": "v1.33.9",
        "sql_proxy_use_tcp": "False",
    },
}

# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "False",
    },
}

所有启用 SSL 的连接的类似连接定义

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]

# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "True",
    },
}

还可以通过环境变量配置连接(请注意,如果使用标准 AIRFLOW 表示法通过环境变量定义连接,则运算符中的连接 ID 与 AIRFLOW_CONN_{CONN_ID} 后缀大写字母相匹配)

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query.py[源代码]


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
)

# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=True".format(**mysql_kwargs)
)

# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

使用运算符

下面的示例运算符使用前面准备好的连接。它可能是来自 Airflow 数据库的 connection_id,或者通过环境变量配置的连接(请注意,如果使用标准 AIRFLOW 表示法通过环境变量定义连接,则运算符中的连接 ID 与 AIRFLOW_CONN_{CONN_ID} 后缀大写字母相匹配)

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query.py[源代码]

                query_task = CloudSQLExecuteQueryOperator(
                    gcp_cloudsql_conn_id=connection_id,
                    task_id=task_id,
                    sql=SQL,
                )

还可以指定运算符级别的 SSL 设置。在这种情况下,将在连接中配置的 SSL 设置覆盖。执行此操作的一种方法是指定每个证书文件的路径,如下所示。请注意,出于安全原因,这些文件将复制到具有最小所需权限的临时位置。

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]

        query_task = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_client_cert=ssl_cert_path,
            ssl_server_cert=ssl_server_cert_path,
            ssl_client_key=ssl_key_path,
        )

还可以将 SSL 证书保存在 Google Cloud Secret Manager 中并提供一个密钥 ID。密钥格式为:.. code-block:: python

{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[源代码]

        query_task_secret = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_secret_id=secret_id,
        )

模板化

template_fields: Sequence[str] = (
    "sql",
    "gcp_cloudsql_conn_id",
    "gcp_conn_id",
    "ssl_server_cert",
    "ssl_client_cert",
    "ssl_client_key",
    "ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}

更多信息

请参阅 Google Cloud SQL 文档,了解与 MySQLPostgreSQL 相关的代理。

此条目是否有用?