This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for celery.worker.heartbeat

"""Heartbeat service.

This is the internal thread responsible for sending heartbeat events
at regular intervals (may not be an actual thread).
"""
from celery.signals import heartbeat_sent
from celery.utils.sysinfo import load_average

from .state import SOFTWARE_INFO, active_requests, all_total_count

__all__ = ('Heart',)


[docs] class Heart: """Timer sending heartbeats at regular intervals. Arguments: timer (kombu.asynchronous.timer.Timer): Timer to use. eventer (celery.events.EventDispatcher): Event dispatcher to use. interval (float): Time in seconds between sending heartbeats. Default is 2 seconds. """ def __init__(self, timer, eventer, interval=None): self.timer = timer self.eventer = eventer self.interval = float(interval or 2.0) self.tref = None # Make event dispatcher start/stop us when enabled/disabled. self.eventer.on_enabled.add(self.start) self.eventer.on_disabled.add(self.stop) # Only send heartbeat_sent signal if it has receivers. self._send_sent_signal = ( heartbeat_sent.send if heartbeat_sent.receivers else None) def _send(self, event, retry=True): if self._send_sent_signal is not None: self._send_sent_signal(sender=self) return self.eventer.send(event, freq=self.interval, active=len(active_requests), processed=all_total_count[0], loadavg=load_average(), retry=retry, **SOFTWARE_INFO)
[docs] def start(self): if self.eventer.enabled: self._send('worker-online') self.tref = self.timer.call_repeatedly( self.interval, self._send, ('worker-heartbeat',), )
[docs] def stop(self): if self.tref is not None: self.timer.cancel(self.tref) self.tref = None if self.eventer.enabled: self._send('worker-offline', retry=False)