Policies consist of three sections: schedule, condition, and apply. The API varies depending on the section it is in. The first section, schedule, provides the most restrictive API since its only purpose is to return a schedule object. The condition section is also somewhat restricted, and the apply section has the most feature-filled API.
See Policies Overview for information about how policies may be used.
Contents
This sections discusses how to check tasks’ and workers’ settings as well as how to modify those settings. A similar API, called a collection, is followed for both.
The pattern that is followed for accessing both tasks and workers settings is called a collection. Using this interface, one may access one, many, or all tasks or workers.
In the policy API, the collections are referred to by the names tasks for tasks, and workers for workers. Please see their specific topics below for the details of accessing tasks and workers.
Here is the general pattern for accessing specific collection members:
collection.all() # access all collection members
collection['NAME'] # access a specific collection member
collection[('NAME1','NAME2',...)] # access multiple collection members
The first example above accesses all members of the collection. The object returned can be used to set the same setting on all of those members. The second example accesses only the single named object. The returned value can be used to get or set the settings on that specific object. The third example accesses multiple specificly-named objects. The returned value can be used to set the same setting on those specifically-named objects.
Note
The Collections API is designed to be simple and intuitive. One manifestiation of this is that they always return single values. Because the same setting may have different values on different objects, it makes no sense to get a single setting from multiple objects. As a result, settings may be retrieved for only one named object at a time.
For example:
val = tasks.all().ignore_result # This will raise an exception.
val = tasks['tasks.MyTask'].ignore_result # This is OK.
The tasks collection available to policies. This allows one to inspect and modify selected Celery Task attributes. The methods on this object return instances of TaskItem.
An object which represents one or more Celery Task types. Such an object may be retrieved via the tasks object within a Policy.
To retrieve the routing_key on a Task named tasks.MyTask:
x = tasks['tasks.MyTask'].routing_key
To set the ignore_result attribute on the same Task:
tasks['tasks.MyTask'].ignore_result = True
The following Task attributes are available:
bool
str or None
str or None
int
str or None
bool
bool
int or None
The workers collection available to policies. This allows one to inspect and modify selected attributes of running Celery Workers. The methods on this object return instances of WorkerItem.
An object which represents one or more running Celery Workers. Such an object may be retrieved via the workers object within a Policy.
To get the prefetch value for a Worker named myworker.example.org:
x = workers["myworker.example.org"].prefetch.get()
To increment the number of subprocesses on the same worker:
workers["myworker.example.org"].subprocesses.increment()
The object returned has the following methods:
Return the worker’s current prefetch value.
The object returned has the following methods:
If not given, the n argument defaults to 1.
Return the number of subprocesses the worker has.
The stats object provides several methods to inspect the current status and past performance of tasks. The following are the available methods:
The number of tasks that meet the given conditions.
Parameters: |
|
---|
The number of tasks that meet the given conditions and that have failed. The parameters interval, workers, and tasknames have the same meaning as in tasks().
The number of tasks that meet the given conditions and that have succeeded. The parameters interval, workers, and tasknames have the same meaning as in tasks().
The number of tasks that meet the given conditions and that have been revoked. The parameters interval, workers, and tasknames have the same meaning as in tasks().
The number of tasks that meet the given conditions and that are in a ready state. The parameters interval, workers, and tasknames have the same meaning as in tasks().
The number of tasks that meet the given conditions and that are in an unready state. The parameters interval, workers, and tasknames have the same meaning as in tasks().
The average wait time of the tasks that meet the given conditions.
The average run time of the tasks that meet the given conditions.
[1] | (1, 2, 3, 4, 5) timedelta is not restricted to seconds, but using some concrete unit of time here is clearer. |
The schedule section provides functions that (strangely enough) can create schedules. The evaluation of the section must result in a schedule object.
This function creates a schedule that allows cron-like scheduling. The class itself is provided by Celery, so please see the documentation there for more infomation.
Examples (reproduced here from the Celery documentation):
Although using a subset of Python, policies still provide many of the language’s built-in functions, constants and a few selected modules.
Returns the current time as a datetime.datetime object. (The datetime module is also available to policies.)
Returns the current date as a datetime.datetime object. (The datetime module is also available to policies.) All arguments are optional, and only keyword arguments are allowed.
If no arguments are provided, the time defaults to 00:00:00 (midnight). However, some keyword arguments are offered which can change this behavior.
Parameters: |
|
---|
If both timestr and time are provided, only timestr will be used.
Sometimes, a policy does not need to (or is not able to) respond automatically to the condition it finds. This function allows a policy to send you an email in such situations. It is built on top of Django‘s email feature.
Parameters: |
|
---|
The following Celery states are available as constants:
PENDING, RECEIVED, STARTED, SUCCESS, FAILURE, REVOKED, RETRY
Many of the standard builtin functions are available. But some, like eval() and __import__() are not allowed. The following builtin functions are available:
The following builtin functions are not available (this list may be incomplete):
Prohibited Functions | ||
---|---|---|
compile() | globals() | raw_input() |
eval() | hasattr() | reload() |
execfile() | input() | setattr() |
file() | locals() | type() |
getattr() | open() | __import__() |
The following standard builtin constants are also available:
True, False, None
The following examples show how to use various portions of the policies API. See Policy Recipes for examples of entire policies.
x = stats.tasks() # 1
x = stats.tasks(interval=datetime.timedelta(hour=1)) # 2
x = stats.tasks(interval=(datetime.timedelta(hour=2),datetime.timedelta(hour=1))) # 3
x = stats.tasks(tasknames="my_task") # 4
x = stats.tasks(tasknames=["my_task","your_task"]) # 5
x = tasks["my_task"].ignore_result # 6
schedule | condition | apply | |
---|---|---|---|
Common API | ✓ | ✓ | ✓ |
send_email | ✓ | ||
crontab | ✓ | ||
stats | ✓ | ✓ | |
tasks | ✓ | ||
workers | ✓ |