This document describes Celery 2.4. For development docs, go here.
celery.task.base¶
celery.task.base¶
The task implementation has been moved to celery.app.task
.
copyright: |
|
---|---|
license: | BSD, see LICENSE for more details. |
-
class
celery.task.base.
BaseTask
¶ Task base class.
When called tasks apply the
run()
method. This method must be defined by all tasks (that is unless the__call__()
method is overridden).-
classmethod
AsyncResult
(task_id)¶ Get AsyncResult instance for this kind of task.
Parameters: task_id – Task id to get result for.
-
class
ErrorMail
(task, **kwargs)¶ Defines how and when task error e-mails should be sent.
Parameters: task – The task instance that raised the error. subject
andbody
are format strings which are passed a context containing the following keys:name
Name of the task.
id
UUID of the task.
exc
String representation of the exception.
args
Positional arguments.
kwargs
Keyword arguments.
traceback
String representation of the traceback.
hostname
Worker hostname.
-
should_send
(context, exc)¶ Returns true or false depending on if a task error mail should be sent for this type of error.
-
exception
BaseTask.
MaxRetriesExceededError
¶ The tasks max restart limit has been exceeded.
-
BaseTask.
after_return
(status, retval, task_id, args, kwargs, einfo)¶ Handler called after the task returns.
Parameters: - status – Current task state.
- retval – Task return value/exception.
- task_id – Unique id of the task.
- args – Original arguments for the task that failed.
- kwargs – Original keyword arguments for the task that failed.
- einfo –
ExceptionInfo
instance, containing the traceback (if any).
The return value of this handler is ignored.
-
classmethod
BaseTask.
apply
(args=None, kwargs=None, **options)¶ Execute this task locally, by blocking until the task returns.
Parameters: - args – positional arguments passed on to the task.
- kwargs – keyword arguments passed on to the task.
- throw – Re-raise task exceptions. Defaults to
the
CELERY_EAGER_PROPAGATES_EXCEPTIONS
setting.
:rtype
celery.result.EagerResult
:
-
classmethod
BaseTask.
apply_async
(args=None, kwargs=None, countdown=None, eta=None, task_id=None, publisher=None, connection=None, connect_timeout=None, router=None, expires=None, queues=None, **options)¶ Apply tasks asynchronously by sending a message.
Parameters: - args – The positional arguments to pass on to the
task (a
list
ortuple
). - kwargs – The keyword arguments to pass on to the
task (a
dict
) - countdown – Number of seconds into the future that the task should execute. Defaults to immediate execution (do not confuse with the immediate flag, as they are unrelated).
- eta – A
datetime
object describing the absolute time and date of when the task should be executed. May not be specified if countdown is also supplied. (Do not confuse this with the immediate flag, as they are unrelated). - expires – Either a
int
, describing the number of seconds, or adatetime
object that describes the absolute time and date of when the task should expire. The task will not be executed after the expiration time. - connection – Re-use existing broker connection instead of establishing a new one. The connect_timeout argument is not respected if this is set.
- connect_timeout – The timeout in seconds, before we give up on establishing a connection to the AMQP server.
- retry – If enabled sending of the task message will be retried
in the event of connection loss or failure. Default
is taken from the
CELERY_TASK_PUBLISH_RETRY
setting. Note you need to handle the publisher/connection manually for this to work. - retry_policy – Override the retry policy used. See the
CELERY_TASK_PUBLISH_RETRY
setting. - routing_key – The routing key used to route the task to a
worker server. Defaults to the
routing_key
attribute. - exchange – The named exchange to send the task to.
Defaults to the
exchange
attribute. - exchange_type – The exchange type to initialize the exchange
if not already declared. Defaults to the
exchange_type
attribute. - immediate – Request immediate delivery. Will raise an
exception if the task cannot be routed to a worker
immediately. (Do not confuse this parameter with
the countdown and eta settings, as they are
unrelated). Defaults to the
immediate
attribute. - mandatory – Mandatory routing. Raises an exception if
there’s no running workers able to take on this
task. Defaults to the
mandatory
attribute. - priority – The task priority, a number between 0 and 9.
Defaults to the
priority
attribute. - serializer – A string identifying the default
serialization method to use. Can be pickle,
json, yaml, msgpack or any custom
serialization method that has been registered
with
kombu.serialization.registry
. Defaults to theserializer
attribute. - compression – A string identifying the compression method
to use. Can be one of
zlib
,bzip2
, or any custom compression methods registered withkombu.compression.register()
. Defaults to theCELERY_MESSAGE_COMPRESSION
setting.
Note
If the
CELERY_ALWAYS_EAGER
setting is set, it will be replaced by a localapply()
call instead.- args – The positional arguments to pass on to the
task (a
-
classmethod
BaseTask.
delay
(*args, **kwargs)¶ Star argument version of
apply_async()
.Does not support the extra options enabled by
apply_async()
.Parameters: - *args – positional arguments passed on to the task.
- **kwargs – keyword arguments passed on to the task.
:returns
celery.result.AsyncResult
:
-
classmethod
BaseTask.
establish_connection
(connect_timeout=None)¶ Establish a connection to the message broker.
-
BaseTask.
execute
(request, pool, loglevel, logfile, **kwargs)¶ The method the worker calls to execute the task.
Parameters: - request – A
TaskRequest
. - pool – A task pool.
- loglevel – Current loglevel.
- logfile – Name of the currently used logfile.
- consumer – The
Consumer
.
- request – A
-
classmethod
BaseTask.
get_consumer
(connection=None, connect_timeout=None)¶ Get message consumer.
:rtype
kombu.messaging.Consumer
:Warning
If you don’t specify a connection, one will automatically be established for you, in that case you need to close this connection after use:
>>> consumer = self.get_consumer() >>> # do something with consumer >>> consumer.close() >>> consumer.connection.close()
-
classmethod
BaseTask.
get_logger
(loglevel=None, logfile=None, propagate=False, **kwargs)¶ Get task-aware logger object.
-
classmethod
BaseTask.
get_publisher
(connection=None, exchange=None, connect_timeout=None, exchange_type=None, **options)¶ Get a celery task message publisher.
:rtype
TaskPublisher
:Warning
If you don’t specify a connection, one will automatically be established for you, in that case you need to close this connection after use:
>>> publisher = self.get_publisher() >>> # ... do something with publisher >>> publisher.connection.close()
or used as a context:
>>> with self.get_publisher() as publisher: ... # ... do something with publisher
-
BaseTask.
on_failure
(exc, task_id, args, kwargs, einfo)¶ Error handler.
This is run by the worker when the task fails.
Parameters: - exc – The exception raised by the task.
- task_id – Unique id of the failed task.
- args – Original arguments for the task that failed.
- kwargs – Original keyword arguments for the task that failed.
- einfo –
ExceptionInfo
instance, containing the traceback.
The return value of this handler is ignored.
-
BaseTask.
on_retry
(exc, task_id, args, kwargs, einfo)¶ Retry handler.
This is run by the worker when the task is to be retried.
Parameters: - exc – The exception sent to
retry()
. - task_id – Unique id of the retried task.
- args – Original arguments for the retried task.
- kwargs – Original keyword arguments for the retried task.
- einfo –
ExceptionInfo
instance, containing the traceback.
The return value of this handler is ignored.
- exc – The exception sent to
-
BaseTask.
on_success
(retval, task_id, args, kwargs)¶ Success handler.
Run by the worker if the task executes successfully.
Parameters: - retval – The return value of the task.
- task_id – Unique id of the executed task.
- args – Original arguments for the executed task.
- kwargs – Original keyword arguments for the executed task.
The return value of this handler is ignored.
-
classmethod
BaseTask.
retry
(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)¶ Retry the task.
Parameters: - args – Positional arguments to retry with.
- kwargs – Keyword arguments to retry with.
- exc – Optional exception to raise instead of
MaxRetriesExceededError
when the max restart limit has been exceeded. - countdown – Time in seconds to delay the retry for.
- eta – Explicit time and date to run the retry at
(must be a
datetime
instance). - max_retries – If set, overrides the default retry limit.
- **options – Any extra options to pass on to meth:apply_async.
- throw – If this is
False
, do not raise theRetryTaskError
exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns.
Raises celery.exceptions.RetryTaskError: To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to
False
, and is considered normal operation.Example
>>> @task >>> def tweet(auth, message): ... twitter = Twitter(oauth=auth) ... try: ... twitter.post_status_update(message) ... except twitter.FailWhale, exc: ... # Retry in 5 minutes. ... return tweet.retry(countdown=60 * 5, exc=exc)
Although the task will never return above as retry raises an exception to notify the worker, we use return in front of the retry to convey that the rest of the block will not be executed.
-
BaseTask.
run
(*args, **kwargs)¶ The body of the task executed by workers.
-
classmethod
-
class
celery.task.base.
PeriodicTask
¶ A periodic task is a task that behaves like a cron job.
Results of periodic tasks are not stored by default.
-
run_every
¶ REQUIRED Defines how often the task is run (its interval), it can be a
timedelta
object, acrontab
object or an integer specifying the time in seconds.
-
relative
¶ If set to
True
, run times are relative to the time when the server was started. This was the previous behaviour, periodic tasks are now scheduled by the clock.
Raises NotImplementedError: if the run_every
attribute is not defined.Example
>>> from celery.task import tasks, PeriodicTask >>> from datetime import timedelta >>> class EveryThirtySecondsTask(PeriodicTask): ... run_every = timedelta(seconds=30) ... ... def run(self, **kwargs): ... logger = self.get_logger(**kwargs) ... logger.info("Execute every 30 seconds")
>>> from celery.task import PeriodicTask >>> from celery.schedules import crontab
>>> class EveryMondayMorningTask(PeriodicTask): ... run_every = crontab(hour=7, minute=30, day_of_week=1) ... ... def run(self, **kwargs): ... logger = self.get_logger(**kwargs) ... logger.info("Execute every Monday at 7:30AM.")
>>> class EveryMorningTask(PeriodicTask): ... run_every = crontab(hours=7, minute=30) ... ... def run(self, **kwargs): ... logger = self.get_logger(**kwargs) ... logger.info("Execute every day at 7:30AM.")
>>> class EveryQuarterPastTheHourTask(PeriodicTask): ... run_every = crontab(minute=15) ... ... def run(self, **kwargs): ... logger = self.get_logger(**kwargs) ... logger.info("Execute every 0:15 past the hour every day.")
-
is_due
(last_run_at)¶ Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds.
See
celery.schedules.schedule.is_due()
for more information.
-
remaining_estimate
(last_run_at)¶ Returns when the periodic task should run next as a timedelta.
-
-
class
celery.task.base.
TaskType
¶ Meta class for tasks.
Automatically registers the task in the task registry, except if the abstract attribute is set.
If no name attribute is provided, then no name is automatically set to the name of the module it was defined in, and the class name.