最近在毕业设计项目中需要设计实现一个任务调度系统,于是借此机会想看看一些开源的任务调度是怎么实现的。Airflow是一个有名的基于Python任务调度系统,看了一下之后发现虽然调度方面没有什么用的上的,但是Airflow本身还是挺有意思的,于是简单学习了一下。这里做一下记录,如果未来还需要使用到类似的任务调度系统,可以做一下类比。

Apache Airflow 介绍

Airflow是什么

Apache Airflow™ is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows.

官方文档的介绍中,对Airflow的定义是一个开发、调度、监控工作流的开源平台。调度,指的是平台可以在时间或其他条件(如依赖关系、数据事件)满足时自动执行任务,监控则强调Airflow提供了一个cli和web ui的方式方便开发者实时查看平台上任务的运行状态。在开发上,我的理解是Airflow提供了一系列基于Python的库,帮助开发者以Python的形式定义工作流(在Airflow中叫做DAG有向无环图)。基于批处理这一特点决定了Airflow最适合的领域是定时处理一批数据(例如每天处理当天的销售数据)。虽然Airflow也支持通过API或CLI调用触发工作流,但这并不是Airflow的主要用途。

总结起来,Airflow具有以下功能特点:

  1. 基于Python。Airflow本身可以通过pip安装,通过python启动。同时,Airflow中的工作流(DAG)也需要开发者通过Python开发和定义。
  2. 提供了丰富多样的Operator(算子),在DAG中,可以很方便地运行各种外部任务,如Python任务、Bash操作、K8s操作等。
  3. 提供了一个web ui,可以很方便地对任务运行的情况进行监控。
  4. 提供了backfill功能,可以补全对历史数据的处理。
  5. 可以根据时间、数据等条件自动触发工作流和任务的执行。
  6. 伸缩性强。为了应对不同的使用规模,提供了多种搭建架构,从单机到大规模分布式部署都支持。

Airflow 的架构

graph LR
    developer[Developer]
    dags[DAG folder]
    scheduler[Scheduler]
    web[Web ui]
    db[(Database)]
    
    developer -- edit --> dags
    dags -- is read by -->  scheduler
    scheduler --> db
    web --> db
    developer -- operate and monidor --> web
    

如上图,是最简单的Airflow架构模式。其中Scheduler负责DAG(python文件)的处理、调度和执行,开发者可以通过Web ui来监控和触发DAG的执行。

在这种架构中,scheduler承担了大部分的职责,而实际的大规模部署时,通常更倾向于把各个功能的专用组件单独部署,这些组件都是可选的:

  1. Trigger,负责触发deferred的任务的执行。
  2. Executor,负责实际执行任务,可以分担Scheduler中执行任务的职责。
  3. DAG parser,负责处理DAG,可以分担Scheduler中处理DAG的职责。

具体的更复杂的架构可以参考官方文档架构相关的说明

Airflow DAG

DAG是airflow运行任务的核心。DAG是任务(task)的集合,还定义了任务之间的依赖关系,定义了任务应该以何种顺序执行。

DAG run 是一个DAG运行的实例。DAG run中的任务运行实例称为Task instance。

DAG运行有两种方式:

  1. 被手动触发或被API调用触发
  2. 被schedule定义调度执行。schedule是定义在DAG中的。

DAG中有一个非常重要的概念:逻辑时间和实际运行时间。

  1. 逻辑时间:DAG中的task感知到的时间,或者说是处理的数据的时间。logical date,又因为历史原因被称为execution date。
  2. 实际运行时间:DAG或任务实际运行时的时间。start date表示开始时间,end date表示结束时间。

在手动触发执行时,通常start date=logical date。而对于调度的批处理任务,往往start date = logical date + data interval。例如处理销售数据时,往往是4月19日0点开始处理4月18日这一天的数据,因此start date是4.19,但是logical date是4.18,date interval每次数据处理时间间隔刚好是这两个时间的差值。

在backfill的场景下,可能希望在4月19号这天不仅要处理4月19号一天的数据,还要按天处理3月1号以来的所有数据。对于之前的数据,logical date就是对应数据的日期,而start date实际的执行日期总是4.19。

流控制

流控制:默认DAG中的一个任务所有依赖的任务都完成时,就会被调度执行。DAG提供了一些流控制语句来手动对任务执行的方式进行额外控制:

  1. 分支Branching:可以根据条件,从多个下游任务中执行一个。未被执行的任务会跳过。
  2. Depends On Past:一个任务是否执行依赖于其对应之前logical date的task instance是否成功。设置这个条件是因为有些任务需要业务上具有连续性,前面的数据处理之后,后面的才可以处理,否则后面数据的处理没有意义。
  3. Trigger rules:可以自定义一系列触发条件,在满足条件时才触发任务执行。
  4. Setup and Teardown:可以处理一些初始化和清理的任务。
  5. Latest only:只执行最新一次的task。在backfill的场景中,如果对于一个任务并不需要在所有历史DAG run中都执行,只需要最后执行一次,就可以使用这个配置。

XCOMS

默认所有task都是完全隔离的,不存在数据交换。如果需要在两个task之间传输一些小数据,就可以使用XCOMS。在任务中,通过调用task_instance.xcom_push(key="key", value=any_serializable)就可以发布数据到XCOMS,而通过task_instance.xcom_pull(key="key", task_ids="task_1")的形式就可以从XCOMS中获取一个值。每一次的DAG run的XCOM是相互隔离的。

注意,因为XCOMS中的内容是直接存储在元数据数据库中的,所以不适合存储大量的数据。对于大量数据,最好可以借助外部的ObjectStorage等进行存储。

ObjectStorage

在Airflow2.8版本之后,引入了ObjectStorage。以往的task中,如果需要读取外部的对象存储资源,往往需要手动处理凭证并手动连接对象存储服务,会出现许多模板代码。Airflow提供了开箱即用的ObjectStorage,只需要事先配置一下和对象存储服务之间的connection配置信息,就可以直接在DAG的任务中使用对象存储操作云上的文件,提高了开发DAG时的效率。

详细操作方式可以参考官方文档中对ObjectStorage具体使用方式的说明

DataSet

数据集,是Airflow2.4之后提出的一个逻辑概念,可以用来驱动数据触发的调度。对于DataSet的操作并不会直接映射到某一个数据存储服务上对文件的操作,他更像是一个对数据相关事件的发布和消费。一个上游的DAG可以在DAG的定义中声明outlets=[Dataset("s3://xxx/xxx.csv")],表示这个DAG成功执行后会发布在此url对应的文件上的事件,Airflow的调度器会触发所有声明了调度方式为schedule=[Dataset("s3://xxx/xxx.csv")]的DAG。

这样的调度方式不同于以往以时间为主要触发条件的调度,而是以数据集上的事件为主要的触发条件,也成为Data-aware scheduling.详细说明可以参考官方文档