This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.apps.beat

"""Beat command-line program.

This module is the 'program-version' of :mod:`celery.beat`.

It does everything necessary to run that module
as an actual application, like installing signal handlers
and so on.
"""
import numbers
import socket
import sys
from datetime import datetime

from celery import VERSION_BANNER, beat, platforms
from celery.utils.imports import qualname
from celery.utils.log import LOG_LEVELS, get_logger
from celery.utils.time import humanize_seconds

__all__ = ('Beat',)

STARTUP_INFO_FMT = """
LocalTime -> {timestamp}
Configuration ->
    . broker -> {conninfo}
    . loader -> {loader}
    . scheduler -> {scheduler}
{scheduler_info}
    . logfile -> {logfile}@%{loglevel}
    . maxinterval -> {hmax_interval} ({max_interval}s)
""".strip()

logger = get_logger('celery.beat')


[docs]class Beat: """Beat as a service.""" Service = beat.Service app = None def __init__(self, max_interval=None, app=None, socket_timeout=30, pidfile=None, no_color=None, loglevel='WARN', logfile=None, schedule=None, scheduler=None, scheduler_cls=None, # XXX use scheduler redirect_stdouts=None, redirect_stdouts_level=None, **kwargs): self.app = app = app or self.app either = self.app.either self.loglevel = loglevel self.logfile = logfile self.schedule = either('beat_schedule_filename', schedule) self.scheduler_cls = either( 'beat_scheduler', scheduler, scheduler_cls) self.redirect_stdouts = either( 'worker_redirect_stdouts', redirect_stdouts) self.redirect_stdouts_level = either( 'worker_redirect_stdouts_level', redirect_stdouts_level) self.max_interval = max_interval self.socket_timeout = socket_timeout self.no_color = no_color self.colored = app.log.colored( self.logfile, enabled=not no_color if no_color is not None else no_color, ) self.pidfile = pidfile if not isinstance(self.loglevel, numbers.Integral): self.loglevel = LOG_LEVELS[self.loglevel.upper()]
[docs] def run(self): print(str(self.colored.cyan( f'celery beat v{VERSION_BANNER} is starting.'))) self.init_loader() self.set_process_title() self.start_scheduler()
[docs] def setup_logging(self, colorize=None): if colorize is None and self.no_color is not None: colorize = not self.no_color self.app.log.setup(self.loglevel, self.logfile, self.redirect_stdouts, self.redirect_stdouts_level, colorize=colorize)
[docs] def start_scheduler(self): if self.pidfile: platforms.create_pidlock(self.pidfile) service = self.Service( app=self.app, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, ) print(self.banner(service)) self.setup_logging() if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) try: self.install_sync_handler(service) service.start() except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise
[docs] def banner(self, service): c = self.colored return str( c.blue('__ ', c.magenta('-'), c.blue(' ... __ '), c.magenta('-'), c.blue(' _\n'), c.reset(self.startup_info(service))), )
[docs] def init_loader(self): # Run the worker init handler. # (Usually imports task modules and such.) self.app.loader.init_worker() self.app.finalize()
[docs] def startup_info(self, service): scheduler = service.get_scheduler(lazy=True) return STARTUP_INFO_FMT.format( conninfo=self.app.connection().as_uri(), timestamp=datetime.now().replace(microsecond=0), logfile=self.logfile or '[stderr]', loglevel=LOG_LEVELS[self.loglevel], loader=qualname(self.app.loader), scheduler=qualname(scheduler), scheduler_info=scheduler.info, hmax_interval=humanize_seconds(scheduler.max_interval), max_interval=scheduler.max_interval, )
[docs] def set_process_title(self): arg_start = 'manage' in sys.argv[0] and 2 or 1 platforms.set_process_title( 'celery beat', info=' '.join(sys.argv[arg_start:]), )
[docs] def install_sync_handler(self, service): """Install a `SIGTERM` + `SIGINT` handler saving the schedule.""" def _sync(signum, frame): service.sync() raise SystemExit() platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)