Policies API

Policies consist of three sections: schedule, condition, and apply. The API varies depending on the section it is in. The first section, schedule, provides the most restrictive API since its only purpose is to return a schedule object. The condition section is also somewhat restricted, and the apply section has the most feature-filled API.

See Policies Overview for information about how policies may be used.

Task and Worker Settings

This sections discusses how to check tasks’ and workers’ settings as well as how to modify those settings. A similar API, called a collection, is followed for both.

Collections

The pattern that is followed for accessing both tasks and workers settings is called a collection. Using this interface, one may access one, many, or all tasks or workers.

In the policy API, the collections are referred to by the names tasks for tasks, and workers for workers. Please see their specific topics below for the details of accessing tasks and workers.

Here is the general pattern for accessing specific collection members:

collection.all()                    # access all collection members
collection['NAME']                  # access a specific collection member
collection[('NAME1','NAME2',...)]   # access multiple collection members

The first example above accesses all members of the collection. The object returned can be used to set the same setting on all of those members. The second example accesses only the single named object. The returned value can be used to get or set the settings on that specific object. The third example accesses multiple specificly-named objects. The returned value can be used to set the same setting on those specifically-named objects.

Note

The Collections API is designed to be simple and intuitive. One manifestiation of this is that they always return single values. Because the same setting may have different values on different objects, it makes no sense to get a single setting from multiple objects. As a result, settings may be retrieved for only one named object at a time.

For example:

val = tasks.all().ignore_result             # This will raise an exception.
val = tasks['tasks.MyTask'].ignore_result   # This is OK.

Tasks Collection

tasks

The tasks collection available to policies. This allows one to inspect and modify selected Celery Task attributes. The methods on this object return instances of TaskItem.

class TaskItem

An object which represents one or more Celery Task types. Such an object may be retrieved via the tasks object within a Policy.

To retrieve the routing_key on a Task named tasks.MyTask:

x = tasks['tasks.MyTask'].routing_key

To set the ignore_result attribute on the same Task:

tasks['tasks.MyTask'].ignore_result = True

The following Task attributes are available:

ignore_result

bool

routing_key

str or None

exchange

str or None

default_retry_delay

int

rate_limit

str or None

store_errors_even_if_ignored

bool

acks_late

bool

expires

int or None

Workers Collection

workers

The workers collection available to policies. This allows one to inspect and modify selected attributes of running Celery Workers. The methods on this object return instances of WorkerItem.

class WorkerItem

An object which represents one or more running Celery Workers. Such an object may be retrieved via the workers object within a Policy.

To get the prefetch value for a Worker named myworker.example.org:

x = workers["myworker.example.org"].prefetch.get()

To increment the number of subprocesses on the same worker:

workers["myworker.example.org"].subprocesses.increment()
prefetch

The object returned has the following methods:

increment([n])
decrement([n])

If not given, the n argument defaults to 1.

get()

Return the worker’s current prefetch value.

subprocesses

The object returned has the following methods:

increment([n])
decrement([n])

If not given, the n argument defaults to 1.

get()

Return the number of subprocesses the worker has.

Task and Worker Stats

stats

The stats object provides several methods to inspect the current status and past performance of tasks. The following are the available methods:

tasks(states=None, interval=None, workers=None, tasknames=None)

The number of tasks that meet the given conditions.

Parameters:
  • states – A single Celery state constant or an iterable of such constants.
  • interval

    A single datetime.timedelta object, or a pair of datetime.timedelta and/or datetime.datetime objects (as a tuple). When it is a single timedelta object, the interval spans the time from timedelta seconds [1] before now up to now. When it is a pair, the interpretation depends on the element types:

    (timedelta i, datetime j):
    The time between time j and i seconds [1] before time j.
    (datetime i, timedelta j):
    The time between time i and j seconds [1] after time i.
    (timedelta i, timedelta j):
    The time between i seconds [1] before now and j seconds [1] before now.
    (datetime i, datetime j):
    The time between time i and time j.

    In all cases, the calculated date pairs are adjusted so the left datetime is less than the right.

  • workers – The worker or workers to which to limit the search. This can be a worker name as a string, an iterable of worker names, or None in which case all workers are queried.
  • tasknames – The task or tasks to which to limit the search. This can be a task name as a string, an iterable of task names, or None in which case all tasks are queried.
tasks_failed(interval=None, workers=None, tasknames=None)

The number of tasks that meet the given conditions and that have failed. The parameters interval, workers, and tasknames have the same meaning as in tasks().

tasks_succeeded(interval=None, workers=None, tasknames=None)

The number of tasks that meet the given conditions and that have succeeded. The parameters interval, workers, and tasknames have the same meaning as in tasks().

tasks_revoked(interval=None, workers=None, tasknames=None)

The number of tasks that meet the given conditions and that have been revoked. The parameters interval, workers, and tasknames have the same meaning as in tasks().

tasks_ready(interval=None, workers=None, tasknames=None)

The number of tasks that meet the given conditions and that are in a ready state. The parameters interval, workers, and tasknames have the same meaning as in tasks().

tasks_sent(interval=None, workers=None, tasknames=None)

The number of tasks that meet the given conditions and that are in an unready state. The parameters interval, workers, and tasknames have the same meaning as in tasks().

mean_waittime(states, interval, workers, tasknames)

The average wait time of the tasks that meet the given conditions.

mean_runtime(states, interval, workers, tasknames)

The average run time of the tasks that meet the given conditions.

[1](1, 2, 3, 4, 5) timedelta is not restricted to seconds, but using some concrete unit of time here is clearer.

Policy Schedule

The schedule section provides functions that (strangely enough) can create schedules. The evaluation of the section must result in a schedule object.

crontab(minute, hour, day_of_week)

This function creates a schedule that allows cron-like scheduling. The class itself is provided by Celery, so please see the documentation there for more infomation.

Examples (reproduced here from the Celery documentation):

crontab()
Execute every minute.
crontab(minute=0, hour=0)
Execute daily at midnight.
crontab(minute=0, hour="*/3")
Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0, hour=[0,3,6,9,12,15,18,21])
Same as previous.
crontab(minute="*/15")
Execute every 15 minutes.
crontab(day_of_week="sunday")
Execute every minute (!) at Sundays.
crontab(minute="*", hour="*", day_of_week="sun")
Same as previous.
crontab(minute="*/10", hour="3,17,22", day_of_week="thu,fri")
Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.
crontab(minute=0, hour="*/2,*/3")
Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour="*/5")
Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour="*/3,8-17")
Execute every hour divisible by 3, and every hour during office hours (8am-5pm).

Common API

Although using a subset of Python, policies still provide many of the language’s built-in functions, constants and a few selected modules.

Utilities

now()

Returns the current time as a datetime.datetime object. (The datetime module is also available to policies.)

today([offset_days, timestr, time])

Returns the current date as a datetime.datetime object. (The datetime module is also available to policies.) All arguments are optional, and only keyword arguments are allowed.

If no arguments are provided, the time defaults to 00:00:00 (midnight). However, some keyword arguments are offered which can change this behavior.

Parameters:
  • offset_days (int) – An integer which can set the date forward or backward from the current date.
  • timestr (str) –

    A string representing a time which will be used in the returned datetime object. The string’s format is as follows: HH:MM:SS.mmmmmm

    • HH, MM, SS are the minutes, hours, and seconds.
    • mmmmmm is the microseconds, which can be 0-6 digits.

    The seconds and microseconds are optional.

  • time (tuple) – A tuple representing a time which will be used in the returned datetime object. The elements much be integers which represent: hours, minutes, seconds, and microseconds. The seconds and microseconds are optional.

If both timestr and time are provided, only timestr will be used.

Email

send_email(subject, message, from_email, recipient_list[, auth_user[, auth_password]])

Sometimes, a policy does not need to (or is not able to) respond automatically to the condition it finds. This function allows a policy to send you an email in such situations. It is built on top of Django‘s email feature.

Parameters:
  • subject (str) – The subject of the email message, as a string.
  • message (str) – The content of the email message, as a string.
  • from_email (str) – The email address that will appear in the from field.
  • recipient_list (list) – A list of email addresses to which to send the email.
  • auth_user (str) – Username for the SMTP server. If not given, Django will use the EMAIL_HOST_USER setting.
  • auth_password (str) – Password for the SMTP server. If not given, Django will use the EMAIL_HOST_PASSWORD setting.

Celery States

The following Celery states are available as constants:

PENDING, RECEIVED, STARTED, SUCCESS, FAILURE, REVOKED, RETRY

Python Builtins

Many of the standard builtin functions are available. But some, like eval() and __import__() are not allowed. The following builtin functions are available:

Built-in Functions
abs() dict int() next() slice()
all() divmod() isinstance() oct() sorted()
any() enumerate() issubclass() ord() str()
basestring() filter() iter() pow() sum()
bin() float() len() print() tuple()
bool() format() list() range() unichr()
bytearray() frozenset long() reduce() unicode()
callable() hash() map() repr() xrange()
chr() help() max() reversed() zip()
cmp() hex() memoryview round()  
complex() id() min() set  

The following builtin functions are not available (this list may be incomplete):

Prohibited Functions
compile() globals() raw_input()
eval() hasattr() reload()
execfile() input() setattr()
file() locals() type()
getattr() open() __import__()

The following standard builtin constants are also available:

True, False, None

Examples

The following examples show how to use various portions of the policies API. See Policy Recipes for examples of entire policies.

x = stats.tasks()   # 1
  1. The number of tasks sent. (This will not be all tasks ever sent because old records in the database are cleared periodically.)
x = stats.tasks(interval=datetime.timedelta(hour=1))    # 2
  1. The number of tasks sent over the last hour.
x = stats.tasks(interval=(datetime.timedelta(hour=2),datetime.timedelta(hour=1)))    # 3
  1. The number of tasks sent between one hour and two hours ago.
x = stats.tasks(tasknames="my_task")    # 4
  1. The number of tasks of type my_task sent.
x = stats.tasks(tasknames=["my_task","your_task"])      # 5
  1. The number of tasks of type my_task or your_task sent.
x = tasks["my_task"].ignore_result      # 6
  1. Get the ignore_result setting for tasks of type my_task.

Availability by Policy Section

schedule condition apply
Common API
send_email    
crontab    
stats  
tasks    
workers