celeryでsingletonなタスク
djangoでバッチ処理や非同期処理を組もうとしてceleryを導入。使い方も簡単でとても便利。
が、多重起動されたら困る処理を制限しようとしても、celery自体にはそういう機能がない模様。celeryのcookbookをみると、celery.task.Taskを継承したクラスを自分で作ればいいらしい。でもいちいちclassを定義するのは面倒。
というわけでdecoratorでsingletonなタスクを実現できるようにしてみた。
やり方は簡単。というかcookbookの実装そのまんま。
def singletonTask(lockkey):
def _make_runner(func):
def _work_or_die(*args, **kwargs):
expire_time = kwargs.get("expire", SINGLETON_TASK_EXPIRETIME)
acquire_lock = lambda: cache.add(lockkey, "true", expire_time)
release_lock = lambda: cache.delete(lockkey)
#logger = _make_runner.get_logger()
#logger.info("check working flg...")
if acquire_lock() :
try:
#logger.info("started....")
func(*args, **kwargs)
finally:
release_lock()
else :
#logger.info("job cancelled")
raise TaskAlreadyWorking(lockkey)
_work_or_die.__name__ = func.__name__
_work_or_die.__module__ = func.__module__
return _work_or_die
return _make_runner
弱点は_work_or_dieの中ではloggerが使えないこと。func.get_loggerも、_work_or_die.get_loggerもうまくいかなかった。困った。
あとキモは_work_or_die.__name__ を func.__name__ に書き換えていること。celeryのtask生成部分を見てると、@taskにnameを与えない限り、func.__name__ を元にtaskの名前を決めている。なので _work_or_die.__name__ を書き換えないでそのまま返すと、全て同じ名前になり、正しいタスクが起動できなくなる。
最初気が付かなくてハマったので、FAQの decorating tasks のあたりにでも書いておいてほしいなぁ。
2011/10/15 追記
__module__も書き換えないとダメなことを忘れていたので追記
-
takemasa5の投稿です