This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.events.dispatcher

"""Event dispatcher sends events."""

import os
import threading
import time
from collections import defaultdict, deque

from kombu import Producer

from celery.app import app_or_default
from celery.utils.nodenames import anon_nodename
from celery.utils.time import utcoffset

from .event import Event, get_exchange, group_from

__all__ = ('EventDispatcher',)


[docs]class EventDispatcher: """Dispatches event messages. Arguments: connection (kombu.Connection): Connection to the broker. hostname (str): Hostname to identify ourselves as, by default uses the hostname returned by :func:`~celery.utils.anon_nodename`. groups (Sequence[str]): List of groups to send events for. :meth:`send` will ignore send requests to groups not in this list. If this is :const:`None`, all events will be sent. Example groups include ``"task"`` and ``"worker"``. enabled (bool): Set to :const:`False` to not actually publish any events, making :meth:`send` a no-op. channel (kombu.Channel): Can be used instead of `connection` to specify an exact channel to use when sending events. buffer_while_offline (bool): If enabled events will be buffered while the connection is down. :meth:`flush` must be called as soon as the connection is re-established. Note: You need to :meth:`close` this after use. """ DISABLED_TRANSPORTS = {'sql'} app = None # set of callbacks to be called when :meth:`enabled`. on_enabled = None # set of callbacks to be called when :meth:`disabled`. on_disabled = None def __init__(self, connection=None, hostname=None, enabled=True, channel=None, buffer_while_offline=True, app=None, serializer=None, groups=None, delivery_mode=1, buffer_group=None, buffer_limit=24, on_send_buffered=None): self.app = app_or_default(app or self.app) self.connection = connection self.channel = channel self.hostname = hostname or anon_nodename() self.buffer_while_offline = buffer_while_offline self.buffer_group = buffer_group or frozenset() self.buffer_limit = buffer_limit self.on_send_buffered = on_send_buffered self._group_buffer = defaultdict(list) self.mutex = threading.Lock() self.producer = None self._outbound_buffer = deque() self.serializer = serializer or self.app.conf.event_serializer self.on_enabled = set() self.on_disabled = set() self.groups = set(groups or []) self.tzoffset = [-time.timezone, -time.altzone] self.clock = self.app.clock self.delivery_mode = delivery_mode if not connection and channel: self.connection = channel.connection.client self.enabled = enabled conninfo = self.connection or self.app.connection_for_write() self.exchange = get_exchange(conninfo, name=self.app.conf.event_exchange) if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS: self.enabled = False if self.enabled: self.enable() self.headers = {'hostname': self.hostname} self.pid = os.getpid() def __enter__(self): return self def __exit__(self, *exc_info): self.close()
[docs] def enable(self): self.producer = Producer(self.channel or self.connection, exchange=self.exchange, serializer=self.serializer, auto_declare=False) self.enabled = True for callback in self.on_enabled: callback()
[docs] def disable(self): if self.enabled: self.enabled = False self.close() for callback in self.on_disabled: callback()
[docs] def publish(self, type, fields, producer, blind=False, Event=Event, **kwargs): """Publish event using custom :class:`~kombu.Producer`. Arguments: type (str): Event type name, with group separated by dash (`-`). fields: Dictionary of event fields, must be json serializable. producer (kombu.Producer): Producer instance to use: only the ``publish`` method will be called. retry (bool): Retry in the event of connection failure. retry_policy (Mapping): Map of custom retry policy options. See :meth:`~kombu.Connection.ensure`. blind (bool): Don't set logical clock value (also don't forward the internal logical clock). Event (Callable): Event type used to create event. Defaults to :func:`Event`. utcoffset (Callable): Function returning the current utc offset in hours. """ clock = None if blind else self.clock.forward() event = Event(type, hostname=self.hostname, utcoffset=utcoffset(), pid=self.pid, clock=clock, **fields) with self.mutex: return self._publish(event, producer, routing_key=type.replace('-', '.'), **kwargs)
def _publish(self, event, producer, routing_key, retry=False, retry_policy=None, utcoffset=utcoffset): exchange = self.exchange try: producer.publish( event, routing_key=routing_key, exchange=exchange.name, retry=retry, retry_policy=retry_policy, declare=[exchange], serializer=self.serializer, headers=self.headers, delivery_mode=self.delivery_mode, ) except Exception as exc: # pylint: disable=broad-except if not self.buffer_while_offline: raise self._outbound_buffer.append((event, routing_key, exc))
[docs] def send(self, type, blind=False, utcoffset=utcoffset, retry=False, retry_policy=None, Event=Event, **fields): """Send event. Arguments: type (str): Event type name, with group separated by dash (`-`). retry (bool): Retry in the event of connection failure. retry_policy (Mapping): Map of custom retry policy options. See :meth:`~kombu.Connection.ensure`. blind (bool): Don't set logical clock value (also don't forward the internal logical clock). Event (Callable): Event type used to create event, defaults to :func:`Event`. utcoffset (Callable): unction returning the current utc offset in hours. **fields (Any): Event fields -- must be json serializable. """ if self.enabled: groups, group = self.groups, group_from(type) if groups and group not in groups: return if group in self.buffer_group: clock = self.clock.forward() event = Event(type, hostname=self.hostname, utcoffset=utcoffset(), pid=self.pid, clock=clock, **fields) buf = self._group_buffer[group] buf.append(event) if len(buf) >= self.buffer_limit: self.flush() elif self.on_send_buffered: self.on_send_buffered() else: return self.publish(type, fields, self.producer, blind=blind, Event=Event, retry=retry, retry_policy=retry_policy)
[docs] def flush(self, errors=True, groups=True): """Flush the outbound buffer.""" if errors: buf = list(self._outbound_buffer) try: with self.mutex: for event, routing_key, _ in buf: self._publish(event, self.producer, routing_key) finally: self._outbound_buffer.clear() if groups: with self.mutex: for group, events in self._group_buffer.items(): self._publish(events, self.producer, '%s.multi' % group) events[:] = [] # list.clear
[docs] def extend_buffer(self, other): """Copy the outbound buffer of another instance.""" self._outbound_buffer.extend(other._outbound_buffer)
[docs] def close(self): """Close the event dispatcher.""" self.mutex.locked() and self.mutex.release() self.producer = None
def _get_publisher(self): return self.producer def _set_publisher(self, producer): self.producer = producer publisher = property(_get_publisher, _set_publisher) # XXX compat