This document describes the current stable version of Celery (4.0). For development docs, go here.

Source code for celery.worker.pidbox

"""Worker Pidbox (remote control)."""
from __future__ import absolute_import, unicode_literals
import socket
import threading
from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str
from celery.utils.collections import AttributeDict
from celery.utils.functional import pass1
from celery.utils.log import get_logger
from . import control

__all__ = ['Pidbox', 'gPidbox']

logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error, logger.info


[docs]class Pidbox(object): """Worker mailbox.""" consumer = None def __init__(self, c): self.c = c self.hostname = c.hostname self.node = c.app.control.mailbox.Node( safe_str(c.hostname), handlers=control.Panel.data, state=AttributeDict( app=c.app, hostname=c.hostname, consumer=c, tset=pass1 if c.controller.use_eventloop else set), ) self._forward_clock = self.c.app.clock.forward
[docs] def on_message(self, body, message): # just increase clock as clients usually don't # have a valid clock to adjust with. self._forward_clock() try: self.node.handle_message(body, message) except KeyError as exc: error('No such control command: %s', exc) except Exception as exc: error('Control command error: %r', exc, exc_info=True) self.reset()
[docs] def start(self, c): self.node.channel = c.connection.channel() self.consumer = self.node.listen(callback=self.on_message) self.consumer.on_decode_error = c.on_decode_error
[docs] def on_stop(self): pass
[docs] def stop(self, c): self.on_stop() self.consumer = self._close_channel(c)
[docs] def reset(self): self.stop(self.c) self.start(self.c)
def _close_channel(self, c): if self.node and self.node.channel: ignore_errors(c, self.node.channel.close)
[docs] def shutdown(self, c): self.on_stop() if self.consumer: debug('Canceling broadcast consumer...') ignore_errors(c, self.consumer.cancel) self.stop(self.c)
[docs]class gPidbox(Pidbox): """Worker pidbox (greenlet).""" _node_shutdown = None _node_stopped = None _resets = 0
[docs] def start(self, c): c.pool.spawn_n(self.loop, c)
[docs] def on_stop(self): if self._node_stopped: self._node_shutdown.set() debug('Waiting for broadcast thread to shutdown...') self._node_stopped.wait() self._node_stopped = self._node_shutdown = None
[docs] def reset(self): self._resets += 1
def _do_reset(self, c, connection): self._close_channel(c) self.node.channel = connection.channel() self.consumer = self.node.listen(callback=self.on_message) self.consumer.consume()
[docs] def loop(self, c): resets = [self._resets] shutdown = self._node_shutdown = threading.Event() stopped = self._node_stopped = threading.Event() try: with c.connection_for_read() as connection: info('pidbox: Connected to %s.', connection.as_uri()) self._do_reset(c, connection) while not shutdown.is_set() and c.connection: if resets[0] < self._resets: resets[0] += 1 self._do_reset(c, connection) try: connection.drain_events(timeout=1.0) except socket.timeout: pass finally: stopped.set()