This document describes Celery 2.4. For development docs, go here.
celery.worker.buckets¶
celery.worker.buckets¶
This module implements the rate limiting of tasks,
by having a token bucket queue for each task type.
When a task is allowed to be processed it’s moved
over the the ready_queue
The celery.worker.mediator
is then responsible
for moving tasks from the ready_queue
to the worker pool.
copyright: |
|
---|---|
license: | BSD, see LICENSE for more details. |
-
class
celery.worker.buckets.
FastQueue
(maxsize=0)¶ Queue.Queue
supporting the interface ofTokenBucketQueue
.-
clear
()¶
-
expected_time
(tokens=1)¶
-
items
¶
-
wait
(block=True)¶
-
-
exception
celery.worker.buckets.
RateLimitExceeded
¶ The token buckets rate limit has been exceeded.
-
class
celery.worker.buckets.
TaskBucket
(task_registry)¶ This is a collection of token buckets, each task type having its own token bucket. If the task type doesn’t have a rate limit, it will have a plain
Queue
object instead of aTokenBucketQueue
.The
put()
operation forwards the task to its appropriate bucket, while theget()
operation iterates over the buckets and retrieves the first available item.Say we have three types of tasks in the registry: celery.ping, feed.refresh and video.compress, the TaskBucket will consist of the following items:
{"celery.ping": TokenBucketQueue(fill_rate=300), "feed.refresh": Queue(), "video.compress": TokenBucketQueue(fill_rate=2)}
The get operation will iterate over these until one of the buckets is able to return an item. The underlying datastructure is a dict, so the order is ignored here.
Parameters: task_registry – The task registry used to get the task type class for a given task name. -
add_bucket_for_type
(task_name)¶ Add a bucket for a task type.
Will read the tasks rate limit and create a
TokenBucketQueue
if it has one. If the task doesn’t have a rate limitFastQueue
will be used instead.
-
clear
()¶ Delete the data in all of the buckets.
-
empty
()¶ Returns
True
if all of the buckets are empty.
-
get
(block=True, timeout=None)¶ Retrive the task from the first available bucket.
Available as in, there is an item in the queue and you can consume tokens from it.
-
get_bucket_for_type
(task_name)¶ Get the bucket for a particular task type.
-
get_nowait
()¶
-
init_with_registry
()¶ Initialize with buckets for all the task types in the registry.
-
items
¶ Flattens the data in all of the buckets into a single list.
-
put
(request)¶ Put a
TaskRequest
into the appropiate bucket.
-
put_nowait
(request)¶ Put a
TaskRequest
into the appropiate bucket.
-
qsize
()¶ Get the total size of all the queues.
-
refresh
()¶ Refresh rate limits for all task types in the registry.
-
update_bucket_for_type
(task_name)¶
-
-
class
celery.worker.buckets.
TokenBucketQueue
(fill_rate, queue=None, capacity=1)¶ Queue with rate limited get operations.
This uses the token bucket algorithm to rate limit the queue on get operations.
Parameters: - fill_rate – The rate in tokens/second that the bucket will be refilled.
- capacity – Maximum number of tokens in the bucket. Default is 1.
-
exception
RateLimitExceeded
¶ The token buckets rate limit has been exceeded.
-
TokenBucketQueue.
clear
()¶ Delete all data in the queue.
-
TokenBucketQueue.
empty
()¶ Returns
True
if the queue is empty.
-
TokenBucketQueue.
expected_time
(tokens=1)¶ Returns the expected time in seconds of when a new token should be available.
-
TokenBucketQueue.
get
(block=True)¶ Remove and return an item from the queue.
Raises: - RateLimitExceeded – If a token could not be consumed from the token bucket (consuming from the queue too fast).
- Queue.Empty – If an item is not immediately available.
-
TokenBucketQueue.
get_nowait
()¶ Remove and return an item from the queue without blocking.
Raises: - RateLimitExceeded – If a token could not be consumed from the token bucket (consuming from the queue too fast).
- Queue.Empty – If an item is not immediately available.
-
TokenBucketQueue.
items
¶ Underlying data. Do not modify.
-
TokenBucketQueue.
put
(item, block=True)¶ Put an item onto the queue.
-
TokenBucketQueue.
put_nowait
(item)¶ Put an item into the queue without blocking.
Raises Queue.Full: If a free slot is not immediately available.
-
TokenBucketQueue.
qsize
()¶ Returns the size of the queue.
-
TokenBucketQueue.
wait
(block=False)¶ Wait until a token can be retrieved from the bucket and return the next item.
-
celery.worker.buckets.
chain_from_iterable
()¶ chain.from_iterable(iterable) –> chain object
Alternate chain() contructor taking a single iterable argument that evaluates lazily.