投稿日:2020年10月28日
Pythonにはschedというスケジューリング用のビルトインのライブラリが存在します。このライブラリを使用すると「10分後に〇〇という関数を実行」や、さらに工夫すれば「10分ごとに毎回関数を実行」などのスケジュールを組むことができます。この記事ではこのschedについて解説しています。
Pythonにはschedというスケジューリング用のビルトインのライブラリが存在します。このライブラリを使用すると「10分後に〇〇という関数を実行」や、さらに工夫すれば「10分ごとに毎回関数を実行」などのスケジュールを組むことができます。
自分はこのような定期実行をDjangoアプリケーションで無駄なデータを定期的に削除するときなどに使用します。
この記事ではこのschedについて解説しています。
基本的な使い方はScuedulerオブジェクトを作成し、遅延時間、優先度、対象関数を設定し実行する流れになります。
import sched, time
def sample_func(greeting = "Hello"):
print(greeting)
s = sched.schedule()
s.enter(10,1,sample_func) # 10秒後に実行のスケジュール
s.run()
sched.schedule()でScheduleオブジェクトを作成しています。
Scheduleオブジェクトはenter(delay, priority, action, argument=(), kwargs={})メソッドでタスクを追加します。
argument引数にタプルを渡すことで関数にたいして引数を渡すことができます。
import sched, time
def sample_func(greeting = "Hello"):
print(greeting)
s = sched.schedule()
s.enter(10,1,sample_func, argument=('Good Night!!',)) # 引数を与える
s.run()
kwargs引数に辞書型の値を渡すことで特定の引数に値を渡すことができます。
import sched, time
def sample_func(greeting = "Hello"):
print(greeting)
s = sched.scheduler()
s.enter(10,1,sample_func, kwargs={'greeting':'Good bye!!'}) # キーワード引数を与える
s.run()
再帰関数を上手く使うことで定期実行を実装することができます。
実行中のスレッドでそのまま実行してしまうとあまりメリットがないので、マルチスレッドで実行するといいでしょう。
下に3秒ごとにあいさつをし続ける定期実行のサンプルコードを載せておきます。
import sched,time,threading
def periodic_execution(schedule, interval):
"""
定期的に'Hello'と出力する関数
Args:
schedule {obj} : Scheduleオブジェクト
interval {int} : 実行の間隔
"""
print("Hello")
s = sched.scheduler()
s.enter(interval, 1, periodic_execution, kwargs={'schedule':schedule, 'interval':interval})
s.run()
def periodic_task():
"""
定期実行の関数を起動させる関数
"""
s = sched.scheduler()
s.enter(3, 1, periodic_execution, kwargs={'schedule':s,'interval':3})
s.run()
# マルチスレッドでタスクを実行
t = threading.Thread(target=periodic_task)
t.start()