投稿日:2020年10月30日
Celeryを使用すると柔軟なタスクのスケジューリングをすることができます。この記事ではCeleryのための環境構築と簡単な使い方について解説していきます。
このチュートリアルのソースコードはgithubの以下のリポジトリのpart2というタグにあります。
https://github.com/ogipochi/python_celery_example
次のコマンドでローカルにクローンできます。
git clone https://github.com/ogipochi/python_celery_example.git -b part2
今回はパッケージを作成し、そこにモジュールを作成してみます。
中身のコードの内容的には前回のtasks.pyで定義する場合と同じですが、モジュールにする時はCeleryオブジェクトはcelery.pyというファイルに定義する必要があります。
まずは前回作成したpython_app/srcの中のtasks.pyファイルを削除し、projというモジュールを作成します。
python_app/src/
└── proj
├── __init__.py
├── celery.py
└── tasks.py
celery.pyの中身は以下の様になります。
コードの大部分は前回のtasks.pyと同じになります。
from celery import Celery
# Celeryオブジェクトを作成
app = Celery('proj',
broker='redis://redis_app:6379/',
backend='redis://redis_app:6379/',
include=['proj.tasks',])
# タスクの結果の保持時間を変更
app.conf.update(
result_expires=3600
)
if __name__ == '__main__':
app.start()
ここではCeleryオブジェクトの生成時の引数として新しくinclude引数を渡しています。この引数はworkerが起動した際にインポートするタスクをリストで渡すことができます。今回はproj/tasks.pyのモジュールを渡しています。
またapp.conf.update()で引数にresult_expiresを渡しています。この引数はタスクの結果を保持しておく時間の設定になります。デフォルトは1日に設定してあります。
ではproj/tasks.pyに実際にタスクの定義を記述していきましょう。
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
今回はprojというモジュールでCeleryの定義をしたので、Dockerfileの実行コマンドを変えてみます。
FROM python:3.7
RUN pip install -U celery[redis]
COPY ./src /src
WORKDIR /src
ENTRYPOINT [ "celery","-A","proj","worker","--loglevel=INFO" ]
実際に起動してみます。
まだ前回のDockerコンテナが起動している場合はdocker-compose downで終了させてから起動しましょう。
$ sudo docker-compose up --build
今回はpython_appコンテナの出力、つまりceleryコマンドの出力をもう少し詳しくみてみましょう。
[...]
python_app_1 | /usr/local/lib/python3.7/site-packages/celery/platforms.py:798: RuntimeWarning: You're running the worker with superuser privileges: this is
python_app_1 | absolutely not recommended!
python_app_1 |
python_app_1 | Please specify a different user using the --uid option.
python_app_1 |
python_app_1 | User information: uid=0 euid=0 gid=0 egid=0
python_app_1 |
python_app_1 | uid=uid, euid=euid, gid=gid, egid=egid,
python_app_1 |
python_app_1 | -------------- celery@0b5c0ff894a3 v5.0.1 (singularity)
python_app_1 | --- ***** -----
python_app_1 | -- ******* ---- Linux-5.4.0-51-generic-x86_64-with-debian-9.8 2020-10-29 12:16:53
python_app_1 | - *** --- * ---
python_app_1 | - ** ---------- [config]
python_app_1 | - ** ---------- .> app: proj:0x7fbf282c6390
python_app_1 | - ** ---------- .> transport: redis://redis_app:6379//
python_app_1 | - ** ---------- .> results: redis://redis_app:6379/
python_app_1 | - *** --- * --- .> concurrency: 4 (prefork)
python_app_1 | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
python_app_1 | --- ***** -----
python_app_1 | -------------- [queues]
python_app_1 | .> celery exchange=celery(direct) key=celery
python_app_1 |
python_app_1 |
python_app_1 | [tasks]
python_app_1 | . proj.tasks.add
python_app_1 | . proj.tasks.mul
python_app_1 | . proj.tasks.xsum
python_app_1 |
python_app_1 | [2020-10-29 12:16:53,592: INFO/MainProcess] Connected to redis://redis_app:6379//
python_app_1 | [2020-10-29 12:16:53,600: INFO/MainProcess] mingle: searching for neighbors
python_app_1 | [2020-10-29 12:16:54,618: INFO/MainProcess] mingle: all alone
python_app_1 | [2020-10-29 12:16:54,630: INFO/MainProcess] celery@0b5c0ff894a3 ready.
出力に出ている[config]にCeleryの実行設定が出力されています。
[queues]はworkerの実行予定のタスク一覧です。Celeryのworkerは一度に複数のタスクを実行できます。
早速、タスクの実行をしてみます。
ここでは練習としてDockerコンテナの中に入りPythonのインタプリタから実行していきます。
docker-compose upをしたのとは別のターミナルを開いて進めてください。
まずはコンテナの中に入りPythonインタプリタを実行します。
$ sudo docker exec -it celery_sample_python_app_1 bash
root@0b5c0ff894a3:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30)
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
まずはdelay()メソッドを使ってみましょう。
このメソッドはworkerにタスクの実行を即座に要求するメソッドです。
この後説明するapppy_async()というメソッドを簡易的に使える様にしたメソッドです。
>>> from proj.tasks import add
>>> add.delay(5,5)
<AsyncResult: e52a5691-9249-4ea4-a0ae-444fd3b742e6>
これはPart1で使用しましたね。
次にapply_async()メソッドを使ってみます。
特にオプションをつけずに実行した場合、delay()と同じ様にworkerに即座に実行要求を送ります。
>>> add.apply_async((5,5))
<AsyncResult: 6bfa49e7-f89b-40d2-9ec7-2840b2dea8a7>
apply_async()にはいろいろなオプションをつけることができます。
細かくどんなオプションがあるのかは公式ドキュメントをみてもらうとして、
その中でよく使うであろうcountdownとetaについて解説します。
countdown引数はタスクまでの秒数を指定できます。
>>> result = add.apply_async((4,3), countdown=10)
>>> result.ready()
False
>>> result.ready()
True
>>> result.get(timeout=1)
7
eta引数は実行日時を指定できます。
上と同じ様に10秒後に実行してみます。
こちらで10秒後を指定する場合、現在日時+10秒を指定してあげれば良いですね。
>>> from datetime import datetime, timedelta
>>> result = add.apply_async((3,3), eta=datetime.now() + timedelta(seconds = 10))
>>> result.ready()
False
>>> result.ready()
True
>>> result.get(timeout=1)
6
あまり意味はないですが、タスクとして定義した関数をそのまま現在のプロセスで実行することもできます。
>>> add(4,5)
9
deploy()やapply_async()によって返されるAsyncResultには実行結果を取得する関数や現在の実行状態を格納する変数があります。
>>> result = add.apply_async(({"a": 1, "b" : 2},3), eta=datetime.now() + timedelta(seconds = 10))
>>> result.failed()
True
>>> result.successful()
False
>>> result.state
'FAILURE'
stateは実行待ち(PENDING)、実行中(STARTED)、成功(SUCCESS)、失敗(FAILURE)、再実行(RETRY)のパターンがあります。ただしSTARTEDはceleryのworkerの起動設定でtask_track_startedがTrueになっているか、関数のアノテーションで@task(track_started=True)と指定する必要があります。
タスクの実行を他の関数などに渡したい場合にはsignatureを使いましょう。
signatureを利用するとタスクの実行の引数、オプションなどをラップすることができます。
>>> my_task = add.signature((2,2), countdown=10)
>>> my_task.delay()
<AsyncResult: 6c317308-9290-4f5e-acfd-b68bf962e570>
You'll Never Guess This Pornstar UK's Secrets Pornstar uk