This document describes an older version of Celery (2.5). For the latest stable version please go here.

celery.worker.consumer

celery.worker.consumer

This module contains the component responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.

copyright:
  1. 2009 - 2012 by Ask Solem.
license:

BSD, see LICENSE for more details.

  • start() is an infinite loop, which only iterates again if the connection is lost. For each iteration (at start, or if the connection is lost) it calls reset_connection(), and starts the consumer by calling consume_messages().

  • reset_connection(), clears the internal queues, establishes a new connection to the broker, sets up the task consumer (+ QoS), and the broadcast remote control command consumer.

    Also if events are enabled it configures the event dispatcher and starts up the heartbeat thread.

  • Finally it can consume messages. consume_messages() is simply an infinite loop waiting for events on the AMQP channels.

    Both the task consumer and the broadcast consumer uses the same callback: receive_message().

  • So for each message received the receive_message() method is called, this checks the payload of the message for either a task key or a control key.

    If the message is a task, it verifies the validity of the message converts it to a celery.worker.job.Request, and sends it to on_task().

    If the message is a control command the message is passed to on_control(), which in turn dispatches the control command using the control dispatcher.

    It also tries to handle malformed or invalid messages properly, so the worker doesn’t choke on them and die. Any invalid messages are acknowledged immediately and logged, so the message is not resent again, and again.

  • If the task has an ETA/countdown, the task is moved to the eta_schedule so the timer2.Timer can schedule it at its deadline. Tasks without an eta are moved immediately to the ready_queue, so they can be picked up by the Mediator to be sent to the pool.

  • When a task with an ETA is received the QoS prefetch count is also incremented, so another message can be reserved. When the ETA is met the prefetch count is decremented again, though this cannot happen immediately because amqplib doesn’t support doing broker requests across threads. Instead the current prefetch count is kept as a shared counter, so as soon as consume_messages() detects that the value has changed it will send out the actual QoS event to the broker.

  • Notice that when the connection is lost all internal queues are cleared because we can no longer ack the messages reserved in memory. However, this is not dangerous as the broker will resend them to another worker when the channel is closed.

  • WARNING: stop() does not close the connection! This is because some pre-acked messages may be in processing, and they need to be finished before the channel is closed. For celeryd this means the pool must finish the tasks it has acked early, then close the connection.

class celery.worker.consumer.Component(parent, **kwargs)
create(w)
last = True
name = 'consumer'
namespace = 'worker'
class celery.worker.consumer.Consumer(ready_queue, eta_schedule, logger, init_callback=<function noop at 0x1cd7668>, send_events=False, hostname=None, initial_prefetch_count=2, pool=None, app=None, priority_timer=None, controller=None)

Listen for messages received from the broker and move them to the ready queue for task processing.

Parameters:
apply_eta_task(task)

Method called by the timer to apply a task with an ETA/countdown.

broadcast_consumer = None

The consumer used to consume broadcast commands.

close_connection()

Closes the current broker connection and all open channels.

connection = None

The broker connection.

consume_messages()

Consume messages forever (or until an exception is raised).

eta_schedule = None

Timer for tasks with an ETA/countdown.

event_dispatcher = None

A celery.events.EventDispatcher for sending events.

heart = None

The thread that sends event heartbeats at regular intervals. The heartbeats are used by monitors to detect that a worker went offline/disappeared.

hostname = None

The current hostname. Defaults to the system hostname.

info

Returns information about this consumer instance as a dict.

This is also the consumer related info returned by celeryctl stats.

init_callback = None

Optional callback to be called when the connection is established. Will only be called once, even if the connection is lost and re-established.

initial_prefetch_count = 0

Initial QoS prefetch count for the task channel.

logger = None

The logger instance to use. Defaults to the default Celery logger.

maybe_conn_error(fun)

Applies function but ignores any connection or channel errors raised.

on_control(body, message)

Process remote control command message.

on_decode_error(message, exc)

Callback called if an error occurs while decoding a message received.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

Parameters:
  • message – The message with errors.
  • exc – The original exception instance.
on_task(task)

Handle received task.

If the task has an eta we enter it into the ETA schedule, otherwise we move it the ready queue for immediate processing.

pidbox_node = None

The process mailbox (kombu pidbox node).

pool = None

The current worker pool instance.

priority_timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

ready_queue = None

The queue that holds tasks ready for immediate processing.

receive_message(body, message)

Handles incoming messages.

Parameters:
  • body – The message body.
  • message – The kombu message object.
reset_connection()

Re-establish the broker connection and set up consumers, heartbeat and the event dispatcher.

reset_pidbox_node()

Sets up the process mailbox.

restart_heartbeat()

Restart the heartbeat thread.

This thread sends heartbeat events at intervals so monitors can tell if the worker is off-line/missing.

send_events = False

Enable/disable events.

start()

Start the consumer.

Automatically survives intermittent connection failure, and will retry establishing the connection and restart consuming messages.

stop()

Stop consuming.

Does not close the broker connection, so be sure to call close_connection() when you are finished with it.

stop_consumers(close_connection=True)

Stop consuming tasks and broadcast commands, also stops the heartbeat thread and event dispatcher.

Parameters:close_connection – Set to False to skip closing the broker connection.
stop_pidbox_node()
task_consumer = None

The consumer used to consume task messages.

update_strategies()
celery.worker.consumer.INVALID_TASK_ERROR = 'Received invalid task message: %s\nThe message has been ignored and discarded.\n\nPlease ensure your message conforms to the task\nmessage protocol as described here: http://bit.ly/hYj41y\n\nThe full contents of the message body was:\n%s\n'

Error message for when an invalid task message is received.

celery.worker.consumer.PREFETCH_COUNT_MAX = 65535

Prefetch count can’t exceed short.

class celery.worker.consumer.QoS(consumer, initial_value, logger)

Quality of Service for Channel.

For thread-safe increment/decrement of a channels prefetch count value.

Parameters:
  • consumer – A kombu.messaging.Consumer instance.
  • initial_value – Initial prefetch count value.
  • logger – Logger used to log debug messages.
decrement(n=1)

Decrement the current prefetch count value by n.

decrement_eventually(n=1)

Decrement the value, but do not update the qos.

The MainThread will be responsible for calling update() when necessary.

increment(n=1)

Increment the current prefetch count value by n.

prev = None
set(pcount)

Set channel prefetch_count setting.

update()

Update prefetch count with current value.

celery.worker.consumer.UNKNOWN_TASK_ERROR = 'Received unregistered task of type %s.\nThe message has been ignored and discarded.\n\nDid you remember to import the module containing this task?\nOr maybe you are using relative imports?\nPlease see http://bit.ly/gLye1c for more information.\n\nThe full contents of the message body was:\n%s\n'

Error message for when an unregistered task is received.

Previous topic

celery.worker

Next topic

celery.worker.job

This Page