airflow.providers.google.cloud.hooks.gcs
¶
此模块包含一个 Google Cloud Storage hook。
模块内容¶
类¶
使用 Google Cloud 连接与 Google Cloud Storage 交互。 |
|
GCSAsyncHook 在触发器 worker 上运行,继承自 GoogleBaseAsyncHook。 |
函数¶
|
如果给定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是目录或空存储桶,则返回 True。 |
|
从 Google Cloud Storage 下载并解析 json 文件。 |
属性¶
- class airflow.providers.google.cloud.hooks.gcs.GCSHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
使用 Google Cloud 连接与 Google Cloud Storage 交互。
- copy(source_bucket, source_object, destination_bucket=None, destination_object=None)[源代码]¶
将对象从一个存储桶复制到另一个存储桶,如果需要,可以重命名。
可以省略 destination_bucket 或 destination_object,在这种情况下,将使用源存储桶/对象,但不能同时省略两者。
- rewrite(source_bucket, source_object, destination_bucket, destination_object=None)[源代码]¶
类似于 copy;支持超过 5 TB 的文件以及在位置和/或存储类之间复制。
可以省略 destination_object,在这种情况下使用 source_object。
- download(bucket_name: str, object_name: str, filename: None = None, chunk_size: int | None = None, timeout: int | None = DEFAULT_TIMEOUT, num_max_attempts: int | None = 1, user_project: str | None = None) bytes [源]¶
- download(bucket_name: str, object_name: str, filename: str, chunk_size: int | None = None, timeout: int | None = DEFAULT_TIMEOUT, num_max_attempts: int | None = 1, user_project: str | None = None) str
从 Google Cloud Storage 下载文件。
当未提供文件名时,操作符会将文件加载到内存中并返回其内容。当提供文件名时,它会将文件写入到指定位置并返回该位置。对于超出可用内存的文件大小,建议写入文件。
- 参数
bucket_name – 要从中获取的存储桶。
object_name – 要获取的对象。
filename – 如果设置,则为应将文件写入的本地文件路径。
chunk_size – Blob 块大小。
timeout – 请求超时时间(秒)。
num_max_attempts – 下载文件的最大尝试次数。
user_project – 要为请求计费的 Google Cloud 项目的标识符。对于“请求者付费”存储桶是必需的。
- download_as_byte_array(bucket_name, object_name, chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1)[源]¶
从 Google Cloud Storage 下载文件。
当未提供文件名时,操作符会将文件加载到内存中并返回其内容。当提供文件名时,它会将文件写入到指定位置并返回该位置。对于超出可用内存的文件大小,建议写入文件。
- provide_file(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, dir=None, user_project=None)[源]¶
将文件下载到临时目录并返回文件句柄。
您可以通过传递 bucket_name 和 object_name 参数或仅传递 object_url 参数来使用此方法。
- 参数
- 返回
文件句柄
- 返回类型
collections.abc.Generator[IO[bytes], None, None]
- provide_file_and_upload(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, user_project=None)[源]¶
创建临时文件,返回文件句柄,并在关闭时上传文件内容。
您可以通过传递 bucket_name 和 object_name 参数或仅传递 object_url 参数来使用此方法。
- upload(bucket_name, object_name, filename=None, data=None, mime_type=None, gzip=False, encoding='utf-8', chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1, metadata=None, cache_control=None, user_project=None)[source]¶
将本地文件或文件数据(字符串或字节)上传到 Google Cloud Storage。
- 参数
bucket_name (str) – 要上传到的存储桶。
object_name (str) – 上传文件时设置的对象名称。
filename (str | None) – 要上传的本地文件路径。
mime_type (str | None) – 上传文件时设置的文件 MIME 类型。
gzip (bool) – 是否压缩本地文件或文件数据以上传的选项
encoding (str) – 如果文件数据以字符串形式提供,则使用此字节编码。
chunk_size (int | None) – Blob 块大小。
timeout (int | None) – 请求超时时间(秒)。
num_max_attempts (int) – 尝试上传文件的最大次数。
metadata (dict | None) – 要随文件上传的元数据。
cache_control (str | None) – Cache-Control 元数据字段。
user_project (str | None) – 要为请求计费的 Google Cloud 项目的标识符。对于“请求者付费”存储桶是必需的。
- exists(bucket_name, object_name, retry=DEFAULT_RETRY)[source]¶
检查 Google Cloud Storage 中是否存在文件。
- 参数
bucket_name (str) – 对象所在的 Google Cloud Storage 存储桶。
object_name (str) – 要在 Google Cloud Storage 存储桶中检查的 blob_name 的名称。
retry (google.api_core.retry.Retry) – (可选)如何重试 RPC
- is_updated_after(bucket_name, object_name, ts)[source]¶
检查 Google Cloud Storage 中的 blob_name 是否已更新。
- 参数
bucket_name (str) – 对象所在的 Google Cloud Storage 存储桶。
object_name (str) – 要在 Google Cloud Storage 存储桶中检查的对象名称。
ts (datetime.datetime) – 要对照检查的时间戳。
- is_updated_between(bucket_name, object_name, min_ts, max_ts)[source]¶
检查 Google Cloud Storage 中的 blob_name 是否已更新。
- 参数
bucket_name (str) – 对象所在的 Google Cloud Storage 存储桶。
object_name (str) – 要在 Google Cloud Storage 存储桶中检查的对象名称。
min_ts (datetime.datetime) – 要对照检查的最小时间戳。
max_ts (datetime.datetime) – 要对照检查的最大时间戳。
- is_updated_before(bucket_name, object_name, ts)[source]¶
检查 Google Cloud Storage 中的 blob_name 是否在给定时间之前更新。
- 参数
bucket_name (str) – 对象所在的 Google Cloud Storage 存储桶。
object_name (str) – 要在 Google Cloud Storage 存储桶中检查的对象名称。
ts (datetime.datetime) – 要对照检查的时间戳。
- delete_bucket(bucket_name, force=False, user_project=None)[source]¶
从 Google Cloud Storage 中删除一个存储桶对象。
- list(bucket_name, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None, user_project=None)[source]¶
列出具有给定单个前缀或多个前缀的存储桶中的所有对象。
- 参数
bucket_name (str) – 存储桶名称
versions (bool | None) – 如果为 true,则列出对象的所有版本
max_results (int | None) – 在单个响应页面中返回的最大项目数
prefix (str | List[str] | None) – 字符串或字符串列表,用于过滤名称以其开头/它们开头的对象
delimiter (str | None) – (已弃用) 基于分隔符(例如 '.csv')过滤对象
match_glob (str | None) – (可选)基于字符串给定的 glob 模式过滤对象(例如,
'**/*/.json'
)。user_project (str | None) – 要为请求计费的 Google Cloud 项目的标识符。对于“请求者付费”存储桶是必需的。
- 返回
与过滤条件匹配的对象名称流
- list_by_timespan(bucket_name, timespan_start, timespan_end, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None)[source]¶
列出存储桶中给定字符串前缀且在时间范围内更新的所有对象。
- 参数
bucket_name (str) – 存储桶名称
timespan_start (datetime.datetime) – 将返回在此日期时间(UTC)或之后更新的对象
timespan_end (datetime.datetime) – 将返回在此日期时间(UTC)之前更新的对象
versions (bool | None) – 如果为 true,则列出对象的所有版本
max_results (int | None) – 在单个响应页面中返回的最大项目数
prefix (str | None) – 前缀字符串,用于过滤名称以此前缀开头的对象
delimiter (str | None) – (已弃用) 基于分隔符(例如 '.csv')过滤对象
match_glob (str | None) – (可选)基于字符串给定的 glob 模式过滤对象(例如,
'**/*/.json'
)。
- 返回
与过滤条件匹配的对象名称流
- 返回类型
List[str]
- create_bucket(bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None)[source]¶
创建一个新的存储桶。
Google Cloud Storage 使用扁平命名空间,因此您无法创建名称已被使用的存储桶。
另请参阅
有关详细信息,请参阅存储桶命名准则:https://cloud.google.com/storage/docs/bucketnaming.html#requirements
- 参数
bucket_name (str) – 存储桶的名称。
resource (dict | None) – 一个可选的 dict,其中包含用于创建存储桶的参数。有关可用参数的信息,请参阅 Cloud Storage API 文档:https://cloud.google.com/storage/docs/json_api/v1/buckets/insert
storage_class (str) –
这定义了存储桶中对象的存储方式,并确定了 SLA 和存储成本。值包括
MULTI_REGIONAL
REGIONAL
STANDARD
NEARLINE
COLDLINE
.
如果在创建存储桶时未指定此值,则默认值为 STANDARD。
location (str) –
存储桶的位置。存储桶中对象的数据驻留在此区域内的物理存储中。默认为美国。
project_id (str) – Google Cloud 项目的 ID。
labels (dict | None) – 用户提供的标签,以键/值对形式。
- 返回
如果成功,它将返回存储桶的
id
。- 返回类型
- insert_bucket_acl(bucket_name, entity, role, user_project=None)[source]¶
在指定的 bucket_name 上创建一个新的 ACL 条目。
请参阅:https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert
- 参数
bucket_name (str) – 存储桶的名称。
entity (str) – 以以下形式之一持有权限的实体:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers。请参阅:https://cloud.google.com/storage/docs/access-control/lists#scopes
role (str) – 实体的访问权限。可接受的值为:“OWNER”、“READER”、“WRITER”。
user_project (str | None) – (可选)为此请求付费的项目。对于请求者付费存储桶是必需的。
- insert_object_acl(bucket_name, object_name, entity, role, generation=None, user_project=None)[source]¶
在指定的对象上创建新的 ACL 条目。
参见:https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert
- 参数
bucket_name (str) – 存储桶的名称。
object_name (str) – 对象的名称。有关如何对对象名称进行 URL 编码以使其路径安全的信息,请参见:https://cloud.google.com/storage/docs/json_api/#encoding
entity (str) – 持有权限的实体,格式为以下形式之一:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers。参见:https://cloud.google.com/storage/docs/access-control/lists#scopes
role (str) – 实体的访问权限。可接受的值为:“OWNER”、“READER”。
generation (int | None) – 可选。如果存在,则选择此对象的特定修订版本。
user_project (str | None) – (可选)为此请求付费的项目。对于请求者付费存储桶是必需的。
- compose(bucket_name, source_objects, destination_object)[source]¶
将现有对象列表组合为同一存储桶中的新对象。
目前,它仅支持在单个操作中连接最多 32 个对象
https://cloud.google.com/storage/docs/json_api/v1/objects/compose
- 参数
bucket_name (str) – 包含源对象的存储桶的名称。这也是存储组合的目标对象的同一存储桶。
source_objects (List[str]) – 将组合成单个对象的源对象列表。
destination_object (str) – 如果给定,则为对象的路径。
- sync(source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, allow_overwrite=False, delete_extra_files=False)[source]¶
同步存储桶的内容。
参数
source_object
和destination_object
描述根同步目录。如果未传递,则将同步整个存储桶。如果传递,它们应指向目录。注意
不支持单个文件的同步。只能同步整个目录。
- 参数
source_bucket (str) – 包含源对象的存储桶的名称。
destination_bucket (str) – 包含目标对象的存储桶的名称。
source_object (str | None) – 源存储桶中的根同步目录。
destination_object (str | None) – 目标存储桶中的根同步目录。
recursive (bool) – 如果为 True,将考虑子目录
recursive – 如果为 True,将考虑子目录
allow_overwrite (bool) – 如果为 True,如果找到不匹配的文件,则会覆盖文件。默认情况下,不允许覆盖文件
delete_extra_files (bool) –
如果为 True,则删除源中未在目标中找到的额外文件。默认情况下,不会删除额外文件。
注意
如果您指定了错误的源/目标组合,此选项可以快速删除数据。
- 返回
none
- 返回类型
None
- airflow.providers.google.cloud.hooks.gcs.gcs_object_is_directory(bucket)[source]¶
如果给定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是目录或空存储桶,则返回 True。
- airflow.providers.google.cloud.hooks.gcs.parse_json_from_gcs(gcp_conn_id, file_uri, impersonation_chain=None)[source]¶
从 Google Cloud Storage 下载并解析 json 文件。
- 参数
gcp_conn_id (str) – Airflow Google Cloud 连接 ID。
file_uri (str) – json 文件的完整路径,例如:
gs://test-bucket/dir1/dir2/file
- class airflow.providers.google.cloud.hooks.gcs.GCSAsyncHook(**kwargs)[source]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
GCSAsyncHook 在触发器 worker 上运行,继承自 GoogleBaseAsyncHook。