This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.
Redis Transport - kombu.transport.redis
¶
Redis transport module for Kombu.
Features¶
Type: Virtual
Supports Direct: Yes
Supports Topic: Yes
Supports Fanout: Yes
Supports Priority: Yes
Supports TTL: No
Connection String¶
Connection string has the following format:
redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
To use sentinel for dynamic Redis discovery, the connection string has following format:
sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]
Transport Options¶
sep
ack_emulation
: (bool) If set to True transport will simulate Acknowledge of AMQP protocol.unacked_key
unacked_index_key
unacked_mutex_key
unacked_mutex_expire
visibility_timeout
unacked_restore_limit
fanout_prefix
fanout_patterns
global_keyprefix
: (str) The global key prefix to be prepended to all keys used by Kombusocket_timeout
socket_connect_timeout
socket_keepalive
socket_keepalive_options
queue_order_strategy
max_connections
health_check_interval
retry_on_timeout
priority_steps
Transport¶
- class kombu.transport.redis.Transport(*args, **kwargs)[source]¶
Redis Transport.
- class Channel(*args, **kwargs)¶
Redis Channel.
- class QoS(*args, **kwargs)¶
Redis Ack Emulation.
- ack(delivery_tag)¶
Acknowledge message and remove from transactional state.
- append(message, delivery_tag)¶
Append message to transactional state.
- pipe_or_acquire(pipe=None, client=None)¶
- reject(delivery_tag, requeue=False)¶
Remove from transactional state and requeue message.
- restore_at_shutdown = True¶
If disabled, unacked messages won’t be restored at shutdown.
- restore_by_tag(tag, client=None, leftmost=False)¶
- restore_unacked(client=None)¶
Restore all unacknowledged messages.
- restore_visible(start=0, num=10, interval=10)¶
Restore any pending unacknowledged messages.
To be filled in for visibility_timeout style implementations.
Note:¶
This is implementation optional, and currently only used by the Redis transport.
- property unacked_index_key¶
- property unacked_key¶
- property unacked_mutex_expire¶
- property unacked_mutex_key¶
- property visibility_timeout¶
- ack_emulation = True¶
- property active_queues¶
Set of queues being consumed from (excluding fanout queues).
- property async_pool¶
- basic_cancel(consumer_tag)¶
Cancel consumer by consumer tag.
- basic_consume(queue, *args, **kwargs)¶
Consume from queue.
- property client¶
Client used to publish messages, BRPOP etc.
- close()¶
Close channel.
Cancel all consumers, and requeue unacked messages.
- conn_or_acquire(client=None)¶
- connection_class¶
alias of
Connection
- connection_class_ssl¶
alias of
SSLConnection
- fanout_patterns = True¶
If enabled the fanout exchange will support patterns in routing and binding keys (like a topic exchange but using PUB/SUB).
Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.
- fanout_prefix = True¶
Transport option to disable fanout keyprefix. Can also be string, in which case it changes the default prefix (‘/{db}.’) into to something else. The prefix must include a leading slash and a trailing dot.
Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.
- from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps')¶
- get_table(exchange)¶
Get table of bindings for exchange.
- global_keyprefix = ''¶
The global key prefix will be prepended to all keys used by Kombu, which can be useful when a redis database is shared by different users. By default, no prefix is prepended.
- health_check_interval = 25¶
- keyprefix_fanout = '/{db}.'¶
- keyprefix_queue = '_kombu.binding.%s'¶
- max_connections = 10¶
- property pool¶
- priority(n)¶
- priority_steps = [0, 3, 6, 9]¶
- queue_order_strategy = 'round_robin'¶
Order in which we consume from queues.
Can be either string alias, or a cycle strategy class
round_robin
(round_robin_cycle
).Make sure each queue has an equal opportunity to be consumed from.
sorted
(sorted_cycle
).Consume from queues in alphabetical order. If the first queue in the sorted list always contains messages, then the rest of the queues will never be consumed from.
priority
(priority_cycle
).Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.
The default is to consume from queues in round robin.
- retry_on_timeout = None¶
- sep = '\x06\x16'¶
- socket_connect_timeout = None¶
- socket_keepalive = None¶
- socket_keepalive_options = None¶
- socket_timeout = None¶
- property subclient¶
Pub/Sub connection used to consume fanout queues.
- supports_fanout = True¶
flag set if the channel supports fanout exchanges.
- unacked_index_key = 'unacked_index'¶
- unacked_key = 'unacked'¶
- unacked_mutex_expire = 300¶
- unacked_mutex_key = 'unacked_mutex'¶
- unacked_restore_limit = None¶
- visibility_timeout = 3600¶
- channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'redis.exceptions.DataError'>, <class 'redis.exceptions.InvalidResponse'>, <class 'redis.exceptions.ResponseError'>)¶
Tuple of errors that can happen due to channel/method failure.
- connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.BusyLoadingError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)¶
Tuple of errors that can happen due to connection failure.
- default_port = 6379¶
port number used when no port is specified.
- driver_name = 'redis'¶
Name of driver library (e.g. ‘py-amqp’, ‘redis’).
- driver_type = 'redis'¶
Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…
- implements = {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}¶
- polling_interval = None¶
Time to sleep between unsuccessful polls.
Channel¶
- class kombu.transport.redis.Channel(*args, **kwargs)[source]¶
Redis Channel.
- class QoS(*args, **kwargs)¶
Redis Ack Emulation.
- ack(delivery_tag)¶
Acknowledge message and remove from transactional state.
- append(message, delivery_tag)¶
Append message to transactional state.
- pipe_or_acquire(pipe=None, client=None)¶
- reject(delivery_tag, requeue=False)¶
Remove from transactional state and requeue message.
- restore_at_shutdown = True¶
If disabled, unacked messages won’t be restored at shutdown.
- restore_by_tag(tag, client=None, leftmost=False)¶
- restore_unacked(client=None)¶
Restore all unacknowledged messages.
- restore_visible(start=0, num=10, interval=10)¶
Restore any pending unacknowledged messages.
To be filled in for visibility_timeout style implementations.
Note:¶
This is implementation optional, and currently only used by the Redis transport.
- property unacked_index_key¶
- property unacked_key¶
- property unacked_mutex_expire¶
- property unacked_mutex_key¶
- property visibility_timeout¶
- ack_emulation = True¶
- property active_queues¶
Set of queues being consumed from (excluding fanout queues).
- property async_pool¶
- property client¶
Client used to publish messages, BRPOP etc.
- connection_class¶
alias of
Connection
- connection_class_ssl¶
alias of
SSLConnection
- fanout_patterns = True¶
If enabled the fanout exchange will support patterns in routing and binding keys (like a topic exchange but using PUB/SUB).
Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.
- fanout_prefix = True¶
Transport option to disable fanout keyprefix. Can also be string, in which case it changes the default prefix (‘/{db}.’) into to something else. The prefix must include a leading slash and a trailing dot.
Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.
- from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps')¶
- global_keyprefix = ''¶
The global key prefix will be prepended to all keys used by Kombu, which can be useful when a redis database is shared by different users. By default, no prefix is prepended.
- health_check_interval = 25¶
- keyprefix_fanout = '/{db}.'¶
- keyprefix_queue = '_kombu.binding.%s'¶
- max_connections = 10¶
- property pool¶
- priority_steps = [0, 3, 6, 9]¶
- queue_order_strategy = 'round_robin'¶
Order in which we consume from queues.
Can be either string alias, or a cycle strategy class
round_robin
(round_robin_cycle
).Make sure each queue has an equal opportunity to be consumed from.
sorted
(sorted_cycle
).Consume from queues in alphabetical order. If the first queue in the sorted list always contains messages, then the rest of the queues will never be consumed from.
priority
(priority_cycle
).Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.
The default is to consume from queues in round robin.
- retry_on_timeout = None¶
- sep = '\x06\x16'¶
- socket_connect_timeout = None¶
- socket_keepalive = None¶
- socket_keepalive_options = None¶
- socket_timeout = None¶
- property subclient¶
Pub/Sub connection used to consume fanout queues.
- supports_fanout = True¶
flag set if the channel supports fanout exchanges.
- unacked_index_key = 'unacked_index'¶
- unacked_key = 'unacked'¶
- unacked_mutex_expire = 300¶
- unacked_mutex_key = 'unacked_mutex'¶
- unacked_restore_limit = None¶
- visibility_timeout = 3600¶
SentinelChannel¶
- class kombu.transport.redis.SentinelChannel(*args, **kwargs)[source]¶
Channel with explicit Redis Sentinel knowledge.
Broker url is supposed to look like:
sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...
where each sentinel is separated by a ;.
Other arguments for the sentinel should come from the transport options (see transport_options of
Connection
).- You must provide at least one option in Transport options:
master_name - name of the redis group to poll
Example:¶
>>> import kombu >>> c = kombu.Connection( 'sentinel://sentinel1:26379;sentinel://sentinel2:26379', transport_options={'master_name': 'mymaster'} ) >>> c.connect()
- connection_class¶
alias of
SentinelManagedConnection
- connection_class_ssl¶
alias of
SentinelManagedSSLConnection
- from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps', 'master_name', 'min_other_sentinels', 'sentinel_kwargs')¶