This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for celery.bin.worker

# -*- coding: utf-8 -*-
"""

The :program:`celery worker` command (previously known as ``celeryd``)

.. program:: celery worker

.. seealso::

    See :ref:`preload-options`.

.. cmdoption:: -c, --concurrency

    Number of child processes processing the queue. The default
    is the number of CPUs available on your system.

.. cmdoption:: -P, --pool

    Pool implementation:

    prefork (default), eventlet, gevent, solo or threads.

.. cmdoption:: -f, --logfile

    Path to log file. If no logfile is specified, `stderr` is used.

.. cmdoption:: -l, --loglevel

    Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
    `ERROR`, `CRITICAL`, or `FATAL`.

.. cmdoption:: -n, --hostname

    Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
    %n (name) and %d, (domain).

.. cmdoption:: -B, --beat

    Also run the `celery beat` periodic task scheduler. Please note that
    there must only be one instance of this service.

.. cmdoption:: -Q, --queues

    List of queues to enable for this worker, separated by comma.
    By default all configured queues are enabled.
    Example: `-Q video,image`

.. cmdoption:: -I, --include

    Comma separated list of additional modules to import.
    Example: -I foo.tasks,bar.tasks

.. cmdoption:: -s, --schedule

    Path to the schedule database if running with the `-B` option.
    Defaults to `celerybeat-schedule`. The extension ".db" may be
    appended to the filename.

.. cmdoption:: -O

    Apply optimization profile.  Supported: default, fair

.. cmdoption:: --scheduler

    Scheduler class to use. Default is celery.beat.PersistentScheduler

.. cmdoption:: -S, --statedb

    Path to the state database. The extension '.db' may
    be appended to the filename. Default: {default}

.. cmdoption:: -E, --events

    Send events that can be captured by monitors like :program:`celery events`,
    `celerymon`, and others.

.. cmdoption:: --without-gossip

    Do not subscribe to other workers events.

.. cmdoption:: --without-mingle

    Do not synchronize with other workers at startup.

.. cmdoption:: --without-heartbeat

    Do not send event heartbeats.

.. cmdoption:: --heartbeat-interval

    Interval in seconds at which to send worker heartbeat

.. cmdoption:: --purge

    Purges all waiting tasks before the daemon is started.
    **WARNING**: This is unrecoverable, and the tasks will be
    deleted from the messaging server.

.. cmdoption:: --time-limit

    Enables a hard time limit (in seconds int/float) for tasks.

.. cmdoption:: --soft-time-limit

    Enables a soft time limit (in seconds int/float) for tasks.

.. cmdoption:: --maxtasksperchild

    Maximum number of tasks a pool worker can execute before it's
    terminated and replaced by a new worker.

.. cmdoption:: --pidfile

    Optional file used to store the workers pid.

    The worker will not start if this file already exists
    and the pid is still alive.

.. cmdoption:: --autoscale

    Enable autoscaling by providing
    max_concurrency, min_concurrency. Example::

        --autoscale=10,3

    (always keep 3 processes, but grow to 10 if necessary)

.. cmdoption:: --autoreload

    Enable autoreloading.

.. cmdoption:: --no-execv

    Don't do execv after multiprocessing child fork.

"""
from __future__ import absolute_import, unicode_literals

import sys

from celery import concurrency
from celery.bin.base import Command, Option, daemon_options
from celery.bin.celeryd_detach import detached_celeryd
from celery.five import string_t
from celery.platforms import maybe_drop_privileges
from celery.utils import default_nodename
from celery.utils.log import LOG_LEVELS, mlevel

__all__ = ['worker', 'main']

__MODULE_DOC__ = __doc__


[docs]class worker(Command): """Start worker instance. Examples:: celery worker --app=proj -l info celery worker -A proj -l info -Q hipri,lopri celery worker -A proj --concurrency=4 celery worker -A proj --concurrency=1000 -P eventlet celery worker --autoscale=10,0 """ doc = __MODULE_DOC__ # parse help from this too namespace = 'celeryd' enable_config_from_cmdline = True supports_args = False
[docs] def run_from_argv(self, prog_name, argv=None, command=None): command = sys.argv[0] if command is None else command argv = sys.argv[1:] if argv is None else argv # parse options before detaching so errors can be handled. options, args = self.prepare_args( *self.parse_options(prog_name, argv, command)) self.maybe_detach([command] + argv) return self(*args, **options)
[docs] def maybe_detach(self, argv, dopts=['-D', '--detach']): if any(arg in argv for arg in dopts): argv = [v for v in argv if v not in dopts] # will never return detached_celeryd(self.app).execute_from_commandline(argv) raise SystemExit(0)
[docs] def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None, loglevel=None, logfile=None, pidfile=None, state_db=None, **kwargs): maybe_drop_privileges(uid=uid, gid=gid) # Pools like eventlet/gevent needs to patch libs as early # as possible. pool_cls = (concurrency.get_implementation(pool_cls) or self.app.conf.CELERYD_POOL) if self.app.IS_WINDOWS and kwargs.get('beat'): self.die('-B option does not work on Windows. ' 'Please run celery beat as a separate service.') hostname = self.host_format(default_nodename(hostname)) if loglevel: try: loglevel = mlevel(loglevel) except KeyError: # pragma: no cover self.die('Unknown level {0!r}. Please use one of {1}.'.format( loglevel, '|'.join( l for l in LOG_LEVELS if isinstance(l, string_t)))) return self.app.Worker( hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=self.node_format(pidfile, hostname), state_db=self.node_format(state_db, hostname), **kwargs ).start()
[docs] def with_pool_option(self, argv): # this command support custom pools # that may have to be loaded as early as possible. return (['-P'], ['--pool'])
[docs] def get_options(self): conf = self.app.conf return ( Option('-c', '--concurrency', default=conf.CELERYD_CONCURRENCY, type='int'), Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'), Option('--purge', '--discard', default=False, action='store_true'), Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL), Option('-n', '--hostname'), Option('-B', '--beat', action='store_true'), Option('-s', '--schedule', dest='schedule_filename', default=conf.CELERYBEAT_SCHEDULE_FILENAME), Option('--scheduler', dest='scheduler_cls'), Option('-S', '--statedb', default=conf.CELERYD_STATE_DB, dest='state_db'), Option('-E', '--events', default=conf.CELERY_SEND_EVENTS, action='store_true', dest='send_events'), Option('--time-limit', type='float', dest='task_time_limit', default=conf.CELERYD_TASK_TIME_LIMIT), Option('--soft-time-limit', dest='task_soft_time_limit', default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'), Option('--maxtasksperchild', dest='max_tasks_per_child', default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'), Option('--queues', '-Q', default=[]), Option('--exclude-queues', '-X', default=[]), Option('--include', '-I', default=[]), Option('--autoscale'), Option('--autoreload', action='store_true'), Option('--no-execv', action='store_true', default=False), Option('--without-gossip', action='store_true', default=False), Option('--without-mingle', action='store_true', default=False), Option('--without-heartbeat', action='store_true', default=False), Option('--heartbeat-interval', type='int'), Option('-O', dest='optimization'), Option('-D', '--detach', action='store_true'), ) + daemon_options() + tuple(self.app.user_options['worker'])
[docs]def main(app=None): # Fix for setuptools generated scripts, so that it will # work with multiprocessing fork emulation. # (see multiprocessing.forking.get_preparation_data()) if __name__ != '__main__': # pragma: no cover sys.modules['__main__'] = sys.modules[__name__] from billiard import freeze_support freeze_support() worker(app=app).execute_from_commandline()
if __name__ == '__main__': # pragma: no cover main()