簡單獲取celery任務即時結果

在日常web開發工作中,往往有一些需要長時間運行的後臺任務,比如說發送郵件、上傳文件、編譯軟件等。當這些任務完成時,前端頁面能及時變更。

讓有用戶能感知到任務已經完成,或者說有變化。

本文列舉一個最簡單的實現方案:

socket.io前端 + flask-socketio + celery

1. 前端頁面

創建一個test.html文件,內容如下。

 1<!DOCTYPE html>
 2<html lang="en">
 3<head>
 4    <title>Socket.IO Test</title>
 5    <script src="https://cdn.socket.io/4.0.1/socket.io.min.js"></script>
 6    <script>
 7        var socket = io.connect('http://127.0.0.1:5000');
 8
 9        socket.on('connect', function() {
10            console.log('Connected:', socket.id);
11        });
12
13        socket.on('disconnect', function() {
14            console.log('Disconnected');
15        });
16
17        socket.on('test event', function(data) {
18            console.log('Received message:', data);
19            // alert(data.message);
20        });
21
22        function sendMessage() {
23            var message = document.getElementById('message').value;
24            socket.emit('my_event', message);
25        }
26    </script>
27</head>
28    <body>
29        <input type="text" id="message">
30        <button onclick="sendMessage()">Send</button>
31    </body>
32</html>

頁面很簡單,從cdn上導入socket.io.min.js文件,和後端建立ws連接,還有簡單的發送消息按鈕,收到信息後,會在打印相關信息,對新手十分友好。

如果你還是不理解,可以關注公衆號留言,我盡力解答。

2. Flask 後端

創建一愕app.py文件,裝好各種依賴包。

 1
 2from flask import Flask
 3from flask_socketio import SocketIO
 4from celery import Celery
 5
 6app = Flask(__name__)
 7app.config['SECRET_KEY'] ='my_secret_key'
 8socketio = SocketIO(app, cors_allowed_origins="*",message_queue='redis://',logger=True, engineio_logger=True)
 9
10# Celery 配置
11app.config['CELERY_BROKER_URL'] ='redis://localhost:6379/0'
12app.config['CELERY_RESULT_BACKEND'] ='redis://localhost:6379/0'
13
14celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'],backend=app.config['CELERY_RESULT_BACKEND'])
15celery.conf.beat_schedule = {
16    'task_interval': {
17        'task': 'app.long_running_task',
18        'schedule': 10.0, # 時間間隔
19    }
20}
21
22@celery.task
23def long_running_task():
24    # 模擬任務執行
25    import time
26    time.sleep(8)
27    # 任務完成後通過 SocketIO 發送通知
28    socketio.emit('test event', { 'message': 'Job done! time is: {}'.format( time.ctime() ) })
29
30@socketio.on('connect')
31def handle_connect():
32    print('連接成功')
33
34@socketio.on('disconnect')
35def handle_disconnect():
36    print('Client disconnected')
37
38if __name__ == '__main__':
39    socketio.run(app, host='0.0.0.0', port=5000)

上述代碼每隔10秒鐘,執行long_running_task這個模擬任務,任務完成後,會給前端發消息,告訴工作已完成,並打印下當前時間。

關鍵點爲:

1socketio = SocketIO(app, cors_allowed_origins="*",message_queue='redis://',logger=True, engineio_logger=True)

跨域設置,開啓了dbug,方便在後端查看收發信息,message_queue 配置項也很重要,沒仔細看文檔的人,在生產環境部署時,debug能力差的可能倒騰一天 也不知道爲何前端沒有收到信息。

3. 測試環境運行

前端運行: firefox test.html, 用瀏覽器打開test.html文件,再把調試窗口打開。

後端運行:

  • 運行服務端
 1(.venv) ➜  flask-socketio-celery python app.py
 2Server initialized for threading.
 3 * Serving Flask app 'app'
 4 * Debug mode: off
 5WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 6 * Running on all addresses (0.0.0.0)
 7 * Running on http://127.0.0.1:5000
 8 * Running on http://192.168.124.5:5000
 9Press CTRL+C to quit
10OSxZbltBcO8kRT9NAAAA: Sending packet OPEN data {'sid': 'OSxZbltBcO8kRT9NAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
  • 開啓redis server
 1➜  ~ redis-server
 221407:C 04 Aug 2024 15:39:58.061 # WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see https://github.com/jemalloc/jemalloc/issues/1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
 321407:C 04 Aug 2024 15:39:58.061 * oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
 421407:C 04 Aug 2024 15:39:58.061 * Redis version=7.2.5, bits=64, commit=00000000, modified=0, pid=21407, just started
 521407:C 04 Aug 2024 15:39:58.061 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
 621407:M 04 Aug 2024 15:39:58.061 * Increased maximum number of open files to 10032 (it was originally set to 1024).
 721407:M 04 Aug 2024 15:39:58.061 * monotonic clock: POSIX clock_gettime
 8                _._
 9           _.-``__ ''-._
10      _.-``    `.  `_.  ''-._           Redis 7.2.5 (00000000/0) 64 bit
11  .-`` .-```.  ```\/    _.,_ ''-._
12 (    '      ,       .-`  | `,    )     Running in standalone mode
13 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
14 |    `-._   `._    /     _.-'    |     PID: 21407
15  `-._    `-._  `-./  _.-'    _.-'
16 |`-._`-._    `-.__.-'    _.-'_.-'|
17 |    `-._`-._        _.-'_.-'    |           https://redis.io
18  `-._    `-._`-.__.-'_.-'    _.-'
19 |`-._`-._    `-.__.-'    _.-'_.-'|
20 |    `-._`-._        _.-'_.-'    |
21  `-._    `-._`-.__.-'_.-'    _.-'
22      `-._    `-.__.-'    _.-'
23          `-._        _.-'
24              `-.__.-'
25
2621407:M 04 Aug 2024 15:39:58.061 * Server initialized
2721407:M 04 Aug 2024 15:39:58.061 * Loading RDB produced by version 7.2.5
  • 運行celery beat
 1(.venv) ➜  flask-socketio-celery celery -A app.celery beat -l info
 2Server initialized for threading.
 3celery beat v5.4.0 (opalescent) is starting.
 4__    -    ... __   -        _
 5LocalTime -> 2024-08-04 15:41:43
 6Configuration ->
 7    . broker -> redis://localhost:6379/0
 8    . loader -> celery.loaders.app.AppLoader
 9    . scheduler -> celery.beat.PersistentScheduler
10    . db -> celerybeat-schedule
11    . logfile -> [stderr]@%INFO
12    . maxinterval -> 5.00 minutes (300s)
13[2024-08-04 15:41:43,989: INFO/MainProcess] beat: Starting...
14[2024-08-04 15:41:50,632: INFO/MainProcess] Scheduler: Sending due task task_interval (app.long_running_task)
  • 運行celery worker
 1(.venv) ➜  flask-socketio-celery celery -A app.celery worker -l info
 2Server initialized for threading.
 3
 4 -------------- celery@minipc v5.4.0 (opalescent)
 5--- ***** -----
 6-- ******* ---- Linux-6.10.2-arch1-1-x86_64-with-glibc2.40 2024-08-04 15:43:05
 7- *** --- * ---
 8- ** ---------- [config]
 9- ** ---------- .> app:         app:0x78e41973b140
10- ** ---------- .> transport:   redis://localhost:6379/0
11- ** ---------- .> results:     redis://localhost:6379/0
12- *** --- * --- .> concurrency: 16 (prefork)
13-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
14--- ***** -----
15 -------------- [queues]
16                .> celery           exchange=celery(direct) key=celery
17
18
19[tasks]
20  . app.long_running_task
21
22[2024-08-04 15:43:05,949: WARNING/MainProcess] /home/mephisto/github/flask-socketio-celery/.venv/lib/python3.12/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
23whether broker connection retries are made during startup in Celery 6.0 and above.
24If you wish to retain the existing behavior for retrying connections on startup,
25you should set broker_connection_retry_on_startup to True.
26  warnings.warn(
27
28[2024-08-04 15:43:05,954: INFO/MainProcess] Connected to redis://localhost:6379/0
29[2024-08-04 15:43:05,954: WARNING/MainProcess] /home/mephisto/github/flask-socketio-celery/.venv/lib/python3.12/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
30whether broker connection retries are made during startup in Celery 6.0 and above.
31If you wish to retain the existing behavior for retrying connections on startup,
32you should set broker_connection_retry_on_startup to True.
33  warnings.warn(
34
35[2024-08-04 15:43:05,957: INFO/MainProcess] mingle: searching for neighbors
36[2024-08-04 15:43:06,962: INFO/MainProcess] mingle: all alone
37[2024-08-04 15:43:06,970: INFO/MainProcess] celery@minipc ready.
38[2024-08-04 15:43:06,971: INFO/MainProcess] Task app.long_running_task[324cef1b-a8e4-43b0-925d-9e28c841e3b5] received

4. 查看結果

懶得打字了,看截圖。

socketio-flask

左側爲瀏覽器打開test.tml文件的調試情況,右側順時針轉圈,依次爲flask、redis、beat、worker。

左側console裏面每隔十秒鐘,打印出了預期信息。

最後,每次碼文章至少都要耗費2個小時,收益幾乎爲零,動力日漸消退。

新工作比以前忙, 更新頻率只能調低,今天是週末(不信,看上面截圖的時間),通常我都在打遊戲,民工娛樂時間短,要恰飯的嘛!

還只能放棄娛樂時間來寫這個不痛不癢的文章。如果對讀者的有幫助或者有疑問,可關注公衆號,也算是支持。

另外在生產環境部署的時候,比這個要複雜很多。

比如用vue3,當後端任務完成後,前端要立刻自動刷新頁面,要用watch動態監測變化(輪詢也是可以,只是比較醜);

後端部署,會設涉及到Nginx代理的配置,還有如果你用的gunicorn,沒仔細讀官方文檔,配置不當可能倒騰很久,前端都收不到信息;

最離譜的是,當地你url寫錯的時候,socketio會把錯誤的url的一部分當作協議消息發送給後端,連接都沒法建立。

這些細節寫起來、解釋起來可能又要幾十分鐘碼字截圖,個人覺得能用週末閒暇時間來寫,已經是極限了,畢竟不賺錢的,付費問答倒是樂意。

最後修改於: Sunday, August 4, 2024

相關文章:

翻譯: