构建一个运行的管道

让我们看看另一个例子:我们需要从在线托管的文件中获取一些数据,并将其插入到我们的本地数据库中。我们还需要在插入时查看删除重复的行。

请注意:本教程中使用的操作符是已弃用的。其推荐的替代者,SQLExecuteQueryOperator 的工作方式类似。您可能会发现本指南很有帮助。

初始设置

我们需要安装 Docker,因为我们将在此示例中使用在 Docker 中运行 Airflow的步骤。下面的步骤应该足够了,但请参阅快速入门文档以获取完整说明。

# Download the docker-compose.yaml file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'

# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize the database
docker compose up airflow-init

# Start up all services
docker compose up

所有服务启动后,Web UI 将在:https://127.0.0.1:8080 上可用。默认帐户的用户名是 airflow,密码是 airflow

我们还需要创建一个到 postgres 数据库的连接。要通过 Web UI 创建一个连接,请从“Admin”菜单中选择“Connections”,然后单击加号以“Add a new record”到连接列表中。

填写如下所示的字段。请注意 Connection Id 的值,我们将把它作为 postgres_conn_id kwarg 的参数传递。

  • Connection Id: tutorial_pg_conn

  • Connection Type: postgres

  • Host: postgres

  • Schema: airflow

  • Login: airflow

  • Password: airflow

  • Port: 5432

测试您的连接,如果测试成功,请保存您的连接。

表创建任务

我们可以使用PostgresOperator来定义在我们的 postgres 数据库中创建表的任务。

我们将创建一个表来方便数据清理步骤(employees_temp),另一个表来存储我们清理后的数据(employees)。

from airflow.providers.postgres.operators.postgres import PostgresOperator

create_employees_table = PostgresOperator(
    task_id="create_employees_table",
    postgres_conn_id="tutorial_pg_conn",
    sql="""
        CREATE TABLE IF NOT EXISTS employees (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

create_employees_temp_table = PostgresOperator(
    task_id="create_employees_temp_table",
    postgres_conn_id="tutorial_pg_conn",
    sql="""
        DROP TABLE IF EXISTS employees_temp;
        CREATE TABLE employees_temp (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

可选:从文件使用 SQL

如果想将这些 sql 语句从 DAG 中抽象出来,可以将这些语句 sql 文件移动到 dags/ 目录中的某个位置,并将 sql 文件路径(相对于 dags/)传递给 sql kwarg。例如,对于 employees,在 dags/ 中创建一个 sql 目录,将 employees DDL 放入 dags/sql/employees_schema.sql,并将 PostgresOperator() 修改为

create_employees_table = PostgresOperator(
    task_id="create_employees_table",
    postgres_conn_id="tutorial_pg_conn",
    sql="sql/employees_schema.sql",
)

并对 employees_temp 表重复上述操作。

数据检索任务

在这里,我们检索数据,将其保存到 Airflow 实例上的文件中,并将数据从该文件加载到中间表中,以便我们可以执行数据清理步骤。

import os
import requests
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def get_data():
    # NOTE: configure this as appropriate for your airflow environment
    data_path = "/opt/airflow/dags/files/employees.csv"
    os.makedirs(os.path.dirname(data_path), exist_ok=True)

    url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"

    response = requests.request("GET", url)

    with open(data_path, "w") as file:
        file.write(response.text)

    postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
    conn = postgres_hook.get_conn()
    cur = conn.cursor()
    with open(data_path, "r") as file:
        cur.copy_expert(
            "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
            file,
        )
    conn.commit()

数据合并任务

在这里,我们从检索到的数据中选择完全唯一的记录,然后检查数据库中是否已经存在任何员工 序列号(如果存在,我们将使用新数据更新这些记录)。

from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def merge_data():
    query = """
        INSERT INTO employees
        SELECT *
        FROM (
            SELECT DISTINCT *
            FROM employees_temp
        ) t
        ON CONFLICT ("Serial Number") DO UPDATE
        SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
    """
    try:
        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        cur.execute(query)
        conn.commit()
        return 0
    except Exception as e:
        return 1

完成我们的 DAG

我们已经开发了我们的任务,现在我们需要将它们包装在 DAG 中,这使我们能够定义任务应该何时以及如何运行,并声明任务对其他任务的任何依赖关系。下面的 DAG 配置为

  • 从 2021 年 1 月 1 日开始,每天午夜运行,

  • 如果错过任何一天,则仅运行一次,并且

  • 60 分钟后超时

process_employees DAG 的最后一行定义中,我们看到

[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
  • merge_data() 任务依赖于 get_data() 任务,

  • get_data() 依赖于 create_employees_tablecreate_employees_temp_table 任务,并且

  • create_employees_tablecreate_employees_temp_table 任务可以独立运行。

将所有部分放在一起,我们就有了完整的 DAG。

import datetime
import pendulum
import os

import requests
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator


@dag(
    dag_id="process_employees",
    schedule_interval="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
    create_employees_table = PostgresOperator(
        task_id="create_employees_table",
        postgres_conn_id="tutorial_pg_conn",
        sql="""
            CREATE TABLE IF NOT EXISTS employees (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    create_employees_temp_table = PostgresOperator(
        task_id="create_employees_temp_table",
        postgres_conn_id="tutorial_pg_conn",
        sql="""
            DROP TABLE IF EXISTS employees_temp;
            CREATE TABLE employees_temp (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    @task
    def get_data():
        # NOTE: configure this as appropriate for your airflow environment
        data_path = "/opt/airflow/dags/files/employees.csv"
        os.makedirs(os.path.dirname(data_path), exist_ok=True)

        url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"

        response = requests.request("GET", url)

        with open(data_path, "w") as file:
            file.write(response.text)

        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        with open(data_path, "r") as file:
            cur.copy_expert(
                "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                file,
            )
        conn.commit()

    @task
    def merge_data():
        query = """
            INSERT INTO employees
            SELECT *
            FROM (
                SELECT DISTINCT *
                FROM employees_temp
            ) t
            ON CONFLICT ("Serial Number") DO UPDATE
            SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
        """
        try:
            postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
            conn = postgres_hook.get_conn()
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            return 0
        except Exception as e:
            return 1

    [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()


dag = ProcessEmployees()

将此代码保存到 /dags 文件夹中的 python 文件中(例如 dags/process_employees.py),并且(在 短暂延迟后),process_employees DAG 将包含在 Web UI 上可用 DAG 的列表中。

../_images/tutorial-pipeline-1.png

您可以通过取消暂停(通过左端的滑块)并运行它(通过 Actions 下的 Run 按钮)来触发 process_employees DAG。

../_images/tutorial-pipeline-2.png

process_employees DAG 的 Grid 视图中,我们看到所有任务在所有执行的运行中都成功运行。成功!

下一步是什么?

您现在有一个在 Airflow 中使用 Docker Compose 运行的管道。以下是您接下来可能想做的一些事情

另请参阅

此条目是否有帮助?