Policy Recipes

This section provides some example policies and explains how they work.

Email When Task is Not Completed

policy:
    schedule:
        # Runs every day at 3:00 AM.
        crontab(minute=0, hour=3)

    condition:
        # Checks that no tasks of type "tasks.my_task" succeeded between
        # 2:00 and 3:00 AM today.  If this is true, then the apply section
        # is executed.
        0 == stats.tasks_succeeded( interval=(today(timestr='2:00'),
                                              today(timestr='3:00')),
                                    tasknames='tasks.my_task' )

    apply:
        # Assemble components of the email.
        subject =       'Celery Info: tasks.my_task did not succeed.'
        msg =          ('The tasks.my_task Task did not succeed between '
                        '2am and 3am this morning.')
        from_email =    'celery_notice@example.com'
        recipient_list = ['admin@example.com']

        # Send the email.
        send_email(subject, msg, from_email, recipient_list)

Add/Remove Worker Subprocesses Depending on Average Wait Time

# Policy to handle an excessively low wait time.
policy:
    schedule:
        # Run every 10 minutes, on Monday through Friday.
        crontab(minute='*/10', day_of_week='mon,tue,wed,thu,fri')

    condition:
        # A condition combines multiple lines using AND.  Therefore, both
        # of the following expressions must evaluate to True.

        # Check that there has been at least one task of type
        # "tasks.my_task" to have succeeded on worker "worker1" over the
        # past hour.
        0 != stats.tasks_succeeded( interval=datetime.timedelta(hours=1),
                                    tasknames='tasks.my_task',
                                    workers='worker1' )
        # Check if the mean wait time of those same tasks has dipped below
        # 15 seconds.
        15.0 > stats.mean_waittime( interval=datetime.timedelta(hours=1),
                                    tasknames='tasks.my_task',
                                    workers='worker1' )

    apply:
        # Decrease the number of subprocesses that worker "worker1" is
        # running.
        workers['worker1'].subprocesses.decrement()
# Policy to handle an excessively high wait time.
policy:
    schedule:
        # Run every 10 minutes, on Monday through Friday.
        crontab(minute='*/10', day_of_week='mon,tue,wed,thu,fri')

    condition:
        # Check that there has been at least one task of type
        # "tasks.my_task" to have succeeded on worker "worker1" over the
        # past hour.
        0 != stats.tasks_succeeded( interval=datetime.timedelta(hours=1),
                                    tasknames='tasks.my_task',
                                    workers='worker1' )
        # Check if the mean wait time of those same tasks has risen above
        # 60 seconds.
        60.0 < stats.mean_waittime( interval=datetime.timedelta(hours=1),
                                    tasknames='tasks.my_task',
                                    workers='worker1' )

    apply:
        # Increase the number of subprocesses that worker "worker1" is
        # running.
        workers['worker1'].subprocesses.increment()

Table Of Contents

Previous topic

Policies API

Next topic

Cloud Enabled

This Page