This document describes the current stable version of Celery (3.1). For development docs, go here.

celery.worker.consumer

celery.worker.consumer

This module contains the components responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.

class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[source]
class Blueprint(steps=None, name=None, app=None, on_start=None, on_close=None, on_stopped=None)[source]
default_steps = ['celery.worker.consumer:Connection', 'celery.worker.consumer:Mingle', 'celery.worker.consumer:Events', 'celery.worker.consumer:Gossip', 'celery.worker.consumer:Heart', 'celery.worker.consumer:Control', 'celery.worker.consumer:Tasks', 'celery.worker.consumer:Evloop', 'celery.worker.consumer:Agent']
name = 'Consumer'
shutdown(parent)[source]
Strategies

alias of dict

add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[source]
apply_eta_task(task)[source]

Method called by the timer to apply a task with an ETA/countdown.

bucket_for_task(type)[source]
cancel_task_queue(queue)[source]
connect()[source]

Establish the broker connection.

Will retry establishing the connection if the BROKER_CONNECTION_RETRY setting is enabled

create_task_handler()[source]
in_shutdown = False

set when consumer is shutting down.

init_callback = None

Optional callback called the first time the worker is ready to receive tasks.

loop_args()[source]
on_close()[source]
on_decode_error(message, exc)[source]

Callback called if an error occurs while decoding a message received.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

Parameters:
  • message – The message with errors.
  • exc – The original exception instance.
on_invalid_task(body, message, exc)[source]
on_ready()[source]
on_unknown_message(body, message)[source]
on_unknown_task(body, message, exc)[source]
pool = None

The current worker pool instance.

register_with_event_loop(hub)[source]
reset_rate_limits()[source]
restart_count = -1
shutdown()[source]
start()[source]
stop()[source]
timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

update_strategies()[source]
class celery.worker.consumer.Connection(c, **kwargs)[source]
info(c, params='N/A')[source]
name = u'celery.worker.consumer.Connection'
shutdown(c)[source]
start(c)[source]
class celery.worker.consumer.Events(c, send_events=None, **kwargs)[source]
name = u'celery.worker.consumer.Events'
requires = (step:celery.worker.consumer.Connection{()},)
shutdown(c)[source]
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Heart(c, without_heartbeat=False, heartbeat_interval=None, **kwargs)[source]
name = u'celery.worker.consumer.Heart'
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
shutdown(c)
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Control(c, **kwargs)[source]
include_if(c)[source]
name = u'celery.worker.consumer.Control'
requires = (step:celery.worker.consumer.Tasks{(step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)},)
class celery.worker.consumer.Tasks(c, **kwargs)[source]
info(c)[source]
name = u'celery.worker.consumer.Tasks'
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
shutdown(c)[source]
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Evloop(parent, **kwargs)[source]
label = 'event loop'
last = True
name = u'celery.worker.consumer.Evloop'
patch_all(c)[source]
start(c)[source]
class celery.worker.consumer.Agent(c, **kwargs)[source]
conditional = True
create(c)[source]
name = u'celery.worker.consumer.Agent'
requires = (step:celery.worker.consumer.Connection{()},)
class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)[source]
compatible_transport(app)[source]
compatible_transports = set(['redis', 'amqp'])
label = 'Mingle'
name = u'celery.worker.consumer.Mingle'
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
start(c)[source]
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, **kwargs)[source]
call_task(task)[source]
compatible_transport(app)[source]
compatible_transports = set(['redis', 'amqp'])
election(id, topic, action=None)[source]
get_consumers(channel)[source]
label = 'Gossip'
name = u'celery.worker.consumer.Gossip'
on_elect(event)[source]
on_elect_ack(event)[source]
on_message(prepare, message)[source]
on_node_join(worker)[source]
on_node_leave(worker)[source]
on_node_lost(worker)[source]
periodic()[source]
register_timer()[source]
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
start(c)[source]
celery.worker.consumer.dump_body(m, body)[source]