This document describes the current stable version of Celery (4.2). For development docs, go here.

Source code for celery.utils.collections

# -*- coding: utf-8 -*-
"""Custom maps, sets, sequences, and other data structures."""
from __future__ import absolute_import, unicode_literals

import sys
from collections import Callable, Mapping, MutableMapping, MutableSet
from collections import OrderedDict as _OrderedDict
from collections import Sequence, deque
from heapq import heapify, heappop, heappush
from itertools import chain, count

from celery.five import (PY3, Empty, items, keys, monotonic,
                         python_2_unicode_compatible, values)

from .functional import first, uniq
from .text import match_case

try:
    # pypy: dicts are ordered in recent versions
    from __pypy__ import reversed_dict as _dict_is_ordered
except ImportError:
    _dict_is_ordered = None

try:
    from django.utils.functional import LazyObject, LazySettings
except ImportError:
    class LazyObject(object):  # noqa
        pass
    LazySettings = LazyObject  # noqa

__all__ = (
    'AttributeDictMixin', 'AttributeDict', 'BufferMap', 'ChainMap',
    'ConfigurationView', 'DictAttribute', 'Evictable',
    'LimitedSet', 'Messagebuffer', 'OrderedDict',
    'force_mapping', 'lpmerge',
)

REPR_LIMITED_SET = """\
<{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
"""


[docs]def force_mapping(m): # type: (Any) -> Mapping """Wrap object into supporting the mapping interface if necessary.""" if isinstance(m, (LazyObject, LazySettings)): m = m._wrapped
return DictAttribute(m) if not isinstance(m, Mapping) else m
[docs]def lpmerge(L, R): # type: (Mapping, Mapping) -> Mapping """In place left precedent dictionary merge. Keeps values from `L`, if the value in `R` is :const:`None`. """ setitem = L.__setitem__ [setitem(k, v) for k, v in items(R) if v is not None]
return L
[docs]class OrderedDict(_OrderedDict): """Dict where insertion order matters.""" if PY3: # pragma: no cover def _LRUkey(self): # type: () -> Any # return value of od.keys does not support __next__, # but this version will also not create a copy of the list. return next(iter(keys(self))) else: if _dict_is_ordered: # pragma: no cover def _LRUkey(self): # type: () -> Any # iterkeys is iterable. return next(self.iterkeys()) else: def _LRUkey(self): # type: () -> Any return self._OrderedDict__root[1][2] if not hasattr(_OrderedDict, 'move_to_end'): if _dict_is_ordered: # pragma: no cover def move_to_end(self, key, last=True): # type: (Any, bool) -> None if not last: # we don't use this argument, and the only way to # implement this on PyPy seems to be O(n): creating a # copy with the order changed, so we just raise. raise NotImplementedError('no last=True on PyPy') self[key] = self.pop(key) else:
[docs] def move_to_end(self, key, last=True): # type: (Any, bool) -> None link = self._OrderedDict__map[key] link_prev = link[0] link_next = link[1] link_prev[1] = link_next link_next[0] = link_prev root = self._OrderedDict__root if last: last = root[0] link[0] = last link[1] = root last[1] = root[0] = link else: first_node = root[1] link[0] = root link[1] = first_node
root[1] = first_node[0] = link
[docs]class AttributeDictMixin(object): """Mixin for Mapping interface that adds attribute access. I.e., `d.key -> d[key]`). """ def __getattr__(self, k): # type: (str) -> Any """`d.key -> d[key]`.""" try: return self[k] except KeyError: raise AttributeError( '{0!r} object has no attribute {1!r}'.format( type(self).__name__, k)) def __setattr__(self, key, value): # type: (str, Any) -> None """`d[key] = value -> d.key = value`."""
self[key] = value
[docs]class AttributeDict(dict, AttributeDictMixin):
"""Dict subclass with attribute access."""
[docs]class DictAttribute(object): """Dict interface to attributes. `obj[k] -> obj.k` `obj[k] = val -> obj.k = val` """ obj = None def __init__(self, obj): # type: (Any) -> None object.__setattr__(self, 'obj', obj) def __getattr__(self, key): # type: (Any) -> Any return getattr(self.obj, key) def __setattr__(self, key, value): # type: (Any, Any) -> None return setattr(self.obj, key, value)
[docs] def get(self, key, default=None): # type: (Any, Any) -> Any try: return self[key] except KeyError:
return default
[docs] def setdefault(self, key, default=None): # type: (Any, Any) -> None if key not in self:
self[key] = default def __getitem__(self, key): # type: (Any) -> Any try: return getattr(self.obj, key) except AttributeError: raise KeyError(key) def __setitem__(self, key, value): # type: (Any, Any) -> Any setattr(self.obj, key, value) def __contains__(self, key): # type: (Any) -> bool return hasattr(self.obj, key) def _iterate_keys(self): # type: () -> Iterable return iter(dir(self.obj)) iterkeys = _iterate_keys def __iter__(self): # type: () -> Iterable return self._iterate_keys() def _iterate_items(self): # type: () -> Iterable for key in self._iterate_keys(): yield key, getattr(self.obj, key) iteritems = _iterate_items def _iterate_values(self): # type: () -> Iterable for key in self._iterate_keys(): yield getattr(self.obj, key) itervalues = _iterate_values if sys.version_info[0] == 3: # pragma: no cover items = _iterate_items keys = _iterate_keys values = _iterate_values else:
[docs] def keys(self): # type: () -> List[Any]
return list(self)
[docs] def items(self): # type: () -> List[Tuple[Any, Any]]
return list(self._iterate_items())
[docs] def values(self): # type: () -> List[Any]
return list(self._iterate_values()) MutableMapping.register(DictAttribute) # noqa: E305
[docs]class ChainMap(MutableMapping): """Key lookup on a sequence of maps.""" key_t = None changes = None defaults = None maps = None def __init__(self, *maps, **kwargs): # type: (*Mapping, **Any) -> None maps = list(maps or [{}]) self.__dict__.update( key_t=kwargs.get('key_t'), maps=maps, changes=maps[0], defaults=maps[1:], )
[docs] def add_defaults(self, d): # type: (Mapping) -> None d = force_mapping(d) self.defaults.insert(0, d)
self.maps.insert(1, d)
[docs] def pop(self, key, *default): # type: (Any, *Any) -> Any try: return self.maps[0].pop(key, *default) except KeyError: raise KeyError(
'Key not found in the first mapping: {!r}'.format(key)) def __missing__(self, key): # type: (Any) -> Any raise KeyError(key) def _key(self, key): # type: (Any) -> Any return self.key_t(key) if self.key_t is not None else key def __getitem__(self, key): # type: (Any) -> Any _key = self._key(key) for mapping in self.maps: try: return mapping[_key] except KeyError: pass return self.__missing__(key) def __setitem__(self, key, value): # type: (Any, Any) -> None self.changes[self._key(key)] = value def __delitem__(self, key): # type: (Any) -> None try: del self.changes[self._key(key)] except KeyError: raise KeyError('Key not found in first mapping: {0!r}'.format(key))
[docs] def clear(self): # type: () -> None
self.changes.clear()
[docs] def get(self, key, default=None): # type: (Any, Any) -> Any try: return self[self._key(key)] except KeyError:
return default def __len__(self): # type: () -> int return len(set().union(*self.maps)) def __iter__(self): return self._iterate_keys() def __contains__(self, key): # type: (Any) -> bool key = self._key(key) return any(key in m for m in self.maps) def __bool__(self): # type: () -> bool return any(self.maps) __nonzero__ = __bool__ # Py2
[docs] def setdefault(self, key, default=None): # type: (Any, Any) -> None key = self._key(key) if key not in self:
self[key] = default
[docs] def update(self, *args, **kwargs): # type: (*Any, **Any) -> Any
return self.changes.update(*args, **kwargs) def __repr__(self): # type: () -> str return '{0.__class__.__name__}({1})'.format( self, ', '.join(map(repr, self.maps)))
[docs] @classmethod def fromkeys(cls, iterable, *args): # type: (type, Iterable, *Any) -> 'ChainMap' """Create a ChainMap with a single dict created from the iterable."""
return cls(dict.fromkeys(iterable, *args))
[docs] def copy(self): # type: () -> 'ChainMap'
return self.__class__(self.maps[0].copy(), *self.maps[1:]) __copy__ = copy # Py2 def _iter(self, op): # type: (Callable) -> Iterable # defaults must be first in the stream, so values in # changes take precedence. # pylint: disable=bad-reversed-sequence # Someone should teach pylint about properties. return chain(*[op(d) for d in reversed(self.maps)]) def _iterate_keys(self): # type: () -> Iterable return uniq(self._iter(lambda d: d.keys())) iterkeys = _iterate_keys def _iterate_items(self): # type: () -> Iterable return ((key, self[key]) for key in self) iteritems = _iterate_items def _iterate_values(self): # type: () -> Iterable return (self[key] for key in self) itervalues = _iterate_values if sys.version_info[0] == 3: # pragma: no cover keys = _iterate_keys items = _iterate_items values = _iterate_values else: # noqa
[docs] def keys(self): # type: () -> List[Any]
return list(self._iterate_keys())
[docs] def items(self): # type: () -> List[Tuple[Any, Any]]
return list(self._iterate_items())
[docs] def values(self): # type: () -> List[Any]
return list(self._iterate_values())
[docs]@python_2_unicode_compatible class ConfigurationView(ChainMap, AttributeDictMixin): """A view over an applications configuration dictionaries. Custom (but older) version of :class:`collections.ChainMap`. If the key does not exist in ``changes``, the ``defaults`` dictionaries are consulted. Arguments: changes (Mapping): Map of configuration changes. defaults (List[Mapping]): List of dictionaries containing the default configuration. """ def __init__(self, changes, defaults=None, keys=None, prefix=None): # type: (Mapping, Mapping, List[str], str) -> None defaults = [] if defaults is None else defaults super(ConfigurationView, self).__init__(changes, *defaults) self.__dict__.update( prefix=prefix.rstrip('_') + '_' if prefix else prefix, _keys=keys, ) def _to_keys(self, key): # type: (str) -> Sequence[str] prefix = self.prefix if prefix: pkey = prefix + key if not key.startswith(prefix) else key return match_case(pkey, prefix), key return key, def __getitem__(self, key): # type: (str) -> Any keys = self._to_keys(key) getitem = super(ConfigurationView, self).__getitem__ for k in keys + ( tuple(f(key) for f in self._keys) if self._keys else ()): try: return getitem(k) except KeyError: pass try: # support subclasses implementing __missing__ return self.__missing__(key) except KeyError: if len(keys) > 1: raise KeyError( 'Key not found: {0!r} (with prefix: {0!r})'.format(*keys)) raise def __setitem__(self, key, value): # type: (str, Any) -> Any self.changes[self._key(key)] = value
[docs] def first(self, *keys): # type: (*str) -> Any
return first(None, (self.get(key) for key in keys))
[docs] def get(self, key, default=None): # type: (str, Any) -> Any try: return self[key] except KeyError:
return default
[docs] def clear(self): # type: () -> None """Remove all changes, but keep defaults."""
self.changes.clear() def __contains__(self, key): # type: (str) -> bool keys = self._to_keys(key) return any(any(k in m for k in keys) for m in self.maps)
[docs] def swap_with(self, other): # type: (ConfigurationView) -> None changes = other.__dict__['changes'] defaults = other.__dict__['defaults'] self.__dict__.update( changes=changes, defaults=defaults, key_t=other.__dict__['key_t'], prefix=other.__dict__['prefix'], maps=[changes] + defaults
)
[docs]@python_2_unicode_compatible class LimitedSet(object): """Kind-of Set (or priority queue) with limitations. Good for when you need to test for membership (`a in set`), but the set should not grow unbounded. ``maxlen`` is enforced at all times, so if the limit is reached we'll also remove non-expired items. You can also configure ``minlen``: this is the minimal residual size of the set. All arguments are optional, and no limits are enabled by default. Arguments: maxlen (int): Optional max number of items. Adding more items than ``maxlen`` will result in immediate removal of items sorted by oldest insertion time. expires (float): TTL for all items. Expired items are purged as keys are inserted. minlen (int): Minimal residual size of this set. .. versionadded:: 4.0 Value must be less than ``maxlen`` if both are configured. Older expired items will be deleted, only after the set exceeds ``minlen`` number of items. data (Sequence): Initial data to initialize set with. Can be an iterable of ``(key, value)`` pairs, a dict (``{key: insertion_time}``), or another instance of :class:`LimitedSet`. Example: >>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000) >>> for i in range(60000): ... s.add(i) ... s.add(str(i)) ... >>> 57000 in s # last 50k inserted values are kept True >>> '10' in s # '10' did expire and was purged from set. False >>> len(s) # maxlen is reached 50000 >>> s.purge(now=monotonic() + 7200) # clock + 2 hours >>> len(s) # now only minlen items are cached 4000 >>>> 57000 in s # even this item is gone now False """ max_heap_percent_overload = 15 def __init__(self, maxlen=0, expires=0, data=None, minlen=0): # type: (int, float, Mapping, int) -> None self.maxlen = 0 if maxlen is None else maxlen self.minlen = 0 if minlen is None else minlen self.expires = 0 if expires is None else expires self._data = {} self._heap = [] if data: # import items from data self.update(data) if not self.maxlen >= self.minlen >= 0: raise ValueError( 'minlen must be a positive number, less or equal to maxlen.') if self.expires < 0: raise ValueError('expires cannot be negative!') def _refresh_heap(self): # type: () -> None """Time consuming recreating of heap. Don't run this too often.""" self._heap[:] = [entry for entry in values(self._data)] heapify(self._heap) def _maybe_refresh_heap(self): # type: () -> None if self._heap_overload >= self.max_heap_percent_overload: self._refresh_heap()
[docs] def clear(self): # type: () -> None """Clear all data, start from scratch again.""" self._data.clear()
self._heap[:] = []
[docs] def add(self, item, now=None): # type: (Any, float) -> None """Add a new item, or reset the expiry time of an existing item.""" now = now or monotonic() if item in self._data: self.discard(item) entry = (now, item) self._data[item] = entry heappush(self._heap, entry) if self.maxlen and len(self._data) >= self.maxlen:
self.purge()
[docs] def update(self, other): # type: (Iterable) -> None """Update this set from other LimitedSet, dict or iterable.""" if not other: return if isinstance(other, LimitedSet): self._data.update(other._data) self._refresh_heap() self.purge() elif isinstance(other, dict): # revokes are sent as a dict for key, inserted in items(other): if isinstance(inserted, (tuple, list)): # in case someone uses ._data directly for sending update inserted = inserted[0] if not isinstance(inserted, float): raise ValueError( 'Expecting float timestamp, got type ' '{0!r} with value: {1}'.format( type(inserted), inserted)) self.add(key, inserted) else: # XXX AVOID THIS, it could keep old data if more parties # exchange them all over and over again for obj in other:
self.add(obj)
[docs] def discard(self, item): # type: (Any) -> None # mark an existing item as removed. If KeyError is not found, pass. self._data.pop(item, None)
self._maybe_refresh_heap() pop_value = discard
[docs] def purge(self, now=None): # type: (float) -> None """Check oldest items and remove them if needed. Arguments: now (float): Time of purging -- by default right now. This can be useful for unit testing. """ now = now or monotonic() now = now() if isinstance(now, Callable) else now if self.maxlen: while len(self._data) > self.maxlen: self.pop() # time based expiring: if self.expires: while len(self._data) > self.minlen >= 0: inserted_time, _ = self._heap[0] if inserted_time + self.expires > now: break # oldest item hasn't expired yet
self.pop()
[docs] def pop(self, default=None): # type: (Any) -> Any """Remove and return the oldest item, or :const:`None` when empty.""" while self._heap: _, item = heappop(self._heap) try: self._data.pop(item) except KeyError: pass else: return item
return default
[docs] def as_dict(self): # type: () -> Dict """Whole set as serializable dictionary. Example: >>> s = LimitedSet(maxlen=200) >>> r = LimitedSet(maxlen=200) >>> for i in range(500): ... s.add(i) ... >>> r.update(s.as_dict()) >>> r == s True """
return {key: inserted for inserted, key in values(self._data)} def __eq__(self, other): # type: (Any) -> bool return self._data == other._data def __ne__(self, other): # type: (Any) -> bool return not self.__eq__(other) def __repr__(self): # type: () -> str return REPR_LIMITED_SET.format( self, name=type(self).__name__, size=len(self), ) def __iter__(self): # type: () -> Iterable return (i for _, i in sorted(values(self._data))) def __len__(self): # type: () -> int return len(self._data) def __contains__(self, key): # type: (Any) -> bool return key in self._data def __reduce__(self): # type: () -> Any return self.__class__, ( self.maxlen, self.expires, self.as_dict(), self.minlen) def __bool__(self): # type: () -> bool return bool(self._data) __nonzero__ = __bool__ # Py2 @property def _heap_overload(self): # type: () -> float """Compute how much is heap bigger than data [percents]."""
return len(self._heap) * 100 / max(len(self._data), 1) - 100 MutableSet.register(LimitedSet) # noqa: E305
[docs]class Evictable(object): """Mixin for classes supporting the ``evict`` method.""" Empty = Empty
[docs] def evict(self): # type: () -> None """Force evict until maxsize is enforced."""
self._evict(range=count) def _evict(self, limit=100, range=range): # type: (int) -> None try: [self._evict1() for _ in range(limit)] except IndexError: pass def _evict1(self): # type: () -> None if self._evictcount <= self.maxsize: raise IndexError() try: self._pop_to_evict() except self.Empty:
raise IndexError()
[docs]@python_2_unicode_compatible class Messagebuffer(Evictable): """A buffer of pending messages.""" Empty = Empty def __init__(self, maxsize, iterable=None, deque=deque): # type: (int, Iterable, Any) -> None self.maxsize = maxsize self.data = deque(iterable or []) self._append = self.data.append self._pop = self.data.popleft self._len = self.data.__len__ self._extend = self.data.extend
[docs] def put(self, item): # type: (Any) -> None self._append(item)
self.maxsize and self._evict()
[docs] def extend(self, it): # type: (Iterable) -> None self._extend(it)
self.maxsize and self._evict()
[docs] def take(self, *default): # type: (*Any) -> Any try: return self._pop() except IndexError: if default: return default[0]
raise self.Empty() def _pop_to_evict(self): # type: () -> None return self.take() def __repr__(self): # type: () -> str return '<{0}: {1}/{2}>'.format( type(self).__name__, len(self), self.maxsize, ) def __iter__(self): # type: () -> Iterable while 1: try: yield self._pop() except IndexError: break def __len__(self): # type: () -> int return self._len() def __contains__(self, item): # type: () -> bool return item in self.data def __reversed__(self): # type: () -> Iterable return reversed(self.data) def __getitem__(self, index): # type: (Any) -> Any return self.data[index] @property def _evictcount(self): # type: () -> int
return len(self) Sequence.register(Messagebuffer) # noqa: E305
[docs]@python_2_unicode_compatible class BufferMap(OrderedDict, Evictable): """Map of buffers.""" Buffer = Messagebuffer Empty = Empty maxsize = None total = 0 bufmaxsize = None def __init__(self, maxsize, iterable=None, bufmaxsize=1000): # type: (int, Iterable, int) -> None super(BufferMap, self).__init__() self.maxsize = maxsize self.bufmaxsize = 1000 if iterable: self.update(iterable) self.total = sum(len(buf) for buf in items(self))
[docs] def put(self, key, item): # type: (Any, Any) -> None self._get_or_create_buffer(key).put(item) self.total += 1 self.move_to_end(key) # least recently used.
self.maxsize and self._evict()
[docs] def extend(self, key, it): # type: (Any, Iterable) -> None self._get_or_create_buffer(key).extend(it) self.total += len(it)
self.maxsize and self._evict()
[docs] def take(self, key, *default): # type: (Any, *Any) -> Any item, throw = None, False try: buf = self[key] except KeyError: throw = True else: try: item = buf.take() self.total -= 1 except self.Empty: throw = True else: self.move_to_end(key) # mark as LRU if throw: if default: return default[0] raise self.Empty()
return item def _get_or_create_buffer(self, key): # type: (Any) -> Messagebuffer try: return self[key] except KeyError: buf = self[key] = self._new_buffer() return buf def _new_buffer(self): # type: () -> Messagebuffer return self.Buffer(maxsize=self.bufmaxsize) def _LRUpop(self, *default): # type: (*Any) -> Any return self[self._LRUkey()].take(*default) def _pop_to_evict(self): # type: () -> None for _ in range(100): key = self._LRUkey() buf = self[key] try: buf.take() except (IndexError, self.Empty): # buffer empty, remove it from mapping. self.pop(key) else: # we removed one item self.total -= 1 # if buffer is empty now, remove it from mapping. if not len(buf): self.pop(key) else: # move to least recently used. self.move_to_end(key) break def __repr__(self): # type: () -> str return '<{0}: {1}/{2}>'.format( type(self).__name__, self.total, self.maxsize, ) @property def _evictcount(self): # type: () -> int
return self.total