This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for celery.worker.heartbeat

# -*- coding: utf-8 -*-
"""
    celery.worker.heartbeat
    ~~~~~~~~~~~~~~~~~~~~~~~

    This is the internal thread that sends heartbeat events
    at regular intervals.

"""
from __future__ import absolute_import

from celery.utils.sysinfo import load_average

from .state import SOFTWARE_INFO, active_requests, all_total_count

__all__ = ['Heart']


[docs]class Heart(object): """Timer sending heartbeats at regular intervals. :param timer: Timer instance. :param eventer: Event dispatcher used to send the event. :keyword interval: Time in seconds between heartbeats. Default is 2 seconds. """ def __init__(self, timer, eventer, interval=None): self.timer = timer self.eventer = eventer self.interval = float(interval or 2.0) self.tref = None # Make event dispatcher start/stop us when enabled/disabled. self.eventer.on_enabled.add(self.start) self.eventer.on_disabled.add(self.stop) def _send(self, event): return self.eventer.send(event, freq=self.interval, active=len(active_requests), processed=all_total_count[0], loadavg=load_average(), **SOFTWARE_INFO)
[docs] def start(self): if self.eventer.enabled: self._send('worker-online') self.tref = self.timer.call_repeatedly( self.interval, self._send, ('worker-heartbeat', ), )
[docs] def stop(self): if self.tref is not None: self.timer.cancel(self.tref) self.tref = None if self.eventer.enabled: self._send('worker-offline')