Airflow接管galler-dl下載任務

昨天用 gallery-dl 批量下載圖片的時候,還是覺得不夠方便。想起以前在公司,給自己寫的系統做數據備份的時候,用到了 Airflow,能用飛書發送備份結果,每天高枕無憂。

其實,批量圖片下載等這種機械任務,也可以放在 Airflow 裏面跑,靈活可定製,過程記錄如下。

1. 安裝 Airflow

現在 Airflow 都是 ASF( Apache Software Foundation)的項目了,世界變化真是快。

下面操作在 Arch linux 下完成:

創建虛擬環境

~/airflow 目錄創建虛擬環境並激活

1➜  airflow pwd
2/home/mephisto/airflow
3➜  airflow uv venv
4➜  airflow source .venv/bin/activate

按照官方文檔指引安裝 Airflow

此處安裝最新 3.1.5 版本。三個星期前發佈的,新鮮出爐、熱辣滾燙。

1AIRFLOW_VERSION=3.1.5
2PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
3CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
4uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

standalone 模式運行

1airflow standalone

瀏覽器訪問localhost:8080,留意命令輸出中的用戶名/密碼(登錄用到)。

也可自己更改用戶密碼,直接修改 simple_auth_manager_passwords.json.generated 這個文件

1➜  airflow cat simple_auth_manager_passwords.json.generated
2{"admin": "admin"}

上面的意思就是,用戶 admin,密碼 admin, 測試環境使用。

2. 編寫 Airflow dag

直接看示例:

 1  airflow cat dags/download_twitter_media.py
 2from datetime import datetime, timedelta
 3from airflow import DAG
 4from airflow.providers.standard.operators.bash import BashOperator
 5from airflow.models import Variable
 6import json
 7
 8# 從 Airflow Variable 獲取用戶列表(JSON 格式字符串)
 9USERS_JSON = Variable.get("twitter_users", default_var='["nasa"]')
10try:
11    USERS = json.loads(USERS_JSON)
12except json.JSONDecodeError:
13    raise ValueError("Variable 'twitter_users' must be a valid JSON array of strings.")
14
15DOWNLOAD_BASE_PATH = Variable.get("twitter_download_path", default_var="~/Pictures/twitter")
16
17default_args = {
18    "owner": "data_team",
19    "depends_on_past": False,
20    "email_on_failure": False,
21    "retries": 1,
22    "retry_delay": timedelta(minutes=5),
23}
24
25with DAG(
26    dag_id="download_twitter_multiple_users",
27    default_args=default_args,
28    description="Download media from multiple Twitter/X users using gallery-dl",
29    schedule="@daily",
30    start_date=datetime(2025, 1, 1),
31    catchup=False,
32    tags=["social_media", "twitter", "multi_user"],
33) as dag:
34
35    prev_task = None # 設定串行,防止被封ip
36
37    # 動態爲每個用戶生成任務
38    for user in USERS:
39        user = user.strip()
40        if not user:
41            print("no twitter user , set at 'Airflow/admin/variables'")
42
43        #mkdir_cmd = f"mkdir -p {DOWNLOAD_BASE_PATH}/{user}"
44
45        # gallery-dl 命令(增量下載)
46        gallery_dl_cmd = (
47            f"gallery-dl "
48            f"--cookies-from-browser firefox "
49            f"--download-archive {DOWNLOAD_BASE_PATH}/{user}/archive.txt "
50            f"https://x.com/{user}/media"
51        )
52
53        #create_dir = BashOperator(
54        #    task_id=f"create_dir_{user}",
55        #    bash_command=mkdir_cmd,
56        #)
57
58        download_task = BashOperator(
59            task_id=f"download_{user}",
60            bash_command=gallery_dl_cmd,
61        )
62
63        # 串行:上一個任務完成後才執行當前任務
64
65        if prev_task:
66            prev_task >> download_task
67        prev_task = download_task
68
69        # 設置依賴:先建目錄,再下載
70        #create_dir >> download_task

今天加了 download archive,增量下載用的,實際上是個 SQLite 數據庫文件, 文件名可以自己指定的。

1➜  file archive.txt
2archive.txt: SQLite 3.x database, last written using SQLite version 3051001, file counter 1, database pages 2, cookie 0x1, schema 4, UTF-8, version-valid-for 1

上面的 twitter_download_pathtwitter_download_path 從 Airflow 的 variables 讀取,類型分別爲字符串和數組。

下次更改用戶和目標路徑就直接在 Airflow web 界面中修改就好了,很方便實用。

倘若遇到 dag 導入錯誤,在 Airflow 首頁界面上會有提示的,點擊查看報錯修復即可。

3. 截圖示例

有圖有真相:

dag

airflow_dag

變量設置

在 Web 端完成。

airflow_variables_1
airflow_variables_2

task 詳情

截圖很清晰,沒有必要解釋了。看右側的日誌,和自己在終端手動執行是一樣的效果。

airflow_task

這樣,每天都能自動執行,如果你有 NAS, 在上面部署一套,自動下載蒐羅,還能加發送結果通知的功能,很不錯的,邏輯想怎麼改就怎麼改(比如一個用戶一個 dag、改成並行等,我怕被封 ip,特意改成串行的)。

此外,galler-dl 支持很多平臺,各種常見內容網站幾乎都能下,類似的工具也有很多,不喜歡的話,找個替換的,選擇社區強大、更新頻繁的就好。

最後修改於: Thursday, January 1, 2026

相關文章:

翻譯: