定义操作符额外链接¶
如果想向操作符添加更多链接,可以通过插件或提供程序包来定义它们。额外链接将显示在网格视图中的任务详情页面中。
以下代码展示了如何通过插件向操作符添加额外链接
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
class GoogleLink(BaseOperatorLink):
name = "Google"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (GoogleLink(),)
def __init__(self, **kwargs):
super().__init__(**kwargs)
def execute(self, context):
self.log.info("Hello World!")
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
GoogleLink(),
]
注意
操作符额外链接应通过 Airflow 插件或自定义 Airflow 提供程序注册才能工作。
您还可以添加一个全局操作符额外链接,该链接将通过 Airflow 插件或 Airflow 提供程序对所有操作符可用。您可以在插件接口和提供程序包中了解更多信息。
您可以在额外链接中查看通过社区管理的提供程序可用的所有额外链接。
向现有操作符添加或覆盖链接¶
您还可以通过 Airflow 插件或自定义提供程序向现有操作符添加(或覆盖)额外链接。
例如,以下 Airflow 插件将在所有使用GCSToS3Operator
操作符的任务上添加操作符链接。
向现有操作符添加操作符链接 plugins/extra_link.py
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
class S3LogLink(BaseOperatorLink):
name = "S3"
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
operators = [GCSToS3Operator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
# Invalid bucket name because upper case letters and underscores are used
# This will not be a valid bucket in any region
bucket_name = "Invalid_Bucket_Name"
return "https://s3.amazonaws.com/airflow-logs/{bucket_name}/{dag_id}/{task_id}/{run_id}".format(
bucket_name=bucket_name,
dag_id=operator.dag_id,
task_id=operator.task_id,
run_id=ti_key.run_id,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
S3LogLink(),
]
覆盖现有操作符的操作符链接:
还可以通过插件替换操作符上的内置链接。例如,BigQueryExecuteQueryOperator
包含一个指向 Google Cloud Console 的链接,但如果我们要更改该链接,我们可以这样做
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = "BigQuery Console"
operators = [BigQueryOperator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
job_id = XCom.get_one(ti_key=ti_key, key="job_id")
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
BigQueryConsoleLink(),
]
通过提供程序添加操作符链接
如提供程序包中所述,当您创建自己的 Airflow 提供程序时,您可以指定提供额外链接功能的操作符列表。这通过在提供程序的包元数据中存储的 provider-info
信息中包含操作符类名称来实现。
您的 provider-info 字典中所需的示例元数据(这是 apache-airflow-providers-google
提供程序当前返回的元数据的一部分)
extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
您可以包含任意数量带有额外链接的操作符。