This document describes the current stable version of Celery (5.6). For development docs, go here.

celery.worker.consumer.consumer

Worker Consumer Blueprint.

This module contains the components responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.

class celery.worker.consumer.consumer.Consumer(on_task_request, init_callback=<function noop>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[source]

Consumer blueprint.

class Blueprint(steps=None, name=None, on_start=None, on_close=None, on_stopped=None)[source]

Consumer blueprint.

default_steps = ['celery.worker.consumer.connection:Connection', 'celery.worker.consumer.mingle:Mingle', 'celery.worker.consumer.events:Events', 'celery.worker.consumer.gossip:Gossip', 'celery.worker.consumer.heart:Heart', 'celery.worker.consumer.control:Control', 'celery.worker.consumer.tasks:Tasks', 'celery.worker.consumer.delayed_delivery:DelayedDelivery', 'celery.worker.consumer.consumer:Evloop', 'celery.worker.consumer.agent:Agent']
name = 'Consumer'
shutdown(parent)[source]
Strategies

alias of dict

add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[source]
apply_eta_task(task)[source]

Method called by the timer to apply a task with an ETA/countdown.

broker_connection_retry_attempt = 0

Counter to track number of conn retry attempts to broker. Will be reset to 0 once successful

bucket_for_task(type)[source]
call_soon(p, *args, **kwargs)[source]
call_soon_ack(p, *args, **kwargs)[source]

Execute ack/reject callback immediately, bypassing _pending_operations.

In synloop (gevent/eventlet) workers, the standard call_soon defers callbacks to _pending_operations, which are only drained at the top of the next synloop iteration — after drain_events() returns. With acks_late and prefetch_multiplier=1 this means the broker cannot deliver the next message until an unrelated AMQP frame arrives, adding 50-400 ms of latency between every pair of tasks.

This method is intentionally scoped to ack/reject callbacks, which only write an AMQP basic.ack/basic.reject frame to the broker socket. Other call_soon users (e.g. remote-control commands from gPidbox) continue to use deferred execution to preserve greenlet-safety.

cancel_active_requests()[source]

Cancel active requests during shutdown.

Cancels all active requests that either do not require late acknowledgments or, if they do, have not been acknowledged yet.

Does not cancel successful tasks, even if they have not been acknowledged yet.

cancel_task_queue(queue)[source]
connect()[source]

Establish the broker connection used for consuming tasks.

Retries establishing the connection if the broker_connection_retry setting is enabled

connection_for_read(heartbeat=None)[source]
connection_for_write(url=None, heartbeat=None)[source]
create_task_handler(promise=<class 'vine.promises.promise'>)[source]
ensure_connected(conn)[source]
first_connection_attempt = True

This flag will be turned off after the first failed connection attempt.

init_callback = None

Optional callback called the first time the worker is ready to receive tasks.

loop_args()[source]
property max_prefetch_count
on_close()[source]
on_connection_error_after_connected(exc)[source]
on_connection_error_before_connected(exc)[source]
on_decode_error(message, exc)[source]

Callback called if an error occurs while decoding a message.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

Parameters:
  • message (kombu.Message) – The message received.

  • exc (Exception) – The exception being handled.

on_invalid_task(body, message, exc)[source]
on_ready()[source]
on_send_event_buffered()[source]
on_unknown_message(body, message)[source]
on_unknown_task(body, message, exc)[source]
perform_pending_operations()[source]
pool = None

The current worker pool instance.

register_with_event_loop(hub)[source]
reset_rate_limits()[source]
restart_count = -1
shutdown()[source]
start()[source]
stop()[source]
timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

update_strategies()[source]
class celery.worker.consumer.consumer.Evloop(parent, **kwargs)[source]

Event loop service.

Note

This is always started last.

label = 'event loop'
last = True
name = 'celery.worker.consumer.consumer.Evloop'
patch_all(c)[source]
start(c)[source]
celery.worker.consumer.consumer.dump_body(m, body)[source]

Format message body for debugging purposes.