This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for kombu.utils.imports

"""Import related utilities."""

from __future__ import annotations

import importlib
import sys

from kombu.exceptions import reraise

[docs] def symbol_by_name(name, aliases=None, imp=None, package=None, sep='.', default=None, **kwargs): """Get symbol by qualified name. The name should be the full dot-separated path to the class:: modulename.ClassName Example:: celery.concurrency.processes.TaskPool ^- class name or using ':' to separate module and symbol:: celery.concurrency.processes:TaskPool If `aliases` is provided, a dict containing short name/long name mappings, the name is looked up in the aliases first. Examples -------- >>> symbol_by_name('celery.concurrency.processes.TaskPool') <class 'celery.concurrency.processes.TaskPool'> >>> symbol_by_name('default', { ... 'default': 'celery.concurrency.processes.TaskPool'}) <class 'celery.concurrency.processes.TaskPool'> # Does not try to look up non-string names. >>> from celery.concurrency.processes import TaskPool >>> symbol_by_name(TaskPool) is TaskPool True """ aliases = {} if not aliases else aliases if imp is None: imp = importlib.import_module if not isinstance(name, str): return name # already a class name = aliases.get(name) or name sep = ':' if ':' in name else sep module_name, _, cls_name = name.rpartition(sep) if not module_name: cls_name, module_name = None, package if package else cls_name try: try: module = imp(module_name, package=package, **kwargs) except ValueError as exc: reraise(ValueError, ValueError(f"Couldn't import {name!r}: {exc}"), sys.exc_info()[2]) return getattr(module, cls_name) if cls_name else module except (ImportError, AttributeError): if default is None: raise return default