Celeryタスクのリアルタイム結果を簡単に取得する

日々のWeb開発では、メールの送信、ファイルのアップロード、ソフトウェアのコンパイルなど、長時間実行されるバックグラウンドタスクが頻繁に発生します。これらのタスクが完了すると、フロントエンドページをリアルタイムで更新できます。

タスクが完了したときや変更が発生したときに、ユーザーに通知します。

この記事では、シンプルな実装例を紹介します。

Socket.io フロントエンド + Flask-socketio + Celery

1. フロントエンドページ

以下の内容の test.html ファイルを作成します。

 1<!DOCTYPE html>
 2<html lang="en">
 3<head>
 4<title>Socket.IO Test</title>
 5<script src="https://cdn.socket.io/4.0.1/socket.io.min.js"></script>
 6<script>
 7var socket = io.connect('http://127.0.0.1:5000');
 8
 9socket.on('connect', function() {
10console.log('Connected:', socket.id);
11});
12
13socket.on('disconnect', function() {
14console.log('Disconnected');
15});
16
17socket.on('test event', function(data) {
18console.log('Received message:', data);
19// alert(data.message);
20});
21
22function sendMessage() {
23var message = document.getElementById('message').value; socket.emit('my_event', message);
24}
25</script>
26</head>
27<body>
28<input type="text" id="message">
29<button onclick="sendMessage()">Send</button>
30</body>
31</html>

このページは非常にシンプルです。CDNからsocket.io.min.jsファイルをインポートし、バックエンドとのWS接続を確立し、シンプルなメッセージ送信ボタンを備えています。メッセージを受信すると、関連情報を表示します。初心者にとって非常に使いやすいです。

それでもわからない場合は、公式アカウントをフォローしてメッセージを残してください。できる限りお答えします。

2. Flask バックエンド

app.py ファイルを作成し、すべての依存関係をインストールします。

 1
 2flask から Flask をインポート
 3flask_socketio から SocketIO をインポート
 4celery から celery をインポート
 5
 6app = Flask(__name__)
 7app.config['SECRET_KEY'] ='my_secret_key'
 8socketio = SocketIO(app, cors_allowed_origins="*",message_queue='redis://',logger=True, engineio_logger=True)
 9
10# Celery の設定
11app.config['CELERY_BROKER_URL'] ='redis://localhost:6379/0'
12app.config['CELERY_RESULT_BACKEND'] ='redis://localhost:6379/0'
13
14celery = Celery(app.name, Broker=app.config['CELERY_BROKER_URL'],backend=app.config['CELERY_RESULT_BACKEND'])
15celery.conf.beat_schedule = {
16'task_interval': {
17'task': 'app.long_running_task',
18'schedule': 10.0, # 時間間隔
19}
20}
21
22@celery.task
23def long_running_task():
24# タスク実行をシミュレートする
25import time
26time.sleep(8)
27# タスク完了後にSocketIO経由で通知を送信する
28socketio.emit('test event', { 'message': 'Job done! time is: {}'.format( time.ctime() ) })
29
30@socketio.on('connect')
31def handle_connect():
32print('接続成功')
33
34@socketio.on('disconnect')
35def handle_disconnect():
36print('クライアントが切断されました')
37
38if __name__ == '__main__':
39socketio.run(app, host='0.0.0.0', port=5000)

上記のコードは、シミュレートされたタスク long_running_task を10秒ごとに実行します。タスクが完了すると、フロントエンドに作業の完了を通知するメッセージを送信し、現在時刻を表示します。

重要なポイントは以下のとおりです。

1socketio = SocketIO(app, cors_allowed_origins="*",message_queue='redis://',logger=True, engineio_logger=True)

クロスオリジン設定とデバッグが有効になっているため、バックエンドで送受信メッセージを確認しやすくなります。message_queue 設定項目も非常に重要です。ドキュメントをよく読んでいないと、本番環境にデプロイする際に、フロントエンドがメッセージを受信できない理由を不思議に思い、丸一日デバッグに費やすことになるかもしれません。

3. テスト環境の実行

フロントエンドアプリケーションの実行:Firefox で test.html を実行します。ブラウザで test.html ファイルを開き、デバッグウィンドウを有効にします。

バックエンドの実行:

  • サーバーの実行
 1(.venv) ➜ flask-socketio-celery python app.py
 2サーバーはスレッド化のために初期化されています。
 3* Flask アプリ 'app' を提供しています
 4* デバッグモード:オフ
 5警告:これは開発サーバーです。本番環境では使用しないでください。代わりに本番環境の WSGI サーバーを使用してください。
 6* すべてのアドレス (0.0.0.0) で実行中
 7* http://127.0.0.1:5000 で実行中
 8* http://192.168.124.5:5000 で実行中
 9終了するには Ctrl+C を押してください
10OSxZbltBcO8kRT9NAAAA: パケット OPEN データを送信しています {'sid': 'OSxZbltBcO8kRT9NAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
  • Redis サーバーを起動します
 1➜ ~ redis-server
 221407:C 2024年8月4日 15:39:58.061 # 警告 メモリオーバーコミットを有効にする必要があります。有効にしないと、メモリ不足の状態でバックグラウンド保存またはレプリケーションが失敗する可能性があります。無効にすると、メモリ不足が発生しない状態でも障害が発生する可能性があります。https://github.com/jemalloc/jemalloc/issues/1328 を参照してください。この問題を修正するには、/etc/sysctl.conf に「vm.overcommit_memory = 1」を追加し、再起動するか、コマンド「sysctl vm.overcommit_memory=1」を実行して有効にしてください。
 321407:C 2024年8月4日 15:39:58.061 * oO0OoO0OoO0Oo Redis を起動しています oO0OoO0OoO0Oo
 421407:C 2024年8月4日 15:39:58.061 * Redis バージョン=7.2.5、ビット数=64、コミット=00000000、変更内容=0、pid=21407、起動済み
 521407:C 2024年8月4日 15:39:58.061 # 警告: 設定ファイルが指定されていません。デフォルトの設定を使用します。設定ファイルを指定するには、redis-server /path/to/redis.conf を使用してください。
 621407:M 2024年8月4日 15:39:58.061 * オープンファイルの最大数を10032に増加しました(元々は1024に設定されていました)。
 721407:M 2024年8月4日 15:39:58.061 * モノトニッククロック: POSIX clock_gettime
 8_._
 9_.-``__ ''-._
10_.-`` `. `_. ''-._ Redis 7.2.5 (00000000/0) 64ビット
11.-`` .-```. ```\/ _.,_ ''-._
12( ' , .-` | `, ) スタンドアロンモードで実行中
13|`-._`-...-` __...-.``-._|'` _.-'| ポート: 6379
14| `-._ `._ / _.-' | PID: 21407
15`-._ `
16-._ `-./ _.-' _.-'
17|`-._`-._ `-.__.-' _.-'_.-'|
18| `-._`-._ _.-'_.-' | https://redis.io
19`-._ `-._`-.__.-'_.-' _.-'
20|`-._`-._ `-.__.-' _.-'_.-'|
21| `-._`-._ _.-'_.-' |
22`-._ `-._`-.__.-'_.-' _.-'
23`-._ `-.__.-' _.-'
24`-._ _.-'
25`-.__.-'
26
2721407:M 2024年8月4日 15:39:58.061 * サーバーを初期化しました
2821407:M 2024年8月4日 15:39:58.061 * バージョン7.2.5で生成されたRDBをロードしています
  • celery beat を実行
 1(.venv) ➜ flask-socketio-celery celery -A app.celery beat -l info
 2サーバーをスレッド用に初期化しました。
 3celery beat v5.4.0 (opalescent) を起動しています。
 4__ - ... __ - _
 5LocalTime -> 2024-08-04 15:41:43
 6Configuration ->
 7.broker -> redis://localhost:6379/0
 8.loader -> celery.loaders.app.AppLoader
 9.scheduler -> celery.beat.PersistentScheduler
10.db->celerybeat-schedule
11.logfile -> [stderr]@%INFO
12. maxinterval -> 5.00分 (300秒)
13[2024-08-04 15:41:43,989: INFO/MainProcess] beat: 開始中...
14[2024-08-04 15:41:50,632: INFO/MainProcess] Scheduler: 期限タスク task_interval (app.long_running_task) を送信中
  • Celeryワーカーを実行
 1(.venv) ➜ flask-socketio-celery celery -A app.celery worker -l info
 2サーバーをスレッド用に初期化しました。
 3
 4-------------- celery@minipc v5.4.0 (opalescent)
 5---****-----
 6-- ******* ---- Linux-6.10.2-arch1-1-x86_64-with-glibc2.40 2024-08-04 15:43:05
 7- *** --- * ---
 8- ** ---------- [config]
 9- ** ---------- .> app: app:0x78e41973b140
10- ** ---------- .> transport: redis://localhost:6379/0
11- ** ---------- .> results: redis://localhost:6379/0
12- *** --- * --- .> concurrency: 16 (prefork)
13-- ******* ---- .> task events: OFF (-E を有効にすると監視されますこのワーカーのタスク)
14---****-----
15--------------- [キュー]
16.> celery exchange=celery(direct) key=celery
17
18[タスク]
19.app.long_running_task
20
21[2024-08-04 15:43:05,949: WARNING/MainProcess] /home/mephisto/github/flask-socketio-celery/.venv/lib/python3.12/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: Celery 6.0 以降では、broker_connection_retry 構成設定は、起動時にブローカー接続の再試行を行うかどうかを決定しなくなります。
22起動時に接続を再試行する既存の動作を維持する場合は、broker_connection_retry_on_startup を True に設定する必要があります。
23warnings.warn(
24
25[2024-08-04 15:43:05,954: INFO/MainProcess] redis://localhost:6379/0 に接続しました
26[2024-08-04 15:43:05,954: WARNING/MainProcess] /home/mephisto/github/flask-socketio-celery/.venv/lib/python3.12/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: Celery 6.0 以降では、broker_connection_retry 構成設定は、起動時にブローカー接続の再試行を行うかどうかを決定しなくなりました。
27起動時に接続を再試行する既存の動作を維持する場合は、broker_connection_retry_on_startup を True に設定してください。
28warnings.warn(
29
30[2024-08-04 15:43:05,957: INFO/MainProcess] mingle: 近隣のユーザーを検索中
31[2024-08-04 15:43:06,962: INFO/MainProcess] mingle: 一人ぼっち
32[2024-08-04 15:43:06,970: INFO/MainProcess] celery@minipc が準備完了です。
33[2024-08-04 15:43:06,971: INFO/MainProcess] タスク app.long_running_task[324cef1b-a8e4-43b0-925d-9e28c841e3b5] を受信しました

4. 結果を表示する

入力するのが面倒なので、スクリーンショット

socketio-flask

左側は、ブラウザで開いた test.tml ファイルのデバッグ画面です。右側の時計回りの矢印は、それぞれ Flask、Redis、Beat、Worker を示しています。

左側のコンソールには、期待される情報が 10 秒ごとに表示されます。

結局、各記事の執筆に少なくとも 2 時間かかり、ほとんど成果が出ず、モチベーションが下がってしまいました。

新しい仕事は以前よりも忙しくなりました。更新頻度を落とさざるを得ません。今日は週末です(信じられないなら、上のスクリーンショットの時間を見てください)。普段はゲームをしています。労働者は余暇に割く時間はあまりなく、生活費を稼がなければなりません!

このあまり面白くない記事を書くために、余暇を犠牲にしなければなりませんでした。この記事がお役に立った場合やご質問がある場合は、サポートの一環として公式アカウントをフォローしてください。

本番環境へのデプロイは、これよりもはるかに複雑です。

例えば、 Vue3 では、バックエンドのタスクが完了すると、フロントエンドは即座にページを自動的に更新する必要があります。変更を動的に監視するには watch を使用する必要があります(ポーリングも可能ですが、あまりエレガントではありません)。

バックエンドのデプロイメントには、Nginx プロキシの設定が含まれます。また、Gunicorn を使用していて公式ドキュメントをよく読んでいない場合、設定を間違えると、フロントエンドが何も情報を受け取れないまま、多くの操作が必要になる可能性があります。

最も馬鹿げたことは、間違った URL を記述すると、SocketIO が URL の誤った部分をプロトコルメッセージとしてバックエンドに送信し、接続を確立できなくなることです。

これらすべての詳細を書いて説明するには、おそらく数十分かけて入力し、スクリーンショットを撮る必要があるでしょう。個人的には、週末の空き時間に書くのが限界だと思います。結局のところ、利益は出ませんが、有料の Q&A で満足しています。

最終更新日: Wednesday, August 13, 2025

翻訳: