This section provides some example policies and explains how they work.
Contents
policy:
schedule:
# Runs every day at 3:00 AM.
crontab(minute=0, hour=3)
condition:
# Checks that no tasks of type "tasks.my_task" succeeded between
# 2:00 and 3:00 AM today. If this is true, then the apply section
# is executed.
0 == stats.tasks_succeeded( interval=(today(timestr='2:00'),
today(timestr='3:00')),
tasknames='tasks.my_task' )
apply:
# Assemble components of the email.
subject = 'Celery Info: tasks.my_task did not succeed.'
msg = ('The tasks.my_task Task did not succeed between '
'2am and 3am this morning.')
from_email = 'celery_notice@example.com'
recipient_list = ['admin@example.com']
# Send the email.
send_email(subject, msg, from_email, recipient_list)
# Policy to handle an excessively low wait time.
policy:
schedule:
# Run every 10 minutes, on Monday through Friday.
crontab(minute='*/10', day_of_week='mon,tue,wed,thu,fri')
condition:
# A condition combines multiple lines using AND. Therefore, both
# of the following expressions must evaluate to True.
# Check that there has been at least one task of type
# "tasks.my_task" to have succeeded on worker "worker1" over the
# past hour.
0 != stats.tasks_succeeded( interval=datetime.timedelta(hours=1),
tasknames='tasks.my_task',
workers='worker1' )
# Check if the mean wait time of those same tasks has dipped below
# 15 seconds.
15.0 > stats.mean_waittime( interval=datetime.timedelta(hours=1),
tasknames='tasks.my_task',
workers='worker1' )
apply:
# Decrease the number of subprocesses that worker "worker1" is
# running.
workers['worker1'].subprocesses.decrement()
# Policy to handle an excessively high wait time.
policy:
schedule:
# Run every 10 minutes, on Monday through Friday.
crontab(minute='*/10', day_of_week='mon,tue,wed,thu,fri')
condition:
# Check that there has been at least one task of type
# "tasks.my_task" to have succeeded on worker "worker1" over the
# past hour.
0 != stats.tasks_succeeded( interval=datetime.timedelta(hours=1),
tasknames='tasks.my_task',
workers='worker1' )
# Check if the mean wait time of those same tasks has risen above
# 60 seconds.
60.0 < stats.mean_waittime( interval=datetime.timedelta(hours=1),
tasknames='tasks.my_task',
workers='worker1' )
apply:
# Increase the number of subprocesses that worker "worker1" is
# running.
workers['worker1'].subprocesses.increment()