Airflow接管galler-dl下载任务

昨天用 gallery-dl 批量下载图片的时候,还是觉得不够方便。想起以前在公司,给自己写的系统做数据备份的时候,用到了 Airflow,能用飞书发送备份结果,每天高枕无忧。

其实,批量图片下载等这种机械任务,也可以放在 Airflow 里面跑,灵活可定制,过程记录如下。

1. 安装 Airflow

现在 Airflow 都是 ASF( Apache Software Foundation)的项目了,世界变化真是快。

下面操作在 Arch linux 下完成:

创建虚拟环境

~/airflow 目录创建虚拟环境并激活

1➜  airflow pwd
2/home/mephisto/airflow
3➜  airflow uv venv
4➜  airflow source .venv/bin/activate

按照官方文档指引安装 Airflow

此处安装最新 3.1.5 版本。三个星期前发布的,新鲜出炉、热辣滚烫。

1AIRFLOW_VERSION=3.1.5
2PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
3CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
4uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

standalone 模式运行

1airflow standalone

浏览器访问localhost:8080,留意命令输出中的用户名/密码(登录用到)。

也可自己更改用户密码,直接修改 simple_auth_manager_passwords.json.generated 这个文件

1➜  airflow cat simple_auth_manager_passwords.json.generated
2{"admin": "admin"}

上面的意思就是,用户 admin,密码 admin, 测试环境使用。

2. 编写 Airflow dag

直接看示例:

 1  airflow cat dags/download_twitter_media.py
 2from datetime import datetime, timedelta
 3from airflow import DAG
 4from airflow.providers.standard.operators.bash import BashOperator
 5from airflow.models import Variable
 6import json
 7
 8# 从 Airflow Variable 获取用户列表(JSON 格式字符串)
 9USERS_JSON = Variable.get("twitter_users", default_var='["nasa"]')
10try:
11    USERS = json.loads(USERS_JSON)
12except json.JSONDecodeError:
13    raise ValueError("Variable 'twitter_users' must be a valid JSON array of strings.")
14
15DOWNLOAD_BASE_PATH = Variable.get("twitter_download_path", default_var="~/Pictures/twitter")
16
17default_args = {
18    "owner": "data_team",
19    "depends_on_past": False,
20    "email_on_failure": False,
21    "retries": 1,
22    "retry_delay": timedelta(minutes=5),
23}
24
25with DAG(
26    dag_id="download_twitter_multiple_users",
27    default_args=default_args,
28    description="Download media from multiple Twitter/X users using gallery-dl",
29    schedule="@daily",
30    start_date=datetime(2025, 1, 1),
31    catchup=False,
32    tags=["social_media", "twitter", "multi_user"],
33) as dag:
34
35    prev_task = None # 设定串行,防止被封ip
36
37    # 动态为每个用户生成任务
38    for user in USERS:
39        user = user.strip()
40        if not user:
41            print("no twitter user , set at 'Airflow/admin/variables'")
42
43        #mkdir_cmd = f"mkdir -p {DOWNLOAD_BASE_PATH}/{user}"
44
45        # gallery-dl 命令(增量下载)
46        gallery_dl_cmd = (
47            f"gallery-dl "
48            f"--cookies-from-browser firefox "
49            f"--download-archive {DOWNLOAD_BASE_PATH}/{user}/archive.txt "
50            f"https://x.com/{user}/media"
51        )
52
53        #create_dir = BashOperator(
54        #    task_id=f"create_dir_{user}",
55        #    bash_command=mkdir_cmd,
56        #)
57
58        download_task = BashOperator(
59            task_id=f"download_{user}",
60            bash_command=gallery_dl_cmd,
61        )
62
63        # 串行:上一个任务完成后才执行当前任务
64
65        if prev_task:
66            prev_task >> download_task
67        prev_task = download_task
68
69        # 设置依赖:先建目录,再下载
70        #create_dir >> download_task

今天加了 download archive,增量下载用的,实际上是个 SQLite 数据库文件, 文件名可以自己指定的。

1➜  file archive.txt
2archive.txt: SQLite 3.x database, last written using SQLite version 3051001, file counter 1, database pages 2, cookie 0x1, schema 4, UTF-8, version-valid-for 1

上面的 twitter_download_pathtwitter_download_path 从 Airflow 的 variables 读取,类型分别为字符串和数组。

下次更改用户和目标路径就直接在 Airflow web 界面中修改就好了,很方便实用。

倘若遇到 dag 导入错误,在 Airflow 首页界面上会有提示的,点击查看报错修复即可。

3. 截图示例

有图有真相:

dag

airflow_dag

变量设置

在 Web 端完成。

airflow_variables_1
airflow_variables_2

task 详情

截图很清晰,没有必要解释了。看右侧的日志,和自己在终端手动执行是一样的效果。

airflow_task

这样,每天都能自动执行,如果你有 NAS, 在上面部署一套,自动下载搜罗,还能加发送结果通知的功能,很不错的,逻辑想怎么改就怎么改(比如一个用户一个 dag、改成并行等,我怕被封 ip,特意改成串行的)。

此外,galler-dl 支持很多平台,各种常见内容网站几乎都能下,类似的工具也有很多,不喜欢的话,找个替换的,选择社区强大、更新频繁的就好。

最后修改于: Friday, January 16, 2026
欢迎关注微信公众号,留言交流;也欢迎使用我开发的微信小程序。

相关文章:

翻译: