This document describes the current stable version of Celery (5.0). For development docs, go here.

Source code for celery.app.defaults

"""Configuration introspection and defaults."""
from collections import deque, namedtuple
from datetime import timedelta

from celery.utils.functional import memoize
from celery.utils.serialization import strtobool

__all__ = ('Option', 'NAMESPACES', 'flatten', 'find')


DEFAULT_POOL = 'prefork'

DEFAULT_ACCEPT_CONTENT = ['json']
DEFAULT_PROCESS_LOG_FMT = """
    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
""".strip()
DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
%(task_name)s[%(task_id)s]: %(message)s"""

DEFAULT_SECURITY_DIGEST = 'sha256'


OLD_NS = {'celery_{0}'}
OLD_NS_BEAT = {'celerybeat_{0}'}
OLD_NS_WORKER = {'celeryd_{0}'}

searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))


def Namespace(__old__=None, **options):
    if __old__ is not None:
        for key, opt in options.items():
            if not opt.old:
                opt.old = {o.format(key) for o in __old__}
    return options


def old_ns(ns):
    return {f'{ns}_{{0}}'}


[docs]class Option: """Describes a Celery configuration option.""" alt = None deprecate_by = None remove_by = None old = set() typemap = {'string': str, 'int': int, 'float': float, 'any': lambda v: v, 'bool': strtobool, 'dict': dict, 'tuple': tuple} def __init__(self, default=None, *args, **kwargs): self.default = default self.type = kwargs.get('type') or 'string' for attr, value in kwargs.items(): setattr(self, attr, value)
[docs] def to_python(self, value): return self.typemap[self.type](value)
def __repr__(self): return '<Option: type->{} default->{!r}>'.format(self.type, self.default)
NAMESPACES = Namespace( accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS), result_accept_content=Option(None, type='list'), enable_utc=Option(True, type='bool'), imports=Option((), type='tuple', old=OLD_NS), include=Option((), type='tuple', old=OLD_NS), timezone=Option(type='string', old=OLD_NS), beat=Namespace( __old__=OLD_NS_BEAT, max_loop_interval=Option(0, type='float'), schedule=Option({}, type='dict'), scheduler=Option('celery.beat:PersistentScheduler'), schedule_filename=Option('celerybeat-schedule'), sync_every=Option(0, type='int'), ), broker=Namespace( url=Option(None, type='string'), read_url=Option(None, type='string'), write_url=Option(None, type='string'), transport=Option(type='string'), transport_options=Option({}, type='dict'), connection_timeout=Option(4, type='float'), connection_retry=Option(True, type='bool'), connection_max_retries=Option(100, type='int'), failover_strategy=Option(None, type='string'), heartbeat=Option(120, type='int'), heartbeat_checkrate=Option(3.0, type='int'), login_method=Option(None, type='string'), pool_limit=Option(10, type='int'), use_ssl=Option(False, type='bool'), host=Option(type='string'), port=Option(type='int'), user=Option(type='string'), password=Option(type='string'), vhost=Option(type='string'), ), cache=Namespace( __old__=old_ns('celery_cache'), backend=Option(), backend_options=Option({}, type='dict'), ), cassandra=Namespace( entry_ttl=Option(type='float'), keyspace=Option(type='string'), port=Option(type='string'), read_consistency=Option(type='string'), servers=Option(type='list'), table=Option(type='string'), write_consistency=Option(type='string'), auth_provider=Option(type='string'), auth_kwargs=Option(type='string'), options=Option({}, type='dict'), ), s3=Namespace( access_key_id=Option(type='string'), secret_access_key=Option(type='string'), bucket=Option(type='string'), base_path=Option(type='string'), endpoint_url=Option(type='string'), region=Option(type='string'), ), azureblockblob=Namespace( container_name=Option('celery', type='string'), retry_initial_backoff_sec=Option(2, type='int'), retry_increment_base=Option(2, type='int'), retry_max_attempts=Option(3, type='int'), ), control=Namespace( queue_ttl=Option(300.0, type='float'), queue_expires=Option(10.0, type='float'), exchange=Option('celery', type='string'), ), couchbase=Namespace( __old__=old_ns('celery_couchbase'), backend_settings=Option(None, type='dict'), ), arangodb=Namespace( __old__=old_ns('celery_arangodb'), backend_settings=Option(None, type='dict') ), mongodb=Namespace( __old__=old_ns('celery_mongodb'), backend_settings=Option(type='dict'), ), cosmosdbsql=Namespace( database_name=Option('celerydb', type='string'), collection_name=Option('celerycol', type='string'), consistency_level=Option('Session', type='string'), max_retry_attempts=Option(9, type='int'), max_retry_wait_time=Option(30, type='int'), ), event=Namespace( __old__=old_ns('celery_event'), queue_expires=Option(60.0, type='float'), queue_ttl=Option(5.0, type='float'), queue_prefix=Option('celeryev'), serializer=Option('json'), exchange=Option('celeryev', type='string'), ), redis=Namespace( __old__=old_ns('celery_redis'), backend_use_ssl=Option(type='dict'), db=Option(type='int'), host=Option(type='string'), max_connections=Option(type='int'), password=Option(type='string'), port=Option(type='int'), socket_timeout=Option(120.0, type='float'), socket_connect_timeout=Option(None, type='float'), retry_on_timeout=Option(False, type='bool'), socket_keepalive=Option(False, type='bool'), ), result=Namespace( __old__=old_ns('celery_result'), backend=Option(type='string'), cache_max=Option( -1, type='int', old={'celery_max_cached_results'}, ), compression=Option(type='str'), exchange=Option('celeryresults'), exchange_type=Option('direct'), expires=Option( timedelta(days=1), type='float', old={'celery_task_result_expires'}, ), persistent=Option(None, type='bool'), extended=Option(False, type='bool'), serializer=Option('json'), backend_transport_options=Option({}, type='dict'), chord_retry_interval=Option(1.0, type='float'), chord_join_timeout=Option(3.0, type='float'), backend_max_sleep_between_retries_ms=Option(10000, type='int'), backend_max_retries=Option(float("inf"), type='float'), backend_base_sleep_between_retries_ms=Option(10, type='int'), backend_always_retry=Option(False, type='bool'), ), elasticsearch=Namespace( __old__=old_ns('celery_elasticsearch'), retry_on_timeout=Option(type='bool'), max_retries=Option(type='int'), timeout=Option(type='float'), save_meta_as_text=Option(True, type='bool'), ), security=Namespace( __old__=old_ns('celery_security'), certificate=Option(type='string'), cert_store=Option(type='string'), key=Option(type='string'), digest=Option(DEFAULT_SECURITY_DIGEST, type='string'), ), database=Namespace( url=Option(old={'celery_result_dburi'}), engine_options=Option( type='dict', old={'celery_result_engine_options'}, ), short_lived_sessions=Option( False, type='bool', old={'celery_result_db_short_lived_sessions'}, ), table_schemas=Option(type='dict'), table_names=Option(type='dict', old={'celery_result_db_tablenames'}), ), task=Namespace( __old__=OLD_NS, acks_late=Option(False, type='bool'), acks_on_failure_or_timeout=Option(True, type='bool'), always_eager=Option(False, type='bool'), annotations=Option(type='any'), compression=Option(type='string', old={'celery_message_compression'}), create_missing_queues=Option(True, type='bool'), inherit_parent_priority=Option(False, type='bool'), default_delivery_mode=Option(2, type='string'), default_queue=Option('celery'), default_exchange=Option(None, type='string'), # taken from queue default_exchange_type=Option('direct'), default_routing_key=Option(None, type='string'), # taken from queue default_rate_limit=Option(type='string'), default_priority=Option(None, type='string'), eager_propagates=Option( False, type='bool', old={'celery_eager_propagates_exceptions'}, ), ignore_result=Option(False, type='bool'), protocol=Option(2, type='int', old={'celery_task_protocol'}), publish_retry=Option( True, type='bool', old={'celery_task_publish_retry'}, ), publish_retry_policy=Option( {'max_retries': 3, 'interval_start': 0, 'interval_max': 1, 'interval_step': 0.2}, type='dict', old={'celery_task_publish_retry_policy'}, ), queues=Option(type='dict'), queue_max_priority=Option(None, type='int'), reject_on_worker_lost=Option(type='bool'), remote_tracebacks=Option(False, type='bool'), routes=Option(type='any'), send_sent_event=Option( False, type='bool', old={'celery_send_task_sent_event'}, ), serializer=Option('json', old={'celery_task_serializer'}), soft_time_limit=Option( type='float', old={'celeryd_task_soft_time_limit'}, ), time_limit=Option( type='float', old={'celeryd_task_time_limit'}, ), store_errors_even_if_ignored=Option(False, type='bool'), track_started=Option(False, type='bool'), ), worker=Namespace( __old__=OLD_NS_WORKER, agent=Option(None, type='string'), autoscaler=Option('celery.worker.autoscale:Autoscaler'), concurrency=Option(0, type='int'), consumer=Option('celery.worker.consumer:Consumer', type='string'), direct=Option(False, type='bool', old={'celery_worker_direct'}), disable_rate_limits=Option( False, type='bool', old={'celery_disable_rate_limits'}, ), enable_remote_control=Option( True, type='bool', old={'celery_enable_remote_control'}, ), hijack_root_logger=Option(True, type='bool'), log_color=Option(type='bool'), log_format=Option(DEFAULT_PROCESS_LOG_FMT), lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}), max_memory_per_child=Option(type='int'), max_tasks_per_child=Option(type='int'), pool=Option(DEFAULT_POOL), pool_putlocks=Option(True, type='bool'), pool_restarts=Option(False, type='bool'), proc_alive_timeout=Option(4.0, type='float'), prefetch_multiplier=Option(4, type='int'), redirect_stdouts=Option( True, type='bool', old={'celery_redirect_stdouts'}, ), redirect_stdouts_level=Option( 'WARNING', old={'celery_redirect_stdouts_level'}, ), send_task_events=Option( False, type='bool', old={'celery_send_events'}, ), state_db=Option(), task_log_format=Option(DEFAULT_TASK_LOG_FMT), timer=Option(type='string'), timer_precision=Option(1.0, type='float'), ), ) def _flatten_keys(ns, key, opt): return [(ns + key, opt)] def _to_compat(ns, key, opt): if opt.old: return [ (oldkey.format(key).upper(), ns + key, opt) for oldkey in opt.old ] return [((ns + key).upper(), ns + key, opt)]
[docs]def flatten(d, root='', keyfilter=_flatten_keys): """Flatten settings.""" stack = deque([(root, d)]) while stack: ns, options = stack.popleft() for key, opt in options.items(): if isinstance(opt, dict): stack.append((ns + key + '_', opt)) else: yield from keyfilter(ns, key, opt)
DEFAULTS = { key: opt.default for key, opt in flatten(NAMESPACES) } __compat = list(flatten(NAMESPACES, keyfilter=_to_compat)) _OLD_DEFAULTS = {old_key: opt.default for old_key, _, opt in __compat} _TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat} _TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat} __compat = None SETTING_KEYS = set(DEFAULTS.keys()) _OLD_SETTING_KEYS = set(_TO_NEW_KEY.keys()) def find_deprecated_settings(source): # pragma: no cover from celery.utils import deprecated for name, opt in flatten(NAMESPACES): if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None): deprecated.warn(description=f'The {name!r} setting', deprecation=opt.deprecate_by, removal=opt.remove_by, alternative=f'Use the {opt.alt} instead') return source
[docs]@memoize(maxsize=None) def find(name, namespace='celery'): """Find setting by name.""" # - Try specified name-space first. namespace = namespace.lower() try: return searchresult( namespace, name.lower(), NAMESPACES[namespace][name.lower()], ) except KeyError: # - Try all the other namespaces. for ns, opts in NAMESPACES.items(): if ns.lower() == name.lower(): return searchresult(None, ns, opts) elif isinstance(opts, dict): try: return searchresult(ns, name.lower(), opts[name.lower()]) except KeyError: pass # - See if name is a qualname last. return searchresult(None, name.lower(), DEFAULTS[name.lower()])