This document describes the current stable version of Celery (4.2). For development docs, go here.

Source code for celery.beat

# -*- coding: utf-8 -*-
"""The periodic task scheduler."""
from __future__ import absolute_import, unicode_literals

import copy
import errno
import heapq
import os
import shelve
import sys
import time
import traceback
from collections import namedtuple
from functools import total_ordering
from threading import Event, Thread

from billiard import ensure_multiprocessing
from billiard.common import reset_signals
from billiard.context import Process
from kombu.utils.functional import maybe_evaluate, reprcall
from kombu.utils.objects import cached_property

from . import __version__, platforms, signals
from .five import (items, monotonic, python_2_unicode_compatible, reraise,
                   values)
from .schedules import crontab, maybe_schedule
from .utils.imports import load_extension_class_names, symbol_by_name
from .utils.log import get_logger, iter_open_logger_fds
from .utils.time import humanize_seconds

__all__ = (
    'SchedulingError', 'ScheduleEntry', 'Scheduler',
    'PersistentScheduler', 'Service', 'EmbeddedService',
)

event_t = namedtuple('event_t', ('time', 'priority', 'entry'))

logger = get_logger(__name__)
debug, info, error, warning = (logger.debug, logger.info,
                               logger.error, logger.warning)

DEFAULT_MAX_INTERVAL = 300  # 5 minutes


[docs]class SchedulingError(Exception):
"""An error occurred while scheduling a task."""
[docs]@total_ordering @python_2_unicode_compatible class ScheduleEntry(object): """An entry in the scheduler. Arguments: name (str): see :attr:`name`. schedule (~celery.schedules.schedule): see :attr:`schedule`. args (Tuple): see :attr:`args`. kwargs (Dict): see :attr:`kwargs`. options (Dict): see :attr:`options`. last_run_at (~datetime.datetime): see :attr:`last_run_at`. total_run_count (int): see :attr:`total_run_count`. relative (bool): Is the time relative to when the server starts? """ #: The task name name = None #: The schedule (:class:`~celery.schedules.schedule`) schedule = None #: Positional arguments to apply. args = None #: Keyword arguments to apply. kwargs = None #: Task execution options. options = None #: The time and date of when this task was last scheduled. last_run_at = None #: Total number of times this task has been scheduled. total_run_count = 0 def __init__(self, name=None, task=None, last_run_at=None, total_run_count=None, schedule=None, args=(), kwargs={}, options={}, relative=False, app=None): self.app = app self.name = name self.task = task self.args = args self.kwargs = kwargs self.options = options self.schedule = maybe_schedule(schedule, relative, app=self.app) self.last_run_at = last_run_at or self.default_now() self.total_run_count = total_run_count or 0
[docs] def default_now(self):
return self.schedule.now() if self.schedule else self.app.now() _default_now = default_now # compat def _next_instance(self, last_run_at=None): """Return new instance, with date and count fields updated.""" return self.__class__(**dict( self, last_run_at=last_run_at or self.default_now(), total_run_count=self.total_run_count + 1, )) __next__ = next = _next_instance # for 2to3 def __reduce__(self): return self.__class__, ( self.name, self.task, self.last_run_at, self.total_run_count, self.schedule, self.args, self.kwargs, self.options, )
[docs] def update(self, other): """Update values from another entry. Will only update "editable" fields: ``task``, ``schedule``, ``args``, ``kwargs``, ``options``. """ self.__dict__.update({ 'task': other.task, 'schedule': other.schedule, 'args': other.args, 'kwargs': other.kwargs, 'options': other.options,
})
[docs] def is_due(self): """See :meth:`~celery.schedule.schedule.is_due`."""
return self.schedule.is_due(self.last_run_at) def __iter__(self): return iter(items(vars(self))) def __repr__(self): return '<{name}: {0.name} {call} {0.schedule}'.format( self, call=reprcall(self.task, self.args or (), self.kwargs or {}), name=type(self).__name__, ) def __lt__(self, other): if isinstance(other, ScheduleEntry): # How the object is ordered doesn't really matter, as # in the scheduler heap, the order is decided by the # preceding members of the tuple ``(time, priority, entry)``. # # If all that's left to order on is the entry then it can # just as well be random. return id(self) < id(other) return NotImplemented
[docs] def editable_fields_equal(self, other): for attr in ('task', 'args', 'kwargs', 'options', 'schedule'): if getattr(self, attr) != getattr(other, attr): return False
return True def __eq__(self, other): """Test schedule entries equality. Will only compare "editable" fields: ``task``, ``schedule``, ``args``, ``kwargs``, ``options``. """ return self.editable_fields_equal(other) def __ne__(self, other): """Test schedule entries inequality. Will only compare "editable" fields: ``task``, ``schedule``, ``args``, ``kwargs``, ``options``. """
return not self == other
[docs]class Scheduler(object): """Scheduler for periodic tasks. The :program:`celery beat` program may instantiate this class multiple times for introspection purposes, but then with the ``lazy`` argument set. It's important for subclasses to be idempotent when this argument is set. Arguments: schedule (~celery.schedules.schedule): see :attr:`schedule`. max_interval (int): see :attr:`max_interval`. lazy (bool): Don't set up the schedule. """ Entry = ScheduleEntry #: The schedule dict/shelve. schedule = None #: Maximum time to sleep between re-checking the schedule. max_interval = DEFAULT_MAX_INTERVAL #: How often to sync the schedule (3 minutes by default) sync_every = 3 * 60 #: How many tasks can be called before a sync is forced. sync_every_tasks = None _last_sync = None _tasks_since_sync = 0 logger = logger # compat def __init__(self, app, schedule=None, max_interval=None, Producer=None, lazy=False, sync_every_tasks=None, **kwargs): self.app = app self.data = maybe_evaluate({} if schedule is None else schedule) self.max_interval = (max_interval or app.conf.beat_max_loop_interval or self.max_interval) self.Producer = Producer or app.amqp.Producer self._heap = None self.old_schedulers = None self.sync_every_tasks = ( app.conf.beat_sync_every if sync_every_tasks is None else sync_every_tasks) if not lazy: self.setup_schedule()
[docs] def install_default_entries(self, data): entries = {} if self.app.conf.result_expires and \ not self.app.backend.supports_autoexpire: if 'celery.backend_cleanup' not in data: entries['celery.backend_cleanup'] = { 'task': 'celery.backend_cleanup', 'schedule': crontab('0', '4', '*'), 'options': {'expires': 12 * 3600}}
self.update_from_dict(entries)
[docs] def apply_entry(self, entry, producer=None): info('Scheduler: Sending due task %s (%s)', entry.name, entry.task) try: result = self.apply_async(entry, producer=producer, advance=False) except Exception as exc: # pylint: disable=broad-except error('Message Error: %s\n%s', exc, traceback.format_stack(), exc_info=True) else:
debug('%s sent. id->%s', entry.task, result.id)
[docs] def adjust(self, n, drift=-0.010): if n and n > 0: return n + drift
return n
[docs] def is_due(self, entry):
return entry.is_due() def _when(self, entry, next_time_to_run, mktime=time.mktime): adjust = self.adjust return (mktime(entry.default_now().timetuple()) + (adjust(next_time_to_run) or 0))
[docs] def populate_heap(self, event_t=event_t, heapify=heapq.heapify): """Populate the heap with the data contained in the schedule.""" priority = 5 self._heap = [] for entry in values(self.schedule): is_due, next_call_delay = entry.is_due() self._heap.append(event_t( self._when( entry, 0 if is_due else next_call_delay ) or 0, priority, entry ))
heapify(self._heap) # pylint disable=redefined-outer-name
[docs] def tick(self, event_t=event_t, min=min, heappop=heapq.heappop, heappush=heapq.heappush): """Run a tick - one iteration of the scheduler. Executes one due task per call. Returns: float: preferred delay in seconds for next call. """ adjust = self.adjust max_interval = self.max_interval if (self._heap is None or not self.schedules_equal(self.old_schedulers, self.schedule)): self.old_schedulers = copy.copy(self.schedule) self.populate_heap() H = self._heap if not H: return max_interval event = H[0] entry = event[2] is_due, next_time_to_run = self.is_due(entry) if is_due: verify = heappop(H) if verify is event: next_entry = self.reserve(entry) self.apply_entry(entry, producer=self.producer) heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry)) return 0 else: heappush(H, verify) return min(verify[0], max_interval)
return min(adjust(next_time_to_run) or max_interval, max_interval)
[docs] def schedules_equal(self, old_schedules, new_schedules): if set(old_schedules.keys()) != set(new_schedules.keys()): return False for name, old_entry in old_schedules.items(): new_entry = new_schedules.get(name) if not new_entry: return False if new_entry != old_entry: return False
return True
[docs] def should_sync(self): return ( (not self._last_sync or (monotonic() - self._last_sync) > self.sync_every) or (self.sync_every_tasks and self._tasks_since_sync >= self.sync_every_tasks)
)
[docs] def reserve(self, entry): new_entry = self.schedule[entry.name] = next(entry)
return new_entry
[docs] def apply_async(self, entry, producer=None, advance=True, **kwargs): # Update time-stamps and run counts before we actually execute, # so we have that done if an exception is raised (doesn't schedule # forever.) entry = self.reserve(entry) if advance else entry task = self.app.tasks.get(entry.task) try: if task: return task.apply_async(entry.args, entry.kwargs, producer=producer, **entry.options) else: return self.send_task(entry.task, entry.args, entry.kwargs, producer=producer, **entry.options) except Exception as exc: # pylint: disable=broad-except reraise(SchedulingError, SchedulingError( "Couldn't apply scheduled task {0.name}: {exc}".format( entry, exc=exc)), sys.exc_info()[2]) finally: self._tasks_since_sync += 1 if self.should_sync():
self._do_sync()
[docs] def send_task(self, *args, **kwargs):
return self.app.send_task(*args, **kwargs)
[docs] def setup_schedule(self):
self.install_default_entries(self.data) def _do_sync(self): try: debug('beat: Synchronizing schedule...') self.sync() finally: self._last_sync = monotonic() self._tasks_since_sync = 0
[docs] def sync(self):
pass
[docs] def close(self):
self.sync()
[docs] def add(self, **kwargs): entry = self.Entry(app=self.app, **kwargs) self.schedule[entry.name] = entry
return entry def _maybe_entry(self, name, entry): if isinstance(entry, self.Entry): entry.app = self.app return entry return self.Entry(**dict(entry, name=name, app=self.app))
[docs] def update_from_dict(self, dict_): self.schedule.update({ name: self._maybe_entry(name, entry) for name, entry in items(dict_)
})
[docs] def merge_inplace(self, b): schedule = self.schedule A, B = set(schedule), set(b) # Remove items from disk not in the schedule anymore. for key in A ^ B: schedule.pop(key, None) # Update and add new items in the schedule for key in B: entry = self.Entry(**dict(b[key], name=key, app=self.app)) if schedule.get(key): schedule[key].update(entry) else:
schedule[key] = entry def _ensure_connected(self): # callback called for each retry while the connection # can't be established. def _error_handler(exc, interval): error('beat: Connection error: %s. ' 'Trying again in %s seconds...', exc, interval) return self.connection.ensure_connection( _error_handler, self.app.conf.broker_connection_max_retries )
[docs] def get_schedule(self):
return self.data
[docs] def set_schedule(self, schedule):
self.data = schedule schedule = property(get_schedule, set_schedule)
[docs] @cached_property def connection(self):
return self.app.connection_for_write()
[docs] @cached_property def producer(self):
return self.Producer(self._ensure_connected(), auto_declare=False) @property def info(self):
return ''
[docs]class PersistentScheduler(Scheduler): """Scheduler backed by :mod:`shelve` database.""" persistence = shelve known_suffixes = ('', '.db', '.dat', '.bak', '.dir') _store = None def __init__(self, *args, **kwargs): self.schedule_filename = kwargs.get('schedule_filename') Scheduler.__init__(self, *args, **kwargs) def _remove_db(self): for suffix in self.known_suffixes: with platforms.ignore_errno(errno.ENOENT): os.remove(self.schedule_filename + suffix) def _open_schedule(self): return self.persistence.open(self.schedule_filename, writeback=True) def _destroy_open_corrupted_schedule(self, exc): error('Removing corrupted schedule file %r: %r', self.schedule_filename, exc, exc_info=True) self._remove_db() return self._open_schedule()
[docs] def setup_schedule(self): try: self._store = self._open_schedule() # In some cases there may be different errors from a storage # backend for corrupted files. Example - DBPageNotFoundError # exception from bsddb. In such case the file will be # successfully opened but the error will be raised on first key # retrieving. self._store.keys() except Exception as exc: # pylint: disable=broad-except self._store = self._destroy_open_corrupted_schedule(exc) self._create_schedule() tz = self.app.conf.timezone stored_tz = self._store.get(str('tz')) if stored_tz is not None and stored_tz != tz: warning('Reset: Timezone changed from %r to %r', stored_tz, tz) self._store.clear() # Timezone changed, reset db! utc = self.app.conf.enable_utc stored_utc = self._store.get(str('utc_enabled')) if stored_utc is not None and stored_utc != utc: choices = {True: 'enabled', False: 'disabled'} warning('Reset: UTC changed from %s to %s', choices[stored_utc], choices[utc]) self._store.clear() # UTC setting changed, reset db! entries = self._store.setdefault(str('entries'), {}) self.merge_inplace(self.app.conf.beat_schedule) self.install_default_entries(self.schedule) self._store.update({ str('__version__'): __version__, str('tz'): tz, str('utc_enabled'): utc, }) self.sync() debug('Current schedule:\n' + '\n'.join(
repr(entry) for entry in values(entries))) def _create_schedule(self): for _ in (1, 2): try: self._store[str('entries')] except KeyError: # new schedule db try: self._store[str('entries')] = {} except KeyError as exc: self._store = self._destroy_open_corrupted_schedule(exc) continue else: if str('__version__') not in self._store: warning('DB Reset: Account for new __version__ field') self._store.clear() # remove schedule at 2.2.2 upgrade. elif str('tz') not in self._store: warning('DB Reset: Account for new tz field') self._store.clear() # remove schedule at 3.0.8 upgrade elif str('utc_enabled') not in self._store: warning('DB Reset: Account for new utc_enabled field') self._store.clear() # remove schedule at 3.0.9 upgrade break
[docs] def get_schedule(self):
return self._store[str('entries')]
[docs] def set_schedule(self, schedule):
self._store[str('entries')] = schedule schedule = property(get_schedule, set_schedule)
[docs] def sync(self): if self._store is not None:
self._store.sync()
[docs] def close(self): self.sync()
self._store.close() @property def info(self):
return ' . db -> {self.schedule_filename}'.format(self=self)
[docs]class Service(object): """Celery periodic task service.""" scheduler_cls = PersistentScheduler def __init__(self, app, max_interval=None, schedule_filename=None, scheduler_cls=None): self.app = app self.max_interval = (max_interval or app.conf.beat_max_loop_interval) self.scheduler_cls = scheduler_cls or self.scheduler_cls self.schedule_filename = ( schedule_filename or app.conf.beat_schedule_filename) self._is_shutdown = Event() self._is_stopped = Event() def __reduce__(self): return self.__class__, (self.max_interval, self.schedule_filename, self.scheduler_cls, self.app)
[docs] def start(self, embedded_process=False): info('beat: Starting...') debug('beat: Ticking with max interval->%s', humanize_seconds(self.scheduler.max_interval)) signals.beat_init.send(sender=self) if embedded_process: signals.beat_embedded_init.send(sender=self) platforms.set_process_title('celery beat') try: while not self._is_shutdown.is_set(): interval = self.scheduler.tick() if interval and interval > 0.0: debug('beat: Waking up %s.', humanize_seconds(interval, prefix='in ')) time.sleep(interval) if self.scheduler.should_sync(): self.scheduler._do_sync() except (KeyboardInterrupt, SystemExit): self._is_shutdown.set() finally:
self.sync()
[docs] def sync(self): self.scheduler.close()
self._is_stopped.set()
[docs] def stop(self, wait=False): info('beat: Shutting down...') self._is_shutdown.set()
wait and self._is_stopped.wait() # block until shutdown done.
[docs] def get_scheduler(self, lazy=False, extension_namespace='celery.beat_schedulers'): filename = self.schedule_filename aliases = dict( load_extension_class_names(extension_namespace) or {}) return symbol_by_name(self.scheduler_cls, aliases=aliases)( app=self.app, schedule_filename=filename, max_interval=self.max_interval, lazy=lazy,
)
[docs] @cached_property def scheduler(self):
return self.get_scheduler() class _Threaded(Thread): """Embedded task scheduler using threading.""" def __init__(self, app, **kwargs): super(_Threaded, self).__init__() self.app = app self.service = Service(app, **kwargs) self.daemon = True self.name = 'Beat' def run(self): self.app.set_current() self.service.start() def stop(self): self.service.stop(wait=True) try: ensure_multiprocessing() except NotImplementedError: # pragma: no cover _Process = None else: class _Process(Process): # noqa def __init__(self, app, **kwargs): super(_Process, self).__init__() self.app = app self.service = Service(app, **kwargs) self.name = 'Beat' def run(self): reset_signals(full=False) platforms.close_open_fds([ sys.__stdin__, sys.__stdout__, sys.__stderr__, ] + list(iter_open_logger_fds())) self.app.set_default() self.app.set_current() self.service.start(embedded_process=True) def stop(self): self.service.stop() self.terminate()
[docs]def EmbeddedService(app, max_interval=None, **kwargs): """Return embedded clock service. Arguments: thread (bool): Run threaded instead of as a separate process. Uses :mod:`multiprocessing` by default, if available. """ if kwargs.pop('thread', False) or _Process is None: # Need short max interval to be able to stop thread # in reasonable time. return _Threaded(app, max_interval=1, **kwargs)
return _Process(app, max_interval=max_interval, **kwargs)