This document describes the current stable version of Celery (4.0). For development docs, go here.
Source code for celery.app.utils
# -*- coding: utf-8 -*-
"""App utilities: Compat settings, bug-report tool, pickling apps."""
from __future__ import absolute_import, unicode_literals
import os
import platform as _platform
import re
from collections import Mapping, namedtuple
from copy import deepcopy
from types import ModuleType
from kombu.utils.url import maybe_sanitize_url
from celery.exceptions import ImproperlyConfigured
from celery.five import items, keys, string_t, values
from celery.platforms import pyimplementation
from celery.utils.collections import ConfigurationView
from celery.utils.text import pretty
from celery.utils.imports import import_from_cwd, symbol_by_name, qualname
from .defaults import (
_TO_NEW_KEY, _TO_OLD_KEY, _OLD_DEFAULTS, _OLD_SETTING_KEYS,
DEFAULTS, SETTING_KEYS, find,
)
__all__ = [
'Settings', 'appstr', 'bugreport',
'filter_hidden_settings', 'find_app',
]
#: Format used to generate bug-report information.
BUGREPORT_INFO = """
software -> celery:{celery_v} kombu:{kombu_v} py:{py_v}
billiard:{billiard_v} {driver_v}
platform -> system:{system} arch:{arch} imp:{py_i}
loader -> {loader}
settings -> transport:{transport} results:{results}
{human_settings}
"""
HIDDEN_SETTINGS = re.compile(
'API|TOKEN|KEY|SECRET|PASS|PROFANITIES_LIST|SIGNATURE|DATABASE',
re.IGNORECASE,
)
E_MIX_OLD_INTO_NEW = """
Cannot mix new and old setting keys, please rename the
following settings to the new format:
{renames}
"""
E_MIX_NEW_INTO_OLD = """
Cannot mix new setting names with old setting names, please
rename the following settings to use the old format:
{renames}
Or change all of the settings to use the new format :)
"""
FMT_REPLACE_SETTING = '{replace:<36} -> {with_}'
[docs]def appstr(app):
"""String used in __repr__ etc, to id app instances."""
return '{0}:{1:#x}'.format(app.main or '__main__', id(app))
[docs]class Settings(ConfigurationView):
"""Celery settings object.
.. seealso:
:ref:`configuration` for a full list of configuration keys.
"""
@property
def broker_read_url(self):
return (
os.environ.get('CELERY_BROKER_READ_URL') or
self.get('broker_read_url') or
self.broker_url
)
@property
def broker_write_url(self):
return (
os.environ.get('CELERY_BROKER_WRITE_URL') or
self.get('broker_write_url') or
self.broker_url
)
@property
def broker_url(self):
return (
os.environ.get('CELERY_BROKER_URL') or
self.first('broker_url', 'broker_host')
)
@property
def task_default_exchange(self):
return self.first(
'task_default_exchange',
'task_default_queue',
)
@property
def task_default_routing_key(self):
return self.first(
'task_default_routing_key',
'task_default_queue',
)
@property
def timezone(self):
# this way we also support django's time zone.
return self.first('timezone', 'time_zone')
[docs] def without_defaults(self):
"""Return the current configuration, but without defaults."""
# the last stash is the default settings, so just skip that
return Settings({}, self.maps[:-1])
[docs] def find_option(self, name, namespace=''):
"""Search for option by name.
Example:
>>> from proj.celery import app
>>> app.conf.find_option('disable_rate_limits')
('worker', 'prefetch_multiplier',
<Option: type->bool default->False>))
Arguments:
name (str): Name of option, cannot be partial.
namespace (str): Preferred name-space (``None`` by default).
Returns:
Tuple: of ``(namespace, key, type)``.
"""
return find(name, namespace)
[docs] def find_value_for_key(self, name, namespace='celery'):
"""Shortcut to ``get_by_parts(*find_option(name)[:-1])``."""
return self.get_by_parts(*self.find_option(name, namespace)[:-1])
[docs] def get_by_parts(self, *parts):
"""Return the current value for setting specified as a path.
Example:
>>> from proj.celery import app
>>> app.conf.get_by_parts('worker', 'disable_rate_limits')
False
"""
return self['_'.join(part for part in parts if part)]
[docs] def finalize(self):
# See PendingConfiguration in celery/app/base.py
# first access will read actual configuration.
try:
self['__bogus__']
except KeyError:
pass
return self
[docs] def table(self, with_defaults=False, censored=True):
filt = filter_hidden_settings if censored else lambda v: v
dict_members = dir(dict)
self.finalize()
return filt({
k: v for k, v in items(
self if with_defaults else self.without_defaults())
if not k.startswith('_') and k not in dict_members
})
[docs] def humanize(self, with_defaults=False, censored=True):
"""Return a human readable text showing configuration changes."""
return '\n'.join(
'{0}: {1}'.format(key, pretty(value, width=50))
for key, value in items(self.table(with_defaults, censored)))
def _new_key_to_old(key, convert=_TO_OLD_KEY.get):
return convert(key, key)
def _old_key_to_new(key, convert=_TO_NEW_KEY.get):
return convert(key, key)
_settings_info_t = namedtuple('settings_info_t', (
'defaults', 'convert', 'key_t', 'mix_error',
))
_settings_info = _settings_info_t(
DEFAULTS, _TO_NEW_KEY, _old_key_to_new, E_MIX_OLD_INTO_NEW,
)
_old_settings_info = _settings_info_t(
_OLD_DEFAULTS, _TO_OLD_KEY, _new_key_to_old, E_MIX_NEW_INTO_OLD,
)
def detect_settings(conf, preconf={}, ignore_keys=set(), prefix=None,
all_keys=SETTING_KEYS, old_keys=_OLD_SETTING_KEYS):
source = conf
if conf is None:
source, conf = preconf, {}
have = set(keys(source)) - ignore_keys
is_in_new = have.intersection(all_keys)
is_in_old = have.intersection(old_keys)
info = None
if is_in_new:
# have new setting names
info, left = _settings_info, is_in_old
if is_in_old and len(is_in_old) > len(is_in_new):
# Majority of the settings are old.
info, left = _old_settings_info, is_in_new
if is_in_old:
# have old setting names, or a majority of the names are old.
if not info:
info, left = _old_settings_info, is_in_new
if is_in_new and len(is_in_new) > len(is_in_old):
# Majority of the settings are new
info, left = _settings_info, is_in_old
else:
# no settings, just use new format.
info, left = _settings_info, is_in_old
if prefix:
# always use new format if prefix is used.
info, left = _settings_info, set()
# only raise error for keys that the user didn't provide two keys
# for (e.g., both ``result_expires`` and ``CELERY_TASK_RESULT_EXPIRES``).
really_left = {key for key in left if info.convert[key] not in have}
if really_left:
# user is mixing old/new, or new/old settings, give renaming
# suggestions.
raise ImproperlyConfigured(info.mix_error.format(renames='\n'.join(
FMT_REPLACE_SETTING.format(replace=key, with_=info.convert[key])
for key in sorted(really_left)
)))
preconf = {info.convert.get(k, k): v for k, v in items(preconf)}
defaults = dict(deepcopy(info.defaults), **preconf)
return Settings(
preconf, [conf, defaults],
(_old_key_to_new, _new_key_to_old),
prefix=prefix,
)
class AppPickler(object):
"""Old application pickler/unpickler (< 3.1)."""
def __call__(self, cls, *args):
kwargs = self.build_kwargs(*args)
app = self.construct(cls, **kwargs)
self.prepare(app, **kwargs)
return app
def prepare(self, app, **kwargs):
app.conf.update(kwargs['changes'])
def build_kwargs(self, *args):
return self.build_standard_kwargs(*args)
def build_standard_kwargs(self, main, changes, loader, backend, amqp,
events, log, control, accept_magic_kwargs,
config_source=None):
return dict(main=main, loader=loader, backend=backend, amqp=amqp,
changes=changes, events=events, log=log, control=control,
set_as_current=False,
config_source=config_source)
def construct(self, cls, **kwargs):
return cls(**kwargs)
def _unpickle_app(cls, pickler, *args):
"""Rebuild app for versions 2.5+."""
return pickler()(cls, *args)
def _unpickle_app_v2(cls, kwargs):
"""Rebuild app for versions 3.1+."""
kwargs['set_as_current'] = False
return cls(**kwargs)
[docs]def bugreport(app):
"""Return a string containing information useful in bug-reports."""
import billiard
import celery
import kombu
try:
conn = app.connection()
driver_v = '{0}:{1}'.format(conn.transport.driver_name,
conn.transport.driver_version())
transport = conn.transport_cls
except Exception: # pylint: disable=broad-except
transport = driver_v = ''
return BUGREPORT_INFO.format(
system=_platform.system(),
arch=', '.join(x for x in _platform.architecture() if x),
py_i=pyimplementation(),
celery_v=celery.VERSION_BANNER,
kombu_v=kombu.__version__,
billiard_v=billiard.__version__,
py_v=_platform.python_version(),
driver_v=driver_v,
transport=transport,
results=maybe_sanitize_url(app.conf.result_backend or 'disabled'),
human_settings=app.conf.humanize(),
loader=qualname(app.loader.__class__),
)
[docs]def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd):
"""Find app by name."""
from .base import Celery
try:
sym = symbol_by_name(app, imp=imp)
except AttributeError:
# last part was not an attribute, but a module
sym = imp(app)
if isinstance(sym, ModuleType) and ':' not in app:
try:
found = sym.app
if isinstance(found, ModuleType):
raise AttributeError()
except AttributeError:
try:
found = sym.celery
if isinstance(found, ModuleType):
raise AttributeError()
except AttributeError:
if getattr(sym, '__path__', None):
try:
return find_app(
'{0}.celery'.format(app),
symbol_by_name=symbol_by_name, imp=imp,
)
except ImportError:
pass
for suspect in values(vars(sym)):
if isinstance(suspect, Celery):
return suspect
raise
else:
return found
else:
return found
return sym