This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for kombu.asynchronous.timer

"""Timer scheduling Python callbacks."""

from __future__ import annotations

import heapq
import sys
from collections import namedtuple
from datetime import datetime
from functools import total_ordering
from time import monotonic
from time import time as _time
from typing import TYPE_CHECKING
from weakref import proxy as weakrefproxy

from vine.utils import wraps

from kombu.log import get_logger

if sys.version_info >= (3, 9):
    from zoneinfo import ZoneInfo
else:
    from backports.zoneinfo import ZoneInfo

if TYPE_CHECKING:
    from types import TracebackType

__all__ = ('Entry', 'Timer', 'to_timestamp')

logger = get_logger(__name__)

DEFAULT_MAX_INTERVAL = 2
EPOCH = datetime.fromtimestamp(0, ZoneInfo("UTC"))
IS_PYPY = hasattr(sys, 'pypy_version_info')

scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry'))


[docs] def to_timestamp(d, default_timezone=ZoneInfo("UTC"), time=monotonic): """Convert datetime to timestamp. If d' is already a timestamp, then that will be used. """ if isinstance(d, datetime): if d.tzinfo is None: d = d.replace(tzinfo=default_timezone) diff = _time() - time() return max((d - EPOCH).total_seconds() - diff, 0) return d
[docs] @total_ordering class Entry: """Schedule Entry.""" if not IS_PYPY: # pragma: no cover __slots__ = ( 'fun', 'args', 'kwargs', 'tref', 'canceled', '_last_run', '__weakref__', ) def __init__(self, fun, args=None, kwargs=None): self.fun = fun self.args = args or [] self.kwargs = kwargs or {} self.tref = weakrefproxy(self) self._last_run = None self.canceled = False def __call__(self): return self.fun(*self.args, **self.kwargs)
[docs] def cancel(self): try: self.tref.canceled = True except ReferenceError: # pragma: no cover pass
def __repr__(self): return '<TimerEntry: {}(*{!r}, **{!r})'.format( self.fun.__name__, self.args, self.kwargs) # must not use hash() to order entries def __lt__(self, other): return id(self) < id(other) @property def cancelled(self): return self.canceled @cancelled.setter def cancelled(self, value): self.canceled = value
class Timer: """Async timer implementation.""" Entry = Entry on_error = None def __init__(self, max_interval=None, on_error=None, **kwargs): self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL) self.on_error = on_error or self.on_error self._queue = [] def __enter__(self): return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: self.stop() def call_at(self, eta, fun, args=(), kwargs=None, priority=0): kwargs = {} if not kwargs else kwargs return self.enter_at(self.Entry(fun, args, kwargs), eta, priority) def call_after(self, secs, fun, args=(), kwargs=None, priority=0): kwargs = {} if not kwargs else kwargs return self.enter_after(secs, self.Entry(fun, args, kwargs), priority) def call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0): kwargs = {} if not kwargs else kwargs tref = self.Entry(fun, args, kwargs) @wraps(fun) def _reschedules(*args, **kwargs): last, now = tref._last_run, monotonic() lsince = (now - tref._last_run) if last else secs try: if lsince and lsince >= secs: tref._last_run = now return fun(*args, **kwargs) finally: if not tref.canceled: last = tref._last_run next = secs - (now - last) if last else secs self.enter_after(next, tref, priority) tref.fun = _reschedules tref._last_run = None return self.enter_after(secs, tref, priority) def enter_at(self, entry, eta=None, priority=0, time=monotonic): """Enter function into the scheduler. Arguments: --------- entry (~kombu.asynchronous.timer.Entry): Item to enter. eta (datetime.datetime): Scheduled time. priority (int): Unused. """ if eta is None: eta = time() if isinstance(eta, datetime): try: eta = to_timestamp(eta) except Exception as exc: if not self.handle_error(exc): raise return return self._enter(eta, priority, entry) def enter_after(self, secs, entry, priority=0, time=monotonic): return self.enter_at(entry, time() + float(secs), priority) def _enter(self, eta, priority, entry, push=heapq.heappush): push(self._queue, scheduled(eta, priority, entry)) return entry def apply_entry(self, entry): try: entry() except Exception as exc: if not self.handle_error(exc): logger.error('Error in timer: %r', exc, exc_info=True) def handle_error(self, exc_info): if self.on_error: self.on_error(exc_info) return True def stop(self): pass def __iter__(self, min=min, nowfun=monotonic, pop=heapq.heappop, push=heapq.heappush): """Iterate over schedule. This iterator yields a tuple of ``(wait_seconds, entry)``, where if entry is :const:`None` the caller should wait for ``wait_seconds`` until it polls the schedule again. """ max_interval = self.max_interval queue = self._queue while 1: if queue: eventA = queue[0] now, eta = nowfun(), eventA[0] if now < eta: yield min(eta - now, max_interval), None else: eventB = pop(queue) if eventB is eventA: entry = eventA[2] if not entry.canceled: yield None, entry continue else: push(queue, eventB) else: yield None, None def clear(self): self._queue[:] = [] # atomic, without creating a new list. def cancel(self, tref): tref.cancel() def __len__(self): return len(self._queue) def __nonzero__(self): return True @property def queue(self, _pop=heapq.heappop): """Snapshot of underlying datastructure.""" events = list(self._queue) return [_pop(v) for v in [events] * len(events)] @property def schedule(self): return self