当前位置: 主页 > 体验 >   正文

Airflow从入门到实战(万字长文)

导读:文章目录 Airflow 基本概念 概述 名词 Airflow 安装 Airflow 官网 安装 Python 环境 安装 Miniconda 创建 Python3.8 环境 安装 Airflow 启动停止脚本 安装后的一些细节问题 修改数据库为 MyS...

文章目录

Airflow 基本概念

概述

Airflow 是一个以编程方式编写,安排和监视工作流的平台。

使用 Airflow 将工作流编写任务的有向无环图(DAG)。Airflow 计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在 DAG 上执行复杂的调度变的轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变的容易。

名词

(1)Dynamic:Airflow 配置需要实用 Python,允许动态生产管道。这允许编写可动态。这允许编写可动态实例化管道的代码。
(2)Extensible:轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。
(3)Elegant:Airlfow 是精简的,使用功能强大的 Jinja 模板引擎,将脚本参数化内置于 Airflow 的核心中。
(4)Scalable:Airflow 具有模板块架构,并使用消息队列来安排任意数量的工作任务。

Airflow 安装

Airflow 官网

https://airflow.apache.org

安装 Python 环境

Airflow 是由 Python 语言编写的 Web 应用,要求 Python3.8 的环境。

安装 Miniconda

conda 是一个开源的包、环境管理器,可以用于在同一个机器上安装不同 Python 版本的软件包及其依赖,并能够在不同的 Python 环境之间切换,Anaconda 包括 Conda、Python 以及一大堆安装好的工具包,比如:numpy、pandas 等,Miniconda 包括 Conda、Python。

此处,我们不需要如此多的工具包,故选择 MiniConda。

1)下载 Miniconda(Python3 版本)

下载地址:https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

2)安装 Miniconda
(1)执行以下命令进行安装,并按照提示操作,直到安装完成。

[root@hadoop102 software]# mkdir airflow
[root@hadoop102 software]# cd airflow/
--将安装包放入此目录中
[root@hadoop102 airflow]$ bash Miniconda3-latest-Linux-x86_64.sh

(2)在安装过程中,出现以下提示时,可以指定安装路径

(3)出现以下字样,即为安装完成

3)加载环境变量配置文件,使之生效

[root@hadoop102 airflow]# source ~/.bashrc

4)取消激活 base 环境

Miniconda 安装完成后,每次打开终端都会激活其默认的 base 环境,我们可通过以下命令,禁止激活默认 base 环境。

[root@hadoop102 airflow]# conda config --set auto_activate_base false

创建 Python3.8 环境

1)配置 conda 国内镜像

(base) [root@hadoop102 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
(base) [root@hadoop102 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
(base) [root@hadoop102 ~]$ conda config --set show_channel_urls yes

2)创建 Python3.8 环境

(base) [root@hadoop102 ~]$ conda create --name airflow python=3.8

说明:conda 环境管理常用命令
创建环境:conda create -n env_name
查看所有环境:conda info --envs
删除一个环境:conda remove -n env_name --all

3)激活 airflow 环境

(base) [root@hadoop102 ~]$ conda activate airflow

激活后效果如下图所示

[root@hadoop102 software]$ conda activate airflow
(airflow) [atguigu@hadoop102 software]$ 

说明:退出当前环境。

(airflow) [atguigu@hadoop102 ~]$ conda deactivate

4)执行 python -V 命令查看 python 版本

(airflow) [root@hadoop102 software]$ python -V
Python 3.8.13

安装 Airflow

1)更改 pip 的源

[root@hadoop102 software]$ conda activate airflow
(airflow) [root@hadoop102 software]$ pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple
(airflow) [root@hadoop102 software]$ sudo mkdir ~/.pip
(airflow) [root@hadoop102 software]$ sudo vim ~/.pip/pip.conf

添加以下内容

[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = https://pypi.tuna.tsinghua.edu.cn

2)安装 airflow

(airflow) [root@hadoop102 software]$ pip install "apache-airflow==2.4.3"

3)初始化 airflow

(airflow) [root@hadoop102 software]$ airflow db init

4)查看版本

(airflow) [root@hadoop102 software]$ airflow version 
2.4.3

5)airflow 安装好存放路径

(airflow) [root@hadoop102 airflow]$ pwd
/root/airflow

6)创建账号

(airflow) [root@hadoop102 airflow]$ airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 1127914080@qq.com

此时会让你输入密码,这里笔者的密码设置为123456

7)启动 airflow 调度

(airflow) [root@hadoop102 airflow]$ airflow scheduler -D

8)启动 airflow web 服务,启动后浏览器访问 http://hadoop102:8080

(airflow) [root@hadoop102 airflow]$ airflow webserver -p 8080 -D

则已经进入了airflow的页面。

启动停止脚本

https://blog.csdn.net/weixin_45417821/article/details/

安装后的一些细节问题

页面中显示了两个问题,第一个问题是希望将元数据存放在MySQL或者PostgresSQL中,第二个问题是不建议用这个执行器,接下来我们进行改进。

修改数据库为 MySQL

1)在 MySQL 中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错 Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭 MySQL 的 SSL 证书

查看 SSL 是否开启 YES 为开启

mysql> SHOW VARIABLES LIKE '%ssl%';
+---------------+-----------------+
| Variable_name | Value           |
+---------------+-----------------+
| have_openssl  | YES             |
| have_ssl      | YES             |
| ssl_ca        | ca.pem          |
| ssl_capath    |                 |
| ssl_cert      | server-cert.pem |
| ssl_cipher    |                 |
| ssl_crl       |                 |
| ssl_crlpath   |                 |
| ssl_key       | server-key.pem  |
+---------------+-----------------+
9 rows in set (0.02 sec)

3)修改配置文件 my.cnf,加入以下内容:

vim /etc/my.cnf
# disable_ssl
skip_ssl

并重启mysql

sudo systemctl restart mysqld

4)添加 python 连接的依赖:官网介绍的方法有两种,这里我们选择下面的连接器。

官网连接器地址:https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/set-up-database.html

(airflow) [root@hadoop102 airflow]$ pip install mysql-connector-python

5)修改 airflow 的配置文件:

(airflow)[root@hadoop102 ~]$ cd /root/airflow
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
#sql_alchemy_conn = sqlite:home/atguigu/airflow/airflow.db
sql_alchemy_conn = mysql+mysqlconnector://root:000000@hadoop102:3306/airflow_db

6)关闭 airflow,初始化后重启:

(airflow) [root@hadoop102 ~]$ af.sh stop
(airflow) [root@hadoop102 airflow]$ airflow db init
(airflow) [root@hadoop102 ~]$ af.sh start

7)初始化报错 1067 - Invalid default value for ‘update_at’:
原因:字段 ‘update_at’ 为 timestamp 类型,取值范围是:1970-01-01 00:00:00 到2037-12-31 23:59:59(UTC +8 北京时间从 1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改 mysql 存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启 MySQL 会造成参数失效,推荐将参数写入到配置文件 my.cnf 中。

vim /etc/my.cnf

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

并重启mysql

sudo systemctl restart mysqld

再次初始化查看效果:命令 :airflow db init

8)重新创建账号登录:

(airflow) [root@hadoop102 airflow]$ airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 1127914080@qq.com

密码依然是123456

启动airflow ,并打开查看

af.sh start

此时发现已经成功进入了 ,并且数据库方面的提示已经消失了

修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

关闭airflow,修改配置文件

[root@hadoop102 bin]# af.sh stop
(airflow) [root@hadoop102 airflow]# vim airflow.cfg 

添加如下内容

[core]
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。之后再次启动

[root@hadoop102 bin]# af.sh start

hadoop102:8080

可以发现已经没有警告提示了。

部署使用

1)测试环境启动
本次测试使用的是 spark 的官方案例,所有需要启动 hadoop 和 spark 的历史服务器。(这里笔者已经配置好脚本了)

[root@hadoop102 bin]$ hdp.sh start

2)查看 Airflow 配置文件

(alrflow) [root@airflow work-py]# vim ~/airflow/airflow.cfg


代码仓库的目录,别忘了airflow是用代码进行调度的

3)编写.py 脚本,创建 work-py 目录用于存放 python 调度脚本

(airflow) [root@hadoop102 airflow]$ mkdir ~/airflow/dags
(airflow) [root@hadoop102 airflow]$ cd dags/
(airflow) [root@hadoop102 dags]$ vim wordcount.py

添加如下内容

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
 # 用户 test_owner DAG下的所有者
 'owner': 'luanhao',
 # 是否开启任务依赖
 'depends_on_past': True, 
 # 邮箱
 'email': ['1127914080@qq.com'],
 # 启动时间
 'start_date':datetime(2023,1,19),
 # 出错是否发邮件报警
 'email_on_failure': False,
 # 重试是否发邮件报警
 'email_on_retry': False,
 # 重试次数
 'retries': 1,
 # 重试时间间隔
 'retry_delay': timedelta(minutes=5),
}
# 声明任务图
# test代表任务名称(可以修改其他名称,这里我们用wordcount)
dag = DAG('wordcount', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
 # 任务 id
 task_id='dwd',
 # 任务命令 使用Spark的wordcount
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 # 重试次数
 retries=1,
 # 把任务添加进图中
 dag=dag)

t2 = BashOperator(
 task_id='dws',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=1,
 dag=dag)

t3 = BashOperator(
 task_id='ads',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=1,
 dag=dag)
 
# 设置任务依赖
t2.set_upstream(t1)
t3.set_upstream(t2)

一些重要参数

必须导包

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args 设置默认参数。
depends_on_past 是否开启任务依赖。
schedule_interval 调度频率。
retries 重试次数 。
start_date 开始时间。
BashOperator 具体执行任务,如果为 true 前置任务必须成功完成才会走下一个依赖任务,如果为 false 则忽略是否成功完成。
task_id 任务唯一标识(必填)。
bash_command 具体任务执行命令。
set_upstream 设置依赖 如上图所示 ads 任务依赖 dws 任务依赖 dwd 任务。

出现wordcount,

运行

点击成功任务,查看日志,步骤如下

日志内容如下:

查看 dag 图、甘特图

查看脚本代码

Dag 任务操作

删除 Dag 任务

主要删除 DAG 任务不会删除底层文件,过一会还会自动加载回来。

查看当前所有 dag 任务

# 查看所有任务
(airflow) [root@hadoop102 airflow]$ airflow list_dags
# 查看单个任务
(airflow) [root@hadoop102 airflow]$ airflow tasks list wordcount --tree

配置邮件服务器

1)保证邮箱已开 SMTP 服务(这里我们使用QQ邮箱,当然其他邮箱也可以)

2)修改 airflow 配置文件,用 stmps 服务对应 587 端口

(airflow) [root@hadoop102 airflow]$ vim ~/airflow/airflow.cfg
smtp_host = smtp.qq.com
smtp_starttls = True
smtp_ssl = False
smtp_user = 1127914080@qq.com
# smtp_user =
smtp_password = yyyfjkoqvsnzhhgb
# smtp_password =
smtp_port = 587
smtp_mail_from = 1127914080@qq.com

3)重启 airflow

[root@hadoop102 bin]$ af.sh stop
[root@hadoop102 bin]$ af.sh start

4)新增 workflows.py 脚本,并加入邮箱功能

(airflow) [root@hadoop102 dags]# vim workflows.py

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {
 # 用户
 'owner': 'luanhao',
 # 是否开启任务依赖
 'depends_on_past': True, 
 # 邮箱
 'email': ['1127914080@qq.com'],
 # 启动时间
 'start_date':datetime(2023,1,19),
 # 出错是否发邮件报警
 'email_on_failure': True,
 # 重试是否发邮件报警
 'email_on_retry': True,
 # 重试次数
 'retries': 1,
 # 重试时间间隔
 'retry_delay': timedelta(minutes=5),
}

# 声明任务图
dag = DAG('workflows', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
 # 任务 id
 task_id='dwd',
 # 任务命令
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 # 重试次数
 retries=1,
 # 把任务添加进图中
 dag=dag)

t2 = BashOperator(
 task_id='dws',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=3,
 dag=dag)

t3 = BashOperator(
 task_id='ads',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=3,
 dag=dag)

email=EmailOperator(
 # 邮箱id
 task_id="email",
 # 发送给xxx
 to="1063182043@qq.com ",
 # 邮箱主题
 subject="hi,你好啊,我是你来自远方的最亲密的伙伴",
 # 发送内容
 html_content="

后天就要过年了,明天该剪头了撒

"
, # 抄送给xxx cc="1127914080@qq.com ", dag=dag) # 任务之间相互依赖 t2.set_upstream(t1) t3.set_upstream(t2) email.set_upstream(t3)

参数讲解

task_id=“email” : 邮箱的id
to="1063182043@qq.com " :发送给对方
subject=“hi,你好啊,我是你来自远方的最亲密的伙伴”:邮箱的主题描述
html_content= “< h1 >后天就要过年了,明天该剪头了撒”:邮箱的内容
cc="1127914080@qq.com " : 邮箱的抄送内容,抄送给对方

启动

运行测试

内容