This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.bin.worker

"""Program used to start a Celery worker instance."""

import os
import sys

import click
from click import ParamType
from click.types import StringParamType

from celery import concurrency
from celery.bin.base import (COMMA_SEPARATED_LIST, LOG_LEVEL,
                             CeleryDaemonCommand, CeleryOption,
                             handle_preload_options)
from celery.concurrency.base import BasePool
from celery.exceptions import SecurityError
from celery.platforms import (EX_FAILURE, EX_OK, detached,
                              maybe_drop_privileges)
from celery.utils.log import get_logger
from celery.utils.nodenames import default_nodename, host_format, node_format

logger = get_logger(__name__)


[docs]class CeleryBeat(ParamType): """Celery Beat flag.""" name = "beat"
[docs] def convert(self, value, param, ctx): if ctx.obj.app.IS_WINDOWS and value: self.fail('-B option does not work on Windows. ' 'Please run celery beat as a separate service.') return value
[docs]class WorkersPool(click.Choice): """Workers pool option.""" name = "pool" def __init__(self): """Initialize the workers pool option with the relevant choices.""" super().__init__(concurrency.get_available_pool_names())
[docs] def convert(self, value, param, ctx): # Pools like eventlet/gevent needs to patch libs as early # as possible. if isinstance(value, type) and issubclass(value, BasePool): return value value = super().convert(value, param, ctx) worker_pool = ctx.obj.app.conf.worker_pool if value == 'prefork' and worker_pool: # If we got the default pool through the CLI # we need to check if the worker pool was configured. # If the worker pool was configured, we shouldn't use the default. value = concurrency.get_implementation(worker_pool) else: value = concurrency.get_implementation(value) if not value: value = concurrency.get_implementation(worker_pool) return value
[docs]class Hostname(StringParamType): """Hostname option.""" name = "hostname"
[docs] def convert(self, value, param, ctx): return host_format(default_nodename(value))
[docs]class Autoscale(ParamType): """Autoscaling parameter.""" name = "<min workers>, <max workers>"
[docs] def convert(self, value, param, ctx): value = value.split(',') if len(value) > 2: self.fail("Expected two comma separated integers or one integer." f"Got {len(value)} instead.") if len(value) == 1: try: value = (int(value[0]), 0) except ValueError: self.fail(f"Expected an integer. Got {value} instead.") try: return tuple(reversed(sorted(map(int, value)))) except ValueError: self.fail("Expected two comma separated integers." f"Got {value.join(',')} instead.")
CELERY_BEAT = CeleryBeat() WORKERS_POOL = WorkersPool() HOSTNAME = Hostname() AUTOSCALE = Autoscale() C_FAKEFORK = os.environ.get('C_FAKEFORK')
[docs]def detach(path, argv, logfile=None, pidfile=None, uid=None, gid=None, umask=None, workdir=None, fake=False, app=None, executable=None, hostname=None): """Detach program by argv.""" fake = 1 if C_FAKEFORK else fake # `detached()` will attempt to touch the logfile to confirm that error # messages won't be lost after detaching stdout/err, but this means we need # to pre-format it rather than relying on `setup_logging_subsystem()` like # we can elsewhere. logfile = node_format(logfile, hostname) with detached(logfile, pidfile, uid, gid, umask, workdir, fake, after_forkers=False): try: if executable is not None: path = executable os.execv(path, [path] + argv) return EX_OK except Exception: # pylint: disable=broad-except if app is None: from celery import current_app app = current_app app.log.setup_logging_subsystem( 'ERROR', logfile, hostname=hostname) logger.critical("Can't exec %r", ' '.join([path] + argv), exc_info=True) return EX_FAILURE
@click.command(cls=CeleryDaemonCommand, context_settings={'allow_extra_args': True}) @click.option('-n', '--hostname', default=host_format(default_nodename(None)), cls=CeleryOption, type=HOSTNAME, help_group="Worker Options", help="Set custom hostname (e.g., 'w1@%%h'). " "Expands: %%h (hostname), %%n (name) and %%d, (domain).") @click.option('-D', '--detach', cls=CeleryOption, is_flag=True, default=False, help_group="Worker Options", help="Start worker as a background process.") @click.option('-S', '--statedb', cls=CeleryOption, type=click.Path(), callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_state_db, help_group="Worker Options", help="Path to the state database. The extension '.db' may be " "appended to the filename.") @click.option('-l', '--loglevel', default='WARNING', cls=CeleryOption, type=LOG_LEVEL, help_group="Worker Options", help="Logging level.") @click.option('optimization', '-O', default='default', cls=CeleryOption, type=click.Choice(('default', 'fair')), help_group="Worker Options", help="Apply optimization profile.") @click.option('--prefetch-multiplier', type=int, metavar="<prefetch multiplier>", callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_prefetch_multiplier, cls=CeleryOption, help_group="Worker Options", help="Set custom prefetch multiplier value " "for this worker instance.") @click.option('-c', '--concurrency', type=int, metavar="<concurrency>", callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_concurrency, cls=CeleryOption, help_group="Pool Options", help="Number of child processes processing the queue. " "The default is the number of CPUs available" " on your system.") @click.option('-P', '--pool', default='prefork', type=WORKERS_POOL, cls=CeleryOption, help_group="Pool Options", help="Pool implementation.") @click.option('-E', '--task-events', '--events', is_flag=True, default=None, cls=CeleryOption, help_group="Pool Options", help="Send task-related events that can be captured by monitors" " like celery events, celerymon, and others.") @click.option('--time-limit', type=float, cls=CeleryOption, help_group="Pool Options", help="Enables a hard time limit " "(in seconds int/float) for tasks.") @click.option('--soft-time-limit', type=float, cls=CeleryOption, help_group="Pool Options", help="Enables a soft time limit " "(in seconds int/float) for tasks.") @click.option('--max-tasks-per-child', type=int, cls=CeleryOption, help_group="Pool Options", help="Maximum number of tasks a pool worker can execute before " "it's terminated and replaced by a new worker.") @click.option('--max-memory-per-child', type=int, cls=CeleryOption, help_group="Pool Options", help="Maximum amount of resident memory, in KiB, that may be " "consumed by a child process before it will be replaced " "by a new one. If a single task causes a child process " "to exceed this limit, the task will be completed and " "the child process will be replaced afterwards.\n" "Default: no limit.") @click.option('--purge', '--discard', is_flag=True, cls=CeleryOption, help_group="Queue Options") @click.option('--queues', '-Q', type=COMMA_SEPARATED_LIST, cls=CeleryOption, help_group="Queue Options") @click.option('--exclude-queues', '-X', type=COMMA_SEPARATED_LIST, cls=CeleryOption, help_group="Queue Options") @click.option('--include', '-I', type=COMMA_SEPARATED_LIST, cls=CeleryOption, help_group="Queue Options") @click.option('--without-gossip', is_flag=True, cls=CeleryOption, help_group="Features") @click.option('--without-mingle', is_flag=True, cls=CeleryOption, help_group="Features") @click.option('--without-heartbeat', is_flag=True, cls=CeleryOption, help_group="Features", ) @click.option('--heartbeat-interval', type=int, cls=CeleryOption, help_group="Features", ) @click.option('--autoscale', type=AUTOSCALE, cls=CeleryOption, help_group="Features", ) @click.option('-B', '--beat', type=CELERY_BEAT, cls=CeleryOption, is_flag=True, help_group="Embedded Beat Options") @click.option('-s', '--schedule-filename', '--schedule', callback=lambda ctx, _, value: value or ctx.obj.app.conf.beat_schedule_filename, cls=CeleryOption, help_group="Embedded Beat Options") @click.option('--scheduler', cls=CeleryOption, help_group="Embedded Beat Options") @click.pass_context @handle_preload_options def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None, loglevel=None, logfile=None, pidfile=None, statedb=None, **kwargs): """Start worker instance. Examples -------- $ celery --app=proj worker -l INFO $ celery -A proj worker -l INFO -Q hipri,lopri $ celery -A proj worker --concurrency=4 $ celery -A proj worker --concurrency=1000 -P eventlet $ celery worker --autoscale=10,0 """ try: app = ctx.obj.app if ctx.args: try: app.config_from_cmdline(ctx.args, namespace='worker') except (KeyError, ValueError) as e: # TODO: Improve the error messages raise click.UsageError( "Unable to parse extra configuration from command line.\n" f"Reason: {e}", ctx=ctx) if kwargs.get('detach', False): argv = ['-m', 'celery'] + sys.argv[1:] if '--detach' in argv: argv.remove('--detach') if '-D' in argv: argv.remove('-D') return detach(sys.executable, argv, logfile=logfile, pidfile=pidfile, uid=uid, gid=gid, umask=kwargs.get('umask', None), workdir=kwargs.get('workdir', None), app=app, executable=kwargs.get('executable', None), hostname=hostname) maybe_drop_privileges(uid=uid, gid=gid) worker = app.Worker( hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=node_format(pidfile, hostname), statedb=node_format(statedb, hostname), no_color=ctx.obj.no_color, quiet=ctx.obj.quiet, **kwargs) worker.start() return worker.exitcode except SecurityError as e: ctx.obj.error(e.args[0]) ctx.exit(1)