This document describes the current stable version of Celery (5.3). For development docs, go here.

Source code for celery.bin.celery

"""Celery Command Line Interface."""
import os
import pathlib
import sys
import traceback

try:
    from importlib.metadata import entry_points
except ImportError:
    from importlib_metadata import entry_points

import click
import click.exceptions
from click.types import ParamType
from click_didyoumean import DYMGroup
from click_plugins import with_plugins

from celery import VERSION_BANNER
from celery.app.utils import find_app
from celery.bin.amqp import amqp
from celery.bin.base import CeleryCommand, CeleryOption, CLIContext
from celery.bin.beat import beat
from celery.bin.call import call
from celery.bin.control import control, inspect, status
from celery.bin.events import events
from celery.bin.graph import graph
from celery.bin.list import list_
from celery.bin.logtool import logtool
from celery.bin.migrate import migrate
from celery.bin.multi import multi
from celery.bin.purge import purge
from celery.bin.result import result
from celery.bin.shell import shell
from celery.bin.upgrade import upgrade
from celery.bin.worker import worker

UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND = click.style("""
Unable to load celery application.
The module {0} was not found.""", fg='red')

UNABLE_TO_LOAD_APP_ERROR_OCCURRED = click.style("""
Unable to load celery application.
While trying to load the module {0} the following error occurred:
{1}""", fg='red')

UNABLE_TO_LOAD_APP_APP_MISSING = click.style("""
Unable to load celery application.
{0}""")


[docs]class App(ParamType): """Application option.""" name = "application"
[docs] def convert(self, value, param, ctx): try: return find_app(value) except ModuleNotFoundError as e: if e.name != value: exc = traceback.format_exc() self.fail( UNABLE_TO_LOAD_APP_ERROR_OCCURRED.format(value, exc) ) self.fail(UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND.format(e.name)) except AttributeError as e: attribute_name = e.args[0].capitalize() self.fail(UNABLE_TO_LOAD_APP_APP_MISSING.format(attribute_name)) except Exception: exc = traceback.format_exc() self.fail( UNABLE_TO_LOAD_APP_ERROR_OCCURRED.format(value, exc) )
APP = App() if sys.version_info >= (3, 10): _PLUGINS = entry_points(group='celery.commands') else: try: _PLUGINS = entry_points().get('celery.commands', []) except AttributeError: _PLUGINS = entry_points().select(group='celery.commands') @with_plugins(_PLUGINS) @click.group(cls=DYMGroup, invoke_without_command=True) @click.option('-A', '--app', envvar='APP', cls=CeleryOption, type=APP, help_group="Global Options") @click.option('-b', '--broker', envvar='BROKER_URL', cls=CeleryOption, help_group="Global Options") @click.option('--result-backend', envvar='RESULT_BACKEND', cls=CeleryOption, help_group="Global Options") @click.option('--loader', envvar='LOADER', cls=CeleryOption, help_group="Global Options") @click.option('--config', envvar='CONFIG_MODULE', cls=CeleryOption, help_group="Global Options") @click.option('--workdir', cls=CeleryOption, type=pathlib.Path, callback=lambda _, __, wd: os.chdir(wd) if wd else None, is_eager=True, help_group="Global Options") @click.option('-C', '--no-color', envvar='NO_COLOR', is_flag=True, cls=CeleryOption, help_group="Global Options") @click.option('-q', '--quiet', is_flag=True, cls=CeleryOption, help_group="Global Options") @click.option('--version', cls=CeleryOption, is_flag=True, help_group="Global Options") @click.option('--skip-checks', envvar='SKIP_CHECKS', cls=CeleryOption, is_flag=True, help_group="Global Options", help="Skip Django core checks on startup.") @click.pass_context def celery(ctx, app, broker, result_backend, loader, config, workdir, no_color, quiet, version, skip_checks): """Celery command entrypoint.""" if version: click.echo(VERSION_BANNER) ctx.exit() elif ctx.invoked_subcommand is None: click.echo(ctx.get_help()) ctx.exit() if loader: # Default app takes loader from this env (Issue #1066). os.environ['CELERY_LOADER'] = loader if broker: os.environ['CELERY_BROKER_URL'] = broker if result_backend: os.environ['CELERY_RESULT_BACKEND'] = result_backend if config: os.environ['CELERY_CONFIG_MODULE'] = config if skip_checks: os.environ['CELERY_SKIP_CHECKS'] = skip_checks ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir, quiet=quiet) # User options worker.params.extend(ctx.obj.app.user_options.get('worker', [])) beat.params.extend(ctx.obj.app.user_options.get('beat', [])) events.params.extend(ctx.obj.app.user_options.get('events', [])) for command in celery.commands.values(): command.params.extend(ctx.obj.app.user_options.get('preload', [])) @celery.command(cls=CeleryCommand) @click.pass_context def report(ctx): """Shows information useful to include in bug-reports.""" app = ctx.obj.app app.loader.import_default_modules() ctx.obj.echo(app.bugreport()) celery.add_command(purge) celery.add_command(call) celery.add_command(beat) celery.add_command(list_) celery.add_command(result) celery.add_command(migrate) celery.add_command(status) celery.add_command(worker) celery.add_command(events) celery.add_command(inspect) celery.add_command(control) celery.add_command(graph) celery.add_command(upgrade) celery.add_command(logtool) celery.add_command(amqp) celery.add_command(shell) celery.add_command(multi) # Monkey-patch click to display a custom error # when -A or --app are used as sub-command options instead of as options # of the global command. previous_show_implementation = click.exceptions.NoSuchOption.show WRONG_APP_OPTION_USAGE_MESSAGE = """You are using `{option_name}` as an option of the {info_name} sub-command: celery {info_name} {option_name} celeryapp <...> The support for this usage was removed in Celery 5.0. Instead you should use `{option_name}` as a global option: celery {option_name} celeryapp {info_name} <...>""" def _show(self, file=None): if self.option_name in ('-A', '--app'): self.ctx.obj.error( WRONG_APP_OPTION_USAGE_MESSAGE.format( option_name=self.option_name, info_name=self.ctx.info_name), fg='red' ) previous_show_implementation(self, file=file) click.exceptions.NoSuchOption.show = _show
[docs]def main() -> int: """Start celery umbrella command. This function is the main entrypoint for the CLI. :return: The exit code of the CLI. """ return celery(auto_envvar_prefix="CELERY")