This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for celery.worker.pidbox

from __future__ import absolute_import

import socket
import threading

from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str

from celery.datastructures import AttributeDict
from celery.utils.log import get_logger

from . import control

__all__ = ['Pidbox', 'gPidbox']

logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error,

[docs]class Pidbox(object): consumer = None def __init__(self, c): self.c = c self.hostname = c.hostname self.node = safe_str(c.hostname),, state=AttributeDict(, hostname=c.hostname, consumer=c), ) self._forward_clock =
[docs] def on_message(self, body, message): # just increase clock as clients usually don't # have a valid clock to adjust with. self._forward_clock() try: self.node.handle_message(body, message) except KeyError as exc: error('No such control command: %s', exc) except Exception as exc: error('Control command error: %r', exc, exc_info=True) self.reset()
[docs] def start(self, c): = self.consumer = self.node.listen(callback=self.on_message) self.consumer.on_decode_error = c.on_decode_error
[docs] def on_stop(self): pass
[docs] def stop(self, c): self.on_stop() self.consumer = self._close_channel(c)
[docs] def reset(self): """Sets up the process mailbox.""" self.stop(self.c) self.start(self.c)
def _close_channel(self, c): if self.node and ignore_errors(c,
[docs] def shutdown(self, c): self.on_stop() if self.consumer: debug('Canceling broadcast consumer...') ignore_errors(c, self.consumer.cancel) self.stop(self.c)
[docs]class gPidbox(Pidbox): _node_shutdown = None _node_stopped = None _resets = 0
[docs] def start(self, c): c.pool.spawn_n(self.loop, c)
[docs] def on_stop(self): if self._node_stopped: self._node_shutdown.set() debug('Waiting for broadcast thread to shutdown...') self._node_stopped.wait() self._node_stopped = self._node_shutdown = None
[docs] def reset(self): self._resets += 1
def _do_reset(self, c, connection): self._close_channel(c) = self.consumer = self.node.listen(callback=self.on_message) self.consumer.consume()
[docs] def loop(self, c): resets = [self._resets] shutdown = self._node_shutdown = threading.Event() stopped = self._node_stopped = threading.Event() try: with c.connect() as connection: info('pidbox: Connected to %s.', connection.as_uri()) self._do_reset(c, connection) while not shutdown.is_set() and c.connection: if resets[0] < self._resets: resets[0] += 1 self._do_reset(c, connection) try: connection.drain_events(timeout=1.0) except socket.timeout: pass finally: stopped.set()