※このブログではサーバー運用、技術の検証等の費用のため広告をいれています。
記事が見づらいなどの問題がありましたらContactからお知らせください。

<前のページ
【Python3】Celeryによるタスクのスケジューリング - Part1

【Python3】Celeryによるタスクのスケジューリング - Part2

python3 python スケジューリング 定期実行 Celery

投稿日:2020年10月30日

このエントリーをはてなブックマークに追加
Celeryを使用すると柔軟なタスクのスケジューリングをすることができます。この記事ではCeleryのための環境構築と簡単な使い方について解説していきます。

はじめに

Celeryを使用すると柔軟なタスクのスケジューリングをすることができます。この記事ではCeleryのための環境構築と簡単な使い方について解説していきます。

今回は主にタスクの実行タイミングの調整について解説しています。

この記事はPart1の続きになります。

ソースコード

このチュートリアルのソースコードはgithubの以下のリポジトリのpart2というタグにあります。

https://github.com/ogipochi/python_celery_example

次のコマンドでローカルにクローンできます。

git clone https://github.com/ogipochi/python_celery_example.git -b part2

環境構築

今回はパッケージを作成し、そこにモジュールを作成してみます。

中身のコードの内容的には前回tasks.pyで定義する場合と同じですが、モジュールにする時はCeleryオブジェクトcelery.pyというファイルに定義する必要があります。

まずは前回作成したpython_app/srcの中のtasks.pyファイルを削除し、projというモジュールを作成します。

設定の定義

ディレクトリ構造
python_app/src/
└── proj
    ├── __init__.py
    ├── celery.py
    └── tasks.py

celery.pyの中身は以下の様になります。

コードの大部分は前回のtasks.pyと同じになります。

python_app/src/proj/celery.py
from celery import Celery

# Celeryオブジェクトを作成
app = Celery('proj',
             broker='redis://redis_app:6379/',
             backend='redis://redis_app:6379/',
             include=['proj.tasks',])

# タスクの結果の保持時間を変更
app.conf.update(
    result_expires=3600
)

if __name__ == '__main__':
    app.start()

ここではCeleryオブジェクトの生成時の引数として新しくinclude引数を渡しています。この引数はworkerが起動した際にインポートするタスクをリストで渡すことができます。今回はproj/tasks.pyのモジュールを渡しています。

またapp.conf.update()で引数にresult_expiresを渡しています。この引数はタスクの結果を保持しておく時間の設定になります。デフォルトは1日に設定してあります。

ではproj/tasks.pyに実際にタスクの定義を記述していきましょう。

python_app/src/proj/celery.py
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

今回はprojというモジュールでCeleryの定義をしたので、Dockerfileの実行コマンドを変えてみます。

python_app/Dockerfile
FROM python:3.7

RUN pip install -U celery[redis]

COPY ./src /src

WORKDIR /src

ENTRYPOINT [ "celery","-A","proj","worker","--loglevel=INFO" ]

起動

実際に起動してみます。

まだ前回のDockerコンテナが起動している場合はdocker-compose downで終了させてから起動しましょう。

ターミナル
$ sudo docker-compose up --build

今回はpython_appコンテナの出力、つまりceleryコマンドの出力をもう少し詳しくみてみましょう。

python_appコンテナの出力
[...]

python_app_1  | /usr/local/lib/python3.7/site-packages/celery/platforms.py:798: RuntimeWarning: You're running the worker with superuser privileges: this is
python_app_1  | absolutely not recommended!
python_app_1  | 
python_app_1  | Please specify a different user using the --uid option.
python_app_1  | 
python_app_1  | User information: uid=0 euid=0 gid=0 egid=0
python_app_1  | 
python_app_1  |   uid=uid, euid=euid, gid=gid, egid=egid,
python_app_1  |  
python_app_1  |  -------------- celery@0b5c0ff894a3 v5.0.1 (singularity)
python_app_1  | --- ***** ----- 
python_app_1  | -- ******* ---- Linux-5.4.0-51-generic-x86_64-with-debian-9.8 2020-10-29 12:16:53
python_app_1  | - *** --- * --- 
python_app_1  | - ** ---------- [config]
python_app_1  | - ** ---------- .> app:         proj:0x7fbf282c6390
python_app_1  | - ** ---------- .> transport:   redis://redis_app:6379//
python_app_1  | - ** ---------- .> results:     redis://redis_app:6379/
python_app_1  | - *** --- * --- .> concurrency: 4 (prefork)
python_app_1  | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
python_app_1  | --- ***** ----- 
python_app_1  |  -------------- [queues]
python_app_1  |                 .> celery           exchange=celery(direct) key=celery
python_app_1  |                 
python_app_1  | 
python_app_1  | [tasks]
python_app_1  |   . proj.tasks.add
python_app_1  |   . proj.tasks.mul
python_app_1  |   . proj.tasks.xsum
python_app_1  | 
python_app_1  | [2020-10-29 12:16:53,592: INFO/MainProcess] Connected to redis://redis_app:6379//
python_app_1  | [2020-10-29 12:16:53,600: INFO/MainProcess] mingle: searching for neighbors
python_app_1  | [2020-10-29 12:16:54,618: INFO/MainProcess] mingle: all alone
python_app_1  | [2020-10-29 12:16:54,630: INFO/MainProcess] celery@0b5c0ff894a3 ready.

出力に出ている[config]にCeleryの実行設定が出力されています。

  • transport:brokerのURLです。この値はceleryの実行コマンドの-bオプションで上書きできます。
  • concurrency:タスクを同時に実行するために待機しているworkerプロセスの数です。デフォルトでは実行しているPCの論理プロセッサーの数になります。例えば、僕のPCは2コア4スレッドなので4になっていますね。この値は--concurrency,または-cオプションで上書きできます。
  • task events:workerでアクションが実行された際のモニタリングメッセージを送る設定のON/OFFの設定です。Flowerなどのモニタリングツールを使用する場合にはこれをtrueにする必要があります。

[queues]はworkerの実行予定のタスク一覧です。Celeryのworkerは一度に複数のタスクを実行できます。


実践

早速、タスクの実行をしてみます。

ここでは練習としてDockerコンテナの中に入りPythonのインタプリタから実行していきます。

docker-compose upをしたのとは別のターミナルを開いて進めてください。

タスクの実行

まずはコンテナの中に入りPythonインタプリタを実行します。

ターミナル
$ sudo docker exec -it celery_sample_python_app_1 bash
root@0b5c0ff894a3:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30) 
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.

まずはdelay()メソッドを使ってみましょう。

このメソッドはworkerにタスクの実行を即座に要求するメソッドです。

この後説明するapppy_async()というメソッドを簡易的に使える様にしたメソッドです。

ターミナル
>>> from proj.tasks import add
>>> add.delay(5,5)
<AsyncResult: e52a5691-9249-4ea4-a0ae-444fd3b742e6>

これはPart1で使用しましたね。

次にapply_async()メソッドを使ってみます。

特にオプションをつけずに実行した場合、delay()と同じ様にworkerに即座に実行要求を送ります。

ターミナル
>>> add.apply_async((5,5))
<AsyncResult: 6bfa49e7-f89b-40d2-9ec7-2840b2dea8a7>

apply_async()にはいろいろなオプションをつけることができます。

細かくどんなオプションがあるのかは公式ドキュメントをみてもらうとして、

その中でよく使うであろうcountdownetaについて解説します。

countdown引数はタスクまでの秒数を指定できます。

ターミナル
>>> result = add.apply_async((4,3), countdown=10)
>>> result.ready()
False
>>> result.ready()
True
>>> result.get(timeout=1)
7

eta引数は実行日時を指定できます。

上と同じ様に10秒後に実行してみます。

こちらで10秒後を指定する場合、現在日時+10秒を指定してあげれば良いですね。

ターミナル
>>> from datetime import datetime, timedelta
>>> result = add.apply_async((3,3), eta=datetime.now() + timedelta(seconds = 10))
>>> result.ready()
False
>>> result.ready()
True
>>> result.get(timeout=1)
6

あまり意味はないですが、タスクとして定義した関数をそのまま現在のプロセスで実行することもできます。

ターミナル
>>> add(4,5)
9

deploy()apply_async()によって返されるAsyncResultには実行結果を取得する関数や現在の実行状態を格納する変数があります。

ターミナル
>>> result = add.apply_async(({"a": 1, "b" : 2},3), eta=datetime.now() + timedelta(seconds = 10))
>>> result.failed()
True
>>> result.successful()
False
>>> result.state
'FAILURE'

stateは実行待ち(PENDING)実行中(STARTED)成功(SUCCESS)失敗(FAILURE)、再実行(RETRY)のパターンがあります。ただしSTARTEDはceleryのworkerの起動設定でtask_track_startedTrueになっているか、関数のアノテーションで@task(track_started=True)と指定する必要があります。

タスクの実行を他の関数などに渡したい場合にはsignatureを使いましょう。

signatureを利用するとタスクの実行の引数、オプションなどをラップすることができます。

ターミナル
>>> my_task = add.signature((2,2), countdown=10)
>>> my_task.delay()
<AsyncResult: 6c317308-9290-4f5e-acfd-b68bf962e570>

参考書籍

このエントリーをはてなブックマークに追加

<前のページ
【Python3】Celeryによるタスクのスケジューリング - Part1

関連記事

記事へのコメント