This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for

"""Actual App instance implementation."""
import inspect
import os
import sys
import threading
import warnings
from collections import UserDict, defaultdict, deque
from datetime import datetime
from datetime import timezone as datetime_timezone
from operator import attrgetter

from click.exceptions import Exit
from dateutil.parser import isoparse
from kombu import pools
from kombu.clocks import LamportClock
from kombu.common import oid_from
from kombu.utils.compat import register_after_fork
from kombu.utils.objects import cached_property
from kombu.utils.uuid import uuid
from vine import starpromise

from celery import platforms, signals
from celery._state import (_announce_app_finalized, _deregister_app, _register_app, _set_current_app, _task_stack,
                           connect_on_app_finalize, get_current_app, get_current_worker_task, set_default_app)
from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
from celery.loaders import get_loader_cls
from celery.local import PromiseProxy, maybe_evaluate
from celery.utils import abstract
from celery.utils.collections import AttributeDictMixin
from celery.utils.dispatch import Signal
from celery.utils.functional import first, head_from_fun, maybe_list
from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
from celery.utils.log import get_logger
from celery.utils.objects import FallbackContext, mro_lookup
from celery.utils.time import maybe_make_aware, timezone, to_utc

# Load all builtin tasks
from . import backends, builtins  # noqa
from .annotations import prepare as prepare_annotations
from .autoretry import add_autoretry_behaviour
from .defaults import DEFAULT_SECURITY_DIGEST, find_deprecated_settings
from .registry import TaskRegistry
from .utils import (AppPickler, Settings, _new_key_to_old, _old_key_to_new, _unpickle_app, _unpickle_app_v2, appstr,
                    bugreport, detect_settings)

__all__ = ('Celery',)

logger = get_logger(__name__)


The environment variable {0!r} is not set,
and as such the configuration could not be loaded.

Please set this variable and make sure it points to
a valid configuration module.


def app_has_custom(app, attr):
    """Return true if app has customized method `attr`.

        This is used for optimizations in cases where we know
        how the default behavior works, but need to account
        for someone using inheritance to override a method/property.
    return mro_lookup(app.__class__, attr, stop={Celery, object},

def _unpickle_appattr(reverse_name, args):
    """Unpickle app."""
    # Given an attribute name and a list of args, gets
    # the attribute from the current app and calls it.
    return get_current_app()._rgetattr(reverse_name)(*args)

def _after_fork_cleanup_app(app):
    # This is used with multiprocessing.register_after_fork,
    # so need to be at module level.
    except Exception as exc:  # pylint: disable=broad-except'after forker raised exception: %r', exc, exc_info=1)

class PendingConfiguration(UserDict, AttributeDictMixin):
    # `app.conf` will be of this type before being explicitly configured,
    # meaning the app can keep any configuration set directly
    # on `app.conf` before the `app.config_from_object` call.
    # accessing any key will finalize the configuration,
    # replacing `app.conf` with a concrete settings object.

    callback = None
    _data = None

    def __init__(self, conf, callback):
        object.__setattr__(self, '_data', conf)
        object.__setattr__(self, 'callback', callback)

    def __setitem__(self, key, value):
        self._data[key] = value

    def clear(self):

    def update(self, *args, **kwargs):
        self._data.update(*args, **kwargs)

    def setdefault(self, *args, **kwargs):
        return self._data.setdefault(*args, **kwargs)

    def __contains__(self, key):
        # XXX will not show finalized configuration
        # setdefault will cause `key in d` to happen,
        # so for setdefault to be lazy, so does contains.
        return key in self._data

    def __len__(self):
        return len(

    def __repr__(self):
        return repr(

    def data(self):
        return self.callback()

[docs] class Celery: """Celery application. Arguments: main (str): Name of the main module if running as `__main__`. This is used as the prefix for auto-generated task names. Keyword Arguments: broker (str): URL of the default broker used. backend (Union[str, Type[celery.backends.base.Backend]]): The result store backend class, or the name of the backend class to use. Default is the value of the :setting:`result_backend` setting. autofinalize (bool): If set to False a :exc:`RuntimeError` will be raised if the task registry or tasks are used before the app is finalized. set_as_current (bool): Make this the global current app. include (List[str]): List of modules every worker should import. amqp (Union[str, Type[AMQP]]): AMQP object or class name. events (Union[str, Type[]]): Events object or class name. log (Union[str, Type[Logging]]): Log object or class name. control (Union[str, Type[]]): Control object or class name. tasks (Union[str, Type[TaskRegistry]]): A task registry, or the name of a registry class. fixups (List[str]): List of fix-up plug-ins (e.g., see :mod:`celery.fixups.django`). config_source (Union[str, class]): Take configuration from a class, or object. Attributes may include any settings described in the documentation. task_cls (Union[str, Type[]]): base task class to use. See :ref:`this section <custom-task-cls-app-wide>` for usage. """ #: This is deprecated, use :meth:`reduce_keys` instead Pickler = AppPickler SYSTEM = platforms.SYSTEM IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS #: Name of the `__main__` module. Required for standalone scripts. #: #: If set this will be used instead of `__main__` when automatically #: generating task names. main = None #: Custom options for command-line programs. #: See :ref:`extending-commandoptions` user_options = None #: Custom bootsteps to extend and modify the worker. #: See :ref:`extending-bootsteps`. steps = None builtin_fixups = BUILTIN_FIXUPS amqp_cls = '' backend_cls = None events_cls = '' loader_cls = None log_cls = '' control_cls = '' task_cls = '' registry_cls = '' #: Thread local storage. _local = None _fixups = None _pool = None _conf = None _after_fork_registered = False #: Signal sent when app is loading configuration. on_configure = None #: Signal sent after app has prepared the configuration. on_after_configure = None #: Signal sent after app has been finalized. on_after_finalize = None #: Signal sent by every new process after fork. on_after_fork = None def __init__(self, main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs): self._local = threading.local() self._backend_cache = None self.clock = LamportClock() self.main = main self.amqp_cls = amqp or self.amqp_cls self.events_cls = events or self.events_cls self.loader_cls = loader or self._get_default_loader() self.log_cls = log or self.log_cls self.control_cls = control or self.control_cls self._custom_task_cls_used = bool(task_cls) self.task_cls = task_cls or self.task_cls self.set_as_current = set_as_current self.registry_cls = symbol_by_name(self.registry_cls) self.user_options = defaultdict(set) self.steps = defaultdict(set) self.autofinalize = autofinalize self.namespace = namespace self.strict_typing = strict_typing self.configured = False self._config_source = config_source self._pending_defaults = deque() self._pending_periodic_tasks = deque() self.finalized = False self._finalize_mutex = threading.RLock() self._pending = deque() self._tasks = tasks if not isinstance(self._tasks, TaskRegistry): self._tasks = self.registry_cls(self._tasks or {}) # If the class defines a custom __reduce_args__ we need to use # the old way of pickling apps: pickling a list of # args instead of the new way that pickles a dict of keywords. self._using_v1_reduce = app_has_custom(self, '__reduce_args__') # these options are moved to the config to # simplify pickling of the app object. self._preconf = changes or {} self._preconf_set_by_auto = set() self.__autoset('broker_url', broker) self.__autoset('result_backend', backend) self.__autoset('include', include) for key, value in kwargs.items(): self.__autoset(key, value) self._conf = Settings( PendingConfiguration( self._preconf, self._finalize_pending_conf), prefix=self.namespace, keys=(_old_key_to_new, _new_key_to_old), ) # - Apply fix-ups. self.fixups = set(self.builtin_fixups) if fixups is None else fixups # fixup instances in _fixups to keep weakrefs alive. self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups] if self.set_as_current: self.set_current() # Signals if self.on_configure is None: # used to be a method pre 4.0 self.on_configure = Signal(name='app.on_configure') self.on_after_configure = Signal( name='app.on_after_configure', providing_args={'source'}, ) self.on_after_finalize = Signal(name='app.on_after_finalize') self.on_after_fork = Signal(name='app.on_after_fork') # Boolean signalling, whether fast_trace_task are enabled. # this attribute is set in celery.worker.trace and checked by celery.worker.request self.use_fast_trace_task = False self.on_init() _register_app(self) def _get_default_loader(self): # the --loader command-line argument sets the environment variable. return ( os.environ.get('CELERY_LOADER') or self.loader_cls or '' )
[docs] def on_init(self): """Optional callback called at init."""
def __autoset(self, key, value): if value is not None: self._preconf[key] = value self._preconf_set_by_auto.add(key)
[docs] def set_current(self): """Make this the current app for this thread.""" _set_current_app(self)
[docs] def set_default(self): """Make this the default app for all threads.""" set_default_app(self)
def _ensure_after_fork(self): if not self._after_fork_registered: self._after_fork_registered = True if register_after_fork is not None: register_after_fork(self, _after_fork_cleanup_app)
[docs] def close(self): """Clean up after the application. Only necessary for dynamically created apps, and you should probably use the :keyword:`with` statement instead. Example: >>> with Celery(set_as_current=False) as app: ... with app.connection_for_write() as conn: ... pass """ self._pool = None _deregister_app(self)
def start(self, argv=None): """Run :program:`celery` using `argv`. Uses :data:`sys.argv` if `argv` is not specified. """ from celery.bin.celery import celery celery.params[0].default = self if argv is None: argv = sys.argv try: celery.main(args=argv, standalone_mode=False) except Exit as e: return e.exit_code finally: celery.params[0].default = None def worker_main(self, argv=None): """Run :program:`celery worker` using `argv`. Uses :data:`sys.argv` if `argv` is not specified. """ if argv is None: argv = sys.argv if 'worker' not in argv: raise ValueError( "The worker sub-command must be specified in argv.\n" "Use app.start() to programmatically start other commands." ) self.start(argv=argv)
[docs] def task(self, *args, **opts): """Decorator to create a task class out of any callable. See :ref:`Task options<task-options>` for a list of the arguments that can be passed to this decorator. Examples: .. code-block:: python @app.task def refresh_feed(url): store_feed(feedparser.parse(url)) with setting extra options: .. code-block:: python @app.task(exchange='feeds') def refresh_feed(url): return store_feed(feedparser.parse(url)) Note: App Binding: For custom apps the task decorator will return a proxy object, so that the act of creating the task is not performed until the task is used or the task registry is accessed. If you're depending on binding to be deferred, then you must not access any attributes on the returned object until the application is fully set up (finalized). """ if USING_EXECV and opts.get('lazy', True): # When using execv the task in the original module will point to a # different app, so doing things like 'add.request' will point to # a different task instance. This makes sure it will always use # the task instance from the current app. # Really need a better solution for this :( from . import shared_task return shared_task(*args, lazy=False, **opts) def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): _filt = filter def _create_task_cls(fun): if shared: def cons(app): return app._task_from_fun(fun, **opts) cons.__name__ = fun.__name__ connect_on_app_finalize(cons) if not lazy or self.finalized: ret = self._task_from_fun(fun, **opts) else: # return a proxy object that evaluates on first use ret = PromiseProxy(self._task_from_fun, (fun,), opts, __doc__=fun.__doc__) self._pending.append(ret) if _filt: return _filt(ret) return ret return _create_task_cls if len(args) == 1: if callable(args[0]): return inner_create_task_cls(**opts)(*args) raise TypeError('argument 1 to @task() must be a callable') if args: raise TypeError( '@task() takes exactly 1 argument ({} given)'.format( sum([len(args), len(opts)]))) return inner_create_task_cls(**opts)
def type_checker(self, fun, bound=False): return staticmethod(head_from_fun(fun, bound=bound)) def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): if not self.finalized and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') name = name or self.gen_task_name(fun.__name__, fun.__module__) base = base or self.Task if name not in self._tasks: run = fun if bind else staticmethod(fun) task = type(fun.__name__, (base,), dict({ 'app': self, 'name': name, 'run': run, '_decorated': True, '__doc__': fun.__doc__, '__module__': fun.__module__, '__annotations__': fun.__annotations__, '__header__': self.type_checker(fun, bound=bind), '__wrapped__': run}, **options))() # for some reason __qualname__ cannot be set in type() # so we have to set it here. try: task.__qualname__ = fun.__qualname__ except AttributeError: pass self._tasks[] = task task.bind(self) # connects task to this app add_autoretry_behaviour(task, **options) else: task = self._tasks[name] return task def register_task(self, task, **options): """Utility for registering a task-based class. Note: This is here for compatibility with old Celery 1.0 style task classes, you should not need to use this for new projects. """ task = inspect.isclass(task) and task() or task if not task_cls = type(task) = self.gen_task_name( task_cls.__name__, task_cls.__module__) add_autoretry_behaviour(task, **options) self.tasks[] = task task._app = self task.bind(self) return task
[docs] def gen_task_name(self, name, module): return gen_task_name(self, name, module)
[docs] def finalize(self, auto=False): """Finalize the app. This loads built-in tasks, evaluates pending task decorators, reads configuration, etc. """ with self._finalize_mutex: if not self.finalized: if auto and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') self.finalized = True _announce_app_finalized(self) pending = self._pending while pending: maybe_evaluate(pending.popleft()) for task in self._tasks.values(): task.bind(self) self.on_after_finalize.send(sender=self)
[docs] def add_defaults(self, fun): """Add default configuration from dict ``d``. If the argument is a callable function then it will be regarded as a promise, and it won't be loaded until the configuration is actually needed. This method can be compared to: .. code-block:: pycon >>> celery.conf.update(d) with a difference that 1) no copy will be made and 2) the dict will not be transferred when the worker spawns child processes, so it's important that the same configuration happens at import time when pickle restores the object on the other side. """ if not callable(fun): d, fun = fun, lambda: d if self.configured: return self._conf.add_defaults(fun()) self._pending_defaults.append(fun)
[docs] def config_from_object(self, obj, silent=False, force=False, namespace=None): """Read configuration from object. Object is either an actual object or the name of a module to import. Example: >>> celery.config_from_object('myapp.celeryconfig') >>> from myapp import celeryconfig >>> celery.config_from_object(celeryconfig) Arguments: silent (bool): If true then import errors will be ignored. force (bool): Force reading configuration immediately. By default the configuration will be read only when required. """ self._config_source = obj self.namespace = namespace or self.namespace if force or self.configured: self._conf = None if self.loader.config_from_object(obj, silent=silent): return self.conf
[docs] def config_from_envvar(self, variable_name, silent=False, force=False): """Read configuration from environment variable. The value of the environment variable must be the name of a module to import. Example: >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig' >>> celery.config_from_envvar('CELERY_CONFIG_MODULE') """ module_name = os.environ.get(variable_name) if not module_name: if silent: return False raise ImproperlyConfigured( ERR_ENVVAR_NOT_SET.strip().format(variable_name)) return self.config_from_object(module_name, silent=silent, force=force)
def config_from_cmdline(self, argv, namespace='celery'): self._conf.update( self.loader.cmdline_config_parser(argv, namespace) )
[docs] def setup_security(self, allowed_serializers=None, key=None, key_password=None, cert=None, store=None, digest=DEFAULT_SECURITY_DIGEST, serializer='json'): """Setup the message-signing serializer. This will affect all application instances (a global operation). Disables untrusted serializers and if configured to use the ``auth`` serializer will register the ``auth`` serializer with the provided settings into the Kombu serializer registry. Arguments: allowed_serializers (Set[str]): List of serializer names, or content_types that should be exempt from being disabled. key (str): Name of private key file to use. Defaults to the :setting:`security_key` setting. key_password (bytes): Password to decrypt the private key. Defaults to the :setting:`security_key_password` setting. cert (str): Name of certificate file to use. Defaults to the :setting:`security_certificate` setting. store (str): Directory containing certificates. Defaults to the :setting:`security_cert_store` setting. digest (str): Digest algorithm used when signing messages. Default is ``sha256``. serializer (str): Serializer used to encode messages after they've been signed. See :setting:`task_serializer` for the serializers supported. Default is ``json``. """ from import setup_security return setup_security(allowed_serializers, key, key_password, cert, store, digest, serializer, app=self)
[docs] def autodiscover_tasks(self, packages=None, related_name='tasks', force=False): """Auto-discover task modules. Searches a list of packages for a "" module (or use related_name argument). If the name is empty, this will be delegated to fix-ups (e.g., Django). For example if you have a directory layout like this: .. code-block:: text foo/ bar/ baz/ Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will result in the modules ``foo.tasks`` and ``bar.tasks`` being imported. Arguments: packages (List[str]): List of packages to search. This argument may also be a callable, in which case the value returned is used (for lazy evaluation). related_name (Optional[str]): The name of the module to find. Defaults to "tasks": meaning "look for 'module.tasks' for every module in ``packages``.". If ``None`` will only try to import the package, i.e. "look for 'module'". force (bool): By default this call is lazy so that the actual auto-discovery won't happen until an application imports the default modules. Forcing will cause the auto-discovery to happen immediately. """ if force: return self._autodiscover_tasks(packages, related_name) signals.import_modules.connect(starpromise( self._autodiscover_tasks, packages, related_name, ), weak=False, sender=self)
def _autodiscover_tasks(self, packages, related_name, **kwargs): if packages: return self._autodiscover_tasks_from_names(packages, related_name) return self._autodiscover_tasks_from_fixups(related_name) def _autodiscover_tasks_from_names(self, packages, related_name): # packages argument can be lazy return self.loader.autodiscover_tasks( packages() if callable(packages) else packages, related_name, ) def _autodiscover_tasks_from_fixups(self, related_name): return self._autodiscover_tasks_from_names([ pkg for fixup in self._fixups if hasattr(fixup, 'autodiscover_tasks') for pkg in fixup.autodiscover_tasks() ], related_name=related_name)
[docs] def send_task(self, name, args=None, kwargs=None, countdown=None, eta=None, task_id=None, producer=None, connection=None, router=None, result_cls=None, expires=None, publisher=None, link=None, link_error=None, add_to_parent=True, group_id=None, group_index=None, retries=0, chord=None, reply_to=None, time_limit=None, soft_time_limit=None, root_id=None, parent_id=None, route_name=None, shadow=None, chain=None, task_type=None, replaced_task_nesting=0, **options): """Send task by name. Supports the same arguments as :meth:`@-Task.apply_async`. Arguments: name (str): Name of task to call (e.g., `"tasks.add"`). result_cls (AsyncResult): Specify custom result class. """ parent = have_parent = None amqp = self.amqp task_id = task_id or uuid() producer = producer or publisher # XXX compat router = router or amqp.router conf = self.conf if conf.task_always_eager: # pragma: no cover warnings.warn(AlwaysEagerIgnored( 'task_always_eager has no effect on send_task', ), stacklevel=2) ignore_result = options.pop('ignore_result', False) options = router.route( options, route_name or name, args, kwargs, task_type) if expires is not None: if isinstance(expires, datetime): expires_s = (maybe_make_aware( expires) - elif isinstance(expires, str): expires_s = (maybe_make_aware( isoparse(expires)) - else: expires_s = expires if expires_s < 0: logger.warning( f"{task_id} has an expiration date in the past ({-expires_s}s ago).\n" "We assume this is intended and so we have set the " "expiration date to 0 instead.\n" "According to RabbitMQ's documentation:\n" "\"Setting the TTL to 0 causes messages to be expired upon " "reaching a queue unless they can be delivered to a " "consumer immediately.\"\n" "If this was unintended, please check the code which " "published this task." ) expires_s = 0 options["expiration"] = expires_s if not root_id or not parent_id: parent = self.current_worker_task if parent: if not root_id: root_id = parent.request.root_id or if not parent_id: parent_id = if conf.task_inherit_parent_priority: options.setdefault('priority', parent.request.delivery_info.get('priority')) # alias for 'task_as_v2' message = amqp.create_task_message( task_id, name, args, kwargs, countdown, eta, group_id, group_index, expires, retries, chord, maybe_list(link), maybe_list(link_error), reply_to or self.thread_oid, time_limit, soft_time_limit, self.conf.task_send_sent_event, root_id, parent_id, shadow, chain, ignore_result=ignore_result, replaced_task_nesting=replaced_task_nesting, **options ) stamped_headers = options.pop('stamped_headers', []) for stamp in stamped_headers: options.pop(stamp) if connection: producer = amqp.Producer(connection, auto_declare=False) with self.producer_or_acquire(producer) as P: with P.connection._reraise_as_library_errors(): if not ignore_result: self.backend.on_task_call(P, task_id) amqp.send_task_message(P, name, message, **options) result = (result_cls or self.AsyncResult)(task_id) # We avoid using the constructor since a custom result class # can be used, in which case the constructor may still use # the old signature. result.ignored = ignore_result if add_to_parent: if not have_parent: parent, have_parent = self.current_worker_task, True if parent: parent.add_trail(result) return result
[docs] def connection_for_read(self, url=None, **kwargs): """Establish connection used for consuming. See Also: :meth:`connection` for supported arguments. """ return self._connection(url or self.conf.broker_read_url, **kwargs)
[docs] def connection_for_write(self, url=None, **kwargs): """Establish connection used for producing. See Also: :meth:`connection` for supported arguments. """ return self._connection(url or self.conf.broker_write_url, **kwargs)
[docs] def connection(self, hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, connect_timeout=None, transport=None, transport_options=None, heartbeat=None, login_method=None, failover_strategy=None, **kwargs): """Establish a connection to the message broker. Please use :meth:`connection_for_read` and :meth:`connection_for_write` instead, to convey the intent of use for this connection. Arguments: url: Either the URL or the hostname of the broker to use. hostname (str): URL, Hostname/IP-address of the broker. If a URL is used, then the other argument below will be taken from the URL instead. userid (str): Username to authenticate as. password (str): Password to authenticate with virtual_host (str): Virtual host to use (domain). port (int): Port to connect to. ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl` setting. transport (str): defaults to the :setting:`broker_transport` setting. transport_options (Dict): Dictionary of transport specific options. heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only). login_method (str): Custom login method to use (AMQP only). failover_strategy (str, Callable): Custom failover strategy. **kwargs: Additional arguments to :class:`kombu.Connection`. Returns: kombu.Connection: the lazy connection instance. """ return self.connection_for_write( hostname or self.conf.broker_write_url, userid=userid, password=password, virtual_host=virtual_host, port=port, ssl=ssl, connect_timeout=connect_timeout, transport=transport, transport_options=transport_options, heartbeat=heartbeat, login_method=login_method, failover_strategy=failover_strategy, **kwargs )
def _connection(self, url, userid=None, password=None, virtual_host=None, port=None, ssl=None, connect_timeout=None, transport=None, transport_options=None, heartbeat=None, login_method=None, failover_strategy=None, **kwargs): conf = self.conf return self.amqp.Connection( url, userid or conf.broker_user, password or conf.broker_password, virtual_host or conf.broker_vhost, port or conf.broker_port, transport=transport or conf.broker_transport, ssl=self.either('broker_use_ssl', ssl), heartbeat=heartbeat, login_method=login_method or conf.broker_login_method, failover_strategy=( failover_strategy or conf.broker_failover_strategy ), transport_options=dict( conf.broker_transport_options, **transport_options or {} ), connect_timeout=self.either( 'broker_connection_timeout', connect_timeout ), ) broker_connection = connection def _acquire_connection(self, pool=True): """Helper for :meth:`connection_or_acquire`.""" if pool: return self.pool.acquire(block=True) return self.connection_for_write()
[docs] def connection_or_acquire(self, connection=None, pool=True, *_, **__): """Context used to acquire a connection from the pool. For use within a :keyword:`with` statement to get a connection from the pool if one is not already provided. Arguments: connection (kombu.Connection): If not provided, a connection will be acquired from the connection pool. """ return FallbackContext(connection, self._acquire_connection, pool=pool)
default_connection = connection_or_acquire # XXX compat
[docs] def producer_or_acquire(self, producer=None): """Context used to acquire a producer from the pool. For use within a :keyword:`with` statement to get a producer from the pool if one is not already provided Arguments: producer (kombu.Producer): If not provided, a producer will be acquired from the producer pool. """ return FallbackContext( producer, self.producer_pool.acquire, block=True, )
default_producer = producer_or_acquire # XXX compat
[docs] def prepare_config(self, c): """Prepare configuration before it is merged with the defaults.""" return find_deprecated_settings(c)
[docs] def now(self): """Return the current time and date as a datetime.""" now_in_utc = to_utc( return now_in_utc.astimezone(self.timezone)
[docs] def select_queues(self, queues=None): """Select subset of queues. Arguments: queues (Sequence[str]): a list of queue names to keep. """ return
def either(self, default_key, *defaults): """Get key from configuration or use default values. Fallback to the value of a configuration key if none of the `*values` are true. """ return first(None, [ first(None, defaults), starpromise(self.conf.get, default_key), ])
[docs] def bugreport(self): """Return information useful in bug reports.""" return bugreport(self)
def _get_backend(self): backend, url = backends.by_url( self.backend_cls or self.conf.result_backend, self.loader) return backend(app=self, url=url) def _finalize_pending_conf(self): """Get config value by key and finalize loading the configuration. Note: This is used by PendingConfiguration: as soon as you access a key the configuration is read. """ try: conf = self._conf = self._load_config() except AttributeError as err: # AttributeError is not propagated, it is "handled" by # PendingConfiguration parent class. This causes # confusing RecursionError. raise ModuleNotFoundError(*err.args) from err return conf def _load_config(self): if isinstance(self.on_configure, Signal): self.on_configure.send(sender=self) else: # used to be a method pre 4.0 self.on_configure() if self._config_source: self.loader.config_from_object(self._config_source) self.configured = True settings = detect_settings( self.prepare_config(self.loader.conf), self._preconf, ignore_keys=self._preconf_set_by_auto, prefix=self.namespace, ) if self._conf is not None: # replace in place, as someone may have referenced app.conf, # done some changes, accessed a key, and then try to make more # changes to the reference and not the finalized value. self._conf.swap_with(settings) else: self._conf = settings # load lazy config dict initializers. pending_def = self._pending_defaults while pending_def: self._conf.add_defaults(maybe_evaluate(pending_def.popleft()())) # load lazy periodic tasks pending_beat = self._pending_periodic_tasks while pending_beat: periodic_task_args, periodic_task_kwargs = pending_beat.popleft() self._add_periodic_task(*periodic_task_args, **periodic_task_kwargs) self.on_after_configure.send(sender=self, source=self._conf) return self._conf def _after_fork(self): self._pool = None try: self.__dict__['amqp']._producer_pool = None except (AttributeError, KeyError): pass self.on_after_fork.send(sender=self)
[docs] def signature(self, *args, **kwargs): """Return a new :class:`~celery.Signature` bound to this app.""" kwargs['app'] = self return self._canvas.signature(*args, **kwargs)
[docs] def add_periodic_task(self, schedule, sig, args=(), kwargs=(), name=None, **opts): """ Add a periodic task to beat schedule. Celery beat store tasks based on `sig` or `name` if provided. Adding the same signature twice make the second task override the first one. To avoid the override, use distinct `name` for them. """ key, entry = self._sig_to_periodic_task_entry( schedule, sig, args, kwargs, name, **opts) if self.configured: self._add_periodic_task(key, entry, name=name) else: self._pending_periodic_tasks.append([(key, entry), {"name": name}]) return key
def _sig_to_periodic_task_entry(self, schedule, sig, args=(), kwargs=None, name=None, **opts): kwargs = {} if not kwargs else kwargs sig = (sig.clone(args, kwargs) if isinstance(sig, abstract.CallableSignature) else self.signature(, args, kwargs)) return name or repr(sig), { 'schedule': schedule, 'task':, 'args': sig.args, 'kwargs': sig.kwargs, 'options': dict(sig.options, **opts), } def _add_periodic_task(self, key, entry, name=None): if name is None and key in self._conf.beat_schedule: logger.warning( f"Periodic task key='{key}' shadowed a previous unnamed periodic task." " Pass a name kwarg to add_periodic_task to silence this warning." ) self._conf.beat_schedule[key] = entry def create_task_cls(self): """Create a base task class bound to this app.""" return self.subclass_with_self( self.task_cls, name='Task', attribute='_app', keep_reduce=True, abstract=True, ) def subclass_with_self(self, Class, name=None, attribute='app', reverse=None, keep_reduce=False, **kw): """Subclass an app-compatible class. App-compatible means that the class has a class attribute that provides the default app it should use, for example: ``class Foo: app = None``. Arguments: Class (type): The app-compatible class to subclass. name (str): Custom name for the target class. attribute (str): Name of the attribute holding the app, Default is 'app'. reverse (str): Reverse path to this object used for pickling purposes. For example, to get ``app.AsyncResult``, use ``"AsyncResult"``. keep_reduce (bool): If enabled a custom ``__reduce__`` implementation won't be provided. """ Class = symbol_by_name(Class) reverse = reverse if reverse else Class.__name__ def __reduce__(self): return _unpickle_appattr, (reverse, self.__reduce_args__()) attrs = dict( {attribute: self}, __module__=Class.__module__, __doc__=Class.__doc__, **kw) if not keep_reduce: attrs['__reduce__'] = __reduce__ return type(name or Class.__name__, (Class,), attrs) def _rgetattr(self, path): return attrgetter(path)(self) def __enter__(self): return self def __exit__(self, *exc_info): self.close() def __repr__(self): return f'<{type(self).__name__} {appstr(self)}>' def __reduce__(self): if self._using_v1_reduce: return self.__reduce_v1__() return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__())) def __reduce_v1__(self): # Reduce only pickles the configuration changes, # so the default configuration doesn't have to be passed # between processes. return ( _unpickle_app, (self.__class__, self.Pickler) + self.__reduce_args__(), ) def __reduce_keys__(self): """Keyword arguments used to reconstruct the object when unpickling.""" return { 'main': self.main, 'changes': self._conf.changes if self.configured else self._preconf, 'loader': self.loader_cls, 'backend': self.backend_cls, 'amqp': self.amqp_cls, 'events': self.events_cls, 'log': self.log_cls, 'control': self.control_cls, 'fixups': self.fixups, 'config_source': self._config_source, 'task_cls': self.task_cls, 'namespace': self.namespace, } def __reduce_args__(self): """Deprecated method, please use :meth:`__reduce_keys__` instead.""" return (self.main, self._conf.changes if self.configured else {}, self.loader_cls, self.backend_cls, self.amqp_cls, self.events_cls, self.log_cls, self.control_cls, False, self._config_source) @cached_property def Worker(self): """Worker application. See Also: :class:`~@Worker`. """ return self.subclass_with_self('celery.apps.worker:Worker') @cached_property def WorkController(self, **kwargs): """Embeddable worker. See Also: :class:`~@WorkController`. """ return self.subclass_with_self('celery.worker:WorkController') @cached_property def Beat(self, **kwargs): """:program:`celery beat` scheduler application. See Also: :class:`~@Beat`. """ return self.subclass_with_self('celery.apps.beat:Beat') @cached_property def Task(self): """Base task class for this app.""" return self.create_task_cls() @cached_property def annotations(self): return prepare_annotations(self.conf.task_annotations) @cached_property def AsyncResult(self): """Create new result instance. See Also: :class:`celery.result.AsyncResult`. """ return self.subclass_with_self('celery.result:AsyncResult') @cached_property def ResultSet(self): return self.subclass_with_self('celery.result:ResultSet') @cached_property def GroupResult(self): """Create new group result instance. See Also: :class:`celery.result.GroupResult`. """ return self.subclass_with_self('celery.result:GroupResult') @property def pool(self): """Broker connection pool: :class:`~@pool`. Note: This attribute is not related to the workers concurrency pool. """ if self._pool is None: self._ensure_after_fork() limit = self.conf.broker_pool_limit pools.set_limit(limit) self._pool = pools.connections[self.connection_for_write()] return self._pool @property def current_task(self): """Instance of task being executed, or :const:`None`.""" return @property def current_worker_task(self): """The task currently being executed by a worker or :const:`None`. Differs from :data:`current_task` in that it's not affected by tasks calling other tasks directly, or eagerly. """ return get_current_worker_task() @cached_property def oid(self): """Universally unique identifier for this app.""" # since 4.0: thread.get_ident() is not included when # generating the process id. This is due to how the RPC # backend now dedicates a single thread to receive results, # which would not work if each thread has a separate id. return oid_from(self, threads=False) @property def thread_oid(self): """Per-thread unique identifier for this app.""" try: return self._local.oid except AttributeError: self._local.oid = new_oid = oid_from(self, threads=True) return new_oid @cached_property def amqp(self): """AMQP related functionality: :class:`~@amqp`.""" return instantiate(self.amqp_cls, app=self) @property def _backend(self): """A reference to the backend object Uses self._backend_cache if it is thread safe. Otherwise, use self._local """ if self._backend_cache is not None: return self._backend_cache return getattr(self._local, "backend", None) @_backend.setter def _backend(self, backend): """Set the backend object on the app""" if backend.thread_safe: self._backend_cache = backend else: self._local.backend = backend @property def backend(self): """Current backend instance.""" if self._backend is None: self._backend = self._get_backend() return self._backend @property def conf(self): """Current configuration.""" if self._conf is None: self._conf = self._load_config() return self._conf @conf.setter def conf(self, d): self._conf = d @cached_property def control(self): """Remote control: :class:`~@control`.""" return instantiate(self.control_cls, app=self) @cached_property def events(self): """Consuming and sending events: :class:`~@events`.""" return instantiate(self.events_cls, app=self) @cached_property def loader(self): """Current loader instance.""" return get_loader_cls(self.loader_cls)(app=self) @cached_property def log(self): """Logging: :class:`~@log`.""" return instantiate(self.log_cls, app=self) @cached_property def _canvas(self): from celery import canvas return canvas @cached_property def tasks(self): """Task registry. Warning: Accessing this attribute will also auto-finalize the app. """ self.finalize(auto=True) return self._tasks @property def producer_pool(self): return self.amqp.producer_pool def uses_utc_timezone(self): """Check if the application uses the UTC timezone.""" return self.timezone == timezone.utc @cached_property def timezone(self): """Current timezone for this app. This is a cached property taking the time zone from the :setting:`timezone` setting. """ conf = self.conf if not conf.timezone: if conf.enable_utc: return timezone.utc else: return timezone.local return timezone.get_timezone(conf.timezone)
App = Celery # XXX compat