※このブログではサーバー運用、技術の検証等の費用のため広告をいれています。
記事が見づらいなどの問題がありましたらContactからお知らせください。

次のページ>
【Python3】Celeryによるタスクのスケジューリング - Part2

【Python3】Celeryによるタスクのスケジューリング - Part1

python3 python 定期実行 Schedule Celery タスク

投稿日:2020年10月29日

このエントリーをはてなブックマークに追加
Celeryを使用すると柔軟なタスクのスケジューリングをすることができます。この記事ではCeleryのための環境構築と簡単な使い方について解説していきます。

はじめに

Celeryとは

Celeryは柔軟にタスクのスケジューリングをすることができる非同期のjob/taskモジュールです。

これを使うことで例えば以下のような実装を簡単に行うことができます。

  • 毎日0時にアプリケーションの実行結果をメールで送信
  • 待ち時間の長い処理をバックグラウンドで起動

この記事では純粋なPythonでCeleryのための環境構築と簡単な使い方について解説していきます。(Djangoに統合する場合は少し手順が違うため注意してください)

今回は主に環境構築、そしてCeleryが問題なく動くことまでの確認になります。

環境

この記事ではDocker、Docker Composeがインストール済であることを前提としています。

  • Docker==19.03.13
  • Docker Compose==1.26.2

ソースコード

このチュートリアルのソースコードはgithubの以下のリポジトリのpart2というタグにあります。

https://github.com/ogipochi/python_celery_example

次のコマンドでローカルにクローンできます。

ターミナル
git clone https://github.com/ogipochi/python_celery_example.git -b part1

環境構築

作成する内容

まずはCeleryを試すための環境を構築します。

今回はどのプラットフォームでも同じ環境を作る事ができるDockerで環境構築をしていきます。

Celeryにはメッセージを受け取るBrokerと呼ばれる機構が必要になります。

Brokerを作成する場合RedisRabbitMQなどの選択肢があるのですが、今回キャッシュメモリとして有名なRedisを選択します。

作成するディレクトリ構成は以下のようになります。

ディレクトリ構成
.
├── docker-compose.yaml
└── python_app/
    ├── Dockerfile
    └── src/

各ファイルの内容

Docker環境を定義するファイルを作成しましょう。

docker-compose.yaml
version: "3"

services:
    python_app:
        build: ./python_app
        volumes:
            - ./python_app/src:/src
        tty: true
        networks: 
    redis_app:
        image: redis:6.0
        networks:
            - redis_network

networks:
    redis_network:
python_app
FROM python:3.7

RUN pip install -U celery[erdis]

COPY ./src /src

WORKDIR /src

python_appコンテナのImageはCeleryライブラリをインストールしています。

pip install celery[redis]というコマンドはceleryパッケージに加えて、redisの関連パッケージもインストールするコマンドです。redisの場合は、pip install celery redisと書くのと同じです。

またpython_appのコンテナとredis_appのコンテナはnetworkで接続しそれぞれがappのnameで参照できる様になっています。
Dockerのネットワークについて詳しくない方はローカルのPCにPythonRedisをインストールした様な状態だと思ってもらってOKです。


使い方(基本)

Celeryオブジェクトの作成

環境構築でpython_app/srcディレクトリがpython_appのコンテナにマウントしてあるので、そこにスクリプトファイルを作成しましょう。

まずは以下のようにtasks.pyというファイルを作成します。

python_app/src/tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://redis_app:6379/')

@app.task
def add(x, y):
    return x + y

このファイルについて簡単な解説をしておきます。

はじめにCelery()Celeryオブジェクトを作成しています。

第1引数はモジュールの名前を指定します。デフォルト値は`__main__`です。

brokerとして指定しているのはdockerで作成したredisのurlです。

redis_appという値は環境構築の際にdocker-compose.yamlで指定したRedisコンテナのapp_nameです。

Celery Worker Serverの起動

Celery Worker Serverの起動にはcelery -A タスク名 worker --loglevel=INFOというコマンドを使用します。

python_appビルド対象のDockerfileENTRYPOINTにこのコマンドを指定しましょう。

python_app/Dockerfile
FROM python:3.7

RUN pip install -U celery[redis]

COPY ./src /src

WORKDIR /src

ENTRYPOINT [ "celery","-A","tasks","worker","--loglevel=INFO" ]  # コマンドを追加

これでDocker Composeを起動させてみましょう

ターミナル
$ docker-compose build
$ docker-compose up

上記コマンドで次の様な出力が確認できたらCelery Service Workerが起動しているという事です。

docker-compose upの出力
python_app_1  | /usr/local/lib/python3.7/site-packages/celery/platforms.py:798: RuntimeWarning: You're running the worker with superuser privileges: this is
python_app_1  | absolutely not recommended!
python_app_1  | 
python_app_1  | Please specify a different user using the --uid option.
python_app_1  | 
python_app_1  | User information: uid=0 euid=0 gid=0 egid=0
python_app_1  | 
python_app_1  |   uid=uid, euid=euid, gid=gid, egid=egid,
python_app_1  |  
python_app_1  |  -------------- celery@3ccdfaf18919 v5.0.1 (singularity)
python_app_1  | --- ***** ----- 
python_app_1  | -- ******* ---- Linux-5.4.0-51-generic-x86_64-with-debian-9.8 2020-10-28 15:18:08
python_app_1  | - *** --- * --- 
python_app_1  | - ** ---------- [config]
python_app_1  | - ** ---------- .> app:         tasks:0x7f42c5f02358
python_app_1  | - ** ---------- .> transport:   redis://redis_app:6379//
python_app_1  | - ** ---------- .> results:     disabled://
python_app_1  | - *** --- * --- .> concurrency: 4 (prefork)
python_app_1  | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
python_app_1  | --- ***** ----- 
python_app_1  |  -------------- [queues]
python_app_1  |                 .> celery           exchange=celery(direct) key=celery
python_app_1  |                 
python_app_1  | 
python_app_1  | [tasks]
python_app_1  |   . tasks.celery.add
python_app_1  | 
python_app_1  | [2020-10-28 15:18:08,681: INFO/MainProcess] Connected to redis://redis_app:6379//
python_app_1  | [2020-10-28 15:18:08,688: INFO/MainProcess] mingle: searching for neighbors
python_app_1  | [2020-10-28 15:18:09,705: INFO/MainProcess] mingle: all alone
python_app_1  | [2020-10-28 15:18:09,715: INFO/MainProcess] celery@3ccdfaf18919 ready.

タスクの実行

docker-compose upを実行したのとは別のターミナルを開き、起動したDockerコンテナに入ってPythonのインタプリタからタスクの実行をしてみます。

ターミナル
sudo docker ps
CONTAINER ID        IMAGE                      COMMAND                  CREATED             STATUS              PORTS               NAMES
3ccdfaf18919        celery_sample_python_app   "celery -A tasks wor…"   10 minutes ago      Up 10 minutes                           celery_sample_python_app_1
b317ae9bbca3        redis:6.0                  "docker-entrypoint.s…"   12 minutes ago      Up 10 minutes       6379/tcp            celery_sample_redis_app_1
$ sudo docker exec -it celery_sample_python_app_1 bash
root@3ccdfaf18919:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30) 
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> tasks.add.delay(4,4)
<AsyncResult: a4caa10a-0d21-4a79-9429-f09583da9fc3>

この様にタスクを実行した側のスレッドの方にはAsyncResultが返されます。

このAsyncResultはタスクが終了したかの確認、タスクの実行結果の取得に利用できます。

しかし、デフォルトではタスクの実行結果は取得できません。

では定義したタスクの実行結果はどこに出力されているかというと、celeryコマンドを実行したターミナル、つまり今回はdocker-compose upコマンドを実行しているターミナルに出力されています。

ターミナル
[...]

python_app_1  | [2020-10-28 15:18:09,715: INFO/MainProcess] celery@3ccdfaf18919 ready.
python_app_1  | [2020-10-28 15:20:01,951: INFO/MainProcess] Received task: tasks.celery.add[a8470ddd-5481-4a78-a22a-9eecaacd0369]  
python_app_1  | [2020-10-28 15:20:01,953: INFO/ForkPoolWorker-4] Task tasks.celery.add[a8470ddd-5481-4a78-a22a-9eecaacd0369] succeeded in 0.0001821209443733096s: 8

タスクの実行結果の参照方法

タスクの実行結果を取得するには実行結果を保持、そして保持した実行結果をアプリケーション側に送信する機構が必要になります。

この機構としてはSQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP),などいろいろ選択肢がありますが、

今回は環境構築ですでにRedisサーバーを用意しているのでこれを利用してみましょう。

どのサーバーを利用するのかはCeleryオブジェクトを作成する際のbackend引数で指定します。

python_app/src/tasks.py
from celery import Celery

app = Celery(
    'tasks', 
    backend='redis://redis_app:6379/',
    broker='redis://redis_app:6379/')

@app.task
def add(x, y):
    return x + y

値を取得する際には、AsyncResultget()メソッドを使います。

ready()メソッドは値が取得可能な状態かどうかを取得するメソッドです。

ターミナル
$ sudo docker exec -it celery_sample_python_app_1 bash
root@e7a8033d9616:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30) 
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> tasks.add.delay(4,4)
<AsyncResult: a4caa10a-0d21-4a79-9429-f09583da9fc3>
>>> exit()
root@e7a8033d9616:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30) 
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> result = tasks.add.delay(4,4)            
>>> result.ready()
True
>>> result.get(timeout=1)
8

get()timeout引数はデータの取得に時間がかかった時、諦めるまでの時間の指定です。今回は1秒に指定しました。

エラー発生時の挙動

タスクでエラーが発生した場合、AsyncResultget()をそのまま呼び出した場合、そこでも同じエラーが発生します。

get()でエラーを発生させたくない場合propagate引数にFalseをセットすることで発生したエラーを取得することもできます。

トレースバックはAsyncResulttracebackという変数に文字列で格納されています。

ターミナル
$ sudo docker exec -it celery_sample_python_app_1 bash
root@e7a8033d9616:/src# python
Python 3.7.3 (default, Mar 27 2019, 23:40:30) 
[GCC 6.3.0 20170516] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> result = add.delay({"a":1,"b":3},4)
>>> result.get(timeout=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 230, in get
    on_message=on_message,
  File "/usr/local/lib/python3.7/site-packages/celery/backends/asynchronous.py", line 201, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 335, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 328, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/usr/local/lib/python3.7/site-packages/vine/utils.py", line 30, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'dict' and 'int'
>>> result.get(timeout=1,propagate=False)
TypeError("unsupported operand type(s) for +: 'dict' and 'int'")
>>> result.traceback
'Traceback (most recent call last):\n  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 409, in trace_task\n    R = retval = fun(*args, **kwargs)\n  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 701, in __protected_call__\n    return self.run(*args, **kwargs)\n  File "/src/tasks.py", line 7, in add\nTypeError: unsupported operand type(s) for +: \'dict\' and \'int\'\n'

参考書籍

このエントリーをはてなブックマークに追加

次のページ>
【Python3】Celeryによるタスクのスケジューリング - Part2

関連記事

記事へのコメント