This document describes Celery 2.4. For development docs, go here.

celery.worker.buckets

celery.worker.buckets

This module implements the rate limiting of tasks, by having a token bucket queue for each task type. When a task is allowed to be processed it’s moved over the the ready_queue

The celery.worker.mediator is then responsible for moving tasks from the ready_queue to the worker pool.

copyright:
  1. 2009 - 2011 by Ask Solem.
license:

BSD, see LICENSE for more details.

class celery.worker.buckets.FastQueue(maxsize=0)

Queue.Queue supporting the interface of TokenBucketQueue.

clear()
expected_time(tokens=1)
items
wait(block=True)
exception celery.worker.buckets.RateLimitExceeded

The token buckets rate limit has been exceeded.

class celery.worker.buckets.TaskBucket(task_registry)

This is a collection of token buckets, each task type having its own token bucket. If the task type doesn’t have a rate limit, it will have a plain Queue object instead of a TokenBucketQueue.

The put() operation forwards the task to its appropriate bucket, while the get() operation iterates over the buckets and retrieves the first available item.

Say we have three types of tasks in the registry: celery.ping, feed.refresh and video.compress, the TaskBucket will consist of the following items:

{"celery.ping": TokenBucketQueue(fill_rate=300),
 "feed.refresh": Queue(),
 "video.compress": TokenBucketQueue(fill_rate=2)}

The get operation will iterate over these until one of the buckets is able to return an item. The underlying datastructure is a dict, so the order is ignored here.

Parameters:task_registry – The task registry used to get the task type class for a given task name.
add_bucket_for_type(task_name)

Add a bucket for a task type.

Will read the tasks rate limit and create a TokenBucketQueue if it has one. If the task doesn’t have a rate limit FastQueue will be used instead.

clear()

Delete the data in all of the buckets.

empty()

Returns True if all of the buckets are empty.

get(block=True, timeout=None)

Retrive the task from the first available bucket.

Available as in, there is an item in the queue and you can consume tokens from it.

get_bucket_for_type(task_name)

Get the bucket for a particular task type.

get_nowait()
init_with_registry()

Initialize with buckets for all the task types in the registry.

items

Flattens the data in all of the buckets into a single list.

put(request)

Put a TaskRequest into the appropiate bucket.

put_nowait(request)

Put a TaskRequest into the appropiate bucket.

qsize()

Get the total size of all the queues.

refresh()

Refresh rate limits for all task types in the registry.

update_bucket_for_type(task_name)
class celery.worker.buckets.TokenBucketQueue(fill_rate, queue=None, capacity=1)

Queue with rate limited get operations.

This uses the token bucket algorithm to rate limit the queue on get operations.

Parameters:
  • fill_rate – The rate in tokens/second that the bucket will be refilled.
  • capacity – Maximum number of tokens in the bucket. Default is 1.
exception RateLimitExceeded

The token buckets rate limit has been exceeded.

TokenBucketQueue.clear()

Delete all data in the queue.

TokenBucketQueue.empty()

Returns True if the queue is empty.

TokenBucketQueue.expected_time(tokens=1)

Returns the expected time in seconds of when a new token should be available.

TokenBucketQueue.get(block=True)

Remove and return an item from the queue.

Raises:
  • RateLimitExceeded – If a token could not be consumed from the token bucket (consuming from the queue too fast).
  • Queue.Empty – If an item is not immediately available.
TokenBucketQueue.get_nowait()

Remove and return an item from the queue without blocking.

Raises:
  • RateLimitExceeded – If a token could not be consumed from the token bucket (consuming from the queue too fast).
  • Queue.Empty – If an item is not immediately available.
TokenBucketQueue.items

Underlying data. Do not modify.

TokenBucketQueue.put(item, block=True)

Put an item onto the queue.

TokenBucketQueue.put_nowait(item)

Put an item into the queue without blocking.

Raises Queue.Full:
 If a free slot is not immediately available.
TokenBucketQueue.qsize()

Returns the size of the queue.

TokenBucketQueue.wait(block=False)

Wait until a token can be retrieved from the bucket and return the next item.

celery.worker.buckets.chain_from_iterable()

chain.from_iterable(iterable) –> chain object

Alternate chain() contructor taking a single iterable argument that evaluates lazily.