This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for

"""Implementation for the shortcuts."""
from contextlib import contextmanager

from kombu.utils.objects import cached_property

[docs] class Events: """Implements""" receiver_cls = '' dispatcher_cls = '' state_cls = '' def __init__(self, app=None): = app @cached_property def Receiver(self): return self.receiver_cls, reverse='events.Receiver') @cached_property def Dispatcher(self): return self.dispatcher_cls, reverse='events.Dispatcher') @cached_property def State(self): return self.state_cls, reverse='events.State')
[docs] @contextmanager def default_dispatcher(self, hostname=None, enabled=True, buffer_while_offline=False): with as prod: # pylint: disable=too-many-function-args # This is a property pylint... with self.Dispatcher(prod.connection, hostname, enabled,, buffer_while_offline) as d: yield d