任务 Docker 装饰器¶
使用 @task.docker
装饰器包裹的 Python 可调用对象及其参数将在 Docker 容器内执行。
参数¶
Docker 任务装饰器支持以下参数。
- multiple_outputs
如果设置,函数返回值将被展开为多个 XCom 值。字典将展开为以键作为 XCom 键的 XCom 值。默认为 False。
- use_dill
是否使用 dill 或 pickle 进行序列化
- python_command
执行函数的 Python 命令,默认为 python3
- image
从中创建容器的 Docker 镜像。如果省略镜像标签,将使用 “latest”。
- api_version
远程 API 版本。设置为
auto
以自动检测服务器的版本。- container_name
容器的名称。可选(可使用模板)
- cpus
分配给容器的 CPU 数量。此值将乘以 1024。
- docker_url
运行 docker 守护程序的宿主的 URL。默认值为
DOCKER_HOST
环境变量的值,如果未设置则为 unix://var/run/docker.sock。- environment
要在容器中设置的环境变量。(可使用模板)
- private_environment
要在容器中设置的私有环境变量。这些变量不可使用模板,并且在网站上隐藏。
- env_file
包含要在容器中设置的环境变量的
.env
文件的相对路径。将被 environment 参数中的变量覆盖。- force_pull
每次运行时拉取 Docker 镜像。默认为 False。
- mem_limit
容器可以使用的最大内存量。可以是浮点值,表示以字节为单位的限制,也可以是像
128m
或1g
这样的字符串。- host_tmp_dir
指定主机上临时目录的位置,该目录将被映射到 tmp_dir。如果未提供,则默认为使用标准系统临时目录。
- network_mode
容器的网络模式。它可以是以下之一:
"bridge"
:为容器创建新的网络堆栈,使用默认的 Docker 桥接网络"none"
:此容器没有网络"container:<name>"
或"container:<id>"
:使用通过 <name> 或 <id> 指定的另一个容器的网络堆栈"host"
:使用主机网络堆栈。与 **port_bindings** 不兼容"<network-name>"
或"<network-id>"
:将容器连接到用户创建的网络(使用docker network create
命令)
- tls_ca_cert
用于保护 Docker 连接的 PEM 编码的证书颁发机构的路径。
- tls_client_cert
用于验证 Docker 客户端的 PEM 编码的证书路径。
- tls_client_key
用于验证 Docker 客户端的 PEM 编码的密钥路径。
- tls_verify
设置为
True
以验证提供的证书的有效性。- tls_hostname
与 Docker 服务器证书匹配的主机名,或 False 以禁用检查。
- tls_ssl_version
与 Docker 守护程序通信时使用的 SSL 版本。
- mount_tmp_dir
指定是否应将临时目录从主机绑定挂载到容器。
- tmp_dir
容器内由操作员在主机上创建的临时目录的挂载点。该路径也可通过容器内的环境变量
AIRFLOW_TMP_DIR
获得。- user
Docker 容器内的默认用户。
- mounts
要挂载到容器中的挂载列表,例如
['/host/path:/container/path', '/host/path2:/container/path2:ro']
。- working_dir
要在容器上设置的工作目录(等效于 Docker 客户端的 -w 开关)
- entrypoint
覆盖镜像的默认 ENTRYPOINT
- xcom_all
推送所有标准输出,还是仅推送最后一行。默认为 False(最后一行)。
- docker_conn_id
要使用的 Airflow 连接的 ID
- dns
Docker 自定义 DNS 服务器
- dns_search
Docker 自定义 DNS 搜索域
- auto_remove
启用在容器的进程退出时删除容器。可能的值:
never
:(默认)不删除容器success
:成功时删除force
:始终删除容器
- shm_size
/dev/shm
的大小,以字节为单位。大小必须大于 0。如果省略,则使用系统默认值。- tty
为容器分配伪 TTY。需要设置此项才能查看 Docker 容器的日志。
- hostname
容器的可选主机名。
- privileged
为此容器提供扩展权限。
- cap_add
包含容器功能
- extra_hosts
要在容器内部解析的其他主机名,作为主机名到 IP 地址的映射。
- timeout
API 调用的默认超时时间,以秒为单位。
- device_requests
将主机资源(例如 GPU)暴露给容器。
- log_opts_max_size
日志滚动之前的最大大小。正整数加上表示计量单位的修饰符(k、m 或 g)。例如:10m 或 1g。默认为 -1(无限制)。
- log_opts_max_file
可以存在的最大日志文件数。如果滚动日志创建了多余的文件,则会删除最旧的文件。仅在还设置了 max-size 时有效。正整数。默认为 1。
- ipc_mode
设置容器的 IPC 模式。
- skip_on_exit_code
如果任务以此退出代码退出,则将任务保留在
skipped
状态(默认:None)。如果设置为None
,任何非零退出代码都将被视为失败。- port_bindings
将容器的端口发布到主机。它是一个值的字典,其中键表示要在容器内打开的端口,值表示绑定到容器端口的主机端口。与
network_mode
中的"host"
不兼容。- ulimits
要为容器设置的 ulimit 选项列表。每个项都应是一个
docker.types.Ulimit
实例。
使用示例¶
@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}