武之新
celeryでsingletonなタスク

djangoでバッチ処理や非同期処理を組もうとしてceleryを導入。使い方も簡単でとても便利。

が、多重起動されたら困る処理を制限しようとしても、celery自体にはそういう機能がない模様。celeryのcookbookをみると、celery.task.Taskを継承したクラスを自分で作ればいいらしい。でもいちいちclassを定義するのは面倒。

というわけでdecoratorでsingletonなタスクを実現できるようにしてみた。

やり方は簡単。というかcookbookの実装そのまんま。

def singletonTask(lockkey):
	def _make_runner(func):
		def _work_or_die(*args, **kwargs):
			expire_time = kwargs.get("expire", SINGLETON_TASK_EXPIRETIME)
			acquire_lock = lambda: cache.add(lockkey, "true", expire_time)
			release_lock = lambda: cache.delete(lockkey)

			
			#logger = _make_runner.get_logger()
			#logger.info("check working flg...")
			
			if acquire_lock() :
				try:
					#logger.info("started....")
					func(*args, **kwargs)
					
				finally:
					release_lock()
	
			else :
				#logger.info("job cancelled")
				raise TaskAlreadyWorking(lockkey)
				
		_work_or_die.__name__ = func.__name__
		_work_or_die.__module__ = func.__module__
		return _work_or_die
			
	return _make_runner			

弱点は_work_or_dieの中ではloggerが使えないこと。func.get_loggerも、_work_or_die.get_loggerもうまくいかなかった。困った。

あとキモは_work_or_die.__name__ を func.__name__ に書き換えていること。celeryのtask生成部分を見てると、@taskにnameを与えない限り、func.__name__ を元にtaskの名前を決めている。なので _work_or_die.__name__ を書き換えないでそのまま返すと、全て同じ名前になり、正しいタスクが起動できなくなる。

最初気が付かなくてハマったので、FAQの decorating tasks のあたりにでも書いておいてほしいなぁ。

2011/10/15 追記

__module__も書き換えないとダメなことを忘れていたので追記

  1. takemasa5の投稿です