airflow部署LDAP

发布时间 2023-10-12 17:52:45作者: 赵财进宝

airflow部署

\

一、构建虚拟环境virtualenv

yum install python3.11
python -V
ln -s /usr/bin/python3.11 /usr/bin/python
yum install virtualenv
useradd airflow

创建环境(在当前目录中创建一个文件夹)
(airflow) [root@localhost opt]# virtualenv -p python3.11 airflow
在 Linux 或 Mac 中,激活新的 python 环境
source airflow/bin/activate





二、安装apache-airflow[celery]==2.7.1

下载这个文件"https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.11.txt"
利用文件方式安装
pip install "apache-airflow[celery]==2.7.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.11.txt"

pip install "apache-airflow[celery]==2.7.1" -c /opt/airflow/constraints-3.11.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

三、安装数据库

[root@node1 ~]# dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpm
[root@node1 ~]# dnf -qy module disable postgresql
[root@node1 ~]# dnf install -y postgresql16-server
[root@node1 ~]# /usr/pgsql-16/bin/postgresql-16-setup initdb
[root@node1 ~]# systemctl enable postgresql-16
[root@node1 ~]# systemctl start postgresql-16

CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'airflow128';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
-- PostgreSQL 15 requires additional privileges:
USE airflow;
GRANT ALL ON SCHEMA public TO airflow;
firewall-cmd --zone=public --add-port=8080/tcp --permanent 

firewall-cmd --zone=public --add-port=5432 --permanent 

pip install psycopg2-binary -i https://pypi.tuna.tsinghua.edu.cn/simple

修改配置文件airflow.cfg

四、安装配置消息队列

yum install centos-release-rabbitmq-38.noarch
yum  install systemctl enable rabbitmq-server.service
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service

 rabbitmqctl add_user myuser mypassword
 rabbitmqctl add_vhost myvhost
 rabbitmqctl set_user_tags myuser mytag
 rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

 rabbitmqctl add_user airflow 123456
 rabbitmqctl add_vhost airflow
 rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"

airflow db init

airflow webserver
airflow scheduler
airflow celery worker

airflow users create \
    --username airflow \
    --firstname airflow \
    --lastname airflow \
    --role Admin \
    --email chenhu@youjivest.com

airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email chenjie@youjivest.com

五、修改配置文件airflow.cfg

broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'
# localhost 这个必须是主机名,其他的不行

[core]
dags_folder = /home/airflow/airflow/dags
 
#修改时区
default_timezone = Asia/Hong Kong
 
#配置Executor类型,集群建议配置CeleryExecutor
executor = CeleryExecutor
 
# 配置数据库
sql_alchemy_conn = postgresql://postgres:123456@172.30.3.21:5432/airflow 

[webserver]
#设置时区
default_ui_timezone = Asia/Hong Kong

 [scheduler_failover]
# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密
scheduler_nodes_in_cluster = node1,node2
 
[celery]
#配置Celery broker使用的消息队列
broker_url = redis://node1:6379/0
#配置Celery broker任务完成后状态更新使用库
result_backend = db+postgresql://postgres:123456@node1:5432/airflow
  smtp_smarthost: 'smtp.exmail.qq.com:587'
  smtp_from: 'noreply@mails.youjivest.com'
  smtp_hello: 'youjivest.com'
  smtp_auth_username: 'noreply@mails.youjivest.com'
  smtp_auth_password: 'kHThsAfxfsGHvcWF'

六、邮件发送配置

[email]
email_backend = airflow.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
from_email您可以通过在以下部分中进行设置来配置发件人的电子邮件地址[email]:
[email]
from_email = "John Doe <johndoe@example.com>"

[smtp]
smtp_host = smtp.sendgrid.net  # 您的 SMTP 服务器主机名
smtp_starttls = False  # 设置为 True 表示使用 STARTTLS,这是一种加密连接的方式,一般情况下保持为 False
smtp_ssl = False  # 设置为 True 表示使用 SSL 连接,如果您的 SMTP 服务器需要 SSL 连接,则设置为 True
smtp_user = apikey  # SMTP 用户名,通常是 'apikey'
smtp_password = your-api-key  # 您的 SMTP 密钥,应该替换为实际的 API 密钥
smtp_port = 587  # SMTP 服务器端口号,通常是 587
smtp_mail_from = your-email@example.com  # 发件人的邮箱地址

[smtp]
smtp_host=smtp.exmail.qq.com:587
smtp_starttls=False
smtp_ssl=False
smtp_user=noreply@mails.youjivest.com
smtp_password=kHThsAfxfsGHvcWF
smtp_port=587
smtp_mail_from=<your-from-email>

七、send_email.py test

# !/usr/bin/python
# -*- coding: UTF-8 -*-
import smtplib
import pendulum
from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from email.mime.text import MIMEText
from email.header import Header


def send_email(mail_msg):
	"""发送邮件 第三方 SMTP 服务"""
    # ----------发送者
    mail_host = "smtp.exmail.qq.com:587"  # 设置服务器
    mail_user = "noreply@mails.youjivest.com"  # 用户名,发送者邮箱
    mail_pass = "kHThsAfxfsGHvcWF"  # 口令, 可以去自己的邮箱设置中查看
    sender = 'noreply@mails.youjivest.com' # 发送者邮箱

	# ----------接收者
    receivers = ['chenhu@youjivest.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱

    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = Header("Airflow-告警")
    message['To'] = Header("developer")

    subject = 'Airflow 任务告警'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtp_obj = smtplib.SMTP()
        smtp_obj.connect(mail_host, 587)    # 25 为 SMTP 端口号
        smtp_obj.login(mail_user, mail_pass)
        smtp_obj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException:
        print("Error: 无法发送邮件")


def send_email_fun(msg):
    send_email(msg)


def failure_callback(context):
	"""DAG失败时,触发该回调函数,context是上下文,调用时自动传递 """
    dag_id = context['dag'].dag_id if context
    task_id = context['task_instance'].task_id
    reason = context['exception']
    mail_msg = f"""
        <p>日期:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
        <p>DAG ID:{dag_id}...</p>
        <p>Task id:{task_id}...</p>
        <P>STATE: Error!</P>
        <p>Exception:{reason}...</p>
        """
    send_email_fun(mail_msg)


local_tz = pendulum.timezone("Asia/Shanghai")
start_date = datetime(2023, 10, 14, 11, 13, 50, tzinfo=local_tz)
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_date,
    'on_failure_callback': failure_callback,  # DAG运行失败时,调用该回调,不需要传参
    'on_success_callback': success_callback,  # DAG成功运行时调用该回调,success_callback自行定义,一般成功时没必要发邮件通知
}

dag = DAG(
    dag_id="test_send",
    default_args=default_args,
    schedule_interval=None,
)


def print_hello(**context):
	"""根据自身需求定义的操作,context会在PythonOperator调用时自动传入"""
    print("hello")
    with open("./context.txt", "a", encoding="utf-8") as f:
        f.write(str(context))


failure_callback = PythonOperator(
    task_id='send_email',
    # provide_context=True,
    python_callable=print_hello,  # 调用该函数时,自动传入context
    provide_context=True,
    dag=dag,
)

failure_callback

八、设置systemctl 启动

[root@pgnode1 systemd]# cat /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Apache Airflow Web Server
After=network.target

[Service]
User=airflow
Group=airflow
ExecStart=/opt/airflow/bin/airflow webserver
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target
[root@pgnode1 systemd]# cat /etc/systemd/system/airflow-scheduler.service 
[Unit]
Description=Apache Airflow Scheduler
After=network.target

[Service]
User=airflow
Group=airflow
ExecStart=/opt/airflow/bin/airflow scheduler
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target
[root@node3 ~]# cat  /etc/systemd/system/airflow-worker.service
[Unit]
Description=Apache Airflow Worker
After=network.target

[Service]
User=airflow
Group=airflow
ExecStart=/opt/airflow/bin/airflow celery worker
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target

九、airflow配置身份验证:LDAP

pip安装  python-ldap 所需要的依赖
yum install python3.11-devel openldap-devel gcc

此方法将根据 LDAP 服务器验证用户的凭据。

警告:要使用 LDAP,您需要安装python-ldap

https://www.python-ldap.org/en/python-ldap-3.4.3/ 安装文档python-ldap

对于典型的 Microsoft AD 设置(所有用户都可以执行 LDAP 搜索):

pip install python-ldap -i https://pypi.tuna.tsinghua.edu.cn/simple

参考文档https://flask-appbuilder.readthedocs.io/en/latest/security.html

参考

对于典型的 OpenLDAP 设置(其中 LDAP 搜索需要特殊帐户):

AUTH_TYPE = AUTH_LDAP
AUTH_LDAP_SERVER = "ldap://ldap.example.com"
AUTH_LDAP_USE_TLS = False

# registration configs
AUTH_USER_REGISTRATION = True  # allow users who are not already in the FAB DB
AUTH_USER_REGISTRATION_ROLE = "Public"  # this role will be given in addition to any AUTH_ROLES_MAPPING
AUTH_LDAP_FIRSTNAME_FIELD = "givenName"
AUTH_LDAP_LASTNAME_FIELD = "sn"
AUTH_LDAP_EMAIL_FIELD = "mail"  # if null in LDAP, email is set to: "{username}@email.notfound"

# search configs
AUTH_LDAP_SEARCH = "ou=users,dc=example,dc=com"  # the LDAP search base
AUTH_LDAP_UID_FIELD = "uid"  # the username field
AUTH_LDAP_BIND_USER = "uid=admin,ou=users,dc=example,dc=com"  # the special bind username for search
AUTH_LDAP_BIND_PASSWORD = "admin_password"  # the special bind password for search

您可以通过配置来限制 LDAP 搜索范围:

# only allow users with memberOf="cn=myTeam,ou=teams,dc=example,dc=com"
AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=myTeam,ou=teams,dc=example,dc=com)"

您可以根据 LDAP 角色赋予 FlaskAppBuilder 角色(注意,这需要设置 AUTH_LDAP_SEARCH)

# a mapping from LDAP DN to a list of FAB roles
AUTH_ROLES_MAPPING = {
    "cn=fab_users,ou=groups,dc=example,dc=com": ["User"],
    "cn=fab_admins,ou=groups,dc=example,dc=com": ["Admin"],
}

# the LDAP user attribute which has their role DNs
# username字段LDAP用户属性,该属性具有其角色DN
AUTH_LDAP_GROUP_FIELD = "memberOf"

# if we should replace ALL the user's roles each login, or only on registration
# 如果我们应该在每次登录时替换所有用户的角色,或者仅在注册时替换
AUTH_ROLES_SYNC_AT_LOGIN = True

# force users to re-auth after 30min of inactivity (to keep roles in sync)
# 强制用户在不活动30分钟后重新进行身份验证(以保持角色同步)
PERMANENT_SESSION_LIFETIME = 1800

传输层安全协议

对于 STARTTLS,配置ldap://服务器并将AUTH_LDAP_USE_TLS设置为True:

AUTH_LDAP_SERVER = "ldap://ldap.example.com"
AUTH_LDAP_USE_TLS = True

对于 LDAP over TLS (ldaps),请使用ldaps://方案配置服务器并将AUTH_LDAP_USE_TLS设置为False:

AUTH_LDAP_SERVER = "ldaps://ldap.example.com"
AUTH_LDAP_USE_TLS = False
"""Default configuration for the Airflow webserver"""
import os

#from airflow.www.fab_security.manager import AUTH_DB
from airflow.www.fab_security.manager import AUTH_LDAP
# from airflow.www.fab_security.manager import AUTH_OAUTH
# from airflow.www.fab_security.manager import AUTH_OID
# from airflow.www.fab_security.manager import AUTH_REMOTE_USER
basedir = os.path.abspath(os.path.dirname(__file__))

# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True

#AUTH_TYPE = AUTH_DB
AUTH_TYPE = AUTH_LDAP

# Uncomment to setup Full admin role name
AUTH_ROLE_ADMIN = 'Admin'

# The default user self registration role
# AUTH_USER_REGISTRATION_ROLE = "Public"

# When using LDAP Auth, setup the ldap server
#AUTH_LDAP_SERVER = "ldap://leojiang.com"
AUTH_LDAP_SERVER = "ldap://ipa.youji*.com"
AUTH_LDAP_USE_TLS = False

# registration configs
# 指定Airflow允许LDAP用户在登录时自动注册
AUTH_USER_REGISTRATION = True  # allow users who are not already in the FAB DB
# 这个必须配置要不会报认证的错误
AUTH_USER_REGISTRATION_ROLE = "Admin"  # this role will be given in addition to any AUTH_ROLES_MAPPING
# 指定用户每次登陆都会刷新一次权限
AUTH_ROLES_SYNC_AT_LOGIN = True
# search configs(以下就是你自己ldap的一些用户信息,根据自己的配置即可)
#AUTH_LDAP_SEARCH = "ou=ID,ou=Data,dc=leo,dc=se"  # the LDAP search base
AUTH_LDAP_SEARCH = "cn=users,cn=accounts,dc=youji*,dc=com"  # the LDAP search base
AUTH_LDAP_UID_FIELD = "uid"  # the username field
#AUTH_LDAP_BIND_USER = "CN=leojiangdm,OU=CA,OU=SvcAccount,OU=P1,OU=ID,OU=Data,DC=leo,DC=se"  # the special bind username for search
AUTH_LDAP_BIND_USER = "uid=admin,cn=users,cn=accounts,dc=youjivest,dc=com" 
#AUTH_LDAP_BIND_PASSWORD = "Adfsadfasdfsd@123"  # the special bind password for search 
AUTH_LDAP_BIND_PASSWORD = "********"  
# 您可以通过配置来限制LDAP搜索范围:(注意如果用 AUTH_ROLES_MAPPING 参数,则该参数不能使用)
# only allow users with memberOf="cn=myTeam,ou=teams,dc=example,dc=com"
#AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=myTeam,ou=teams,dc=example,dc=com)"
AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=ipausers,cn=groups,cn=accounts,dc=youjivest,dc=com)"
# 您可以基于 LDAP 角色赋予 FlaskAppBuilder 角色(注意,这需要设置 AUTH_LDAP_SEARCH):
# a mapping from LDAP DN to a list of FAB roles
#AUTH_ROLES_MAPPING = {
#    "cn=fab_users,ou=groups,dc=example,dc=com": ["User"],
#    "cn=fab_admins,ou=groups,dc=example,dc=com": ["Admin"],
#}


十、promethes监控 airflow

https://www.redhat.com/en/blog/monitoring-apache-airflow-using-prometheus

三个 Airflow 组件:Webserver、Scheduler 和 Worker。从 Webserver、Scheduler 和 Worker 开始的实线显示了从这三个组件流向 statsd_exporter 的指标。statsd_exporter 聚合指标,将它们转换为 Prometheus 格式,并将它们公开为 Prometheus 端点。Prometheus 服务器会定期抓取此端点,并将指标保存在其数据库中。然后可以在 Grafana 仪表板中查看存储在 Prometheus 中的气流指标。

本博客的其余部分将创建上图所示的设置。我们准备去:

  • 配置 Airflow 以发布 statsd 指标。
  • 使用 statsd_exporter 将 statsd 指标转换为 Prometheus 指标。
  • 部署 Prometheus 服务器来收集指标并将其提供给 Grafana。
创建 Airflow 数据库和airflow.cfg 配置文件:
$ airflow initdb

打开Airflow配置文件airflow.cfg进行编辑
$ vi ~/airflow/airflow.cfg

通过设置 statsd_on = True 打开 statsd 指标。在保存更改之前,statsd 配置应如下所示:
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

基于此配置,Airflow 将把 statsd 指标发送到 statsd 服务器,该服务器将接受 localhost:8125 上的指标。我们将在下一节中启动该服务器。

本节的最后一步是启动 Airflow Web 服务器和调度程序进程。您可能希望在两个单独的终端窗口中运行这些命令。确保在发出命令之前激活 Python 虚拟环境:
$ airflow webserver  
$ airflow scheduler

此时,Airflow 正在运行并将 statsd 指标发送到localhost:8125。在下一节中,我们将启动 statsd_exporter,它将收集 statsd 指标并将其导出为 Prometheus 指标。

------------
将 statsd 指标转换为 Prometheus 指标
让我们从安装 statsd_exporter 开始本节。如果您的计算机上正确设置了 Golang 环境,则可以通过发出以下命令来安装 statsd_exporter:
 go get github.com/prometheus/statsd_exporter
------------ 

# 下载 statsd_exporter
 wget https://github.com/prometheus/statsd_exporter/releases/
 
[root@airflow opt]# rz -E
rz waiting to receive.
[root@airflow opt]# ll
total 7716
drwxr-xr-x. 7 airflow airflow     137 Oct 10 17:23 airflow
-rw-r--r--. 1 root    root    7898529 Oct 12 10:55 statsd_exporter-0.24.0.linux-amd64.tar.gz
[root@airflow opt]# tar xf statsd_exporter-0.24.0.linux-amd64.tar.gz 
[root@airflow opt]# ll
total 7716
drwxr-xr-x. 7 airflow airflow     137 Oct 10 17:23 airflow
drwxr-xr-x. 2    1001    1002      58 Jun  2 16:07 statsd_exporter-0.24.0.linux-amd64
-rw-r--r--. 1 root    root    7898529 Oct 12 10:55 statsd_exporter-0.24.0.linux-amd64.tar.gz
[root@airflow opt]# mv statsd_exporter-0.24.0.linux-amd64 statsd_exporter
[root@airflow opt]# cp statsd_exporter /usr/local/bin/
cp: -r not specified; omitting directory 'statsd_exporter'
[root@airflow opt]# cp statsd_exporter/
LICENSE          NOTICE           statsd_exporter  
[root@airflow opt]# cp statsd_exporter/statsd_exporter /usr/local/bin/
[root@airflow opt]# vim /usr/lib/systemd/system/statsd_exporter.service
[Unit]
Documentation=https://prometheus.io/
[Service]
Type=simple
ExecStart=/usr/local/bin/statsd_exporter --statsd.listen-udp localhost:8125 --log.level debug
Restart=on-failure
[Install]
WantedBy=multi-user.target

或者,您可以使用prometheus/statsd-exporter容器映像部署 statsd_exporter 。映像文档包含有关如何拉取和运行映像的说明。

当 Airflow 运行时,在同一台计算机上启动 statsd_exporter:

[root@airflow ~]# cat /usr/lib/systemd/system/statsd_exporter.service
[Unit]
Documentation=https://prometheus.io/
[Service]
Type=simple
ExecStart=/usr/local/bin/statsd_exporter --statsd.listen-udp localhost:8125 --log.level debug
Restart=on-failure
[Install]
WantedBy=multi-user.target
[root@airflow ~]# 

$ statsd_exporter --statsd.listen-udp localhost:8125 --log.level debug

level=info ts=2020-09-18T15:26:51.283Z caller=main.go:302 msg="Starting StatsD -> Prometheus Exporter" version="(version=, branch=, revision=)"
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:303 msg="Build context" context="(go=go1.14.7, user=, date=)"
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:304 msg="Accepting StatsD Traffic" udp=localhost:8125 tcp=:9125 unixgram=
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:305 msg="Accepting Prometheus Requests" addr=:9102
level=debug ts=2020-09-18T15:26:52.534Z caller=listener.go:69 msg="Incoming line" proto=udp line=airflow.executor.open_slots:32|g

如果一切顺利,您应该会看到气流指标在屏幕上滚动,如上面的示例所示。您还可以验证 statsd_exporter 是否正在执行其工作并以 Prometheus 格式公开指标。Prometheus 指标应该可以通过localhost:9102 访问。您可以使用curl来获取Prometheus指标:

$curl localhost:9102/metrics 
# HELP airflow_collect_dags 由 statsd_exporter 自动生成的指标。
# TYPE airflow_collect_dags gauge 
airflow_collect_dags 50.056391 
# HELP airflow_dag_loading_duration_example_bash_operator 由 statsd_exporter 自动生成的指标。
# 类型airflow_dag_loading_duration_example_bash_operator摘要
airflow_dag_loading_duration_example_bash_operator{quantile =“0.5”}1.108e-06airflow_dag_loading_duration_example_bash_operator 
{quantile =“0.9”}4.942e-06airflow_dag_loading_duration_example_bash_operator 
{quantile =“0.99” “} 4.942e-06 
airflow_dag_loading_duration_example_bash_operator_sum 1.8886000000000002e-05 
airflow_dag_loading_duration_example_bash_operator_count 7
# HELP airflow_dag_loading_duration_example_branch_dop_operator_v3 由 statsd_exporter 自动生成的指标。
# 类型airflow_dag_loading_duration_example_branch_dop_operator_v3摘要
airflow_dag_loading_duration_example_branch_dop_operator_v3{quantile =“0.5”}1.61e-06airflow_dag_loading_duration_example_branch_dop_operator_v3 
{quantile =“0.9”}5.776e-06airflow_dag_ load_duration_example_branch_dop_operator_v3{quantile="0.99" 
} 5.776e-06 airflow_dag_loading_duration_example_branch_dop_operator_v3_sum 
1.8076e-05 
airflow_dag_loading_duration_example_branch_dop_operator_v3_count 6 
...

使用 Prometheus 收集指标

完成上一部分后,Airflow 指标现在以 Prometheus 格式提供。下一步,我们将部署 Prometheus 服务器来收集这些指标。您可以从项目下载页面GitHub发布页面下载 Prometheus 预编译的二进制文件。或者,您可以利用现有的Prometheus容器映像。

收集 Airflow 指标的最低 Prometheus 配置如下所示:

scrape_configs:
  - job_name: airflow
    static_configs:
      - targets: ['localhost:9102']

它指示 Prometheus 服务器定期从端点 l ocalhost:9102 抓取指标。将上述配置保存为名为 prometheus.yml 的文件,并通过发出以下命令启动 Prometheus 服务器:

$ prometheus --config.file prometheus.yml

现在,您可以使用浏览器转到 Prometheus 内置仪表板(网址为http://localhost:9090/graph)并查看 Airflow 指标。