任务 Docker 装饰器

使用 @task.docker 装饰器包裹的 Python 可调用对象及其参数将在 Docker 容器内执行。

参数

Docker 任务装饰器支持以下参数。

multiple_outputs

如果设置,函数返回值将被展开为多个 XCom 值。字典将展开为以键作为 XCom 键的 XCom 值。默认为 False。

use_dill

是否使用 dill 或 pickle 进行序列化

python_command

执行函数的 Python 命令,默认为 python3

image

从中创建容器的 Docker 镜像。如果省略镜像标签,将使用 “latest”。

api_version

远程 API 版本。设置为 auto 以自动检测服务器的版本。

container_name

容器的名称。可选(可使用模板)

cpus

分配给容器的 CPU 数量。此值将乘以 1024。

docker_url

运行 docker 守护程序的宿主的 URL。默认值为 DOCKER_HOST 环境变量的值,如果未设置则为 unix://var/run/docker.sock。

environment

要在容器中设置的环境变量。(可使用模板)

private_environment

要在容器中设置的私有环境变量。这些变量不可使用模板,并且在网站上隐藏。

env_file

包含要在容器中设置的环境变量的 .env 文件的相对路径。将被 environment 参数中的变量覆盖。

force_pull

每次运行时拉取 Docker 镜像。默认为 False。

mem_limit

容器可以使用的最大内存量。可以是浮点值,表示以字节为单位的限制,也可以是像 128m1g 这样的字符串。

host_tmp_dir

指定主机上临时目录的位置,该目录将被映射到 tmp_dir。如果未提供,则默认为使用标准系统临时目录。

network_mode

容器的网络模式。它可以是以下之一:

  • "bridge":为容器创建新的网络堆栈,使用默认的 Docker 桥接网络

  • "none":此容器没有网络

  • "container:<name>""container:<id>":使用通过 <name> 或 <id> 指定的另一个容器的网络堆栈

  • "host":使用主机网络堆栈。与 **port_bindings** 不兼容

  • "<network-name>""<network-id>":将容器连接到用户创建的网络(使用 docker network create 命令)

tls_ca_cert

用于保护 Docker 连接的 PEM 编码的证书颁发机构的路径。

tls_client_cert

用于验证 Docker 客户端的 PEM 编码的证书路径。

tls_client_key

用于验证 Docker 客户端的 PEM 编码的密钥路径。

tls_verify

设置为 True 以验证提供的证书的有效性。

tls_hostname

与 Docker 服务器证书匹配的主机名,或 False 以禁用检查。

tls_ssl_version

与 Docker 守护程序通信时使用的 SSL 版本。

mount_tmp_dir

指定是否应将临时目录从主机绑定挂载到容器。

tmp_dir

容器内由操作员在主机上创建的临时目录的挂载点。该路径也可通过容器内的环境变量 AIRFLOW_TMP_DIR 获得。

user

Docker 容器内的默认用户。

mounts

要挂载到容器中的挂载列表,例如 ['/host/path:/container/path', '/host/path2:/container/path2:ro']

working_dir

要在容器上设置的工作目录(等效于 Docker 客户端的 -w 开关)

entrypoint

覆盖镜像的默认 ENTRYPOINT

xcom_all

推送所有标准输出,还是仅推送最后一行。默认为 False(最后一行)。

docker_conn_id

要使用的 Airflow 连接的 ID

dns

Docker 自定义 DNS 服务器

dns_search

Docker 自定义 DNS 搜索域

auto_remove

启用在容器的进程退出时删除容器。可能的值:

  • never:(默认)不删除容器

  • success:成功时删除

  • force:始终删除容器

shm_size

/dev/shm 的大小,以字节为单位。大小必须大于 0。如果省略,则使用系统默认值。

tty

为容器分配伪 TTY。需要设置此项才能查看 Docker 容器的日志。

hostname

容器的可选主机名。

privileged

为此容器提供扩展权限。

cap_add

包含容器功能

extra_hosts

要在容器内部解析的其他主机名,作为主机名到 IP 地址的映射。

timeout

API 调用的默认超时时间,以秒为单位。

device_requests

将主机资源(例如 GPU)暴露给容器。

log_opts_max_size

日志滚动之前的最大大小。正整数加上表示计量单位的修饰符(k、m 或 g)。例如:10m 或 1g。默认为 -1(无限制)。

log_opts_max_file

可以存在的最大日志文件数。如果滚动日志创建了多余的文件,则会删除最旧的文件。仅在还设置了 max-size 时有效。正整数。默认为 1。

ipc_mode

设置容器的 IPC 模式。

skip_on_exit_code

如果任务以此退出代码退出,则将任务保留在 skipped 状态(默认:None)。如果设置为 None,任何非零退出代码都将被视为失败。

port_bindings

将容器的端口发布到主机。它是一个值的字典,其中键表示要在容器内打开的端口,值表示绑定到容器端口的主机端口。与 network_mode 中的 "host" 不兼容。

ulimits

要为容器设置的 ulimit 选项列表。每个项都应是一个 docker.types.Ulimit 实例。

使用示例

tests/system/docker/example_taskflow_api_docker_virtualenv.py[源代码]

    @task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

此条目是否有帮助?