This document describes the current stable version of Kombu (5.3). For development docs, go here.

Connection/Producer Pools - kombu.pools

Public resource pools.

class kombu.pools.PoolGroup(limit=None, close_after_fork=True)[source]

Collection of resource pools.

create(resource, limit)[source]
class kombu.pools.ProducerPool(connections, *args, **kwargs)[source]

Pool of kombu.Producer instances.

class Producer(channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None)

Message Producer.

Arguments:

channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. serializer (str): Default serializer. Default is “json”. compression (str): Default compression method.

Default is no compression.

auto_declare (bool): Automatically declare the default exchange

at instantiation. Default is True.

on_return (Callable): Callback to call for undeliverable messages,

when the mandatory or immediate arguments to publish() is used. This callback needs the following signature: (exception, exchange, routing_key, message). Note that the producer needs to drain events to use this feature.

auto_declare = True

By default, if a defualt exchange is set, that exchange will be declare when publishing a message.

property channel
close()
compression = None

Default compression method. Disabled by default.

property connection
declare()

Declare the exchange.

Note:

This happens automatically at instantiation when the auto_declare flag is enabled.

exchange = None

Default exchange

maybe_declare(entity, retry=False, **retry_policy)

Declare exchange if not already declared during this session.

on_return = None

Basic return callback.

publish(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, **properties)

Publish message to the specified exchange.

Arguments:

body (Any): Message body. routing_key (str): Message routing key. delivery_mode (enum): See delivery_mode. mandatory (bool): Currently not supported. immediate (bool): Currently not supported. priority (int): Message priority. A number between 0 and 9. content_type (str): Content type. Default is auto-detect. content_encoding (str): Content encoding. Default is auto-detect. serializer (str): Serializer to use. Default is auto-detect. compression (str): Compression method to use. Default is none. headers (Dict): Mapping of arbitrary headers to pass along

with the message body.

exchange (kombu.entity.Exchange, str): Override the exchange.

Note that this exchange must have been declared.

declare (Sequence[EntityT]): Optional list of required entities

that must have been declared before publishing the message. The entities will be declared using maybe_declare().

retry (bool): Retry publishing, or declaring entities if the

connection is lost.

retry_policy (Dict): Retry configuration, this is the keywords

supported by ensure().

expiration (float): A TTL in seconds can be specified per message.

Default is no expiration.

timeout (float): Set timeout to wait maximum timeout second

for message to publish.

**properties (Any): Additional message properties, see AMQP spec.

release()
revive(channel)

Revive the producer after connection loss.

routing_key = ''

Default routing key.

serializer = None

Default serializer to use. Default is JSON.

close_after_fork = True
close_resource(resource)[source]
create_producer()[source]
new()[source]
prepare(p)[source]
release(resource)[source]
setup()[source]
kombu.pools.get_limit()[source]

Get current connection pool limit.

kombu.pools.register_group(group)[source]

Register group (can be used as decorator).

kombu.pools.reset(*args, **kwargs)[source]

Reset all pools by closing open resources.

kombu.pools.set_limit(limit, force=False, reset_after=False, ignore_errors=False)[source]

Set new connection pool limit.