This document describes the current stable version of Celery (5.0). For development docs, go here.

celery.app.control

Worker Remote Control Client.

Client for worker remote control commands. Server implementation is in celery.worker.control.

class celery.app.control.Control(app=None)[source]

Worker remote control client.

class Mailbox(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None, producer_pool=None, queue_ttl=None, queue_expires=None, reply_queue_ttl=None, reply_queue_expires=10.0)

Process Mailbox.

Node(hostname=None, state=None, channel=None, handlers=None)
abcast(command, kwargs=None)
accept = ['json']
call(destination, command, kwargs=None, timeout=None, callback=None, channel=None)
cast(destination, command, kwargs=None)
connection = None
exchange = None
exchange_fmt = '%s.pidbox'
get_queue(hostname)
get_reply_queue()
multi_call(command, kwargs=None, timeout=1, limit=None, callback=None, channel=None)
namespace = None
node_cls

alias of Node

oid
producer_or_acquire(producer=None, channel=None)
producer_pool
reply_exchange = None
reply_exchange_fmt = 'reply.%s.pidbox'
reply_queue
serializer = None
type = 'direct'
add_consumer(queue, exchange=None, exchange_type='direct', routing_key=None, options=None, destination=None, **kwargs)[source]

Tell all (or specific) workers to start consuming from a new queue.

Only the queue name is required as if only the queue is specified then the exchange/routing key will be set to the same name ( like automatic queues do).

Note

This command does not respect the default queue/exchange options in the configuration.

Parameters
  • queue (str) – Name of queue to start consuming from.

  • exchange (str) – Optional name of exchange.

  • exchange_type (str) – Type of exchange (defaults to ‘direct’) command to, when empty broadcast to all workers.

  • routing_key (str) – Optional routing key.

  • options (Dict) – Additional options as supported by kombu.entity.Queue.from_dict().

See also

broadcast() for supported keyword arguments.

autoscale(max, min, destination=None, **kwargs)[source]

Change worker(s) autoscale setting.

See also

Supports the same arguments as broadcast().

broadcast(command, arguments=None, destination=None, connection=None, reply=False, timeout=1.0, limit=None, callback=None, channel=None, pattern=None, matcher=None, **extra_kwargs)[source]

Broadcast a control command to the celery workers.

Parameters
  • command (str) – Name of command to send.

  • arguments (Dict) – Keyword arguments for the command.

  • destination (List) – If set, a list of the hosts to send the command to, when empty broadcast to all workers.

  • connection (kombu.Connection) – Custom broker connection to use, if not set, a connection will be acquired from the pool.

  • reply (bool) – Wait for and return the reply.

  • timeout (float) – Timeout in seconds to wait for the reply.

  • limit (int) – Limit number of replies.

  • callback (Callable) – Callback called immediately for each reply received.

  • pattern (str) – Custom pattern string to match

  • matcher (Callable) – Custom matcher to run the pattern to match

cancel_consumer(queue, destination=None, **kwargs)[source]

Tell all (or specific) workers to stop consuming from queue.

See also

Supports the same arguments as broadcast().

disable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to disable events.

See also

Supports the same arguments as broadcast().

discard_all(connection=None)

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Parameters

connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.

Returns

the number of tasks discarded.

Return type

int

election(id, topic, action=None, connection=None)[source]
enable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to enable events.

See also

Supports the same arguments as broadcast().

heartbeat(destination=None, **kwargs)[source]

Tell worker(s) to send a heartbeat immediately.

See also

Supports the same arguments as broadcast()

inspect[source]
ping(destination=None, timeout=1.0, **kwargs)[source]

Ping all (or specific) workers.

Returns

List of {'hostname': reply} dictionaries.

Return type

List[Dict]

See also

broadcast() for supported keyword arguments.

pool_grow(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to grow the pool by n.

See also

Supports the same arguments as broadcast().

pool_restart(modules=None, reload=False, reloader=None, destination=None, **kwargs)[source]

Restart the execution pools of all or specific workers.

Keyword Arguments
  • modules (Sequence[str]) – List of modules to reload.

  • reload (bool) – Flag to enable module reloading. Default is False.

  • reloader (Any) – Function to reload a module.

  • destination (Sequence[str]) – List of worker names to send this command to.

See also

Supports the same arguments as broadcast()

pool_shrink(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to shrink the pool by n.

See also

Supports the same arguments as broadcast().

purge(connection=None)[source]

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Parameters

connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.

Returns

the number of tasks discarded.

Return type

int

rate_limit(task_name, rate_limit, destination=None, **kwargs)[source]

Tell workers to set a new rate limit for task by type.

Parameters
  • task_name (str) – Name of task to change rate limit for.

  • rate_limit (int, str) – The rate limit as tasks per second, or a rate limit string (‘100/m’, etc. see celery.task.base.Task.rate_limit for more information).

See also

broadcast() for supported keyword arguments.

revoke(task_id, destination=None, terminate=False, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to revoke a task by id (or list of ids).

If a task is revoked, the workers will ignore the task and not execute it after all.

Parameters
  • task_id (Union(str, list)) – Id of the task to revoke (or list of ids).

  • terminate (bool) – Also terminate the process currently working on the task (if any).

  • signal (str) – Name of signal to send to process if terminate. Default is TERM.

See also

broadcast() for supported keyword arguments.

shutdown(destination=None, **kwargs)[source]

Shutdown worker(s).

See also

Supports the same arguments as broadcast()

terminate(task_id, destination=None, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to terminate a task by id (or list of ids).

See also

This is just a shortcut to revoke() with the terminate argument enabled.

time_limit(task_name, soft=None, hard=None, destination=None, **kwargs)[source]

Tell workers to set time limits for a task by type.

Parameters
  • task_name (str) – Name of task to change time limits for.

  • soft (float) – New soft time limit (in seconds).

  • hard (float) – New hard time limit (in seconds).

  • **kwargs (Any) – arguments passed on to broadcast().

class celery.app.control.Inspect(destination=None, timeout=1.0, callback=None, connection=None, app=None, limit=None, pattern=None, matcher=None)[source]

API for app.control.inspect.

active(safe=None)[source]
active_queues()[source]
app = None
clock()[source]
conf(with_defaults=False)[source]
hello(from_node, revoked=None)[source]
memdump(samples=10)[source]
memsample()[source]
objgraph(type='Request', n=200, max_depth=10)[source]
ping(destination=None)[source]
query_task(*ids)[source]
registered(*taskinfoitems)[source]
registered_tasks(*taskinfoitems)
report()[source]
reserved(safe=None)[source]
revoked()[source]
scheduled(safe=None)[source]
stats()[source]
celery.app.control.flatten_reply(reply)[source]

Flatten node replies.

Convert from a list of replies in this format:

[{'a@example.com': reply},
 {'b@example.com': reply}]

into this format:

{'a@example.com': reply,
 'b@example.com': reply}