投稿日:2020年10月29日
Celeryを使用すると柔軟なタスクのスケジューリングをすることができます。この記事ではCeleryのための環境構築と簡単な使い方について解説していきます。
Celeryは柔軟にタスクのスケジューリングをすることができる非同期のjob/taskモジュールです。
これを使うことで例えば以下のような実装を簡単に行うことができます。
この記事では純粋なPythonでCeleryのための環境構築と簡単な使い方について解説していきます。(Djangoに統合する場合は少し手順が違うため注意してください)
今回は主に環境構築、そしてCeleryが問題なく動くことまでの確認になります。
この記事ではDocker、Docker Composeがインストール済であることを前提としています。
このチュートリアルのソースコードはgithubの以下のリポジトリのpart2というタグにあります。
https://github.com/ogipochi/python_celery_example
次のコマンドでローカルにクローンできます。
git clone https://github.com/ogipochi/python_celery_example.git -b part1
.
├── docker-compose.yaml
└── python_app/
├── Dockerfile
└── src/
Docker環境を定義するファイルを作成しましょう。
version: "3"
services:
python_app:
build: ./python_app
volumes:
- ./python_app/src:/src
tty: true
networks:
redis_app:
image: redis:6.0
networks:
- redis_network
networks:
redis_network:
FROM python:3.7
RUN pip install -U celery[erdis]
COPY ./src /src
WORKDIR /src
python_appコンテナのImageはCeleryライブラリをインストールしています。
pip install celery[redis]というコマンドはceleryパッケージに加えて、redisの関連パッケージもインストールするコマンドです。redisの場合は、pip install celery redisと書くのと同じです。
またpython_appのコンテナとredis_appのコンテナはnetworkで接続しそれぞれがappのnameで参照できる様になっています。
Dockerのネットワークについて詳しくない方はローカルのPCにPythonとRedisをインストールした様な状態だと思ってもらってOKです。
環境構築でpython_app/srcディレクトリがpython_appのコンテナにマウントしてあるので、そこにスクリプトファイルを作成しましょう。
まずは以下のようにtasks.pyというファイルを作成します。
from celery import Celery
app = Celery('tasks', broker='redis://redis_app:6379/')
@app.task
def add(x, y):
return x + y
このファイルについて簡単な解説をしておきます。
はじめにCelery()でCeleryオブジェクトを作成しています。
第1引数はモジュールの名前を指定します。デフォルト値は`__main__`です。
brokerとして指定しているのはdockerで作成したredisのurlです。
redis_appという値は環境構築の際にdocker-compose.yamlで指定したRedisコンテナのapp_nameです。
Celery Worker Serverの起動にはcelery -A タスク名 worker --loglevel=INFOというコマンドを使用します。
python_appビルド対象のDockerfileのENTRYPOINTにこのコマンドを指定しましょう。
FROM python:3.7
RUN pip install -U celery[redis]
COPY ./src /src
WORKDIR /src
ENTRYPOINT [ "celery","-A","tasks","worker","--loglevel=INFO" ] # コマンドを追加
これでDocker Composeを起動させてみましょう
$ docker-compose build
$ docker-compose up
上記コマンドで次の様な出力が確認できたらCelery Service Workerが起動しているという事です。
python_app_1 | /usr/local/lib/python3.7/site-packages/celery/platforms.py:798: RuntimeWarning: You're running the worker with superuser privileges: this is
python_app_1 | absolutely not recommended!
python_app_1 |
python_app_1 | Please specify a different user using the --uid option.
python_app_1 |
python_app_1 | User information: uid=0 euid=0 gid=0 egid=0
python_app_1 |
python_app_1 | uid=uid, euid=euid, gid=gid, egid=egid,
python_app_1 |
python_app_1 | -------------- celery@3ccdfaf18919 v5.0.1 (singularity)
python_app_1 | --- ***** -----
python_app_1 | -- ******* ---- Linux-5.4.0-51-generic-x86_64-with-debian-9.8 2020-10-28 15:18:08
python_app_1 | - *** --- * ---
python_app_1 | - ** ---------- [config]
python_app_1 | - ** ---------- .> app: tasks:0x7f42c5f02358
python_app_1 | - ** ---------- .> transport: redis://redis_app:6379//
python_app_1 | - ** ---------- .> results: disabled://
python_app_1 | - *** --- * --- .> concurrency: 4 (prefork)
python_app_1 | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
python_app_1 | --- ***** -----
python_app_1 | -------------- [queues]
python_app_1 | .> celery exchange=celery(direct) key=celery
python_app_1 |
python_app_1 |
python_app_1 | [tasks]
python_app_1 | . tasks.celery.add
python_app_1 |
python_app_1 | [2020-10-28 15:18:08,681: INFO/MainProcess] Connected to redis://redis_app:6379//
python_app_1 | [2020-10-28 15:18:08,688: INFO/MainProcess] mingle: searching for neighbors
python_app_1 | [2020-10-28 15:18:09,705: INFO/MainProcess] mingle: all alone
python_app_1 | [2020-10-28 15:18:09,715: INFO/MainProcess] celery@3ccdfaf18919 ready.
docker-compose upを実行したのとは別のターミナルを開き、起動したDockerコンテナに入ってPythonのインタプリタからタスクの実行をしてみます。
sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3ccdfaf18919 celery_sample_python_app "celery -A tasks wor…" 10 minutes ago Up 10 minutes celery_sample_python_app_1
b317ae9bbca3 redis:6.0 "docker-entrypoint.s…" 12 minutes ago Up 10 minutes 6379/tcp celery_sample_redis_app_1
$ sudo docker exec -it celery_sample_python_app_1 bash
root@3ccdfaf18919:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> tasks.add.delay(4,4)
<AsyncResult: a4caa10a-0d21-4a79-9429-f09583da9fc3>
この様にタスクを実行した側のスレッドの方にはAsyncResultが返されます。
このAsyncResultはタスクが終了したかの確認、タスクの実行結果の取得に利用できます。
しかし、デフォルトではタスクの実行結果は取得できません。
では定義したタスクの実行結果はどこに出力されているかというと、celeryコマンドを実行したターミナル、つまり今回はdocker-compose upコマンドを実行しているターミナルに出力されています。
[...]
python_app_1 | [2020-10-28 15:18:09,715: INFO/MainProcess] celery@3ccdfaf18919 ready.
python_app_1 | [2020-10-28 15:20:01,951: INFO/MainProcess] Received task: tasks.celery.add[a8470ddd-5481-4a78-a22a-9eecaacd0369]
python_app_1 | [2020-10-28 15:20:01,953: INFO/ForkPoolWorker-4] Task tasks.celery.add[a8470ddd-5481-4a78-a22a-9eecaacd0369] succeeded in 0.0001821209443733096s: 8
from celery import Celery
app = Celery(
'tasks',
backend='redis://redis_app:6379/',
broker='redis://redis_app:6379/')
@app.task
def add(x, y):
return x + y
$ sudo docker exec -it celery_sample_python_app_1 bash
root@e7a8033d9616:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> tasks.add.delay(4,4)
<AsyncResult: a4caa10a-0d21-4a79-9429-f09583da9fc3>
>>> exit()
root@e7a8033d9616:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> result = tasks.add.delay(4,4)
>>> result.ready()
True
>>> result.get(timeout=1)
8
get()のtimeout引数はデータの取得に時間がかかった時、諦めるまでの時間の指定です。今回は1秒に指定しました。
タスクでエラーが発生した場合、AsyncResultのget()をそのまま呼び出した場合、そこでも同じエラーが発生します。
get()でエラーを発生させたくない場合propagate引数にFalseをセットすることで発生したエラーを取得することもできます。
トレースバックはAsyncResultのtracebackという変数に文字列で格納されています。
$ sudo docker exec -it celery_sample_python_app_1 bash
root@e7a8033d9616:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> result = add.delay({"a":1,"b":3},4)
>>> result.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 230, in get
on_message=on_message,
File "/usr/local/lib/python3.7/site-packages/celery/backends/asynchronous.py", line 201, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 335, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 328, in throw
self.on_ready.throw(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/vine/promises.py", line 234, in throw
reraise(type(exc), exc, tb)
File "/usr/local/lib/python3.7/site-packages/vine/utils.py", line 30, in reraise
raise value
TypeError: unsupported operand type(s) for +: 'dict' and 'int'
>>> result.get(timeout=1,propagate=False)
TypeError("unsupported operand type(s) for +: 'dict' and 'int'")
>>> result.traceback
'Traceback (most recent call last):\n File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 409, in trace_task\n R = retval = fun(*args, **kwargs)\n File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 701, in __protected_call__\n return self.run(*args, **kwargs)\n File "/src/tasks.py", line 7, in add\nTypeError: unsupported operand type(s) for +: \'dict\' and \'int\'\n'