Airflow 杂技

Posted on May 1, 2021

背景

近期,从零开始搭建 SOAR 平台,其中工作流引擎或任务编排组件是核心组件之一。

Airflow

Airflow 是用于描述、执行、监控工作流的平台。目前为止,启动 Airflow 最快的方式是——在 Docker 中运行 Airflow这种安装方式也有利于可扩展性

airflow-architecture

有一些组件需要说明一下(此处省略 Flower):

  • Webserver:提供访问 DAG、任务、变量、连接等等的状态信息的 Airflow REST API
  • Scheduler:负责 DAG 解析与任务调度。
  • Worker:执行由 Scheduler 分配的任务。
  • Redis:Scheduler 与 Worker 之间的消息代理。
  • Postgres:存储有关 DAG、任务、变量、连接等等的状态信息。

上述组件是进程级组件,需要注意的是 DAGs 并不是进程,而是指多个 Python 源文件。DAG有向无环图(Directed Acyclic Graph)的缩写,从 Airflow 的角度来看,DAG 用于描述工作流,有向无环图中的结点被称为任务(Task),而 Task 是通过 Operator 来实现。DAGs 的位置在 Airflow 配置文件中指定,重要的事情是 Webserver 和 Scheduler 以及 Worker 都需要读取 DAGs。

生成 DAG

编写 DAG 需要一定的 Python 知识,甚至 Airflow 并不提供创建 DAG 的 UI 或 REST API。情理之中,Airflow 创建工作流并不包含“无代码”或“低代码”特性,从官方首页可以看到其定位:

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

后端满足可视化任务编排需求解决方案之一是通过用户输入生成 DAG 源文件,即生成特定的 Python 源文件。

模板方法

体验过编写 DAG 或浏览过 DAG 示例之后可以归纳出 DAG 的一般结构:

  1. Import Statements(导入语句)

  2. Default Arguments(默认参数)

  3. DAG Constructor(DAG 构造器)

  4. Operators(Operator 构造器)

  5. Dependencies(Operator 之间的依赖关系)

如果需要一些复杂函数才能满足需求,完全可以将复杂度转移到 Airflow 之外的服务提供者(service provider),例如在 DAG 中使用 HTTP Operators 向服务提供者发送请求,由服务提供者处理请求(具体业务逻辑在服务提供者实现)。

程序可以根据 DAG 的一般结构,依次追加代码片段。根据关注点分离原则,类似于模型(Model)与视图(View)的分离降低了复杂度,代码片段可分为静态内容和动态内容,前者通常是是样板代码,后者通常是参数值;将静态内容与动态内容合成源代码最便利的工具是模板引擎,而不是重新发明轮子。

gen-dag

各种编程语言早已有面向 Web 领域的模板引擎,但此处并不需要基于文档树(DOM)生成,因为期望输出不是 HTML 之类的文件,而是 Python 源文件,即纯文本,因而考虑基于字符串的模板引擎,例如 Antlr Project 下的 StringTemplate 4,此介绍非常有助于理解 StringTemplate 4。

public class User {
    public int id; // template can directly access via u.id
    private String name; // template can't access this
    public User(int id, String name) { this.id = id; this.name = name; }
    public boolean isManager() { return true; } // u.manager
    public boolean hasParkingSpot() { return true; } // u.parkingSpot
    public String getName() { return name; } // u.name
    public String toString() { return id+":"+name; } // u
}
ST st = new ST("<b>$u.id$</b>: $u.name$", '$', '$');
st.add("u", new User(999, "parrt"));
String result = st.render(); // "<b>999</b>: parrt"

如何使用 StringTemplate 4 的模板表达式编写 DAG 模板?不妨参考 airflow-up/templates。此处列举一例:

<t.taskId> = SimpleHttpOperator(
    task_id='<t.taskId>',
    http_conn_id='<t.httpConnId>',
    endpoint='<t.endpoint>',
    method='<t.method>',
    <if(t.data)>data="{{ dag_run.conf['<t.taskId>']['data'] <if(t.dataFilter)><t.dataFilter><endif> }}",<endif>
    <if(t.headers)>headers=<t.headers>,<endif>
    <if(t.responseCheck)>response_check=<t.responseCheck>,<endif>
    <if(t.responseFilter)>response_filter=<t.responseFilter>,<endif>
    <if(t.extraOptions)>extra_options=<t.extraOptions>,<endif>
    <if(logResponse)>log_response=<logResponse>,<endif>
    dag=dag,
)

DAG 源文件甚至可以包含 Jinja 表达式(dataFilter 的值选自 Jinja2 内置过滤器,在触发 DAG 运行时,Airflow 渲染 DAG 源文件,并且传递额外配置参数(dag_run.conf)的值。之所以使用 Jinja2,除了 Airflow 天然支持以外,原因之一是业务系统将事件作为请求参数的一部分(额外配置参数)传递给 Webserver 用于触发一个新的 DAG 运行的接口(POST /dags/{dag_id}/dagRuns),因此有些参数没法在生成 DAG 源文件时确定,而只能在运行时确定。

Note: The parameters from dag_run.conf can only be used in a template field of an operator.

原因之二是 SimpleHttpOperatorHttpSensor 的构造器对于不同的请求方法(method)要求不同结构的请求参数(data),例如 POST 请求参数要求 JSON,而 GET 请求参数要求 Query String

Airflow 官方推荐使用移位运算符定义 Operator 之间的依赖关系。用户提交到后端的流程图(通常是树或有向无环图)定义了结点之间的依赖关系,生成 Operator 之间的依赖关系最直截了当的方式是使用深度搜索列出所有路径。

demo_tree-path

每个栈帧都维护一个有序列表(子路径),方法执行过程将结点或 Operator 添加到子路径,递归调用时将子路径元素存入新子路径,循环结束后将子路径添加到主路径。

void addPathToPaths(List<Node> nodes, List<Node> path) {
    if (CollectionUtils.isEmpty(nodes)) {
        return;
    }
    for (Node node : nodes) {
        if (Objects.isNull(node)) {
            continue;
        }
        addNodeToPath(node);
        List<Node> children = node.getChildren();
        if (CollectionUtils.isNotEmpty(children)) {
            addPathToPaths(children, new ArrayList<>(path));
        }
    }
    if (CollectionUtils.isNotEmpty(path)) {
        paths.add(path);
    }
}

DAG 代理

Airflow 的 Scheduler 定期扫描 DAG 目录,发现新 DAG 文件,同时定期解析该目录下的每一个 DAG 文件,两者的频率可通过 min_file_process_intervaldag_dir_list_interval 设置。应用程序是否也必须知道该目录的绝对路径?应用程序必须与 Airflow 运行在同一台服务器上?显然不是,只需要在应用程序与 DAGs 之间插入一个中间层。

dag-agent

共享卷

假如在 Docker 中运行 Airflow,方便开发运维起见,DagAgent 理应在 Docker 中运行,但是 DagAgent 接收到 DAG 文件后应该写到哪里去?以 airflow/docker-compose.yaml 为例,创建了三个卷(Volume)

version: '3'
x-airflow-common:
  ...
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  ...
...

其中 DAG 目录的源地址是 ./dags(宿主机视角),而目的地是 /opt/airflow/dags(Docker 容器视角);可喜可贺的是 Docker CLI 已经支持 Compose 命令,捣鼓 Docker 版 Airflow 也就不需要 docker-compose

% docker compose up -d
[+] Running 8/8
 ⠿ Network "airflow_default"              Created                                                                                                              0.6s
 ⠿ Container airflow_redis_1              Started                                                                                                              1.6s
 ⠿ Container airflow_postgres_1           Started                                                                                                              1.4s
 ⠿ Container airflow_airflow-scheduler_1  Started                                                                                                             10.0s
 ⠿ Container airflow_airflow-init_1       Started                                                                                                             10.3s
 ⠿ Container airflow_airflow-worker_1     Started                                                                                                             10.5s
 ⠿ Container airflow_airflow-webserver_1  Started                                                                                                              7.8s
 ⠿ Container airflow_dagagent_1           Started                                                                                                             10.6s
% docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                    PORTS                    NAMES
122a8455e781   dagagent:latest        "./dagagent"             16 minutes ago   Up 16 minutes (healthy)   0.0.0.0:1323->1323/tcp   airflow_dagagent_1
8ab76a88a05a   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   16 minutes ago   Up 16 minutes (healthy)   0.0.0.0:8889->8080/tcp   airflow_airflow-webserver_1
1d0e1b191ea0   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   16 minutes ago   Up 16 minutes             8080/tcp                 airflow_airflow-scheduler_1
a389b993a2c3   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   16 minutes ago   Up 16 minutes             8080/tcp                 airflow_airflow-worker_1
12015f9bc778   postgres:13            "docker-entrypoint.s…"   16 minutes ago   Up 16 minutes (healthy)   5432/tcp                 airflow_postgres_1
e32501a14963   redis:latest           "docker-entrypoint.s…"   16 minutes ago   Up 16 minutes (healthy)   0.0.0.0:6379->6379/tcp   airflow_redis_1

当前主目录如下:

.
├── dags
├── logs
├── plugins
└── docker-compose.yaml

在宿主机上将 DAG 文件输入到 dags,运行在 Docker 中的 Airflow 可感知;Airflow 的 Docker 容器输出的文件在宿主机可观测。由于 DagAgent 容器化,它的文件 I/O 自然作用于容器文件系统,因此从 DagAgent 的角度来看,DAG 目录也是 /opt/airflow/dags;既然 Webserver、Scheduler、Worker 都引用了 airflow-common,DagAgent 服务配置中可以使用 volumes_from 共享 DAG 目录:

...
  dagagent:
    image: dagagent:latest
    ports:
      - 1323:1323
    environment:
      AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
      ...
    volumes_from:
      - airflow-webserver:rw
    ...
...

最小化镜像

DagAgent 仅仅是一个微小的服务,容器化 Java 程序的话通常过于重量级,不妨使用 Go 编写,详情可参考 dagagent

基于 Go 官方镜像构建应用程序镜像,直接了当的 Dockerfile 文件:

FROM golang:1.16

WORKDIR /go/src/github.com/h2cone/dagagent
COPY . .

RUN go get -d -v ./...
RUN go install -v ./...

CMD ["dagagent"]

构建名为 dagagent:straightforward 的镜像:

% docker build -t dagagent:straightforward -f docker/Dockerfile-straightforward .

但是 dagagent:straightforward 实在是太大了!

% docker image list
REPOSITORY       TAG               IMAGE ID       CREATED         SIZE
dagagent         straightforward   149604a01e75   6 seconds ago   1.05GB
redis            latest            739b59b96069   2 weeks ago     105MB
apache/airflow   2.0.2             d7a0ff8c98a9   2 weeks ago     871MB
postgres         13                26c8bcd8b719   3 weeks ago     314MB

基础镜像是罪魁祸首?使用 golang:1.16-alpine 代替 golang:1.16

FROM golang:1.16-alpine

WORKDIR /go/src/github.com/h2cone/dagagent
COPY . .

RUN go get -d -v
RUN go install -v

CMD ["dagagent"]

构建名为 dagagent:small 的镜像:

% docker build -t dagagent:small -f docker/Dockerfile-small .

该镜像大小已经小于 1G,是否还能再精简?

% docker image list                                          
REPOSITORY       TAG               IMAGE ID       CREATED              SIZE
dagagent         small             145b612b635e   5 seconds ago        485MB
dagagent         straightforward   149604a01e75   About a minute ago   1.05GB
redis            latest            739b59b96069   2 weeks ago          105MB
apache/airflow   2.0.2             d7a0ff8c98a9   2 weeks ago          871MB
postgres         13                26c8bcd8b719   3 weeks ago          314MB

分析一下以上两个镜像的组成,至少包含下载的依赖文件、应用程序的源文件、静态编译输出的可执行文件等。DagAgent 运行时只需要可执行文件与操作系统,故使用多阶段构建(multi-stage builds),将编译时与运行时分为两个阶段,运行时所需的可执行文件拷贝自编译时,最终只产出包含可执行文件与操作系统的镜像:

FROM golang:1.16-alpine3.12 AS builder

WORKDIR /go/src/github.com/h2cone/dagagent
COPY . .

RUN go build -v

FROM alpine:3.12

WORKDIR /root
COPY --from=builder /go/src/github.com/h2cone/dagagent/dagagent .
CMD ["./dagagent"]

构建名为 dagagent:min 的镜像:

% docker build -t dagagent:min -f docker/Dockerfile-min .

该镜像大小已降低 94%:

% docker image list                                      
REPOSITORY       TAG               IMAGE ID       CREATED          SIZE
dagagent         min               75dbfbb53d78   10 seconds ago   27.3MB
dagagent         small             145b612b635e   4 minutes ago    485MB
dagagent         straightforward   149604a01e75   5 minutes ago    1.05GB
redis            latest            739b59b96069   2 weeks ago      105MB
apache/airflow   2.0.2             d7a0ff8c98a9   2 weeks ago      871MB
postgres         13                26c8bcd8b719   3 weeks ago      314MB

加速构建

回头看 dagagent:straightforward,除了镜像过大,还有一个缺点,即下载依赖时(通常最耗时)无法命中缓存。只更新了 DagAgent 的源代码但不更新依赖,构建镜像时却需重新下载依赖。

加速镜像构建首要方式是充分利用构建缓存(Leverage build cache)。Docker 镜像建立在一系列之上,每一层表示 Dockerfile 中的一条指令的执行结果。假如不禁用缓存,Docker 执行每一条 Dockerfile 指令时先查找之前层的缓存,若命中(例如校验和相等),则不重新执行当前指令而是引用已存在的层。一般情况下,结果不易变的指令写在结果易变的指令之前的 Dockerfile 缓存利用率更高。

DagAgent 源代码比依赖更易变,根据上文,先下载依赖后拷贝源代码:

FROM golang:1.16

WORKDIR /go/src/github.com/h2cone/dagagent
COPY go.mod .
COPY go.sum .
RUN go mod download -x

COPY . .

RUN go install -v

CMD ["dagagent"]

结合上一节的“最小化镜像”,最终版本的 Dockerfile 如下所示:

# syntax=docker/dockerfile:1
FROM golang:1.16-alpine3.12 AS builder

ENV GOPROXY="https://goproxy.io,direct"

WORKDIR /go/src/github.com/h2cone/dagagent
COPY go.mod .
COPY go.sum .
RUN go mod download -x

COPY . .

RUN go build -v

FROM alpine:3.12
RUN apk add curl

WORKDIR /root
COPY --from=builder /go/src/github.com/h2cone/dagagent/dagagent .
CMD ["./dagagent"]

构建名为 dagagent:latest 的镜像:

% docker build -t dagagent -f docker/Dockerfile .
[+] Building 36.5s (19/19) FINISHED                                                                                                                                          
 => [internal] load build definition from Dockerfile                                                                                                                    0.0s
 => => transferring dockerfile: 37B                                                                                                                                     0.0s
 => [internal] load .dockerignore                                                                                                                                       0.0s
 => => transferring context: 2B                                                                                                                                         0.0s
 => resolve image config for docker.io/docker/dockerfile:1                                                                                                             19.2s
 => CACHED docker-image://docker.io/docker/dockerfile:1@sha256:e2a8561e419ab1ba6b2fe6cbdf49fd92b95912df1cf7d313c3e2230a333fdbcc                                         0.0s
 => [internal] load metadata for docker.io/library/alpine:3.12                                                                                                          7.6s
 => [internal] load metadata for docker.io/library/golang:1.16-alpine3.12                                                                                              17.1s
 => [builder 1/7] FROM docker.io/library/golang:1.16-alpine3.12@sha256:1636899c10870ab66c48d960a9df620f4f9e86a0c72fbacf36032d27404e7e6c                                 0.0s
 => => resolve docker.io/library/golang:1.16-alpine3.12@sha256:1636899c10870ab66c48d960a9df620f4f9e86a0c72fbacf36032d27404e7e6c                                         0.0s
 => [stage-1 1/4] FROM docker.io/library/alpine:3.12@sha256:36553b10a4947067b9fbb7d532951066293a68eae893beba1d9235f7d11a20ad                                            0.0s
 => [internal] load build context                                                                                                                                       0.0s
 => => transferring context: 6.42kB                                                                                                                                     0.0s
 => CACHED [stage-1 2/4] RUN apk add curl                                                                                                                               0.0s
 => CACHED [stage-1 3/4] WORKDIR /root                                                                                                                                  0.0s
 => CACHED [builder 2/7] WORKDIR /go/src/github.com/h2cone/dagagent                                                                                                     0.0s
 => CACHED [builder 3/7] COPY go.mod .                                                                                                                                  0.0s
 => CACHED [builder 4/7] COPY go.sum .                                                                                                                                  0.0s
 => CACHED [builder 5/7] RUN go mod download -x                                                                                                                         0.0s
 => CACHED [builder 6/7] COPY . .                                                                                                                                       0.0s
 => CACHED [builder 7/7] RUN go build -v                                                                                                                                0.0s
 => CACHED [stage-1 4/4] COPY --from=builder /go/src/github.com/h2cone/dagagent/dagagent .                                                                              0.0s
 => exporting to image                                                                                                                                                  0.0s
 => => exporting layers                                                                                                                                                 0.0s
 => => writing image sha256:ad807e3e1bda89824e55db14088156f9d4e8dabce6d4e79af117eb533371e4dc                                                                            0.0s
 => => naming to docker.io/library/dagagent                                                                                                                             0.0s

判断指令是否命中缓存可通过一个关键词——CACHED来判断。

插件发现

当 Airflow 生态的现有的 Operators 不满足需求时,可以考虑自定义 Operator,例如要实现一个哨兵语句式的 HTTP Operator,又如要实现一个可以将额外配置参数或上游 Tasks 的返回值(传递机制是 XComs)组装成复杂请求参数的拓展 HTTP Operator

.
├── dags
├── logs
├── plugins
│   └── operators
│       ├── __init__.py
│       ├── extended_http_operator.py
│       ├── extended_http_sentinel.py
│       └── simple_http_sentinel.py
├── docker-compose.yaml

实现了新的 Operator 之后,当前版本的 Airflow 能够发现类似上面 plugins/operators 目录下的自定义 Operator,技巧在于内容空白的 __init__.py,自定义 Operator 的导入语句如下:

from operators.simple_http_sentinel import SimpleHttpSentinel
from operators.extended_http_sentinel import ExtendedHttpSentinel
from operators.extended_http_operator import ExtendedHttpOperator

状态同步

业务系统很可能需要知道一个 DAG 运行后的状态(成功或失败),DAG 构造器有成功/失败的回调函数类型的参数(on_success_callback/on_failure_callback),美中不足的是没有表示失败重试次数的参数,然而受到 Python Requests 的高级使用 - 超时,重试,钩子的启发,于是编写了 dag_callback 来提高回调更新状态的成功率。

数据库代理

按照 Fine-tuning your Scheduler performance 调优 Airflow Scheduler 时,注意到下面这段话:

Database connections and Database usage might become a problem as you want to increase performance and process more things in parallel. Airflow is known from being “database-connection hungry” - the more DAGs you have and the more you want to process in parallel, the more database connections will be opened. This is generally not a problem for MySQL as its model of handling connections is thread-based, but this might be a problem for Postgres, where connection handling is process-based. It is a general consensus that if you have even medium size Postgres-based Airflow installation, the best solution is to use PGBouncer as a proxy to your database. The Helm Chart for Apache Airflow supports PGBouncer out-of-the-box. For MsSQL we have not yet worked out the best practices as support for MsSQL is still experimental.

在我的测试中,部署了edoburu/docker-pgbouncer 后,Airflow 任务实例之间的延迟下降了 20% ~ 30%。

Deferrable Operators

如果长时间运行的 Sensors 资源开销不容小觑(基于多进程模型),疑似拖慢其它 Operator,那么不妨考虑 Smart Sensors 或者 Deferrable Operators,后者提供了一种更灵活的方式来实现高效的长时间运行的 Sensor。

尾声

更多编排引擎(orchestration engine):

本文首发于 https://h2cone.github.io/

参考资料