This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.worker.components

"""Worker-level Bootsteps."""
import atexit
import warnings

from kombu.asynchronous import Hub as _Hub
from kombu.asynchronous import get_event_loop, set_event_loop
from kombu.asynchronous.semaphore import DummyLock, LaxBoundedSemaphore
from kombu.asynchronous.timer import Timer as _Timer

from celery import bootsteps
from celery._state import _set_task_join_will_block
from celery.exceptions import ImproperlyConfigured
from celery.platforms import IS_WINDOWS
from celery.utils.log import worker_logger as logger

__all__ = ('Timer', 'Hub', 'Pool', 'Beat', 'StateDB', 'Consumer')

GREEN_POOLS = {'eventlet', 'gevent'}

ERR_B_GREEN = """\
-B option doesn't work with eventlet/gevent pools: \
use standalone beat instead.\
"""

W_POOL_SETTING = """
The worker_pool setting shouldn't be used to select the eventlet/gevent
pools, instead you *must use the -P* argument so that patches are applied
as early as possible.
"""


[docs]class Timer(bootsteps.Step): """Timer bootstep."""
[docs] def create(self, w): if w.use_eventloop: # does not use dedicated timer thread. w.timer = _Timer(max_interval=10.0) else: if not w.timer_cls: # Default Timer is set by the pool, as for example, the # eventlet pool needs a custom timer implementation. w.timer_cls = w.pool_cls.Timer w.timer = self.instantiate(w.timer_cls, max_interval=w.timer_precision, on_error=self.on_timer_error, on_tick=self.on_timer_tick)
[docs] def on_timer_error(self, exc): logger.error('Timer error: %r', exc, exc_info=True)
[docs] def on_timer_tick(self, delay): logger.debug('Timer wake-up! Next ETA %s secs.', delay)
[docs]class Hub(bootsteps.StartStopStep): """Worker starts the event loop.""" requires = (Timer,) def __init__(self, w, **kwargs): w.hub = None super().__init__(w, **kwargs)
[docs] def include_if(self, w): return w.use_eventloop
[docs] def create(self, w): w.hub = get_event_loop() if w.hub is None: required_hub = getattr(w._conninfo, 'requires_hub', None) w.hub = set_event_loop(( required_hub if required_hub else _Hub)(w.timer)) self._patch_thread_primitives(w) return self
[docs] def start(self, w): pass
[docs] def stop(self, w): w.hub.close()
[docs] def terminate(self, w): w.hub.close()
def _patch_thread_primitives(self, w): # make clock use dummy lock w.app.clock.mutex = DummyLock() # multiprocessing's ApplyResult uses this lock. try: from billiard import pool except ImportError: # pragma: no cover pass else: pool.Lock = DummyLock
[docs]class Pool(bootsteps.StartStopStep): """Bootstep managing the worker pool. Describes how to initialize the worker pool, and starts and stops the pool during worker start-up/shutdown. Adds attributes: * autoscale * pool * max_concurrency * min_concurrency """ requires = (Hub,) def __init__(self, w, autoscale=None, **kwargs): w.pool = None w.max_concurrency = None w.min_concurrency = w.concurrency self.optimization = w.optimization if isinstance(autoscale, str): max_c, _, min_c = autoscale.partition(',') autoscale = [int(max_c), min_c and int(min_c) or 0] w.autoscale = autoscale if w.autoscale: w.max_concurrency, w.min_concurrency = w.autoscale super().__init__(w, **kwargs)
[docs] def close(self, w): if w.pool: w.pool.close()
[docs] def terminate(self, w): if w.pool: w.pool.terminate()
[docs] def create(self, w): semaphore = None max_restarts = None if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover warnings.warn(UserWarning(W_POOL_SETTING)) threaded = not w.use_eventloop or IS_WINDOWS procs = w.min_concurrency w.process_task = w._process_task if not threaded: semaphore = w.semaphore = LaxBoundedSemaphore(procs) w._quick_acquire = w.semaphore.acquire w._quick_release = w.semaphore.release max_restarts = 100 if w.pool_putlocks and w.pool_cls.uses_semaphore: w.process_task = w._process_task_sem allow_restart = w.pool_restarts pool = w.pool = self.instantiate( w.pool_cls, w.min_concurrency, initargs=(w.app, w.hostname), maxtasksperchild=w.max_tasks_per_child, max_memory_per_child=w.max_memory_per_child, timeout=w.time_limit, soft_timeout=w.soft_time_limit, putlocks=w.pool_putlocks and threaded, lost_worker_timeout=w.worker_lost_wait, threads=threaded, max_restarts=max_restarts, allow_restart=allow_restart, forking_enable=True, semaphore=semaphore, sched_strategy=self.optimization, app=w.app, ) _set_task_join_will_block(pool.task_join_will_block) return pool
[docs] def info(self, w): return {'pool': w.pool.info if w.pool else 'N/A'}
[docs] def register_with_event_loop(self, w, hub): w.pool.register_with_event_loop(hub)
[docs]class Beat(bootsteps.StartStopStep): """Step used to embed a beat process. Enabled when the ``beat`` argument is set. """ label = 'Beat' conditional = True def __init__(self, w, beat=False, **kwargs): self.enabled = w.beat = beat w.beat = None super().__init__(w, beat=beat, **kwargs)
[docs] def create(self, w): from celery.beat import EmbeddedService if w.pool_cls.__module__.endswith(('gevent', 'eventlet')): raise ImproperlyConfigured(ERR_B_GREEN) b = w.beat = EmbeddedService(w.app, schedule_filename=w.schedule_filename, scheduler_cls=w.scheduler) return b
[docs]class StateDB(bootsteps.Step): """Bootstep that sets up between-restart state database file.""" def __init__(self, w, **kwargs): self.enabled = w.statedb w._persistence = None super().__init__(w, **kwargs)
[docs] def create(self, w): w._persistence = w.state.Persistent(w.state, w.statedb, w.app.clock) atexit.register(w._persistence.save)
[docs]class Consumer(bootsteps.StartStopStep): """Bootstep starting the Consumer blueprint.""" last = True
[docs] def create(self, w): if w.max_concurrency: prefetch_count = max(w.max_concurrency, 1) * w.prefetch_multiplier else: prefetch_count = w.concurrency * w.prefetch_multiplier c = w.consumer = self.instantiate( w.consumer_cls, w.process_task, hostname=w.hostname, task_events=w.task_events, init_callback=w.ready_callback, initial_prefetch_count=prefetch_count, pool=w.pool, timer=w.timer, app=w.app, controller=w, hub=w.hub, worker_options=w.options, disable_rate_limits=w.disable_rate_limits, prefetch_multiplier=w.prefetch_multiplier, ) return c