This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for

"""Event receiver implementation."""
import time
from operator import itemgetter

from kombu import Queue
from kombu.connection import maybe_channel
from kombu.mixins import ConsumerMixin

from celery import uuid
from import app_or_default
from celery.utils.time import adjust_timestamp

from .event import get_exchange

__all__ = ('EventReceiver',)


_TZGETTER = itemgetter('utcoffset', 'timestamp')

[docs] class EventReceiver(ConsumerMixin): """Capture events. Arguments: connection (kombu.Connection): Connection to the broker. handlers (Mapping[Callable]): Event handlers. This is a map of event type names and their handlers. The special handler `"*"` captures all events that don't have a handler. """ app = None def __init__(self, channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix=None, accept=None, queue_ttl=None, queue_expires=None): = app_or_default(app or = maybe_channel(channel) self.handlers = {} if handlers is None else handlers self.routing_key = routing_key self.node_id = node_id or uuid() self.queue_prefix = queue_prefix or = get_exchange( self.connection or, if queue_ttl is None: queue_ttl = if queue_expires is None: queue_expires = self.queue = Queue( '.'.join([self.queue_prefix, self.node_id]),, routing_key=self.routing_key, auto_delete=True, durable=False, message_ttl=queue_ttl, expires=queue_expires, ) self.clock = self.adjust_clock = self.clock.adjust self.forward_clock = self.clock.forward if accept is None: accept = {, 'json'} self.accept = accept
[docs] def process(self, type, event): """Process event by dispatching to configured handler.""" handler = self.handlers.get(type) or self.handlers.get('*') handler and handler(event)
[docs] def get_consumers(self, Consumer, channel): return [Consumer(queues=[self.queue], callbacks=[self._receive], no_ack=True, accept=self.accept)]
[docs] def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs): if wakeup: self.wakeup_workers(channel=channel)
[docs] def itercapture(self, limit=None, timeout=None, wakeup=True): return self.consume(limit=limit, timeout=timeout, wakeup=wakeup)
[docs] def capture(self, limit=None, timeout=None, wakeup=True): """Open up a consumer capturing events. This has to run in the main process, and it will never stop unless :attr:`EventDispatcher.should_stop` is set to True, or forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`. """ for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup): pass
[docs] def wakeup_workers(self, channel=None):'heartbeat', connection=self.connection, channel=channel)
[docs] def event_from_message(self, body, localize=True, now=time.time, tzfields=_TZGETTER, adjust_timestamp=adjust_timestamp, CLIENT_CLOCK_SKEW=CLIENT_CLOCK_SKEW): type = body['type'] if type == 'task-sent': # clients never sync so cannot use their clock value _c = body['clock'] = (self.clock.value or 1) + CLIENT_CLOCK_SKEW self.adjust_clock(_c) else: try: clock = body['clock'] except KeyError: body['clock'] = self.forward_clock() else: self.adjust_clock(clock) if localize: try: offset, timestamp = tzfields(body) except KeyError: pass else: body['timestamp'] = adjust_timestamp(timestamp, offset) body['local_received'] = now() return type, body
def _receive(self, body, message, list=list, isinstance=isinstance): if isinstance(body, list): # celery 4.0+: List of events process, from_message = self.process, self.event_from_message [process(*from_message(event)) for event in body] else: self.process(*self.event_from_message(body)) @property def connection(self): return if else None