This document describes the current stable version of Celery (4.2). For development docs, go here.

Source code for celery.loaders.base

# -*- coding: utf-8 -*-
"""Loader base class."""
from __future__ import absolute_import, unicode_literals

import importlib
import os
import re
import sys
from datetime import datetime

from kombu.utils import json
from kombu.utils.objects import cached_property

from celery import signals
from celery.five import reraise, string_t
from celery.utils.collections import DictAttribute, force_mapping
from celery.utils.functional import maybe_list
from celery.utils.imports import (NotAPackage, find_module, import_from_cwd,

__all__ = ('BaseLoader',)


Error: Module '{module}' doesn't exist, or it's not a valid \
Python module name.

Did you mean '{suggest}'?

unconfigured = object()

[docs]class BaseLoader(object): """Base class for loaders. Loaders handles, * Reading celery client/worker configurations. * What happens when a task starts? See :meth:`on_task_init`. * What happens when the worker starts? See :meth:`on_worker_init`. * What happens when the worker shuts down? See :meth:`on_worker_shutdown`. * What modules are imported to find tasks? """ builtin_modules = frozenset() configured = False override_backends = {} worker_initialized = False _conf = unconfigured def __init__(self, app, **kwargs): = app self.task_modules = set()
[docs] def now(self, utc=True): if utc: return datetime.utcnow()
[docs] def on_task_init(self, task_id, task):
"""Called before a task is executed."""
[docs] def on_process_cleanup(self):
"""Called after a task is executed."""
[docs] def on_worker_init(self):
"""Called when the worker (:program:`celery worker`) starts."""
[docs] def on_worker_shutdown(self):
"""Called when the worker (:program:`celery worker`) shuts down."""
[docs] def on_worker_process_init(self):
"""Called when a child process starts."""
[docs] def import_task_module(self, module): self.task_modules.add(module)
return self.import_from_cwd(module)
[docs] def import_module(self, module, package=None):
return importlib.import_module(module, package=package)
[docs] def import_from_cwd(self, module, imp=None, package=None): return import_from_cwd( module, self.import_module if imp is None else imp, package=package,
[docs] def import_default_modules(self): responses = signals.import_modules.send( # Prior to this point loggers are not yet set up properly, need to # check responses manually and reraised exceptions if any, otherwise # they'll be silenced, making it incredibly difficult to debug. for _, response in responses: if isinstance(response, Exception): raise response
return [self.import_task_module(m) for m in self.default_modules]
[docs] def init_worker(self): if not self.worker_initialized: self.worker_initialized = True self.import_default_modules()
[docs] def shutdown_worker(self):
[docs] def init_worker_process(self):
[docs] def config_from_object(self, obj, silent=False): if isinstance(obj, string_t): try: obj = self._smart_import(obj, imp=self.import_from_cwd) except (ImportError, AttributeError): if silent: return False raise self._conf = force_mapping(obj)
return True def _smart_import(self, path, imp=None): imp = self.import_module if imp is None else imp if ':' in path: # Path includes attribute so can just jump # here (e.g., ``os.path:abspath``). return symbol_by_name(path, imp=imp) # Not sure if path is just a module name or if it includes an # attribute name (e.g., ``os.path``, vs, ``os.path.abspath``). try: return imp(path) except ImportError: # Not a module name, so try module + attribute. return symbol_by_name(path, imp=imp) def _import_config_module(self, name): try: self.find_module(name) except NotAPackage: if name.endswith('.py'): reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format( module=name, suggest=name[:-3])), sys.exc_info()[2]) reraise(NotAPackage, NotAPackage(CONFIG_INVALID_NAME.format( module=name)), sys.exc_info()[2]) else: return self.import_from_cwd(name)
[docs] def find_module(self, module):
return find_module(module)
[docs] def cmdline_config_parser( self, args, namespace='celery', re_type=re.compile(r'\((\w+)\)'), extra_types={'json': json.loads}, override_types={'tuple': 'json', 'list': 'json', 'dict': 'json'}): from import Option, NAMESPACES namespace = namespace and namespace.lower() typemap = dict(Option.typemap, **extra_types) def getarg(arg): """Parse single configuration from command-line.""" # ## find key/value # ns.key=value|ns_key=value (case insensitive) key, value = arg.split('=', 1) key = key.lower().replace('.', '_') # ## find name-space. # .key=value|_key=value expands to default name-space. if key[0] == '_': ns, key = namespace, key[1:] else: # find name-space part of key ns, key = key.split('_', 1) ns_key = (ns and ns + '_' or '') + key # (type)value makes cast to custom type. cast = re_type.match(value) if cast: type_ = cast.groups()[0] type_ = override_types.get(type_, type_) value = value[len(] value = typemap[type_](value) else: try: value = NAMESPACES[ns.lower()][key].to_python(value) except ValueError as exc: # display key name in error message. raise ValueError('{0!r}: {1}'.format(ns_key, exc)) return ns_key, value
return dict(getarg(arg) for arg in args)
[docs] def read_configuration(self, env='CELERY_CONFIG_MODULE'): try: custom_config = os.environ[env] except KeyError: pass else: if custom_config: usercfg = self._import_config_module(custom_config)
return DictAttribute(usercfg)
[docs] def autodiscover_tasks(self, packages, related_name='tasks'): self.task_modules.update( mod.__name__ for mod in autodiscover_tasks(packages or (),
related_name) if mod)
[docs] @cached_property def default_modules(self): return ( tuple(self.builtin_modules) + tuple(maybe_list( + tuple(maybe_list(
) @property def conf(self): """Loader configuration.""" if self._conf is unconfigured: self._conf = self.read_configuration()
return self._conf def autodiscover_tasks(packages, related_name='tasks'): global _RACE_PROTECTION if _RACE_PROTECTION: return () _RACE_PROTECTION = True try: return [find_related_module(pkg, related_name) for pkg in packages] finally: _RACE_PROTECTION = False def find_related_module(package, related_name): """Find module in package.""" # Django 1.7 allows for speciying a class name in INSTALLED_APPS. # (Issue #2248). try: importlib.import_module(package) except ImportError: package, _, _ = package.rpartition('.') if not package: raise try: return importlib.import_module('{0}.{1}'.format(package, related_name)) except ImportError: return