This document describes the current stable version of Celery (5.5). For development docs, go here.
celery
— Distributed processing¶
This module is the main entry-point for the Celery API. It includes commonly needed things for calling tasks, and creating Celery applications.
Celery application instance |
|
group tasks together |
|
chain tasks together |
|
chords enable callbacks for groups |
|
create a new task signature |
|
object describing a task invocation |
|
proxy to the current application instance |
|
proxy to the currently executing task |
Celery
application objects¶
Added in version 2.5.
- class celery.Celery(main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs)[source]¶
Celery application.
- Parameters:
main (str) – Name of the main module if running as __main__. This is used as the prefix for auto-generated task names.
- Keyword Arguments:
broker (str) – URL of the default broker used.
backend (Union[str, Type[celery.backends.base.Backend]]) –
The result store backend class, or the name of the backend class to use.
Default is the value of the
result_backend
setting.autofinalize (bool) – If set to False a
RuntimeError
will be raised if the task registry or tasks are used before the app is finalized.set_as_current (bool) – Make this the global current app.
include (List[str]) – List of modules every worker should import.
events (Union[str, Type[celery.app.events.Events]]) – Events object or class name.
control (Union[str, Type[celery.app.control.Control]]) – Control object or class name.
tasks (Union[str, Type[TaskRegistry]]) – A task registry, or the name of a registry class.
fixups (List[str]) – List of fix-up plug-ins (e.g., see
celery.fixups.django
).config_source (Union[str, class]) – Take configuration from a class, or object. Attributes may include any settings described in the documentation.
task_cls (Union[str, Type[celery.app.task.Task]]) – base task class to use. See this section for usage.
- user_options = None¶
Custom options for command-line programs. See Adding new command-line options
- steps = None¶
Custom bootsteps to extend and modify the worker. See Installing Bootsteps.
- current_task¶
Instance of task being executed, or
None
.
- current_worker_task¶
The task currently being executed by a worker or
None
.Differs from
current_task
in that it’s not affected by tasks calling other tasks directly, or eagerly.
- backend¶
Current backend instance.
- loader¶
Current loader instance.
- events¶
events
.- Type:
Consuming and sending events
- tasks¶
Task registry.
Warning
Accessing this attribute will also auto-finalize the app.
- pool¶
pool
.Note
This attribute is not related to the workers concurrency pool.
- Type:
Broker connection pool
- producer_pool¶
- Task¶
Base task class for this app.
- timezone¶
Current timezone for this app.
This is a cached property taking the time zone from the
timezone
setting.
- builtin_fixups = {'celery.fixups.django:fixup'}¶
- oid¶
Universally unique identifier for this app.
- close()[source]¶
Clean up after the application.
Only necessary for dynamically created apps, and you should probably use the
with
statement instead.Example
>>> with Celery(set_as_current=False) as app: ... with app.connection_for_write() as conn: ... pass
- config_from_object(obj, silent=False, force=False, namespace=None)[source]¶
Read configuration from object.
Object is either an actual object or the name of a module to import.
Example
>>> celery.config_from_object('myapp.celeryconfig')
>>> from myapp import celeryconfig >>> celery.config_from_object(celeryconfig)
- config_from_envvar(variable_name, silent=False, force=False)[source]¶
Read configuration from environment variable.
The value of the environment variable must be the name of a module to import.
Example
>>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig' >>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
- autodiscover_tasks(packages=None, related_name='tasks', force=False)[source]¶
Auto-discover task modules.
Searches a list of packages for a “tasks.py” module (or use related_name argument).
If the name is empty, this will be delegated to fix-ups (e.g., Django).
For example if you have a directory layout like this:
foo/__init__.py tasks.py models.py bar/__init__.py tasks.py models.py baz/__init__.py models.py
Then calling
app.autodiscover_tasks(['foo', 'bar', 'baz'])
will result in the modulesfoo.tasks
andbar.tasks
being imported.- Parameters:
packages (List[str]) – List of packages to search. This argument may also be a callable, in which case the value returned is used (for lazy evaluation).
related_name (Optional[str]) – The name of the module to find. Defaults to “tasks”: meaning “look for ‘module.tasks’ for every module in
packages
.”. IfNone
will only try to import the package, i.e. “look for ‘module’”.force (bool) – By default this call is lazy so that the actual auto-discovery won’t happen until an application imports the default modules. Forcing will cause the auto-discovery to happen immediately.
- add_defaults(fun)[source]¶
Add default configuration from dict
d
.If the argument is a callable function then it will be regarded as a promise, and it won’t be loaded until the configuration is actually needed.
This method can be compared to:
>>> celery.conf.update(d)
with a difference that 1) no copy will be made and 2) the dict will not be transferred when the worker spawns child processes, so it’s important that the same configuration happens at import time when pickle restores the object on the other side.
- add_periodic_task(schedule, sig, args=(), kwargs=(), name=None, **opts)[source]¶
Add a periodic task to beat schedule.
Celery beat store tasks based on sig or name if provided. Adding the same signature twice make the second task override the first one. To avoid the override, use distinct name for them.
- setup_security(allowed_serializers=None, key=None, key_password=None, cert=None, store=None, digest='sha256', serializer='json')[source]¶
Setup the message-signing serializer.
This will affect all application instances (a global operation).
Disables untrusted serializers and if configured to use the
auth
serializer will register theauth
serializer with the provided settings into the Kombu serializer registry.- Parameters:
allowed_serializers (Set[str]) – List of serializer names, or content_types that should be exempt from being disabled.
key (str) – Name of private key file to use. Defaults to the
security_key
setting.key_password (bytes) – Password to decrypt the private key. Defaults to the
security_key_password
setting.cert (str) – Name of certificate file to use. Defaults to the
security_certificate
setting.store (str) – Directory containing certificates. Defaults to the
security_cert_store
setting.digest (str) – Digest algorithm used when signing messages. Default is
sha256
.serializer (str) – Serializer used to encode messages after they’ve been signed. See
task_serializer
for the serializers supported. Default isjson
.
- task(*args, **opts)[source]¶
Decorator to create a task class out of any callable.
See Task options for a list of the arguments that can be passed to this decorator.
Examples
@app.task def refresh_feed(url): store_feed(feedparser.parse(url))
with setting extra options:
@app.task(exchange='feeds') def refresh_feed(url): return store_feed(feedparser.parse(url))
Note
App Binding: For custom apps the task decorator will return a proxy object, so that the act of creating the task is not performed until the task is used or the task registry is accessed.
If you’re depending on binding to be deferred, then you must not access any attributes on the returned object until the application is fully set up (finalized).
- send_task(name, args=None, kwargs=None, countdown=None, eta=None, task_id=None, producer=None, connection=None, router=None, result_cls=None, expires=None, publisher=None, link=None, link_error=None, add_to_parent=True, group_id=None, group_index=None, retries=0, chord=None, reply_to=None, time_limit=None, soft_time_limit=None, root_id=None, parent_id=None, route_name=None, shadow=None, chain=None, task_type=None, replaced_task_nesting=0, **options)[source]¶
Send task by name.
Supports the same arguments as
Task.apply_async()
.- Parameters:
name (str) – Name of task to call (e.g., “tasks.add”).
result_cls (AsyncResult) – Specify custom result class.
- AsyncResult¶
Create new result instance.
See also
- GroupResult¶
Create new group result instance.
See also
- WorkController¶
Embeddable worker.
See also
- connection_for_read(url=None, **kwargs)[source]¶
Establish connection used for consuming.
See also
connection()
for supported arguments.
- connection_for_write(url=None, **kwargs)[source]¶
Establish connection used for producing.
See also
connection()
for supported arguments.
- connection(hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, connect_timeout=None, transport=None, transport_options=None, heartbeat=None, login_method=None, failover_strategy=None, **kwargs)[source]¶
Establish a connection to the message broker.
Please use
connection_for_read()
andconnection_for_write()
instead, to convey the intent of use for this connection.- Parameters:
url – Either the URL or the hostname of the broker to use.
hostname (str) – URL, Hostname/IP-address of the broker. If a URL is used, then the other argument below will be taken from the URL instead.
userid (str) – Username to authenticate as.
password (str) – Password to authenticate with
virtual_host (str) – Virtual host to use (domain).
port (int) – Port to connect to.
ssl (bool, Dict) – Defaults to the
broker_use_ssl
setting.transport (str) – defaults to the
broker_transport
setting.transport_options (Dict) – Dictionary of transport specific options.
heartbeat (int) – AMQP Heartbeat in seconds (
pyamqp
only).login_method (str) – Custom login method to use (AMQP only).
failover_strategy (str, Callable) – Custom failover strategy.
**kwargs – Additional arguments to
kombu.Connection
.
- Returns:
the lazy connection instance.
- Return type:
- connection_or_acquire(connection=None, pool=True, *_, **__)[source]¶
Context used to acquire a connection from the pool.
For use within a
with
statement to get a connection from the pool if one is not already provided.- Parameters:
connection (kombu.Connection) – If not provided, a connection will be acquired from the connection pool.
- producer_or_acquire(producer=None)[source]¶
Context used to acquire a producer from the pool.
For use within a
with
statement to get a producer from the pool if one is not already provided- Parameters:
producer (kombu.Producer) – If not provided, a producer will be acquired from the producer pool.
- select_queues(queues=None)[source]¶
Select subset of queues.
- Parameters:
queues (Sequence[str]) – a list of queue names to keep.
- finalize(auto=False)[source]¶
Finalize the app.
This loads built-in tasks, evaluates pending task decorators, reads configuration, etc.
- on_configure¶
Signal sent when app is loading configuration.
- on_after_configure¶
Signal sent after app has prepared the configuration.
- on_after_finalize¶
Signal sent after app has been finalized.
- on_after_fork¶
Signal sent in child process after fork.
Canvas primitives¶
See Canvas: Designing Work-flows for more about creating task work-flows.
- class celery.group(*tasks, **options)[source]¶
Creates a group of tasks to be executed in parallel.
A group is lazy so you must call it to take action and evaluate the group.
Note
If only one argument is passed, and that argument is an iterable then that’ll be used as the list of tasks instead: this allows us to use
group
with generator expressions.Example
>>> lazy_group = group([add.s(2, 2), add.s(4, 4)]) >>> promise = lazy_group() # <-- evaluate: returns lazy result. >>> promise.get() # <-- will wait for the task to return [4, 8]
- Parameters:
*tasks (List[Signature]) – A list of signatures that this group will call. If there’s only one argument, and that argument is an iterable, then that’ll define the list of signatures instead.
**options (Any) – Execution options applied to all tasks in the group.
- Returns:
- signature that when called will then call all of the
tasks in the group (and return a
GroupResult
instance that can be used to inspect the state of the group).
- Return type:
- class celery.chain(*tasks, **kwargs)[source]¶
Chain tasks together.
Each tasks follows one another, by being applied as a callback of the previous task.
Note
If called with only one argument, then that argument must be an iterable of tasks to chain: this allows us to use generator expressions.
Example
This is effectively :
>>> res = chain(add.s(2, 2), add.s(4))() >>> res.get() 8
Calling a chain will return the result of the last task in the chain. You can get to the other tasks by following the
result.parent
’s:>>> res.parent.get() 4
Using a generator expression:
>>> lazy_chain = chain(add.s(i) for i in range(10)) >>> res = lazy_chain(3)
- Parameters:
*tasks (Signature) – List of task signatures to chain. If only one argument is passed and that argument is an iterable, then that’ll be used as the list of signatures to chain instead. This means that you can use a generator expression.
- Returns:
- A lazy signature that can be called to apply the first
task in the chain. When that task succeeds the next task in the chain is applied, and so on.
- Return type:
- celery.chord¶
alias of
_chord
- celery.signature(varies, *args, **kwargs)[source]¶
Create new signature.
if the first argument is a signature already then it’s cloned.
if the first argument is a dict, then a Signature version is returned.
- Returns:
The resulting signature.
- Return type:
- class celery.Signature(task=None, args=None, kwargs=None, options=None, type=None, subtask_type=None, immutable=False, app=None, **ex)[source]¶
Task Signature.
Class that wraps the arguments and execution options for a single task invocation.
Used as the parts in a
group
and other constructs, or to pass tasks around as callbacks while being compatible with serializers with a strict type subset.Signatures can also be created from tasks:
Using the
.signature()
method that has the same signature asTask.apply_async
:>>> add.signature(args=(1,), kwargs={'kw': 2}, options={})
or the
.s()
shortcut that works for star arguments:>>> add.s(1, kw=2)
the
.s()
shortcut does not allow you to specify execution options but there’s a chaining .set method that returns the signature:>>> add.s(2, 2).set(countdown=10).set(expires=30).delay()
Note
You should use
signature()
to create new signatures. TheSignature
class is the type returned by that function and should be used forisinstance
checks for signatures.See also
Canvas: Designing Work-flows for the complete guide.
- Parameters:
task (Union[Type[celery.app.task.Task], str]) – Either a task class/instance, or the name of a task.
args (Tuple) – Positional arguments to apply.
kwargs (Dict) – Keyword arguments to apply.
options (Dict) – Additional options to
Task.apply_async()
.
Note
If the first argument is a
dict
, the other arguments will be ignored and the values in the dict will be used instead:>>> s = signature('tasks.add', args=(2, 2)) >>> signature(s) {'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
Proxies¶
- celery.current_app¶
The currently set app for this thread.
- celery.current_task¶
The task currently being executed (only set in the worker, or when eager/apply is used).