This document describes the current stable version of Kombu (4.2). For development docs, go here.
Producers¶
Basics¶
You can create a producer using a Connection
:
>>> producer = connection.Producer()
You can also instantiate Producer
directly,
it takes a channel or a connection as an argument:
>>> with Connection('amqp://') as conn:
... with conn.channel() as channel:
... producer = Producer(channel)
Having a producer instance you can publish messages:
Mostly you will be getting a connection from a connection pool, and this connection can be stale, or you could lose the connection in the middle of sending the message. Using retries is a good way to handle these intermittent failures:
>>> producer.publish({'hello': 'world', ..., retry=True})
In addition a retry policy can be specified, which is a dictionary
of parameters supported by the retry_over_time()
function
>>> producer.publish(
... {'hello': 'world'}, ...,
... retry=True,
... retry_policy={
... 'interval_start': 0, # First retry immediately,
... 'interval_step': 2, # then increase by 2s for every retry.
... 'interval_max': 30, # but don't exceed 30s between retries.
... 'max_retries': 30, # give up after 30 tries.
... },
... )
The declare
argument lets you pass a list of entities that must be
declared before sending the message. This is especially important
when using the retry
flag, since the broker may actually restart
during a retry in which case non-durable entities are removed.
Say you are writing a task queue, and the workers may have not started yet so the queues aren’t declared. In this case you need to define both the exchange, and the declare the queue so that the message is delivered to the queue while the workers are offline:
>>> from kombu import Exchange, Queue
>>> task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
>>> producer.publish(
... {'hello': 'world'}, ...,
... retry=True,
... exchange=task_queue.exchange,
... routing_key=task_queue.routing_key,
... declare=[task_queue], # declares exchange, queue and binds.
... )
Bypassing routing by using the anon-exchange¶
You may deliver to a queue directly, bypassing the brokers routing mechanisms, by using the “anon-exchange”: set the exchange parameter to the empty string, and set the routing key to be the name of the queue:
>>> producer.publish(
... {'hello': 'world'},
... exchange='',
... routing_key=task_queue.name,
... )
Serialization¶
Json is the default serializer when a non-string object is passed to publish, but you can also specify a different serializer:
>>> producer.publish({'hello': 'world'}, serializer='pickle')
See Serialization for more information.
Reference¶
-
class
kombu.
Producer
(channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None)[source] Message Producer.
Parameters: - channel (kombu.Connection, ChannelT) – Connection or channel.
- exchange (Exchange, str) – Optional default exchange.
- routing_key (str) – Optional default routing key.
- serializer (str) – Default serializer. Default is “json”.
- compression (str) – Default compression method. Default is no compression.
- auto_declare (bool) – Automatically declare the default exchange
at instantiation. Default is
True
. - on_return (Callable) – Callback to call for undeliverable messages,
when the mandatory or immediate arguments to
publish()
is used. This callback needs the following signature: (exception, exchange, routing_key, message). Note that the producer needs to drain events to use this feature.
-
auto_declare
= True By default, if a defualt exchange is set, that exchange will be declare when publishing a message.
-
compression
= None Default compression method. Disabled by default.
-
declare
()[source] Declare the exchange.
Note
This happens automatically at instantiation when the
auto_declare
flag is enabled.
-
exchange
= None Default exchange
-
maybe_declare
(entity, retry=False, **retry_policy)[source] Declare exchange if not already declared during this session.
-
on_return
= None Basic return callback.
-
publish
(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, **properties)[source] Publish message to the specified exchange.
Parameters: - body (Any) – Message body.
- routing_key (str) – Message routing key.
- delivery_mode (enum) – See
delivery_mode
. - mandatory (bool) – Currently not supported.
- immediate (bool) – Currently not supported.
- priority (int) – Message priority. A number between 0 and 9.
- content_type (str) – Content type. Default is auto-detect.
- content_encoding (str) – Content encoding. Default is auto-detect.
- serializer (str) – Serializer to use. Default is auto-detect.
- compression (str) – Compression method to use. Default is none.
- headers (Dict) – Mapping of arbitrary headers to pass along with the message body.
- exchange (Exchange, str) – Override the exchange. Note that this exchange must have been declared.
- declare (Sequence[EntityT]) – Optional list of required entities
that must have been declared before publishing the message.
The entities will be declared using
maybe_declare()
. - retry (bool) – Retry publishing, or declaring entities if the connection is lost.
- retry_policy (Dict) – Retry configuration, this is the keywords
supported by
ensure()
. - expiration (float) – A TTL in seconds can be specified per message. Default is no expiration.
- **properties (Any) – Additional message properties, see AMQP spec.
-
revive
(channel)[source] Revive the producer after connection loss.
-
routing_key
= u'' Default routing key.
-
serializer
= None Default serializer to use. Default is JSON.