简单获取celery任务实时结果

在日常web开发工作中,往往有一些需要长时间运行的后台任务,比如说发送邮件、上传文件、编译软件等。当这些任务完成时,前端页面能及时变更。

让有用户能感知到任务已经完成,或者说有变化。

本文列举一个最简单的实现方案:

socket.io前端 + flask-socketio + celery

1. 前端页面

创建一个test.html文件,内容如下。

 1<!DOCTYPE html>
 2<html lang="en">
 3<head>
 4    <title>Socket.IO Test</title>
 5    <script src="https://cdn.socket.io/4.0.1/socket.io.min.js"></script>
 6    <script>
 7        var socket = io.connect('http://127.0.0.1:5000');
 8
 9        socket.on('connect', function() {
10            console.log('Connected:', socket.id);
11        });
12
13        socket.on('disconnect', function() {
14            console.log('Disconnected');
15        });
16
17        socket.on('test event', function(data) {
18            console.log('Received message:', data);
19            // alert(data.message);
20        });
21
22        function sendMessage() {
23            var message = document.getElementById('message').value;
24            socket.emit('my_event', message);
25        }
26    </script>
27</head>
28    <body>
29        <input type="text" id="message">
30        <button onclick="sendMessage()">Send</button>
31    </body>
32</html>

页面很简单,从cdn上导入socket.io.min.js文件,和后端建立ws连接,还有简单的发送消息按钮,收到信息后,会在打印相关信息,对新手十分友好。

如果你还是不理解,可以关注公众号留言,我尽力解答。

2. Flask 后端

创建一愕app.py文件,装好各种依赖包。

 1
 2from flask import Flask
 3from flask_socketio import SocketIO
 4from celery import Celery
 5
 6app = Flask(__name__)
 7app.config['SECRET_KEY'] ='my_secret_key'
 8socketio = SocketIO(app, cors_allowed_origins="*",message_queue='redis://',logger=True, engineio_logger=True)
 9
10# Celery 配置
11app.config['CELERY_BROKER_URL'] ='redis://localhost:6379/0'
12app.config['CELERY_RESULT_BACKEND'] ='redis://localhost:6379/0'
13
14celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'],backend=app.config['CELERY_RESULT_BACKEND'])
15celery.conf.beat_schedule = {
16    'task_interval': {
17        'task': 'app.long_running_task',
18        'schedule': 10.0, # 时间间隔
19    }
20}
21
22@celery.task
23def long_running_task():
24    # 模拟任务执行
25    import time
26    time.sleep(8)
27    # 任务完成后通过 SocketIO 发送通知
28    socketio.emit('test event', { 'message': 'Job done! time is: {}'.format( time.ctime() ) })
29
30@socketio.on('connect')
31def handle_connect():
32    print('连接成功')
33
34@socketio.on('disconnect')
35def handle_disconnect():
36    print('Client disconnected')
37
38if __name__ == '__main__':
39    socketio.run(app, host='0.0.0.0', port=5000)

上述代码每隔10秒钟,执行long_running_task这个模拟任务,任务完成后,会给前端发消息,告诉工作已完成,并打印下当前时间。

关键点为:

1socketio = SocketIO(app, cors_allowed_origins="*",message_queue='redis://',logger=True, engineio_logger=True)

跨域设置,开启了dbug,方便在后端查看收发信息,message_queue 配置项也很重要,没仔细看文档的人,在生产环境部署时,debug能力差的可能倒腾一天 也不知道为何前端没有收到信息。

3. 测试环境运行

前端运行: firefox test.html, 用浏览器打开test.html文件,再把调试窗口打开。

后端运行:

  • 运行服务端
 1(.venv) ➜  flask-socketio-celery python app.py
 2Server initialized for threading.
 3 * Serving Flask app 'app'
 4 * Debug mode: off
 5WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 6 * Running on all addresses (0.0.0.0)
 7 * Running on http://127.0.0.1:5000
 8 * Running on http://192.168.124.5:5000
 9Press CTRL+C to quit
10OSxZbltBcO8kRT9NAAAA: Sending packet OPEN data {'sid': 'OSxZbltBcO8kRT9NAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
  • 开启redis server
 1➜  ~ redis-server
 221407:C 04 Aug 2024 15:39:58.061 # WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see https://github.com/jemalloc/jemalloc/issues/1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
 321407:C 04 Aug 2024 15:39:58.061 * oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
 421407:C 04 Aug 2024 15:39:58.061 * Redis version=7.2.5, bits=64, commit=00000000, modified=0, pid=21407, just started
 521407:C 04 Aug 2024 15:39:58.061 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
 621407:M 04 Aug 2024 15:39:58.061 * Increased maximum number of open files to 10032 (it was originally set to 1024).
 721407:M 04 Aug 2024 15:39:58.061 * monotonic clock: POSIX clock_gettime
 8                _._
 9           _.-``__ ''-._
10      _.-``    `.  `_.  ''-._           Redis 7.2.5 (00000000/0) 64 bit
11  .-`` .-```.  ```\/    _.,_ ''-._
12 (    '      ,       .-`  | `,    )     Running in standalone mode
13 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
14 |    `-._   `._    /     _.-'    |     PID: 21407
15  `-._    `-._  `-./  _.-'    _.-'
16 |`-._`-._    `-.__.-'    _.-'_.-'|
17 |    `-._`-._        _.-'_.-'    |           https://redis.io
18  `-._    `-._`-.__.-'_.-'    _.-'
19 |`-._`-._    `-.__.-'    _.-'_.-'|
20 |    `-._`-._        _.-'_.-'    |
21  `-._    `-._`-.__.-'_.-'    _.-'
22      `-._    `-.__.-'    _.-'
23          `-._        _.-'
24              `-.__.-'
25
2621407:M 04 Aug 2024 15:39:58.061 * Server initialized
2721407:M 04 Aug 2024 15:39:58.061 * Loading RDB produced by version 7.2.5
  • 运行celery beat
 1(.venv) ➜  flask-socketio-celery celery -A app.celery beat -l info
 2Server initialized for threading.
 3celery beat v5.4.0 (opalescent) is starting.
 4__    -    ... __   -        _
 5LocalTime -> 2024-08-04 15:41:43
 6Configuration ->
 7    . broker -> redis://localhost:6379/0
 8    . loader -> celery.loaders.app.AppLoader
 9    . scheduler -> celery.beat.PersistentScheduler
10    . db -> celerybeat-schedule
11    . logfile -> [stderr]@%INFO
12    . maxinterval -> 5.00 minutes (300s)
13[2024-08-04 15:41:43,989: INFO/MainProcess] beat: Starting...
14[2024-08-04 15:41:50,632: INFO/MainProcess] Scheduler: Sending due task task_interval (app.long_running_task)
  • 运行celery worker
 1(.venv) ➜  flask-socketio-celery celery -A app.celery worker -l info
 2Server initialized for threading.
 3
 4 -------------- celery@minipc v5.4.0 (opalescent)
 5--- ***** -----
 6-- ******* ---- Linux-6.10.2-arch1-1-x86_64-with-glibc2.40 2024-08-04 15:43:05
 7- *** --- * ---
 8- ** ---------- [config]
 9- ** ---------- .> app:         app:0x78e41973b140
10- ** ---------- .> transport:   redis://localhost:6379/0
11- ** ---------- .> results:     redis://localhost:6379/0
12- *** --- * --- .> concurrency: 16 (prefork)
13-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
14--- ***** -----
15 -------------- [queues]
16                .> celery           exchange=celery(direct) key=celery
17
18
19[tasks]
20  . app.long_running_task
21
22[2024-08-04 15:43:05,949: WARNING/MainProcess] /home/mephisto/github/flask-socketio-celery/.venv/lib/python3.12/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
23whether broker connection retries are made during startup in Celery 6.0 and above.
24If you wish to retain the existing behavior for retrying connections on startup,
25you should set broker_connection_retry_on_startup to True.
26  warnings.warn(
27
28[2024-08-04 15:43:05,954: INFO/MainProcess] Connected to redis://localhost:6379/0
29[2024-08-04 15:43:05,954: WARNING/MainProcess] /home/mephisto/github/flask-socketio-celery/.venv/lib/python3.12/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
30whether broker connection retries are made during startup in Celery 6.0 and above.
31If you wish to retain the existing behavior for retrying connections on startup,
32you should set broker_connection_retry_on_startup to True.
33  warnings.warn(
34
35[2024-08-04 15:43:05,957: INFO/MainProcess] mingle: searching for neighbors
36[2024-08-04 15:43:06,962: INFO/MainProcess] mingle: all alone
37[2024-08-04 15:43:06,970: INFO/MainProcess] celery@minipc ready.
38[2024-08-04 15:43:06,971: INFO/MainProcess] Task app.long_running_task[324cef1b-a8e4-43b0-925d-9e28c841e3b5] received

4. 查看结果

懒得打字了,看截图。

socketio-flask

左侧为浏览器打开test.tml文件的调试情况,右侧顺时针转圈,依次为flask、redis、beat、worker。

左侧console里面每隔十秒钟,打印出了预期信息。

最后,每次码文章至少都要耗费2个小时,收益几乎为零,动力日渐消退。

新工作比以前忙, 更新频率只能调低,今天是周末(不信,看上面截图的时间),通常我都在打游戏,民工娱乐时间短,要恰饭的嘛!

还只能放弃娱乐时间来写这个不痛不痒的文章。如果对读者的有帮助或者有疑问,可关注公众号,也算是支持。

另外在生产环境部署的时候,比这个要复杂很多。

比如用vue3,当后端任务完成后,前端要立刻自动刷新页面,要用watch动态监测变化(轮询也是可以,只是比较丑);

后端部署,会设涉及到Nginx代理的配置,还有如果你用的gunicorn,没仔细读官方文档,配置不当可能倒腾很久,前端都收不到信息;

最离谱的是,当地你url写错的时候,socketio会把错误的url的一部分当作协议消息发送给后端,连接都没法建立。

这些细节写起来、解释起来可能又要几十分钟码字截图,个人觉得能用周末闲暇时间来写,已经是极限了,毕竟不赚钱的,付费问答倒是乐意。

最后修改于: Sunday, August 4, 2024
欢迎关注微信公众号,留言交流。

相关文章:

翻译: