This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.

Amazon SQS Transport - kombu.transport.SQS

Amazon SQS transport module for Kombu.

This package implements an AMQP-like interface on top of Amazons SQS service, with the goal of being optimized for high performance and reliability.

The default settings for this module are focused now on high performance in task queue situations where tasks are small, idempotent and run very fast.

SQS Features supported by this transport

Long Polling

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html

Long polling is enabled by setting the wait_time_seconds transport option to a value between 1 and 20 (valid range: 0–20, where 0 disables long polling). Amazon supports up to 20 seconds. The default is 10 seconds.

When wait_time_seconds is greater than 0, each ReceiveMessage API call to AWS will block on the server side for up to that many seconds, returning early only when a message arrives. This reduces empty responses and lowers SQS request costs compared to short polling (wait_time_seconds=0).

Polling Interval

The polling_interval transport option (default: 1 second) controls the client-side delay added between consecutive polls when a poll returns no messages. After a successful (non-empty) poll the next poll is scheduled immediately; after an empty poll the transport waits polling_interval seconds before trying again.

Important — additive timing when both options are set:

wait_time_seconds and polling_interval are applied at different layers and their delays compose additively. For an empty poll the total time before the next ReceiveMessage call is issued is:

effective interval = wait_time_seconds + polling_interval

For example, with the defaults (wait_time_seconds=10, polling_interval=1) an empty queue is polled at most once every ~11 seconds: the AWS call blocks for up to 10 s, then the client waits an additional 1 s before rescheduling.

To make polling_interval the sole timing control (e.g. for short polling), set wait_time_seconds=0:

app.conf.broker_transport_options = {
    'wait_time_seconds': 0,   # disable server-side long polling
    'polling_interval': 5,    # client waits 5 s after each empty poll
}

To rely entirely on long polling and remove the extra client-side delay, set polling_interval=0:

app.conf.broker_transport_options = {
    'wait_time_seconds': 10,  # server holds connection up to 10 s
    'polling_interval': 0,    # no additional client-side wait
}

Batch API Actions

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api.html

The default behavior of the SQS Channel.drain_events() method is to request up to the ‘prefetch_count’ messages on every request to SQS. These messages are stored locally in a deque object and passed back to the Transport until the deque is empty, before triggering a new API call to Amazon.

This behavior dramatically speeds up the rate that you can pull tasks from SQS when you have short-running tasks (or a large number of workers).

When a Celery worker has multiple queues to monitor, it will pull down up to ‘prefetch_count’ messages from queueA and work on them all before moving on to queueB. If queueB is empty, it will wait up until ‘polling_interval’ expires (plus any server-side long-polling delay from ‘wait_time_seconds’) before moving back and checking on queueA.

Message Attributes

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html

SQS supports sending message attributes along with the message body. To use this feature, you can pass a ‘message_attributes’ as keyword argument to basic_publish method.

Other Features supported by this transport

Predefined Queues

The default behavior of this transport is to use a single AWS credential pair in order to manage all SQS queues (e.g. listing queues, creating queues, polling queues, deleting messages).

If it is preferable for your environment to use multiple AWS credentials, you can use the ‘predefined_queues’ setting inside the ‘transport_options’ map. This setting allows you to specify the SQS queue URL and AWS credentials for each of your queues. For example, if you have two queues which both already exist in AWS) you can tell this transport about them as follows:

transport_options = {
  'predefined_queues': {
    'queue-1': {
      'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
      'access_key_id': 'a',
      'secret_access_key': 'b',
      'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
      'backoff_tasks': ['svc.tasks.tasks.task1'] # optional
    },
    'queue-2.fifo': {
      'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb.fifo',
      'access_key_id': 'c',
      'secret_access_key': 'd',
      'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
      'backoff_tasks': ['svc.tasks.tasks.task2'] # optional
    },
  }
'sts_role_arn': 'arn:aws:iam::<xxx>:role/STSTest', # optional
'sts_token_timeout': 900, # optional
'sts_token_buffer_time': 0, # optional, added in 5.6.0
}

Note that FIFO and standard queues must be named accordingly (the name of a FIFO queue must end with the .fifo suffix).

backoff_policy & backoff_tasks are optional arguments. These arguments automatically change the message visibility timeout, in order to have different times between specific task retries. This would apply after task failure.

AWS STS authentication is supported, by using sts_role_arn, and sts_token_timeout. sts_role_arn is the assumed IAM role ARN we are trying to access with. sts_token_timeout is the token timeout, defaults (and minimum) to 900 seconds. After the mentioned period, a new token will be created.

Predefined Exchanges

When using a fanout exchange with this transport, messages are sent to an AWS SNS, which then forwards the messages to all subscribed queues.

The default behavior of this transport is to create the SNS topic when the exchange is first declared. However, it is also possible to use a predefined SNS topic instead of letting the transport create it.

transport_options = {
  'predefined_exchanges': {
    'exchange-1': {
      'arn': 'arn:aws:sns:us-east-1:xxx:exchange-1',
      'access_key_id': 'a',
      'secret_access_key': 'b',
    },
    'exchange-2.fifo': {
      'arn': 'arn:aws:sns:us-east-1:xxx:exchange-2',
      'access_key_id': 'c',
      'secret_access_key': 'd',
    },
  }
}

Added in version 5.6.0: sts_token_buffer_time (seconds) is the time by which you want to refresh your token earlier than its actual expiration time, defaults to 0 (no time buffer will be added), should be less than sts_token_timeout.

If you authenticate using Okta (e.g. calling gimme-aws-creds), you can also specify a ‘session_token’ to connect to a queue. Note that those tokens have a limited lifetime and are therefore only suited for short-lived tests.

Client config

In some cases you may need to override the botocore config. You can do it as follows:

transport_option = {
  'client-config': {
      'connect_timeout': 5,
   },
}

For a complete list of settings you can adjust using this option see https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

Features

  • Type: Virtual

  • Supports Direct: Yes

  • Supports Topic: Yes

  • Supports Fanout: Yes

  • Supports Priority: No

  • Supports TTL: No

Transport

class kombu.transport.SQS.Transport(client, **kwargs)[source]

SQS Transport.

Additional queue attributes can be supplied to SQS during queue creation by passing an sqs-creation-attributes key in transport_options. sqs-creation-attributes must be a dict whose key-value pairs correspond with Attributes in the CreateQueue SQS API.

For example, to have SQS queues created with server-side encryption enabled using the default Amazon Managed Customer Master Key, you can set KmsMasterKeyId Attribute. When the queue is initially created by Kombu, encryption will be enabled.

from kombu.transport.SQS import Transport

transport = Transport(
    ...,
    transport_options={
        'sqs-creation-attributes': {
            'KmsMasterKeyId': 'alias/aws/sqs',
        },
    }
)

Added in version 5.6.

Queue tags can be applied to SQS queues during creation by passing an queue_tags key in transport_options. queue_tags must be a dict of tag key-value pairs.

from kombu.transport.SQS import Transport

transport = Transport(
    ...,
    transport_options={
        'queue_tags': {
            'Environment': 'production',
            'Team': 'backend',
        },
    }
)

The ApproximateReceiveCount message attribute is fetched by this transport by default. Requested message attributes can be changed by setting fetch_message_attributes in the transport options.

from kombu.transport.SQS import Transport

transport = Transport(
    ...,
    transport_options={
        'fetch_message_attributes': ["All"],  # Get all of the MessageSystemAttributeNames (formerly AttributeNames)
    }
)
# Preferred - A dict specifying system and custom message attributes
transport = Transport(
    ...,
    transport_options={
        'fetch_message_attributes': {
            'MessageSystemAttributeNames': ["SenderId", "SentTimestamp"],
            'MessageAttributeNames': ['S3MessageBodyKey']
        },
    }
)
class Channel(*args, **kwargs)

SQS Channel.

B64_REGEX = re.compile(b'^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$')
class QoS(channel, prefetch_count=0)

Quality of Service guarantees implementation for SQS.

apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks)
extract_task_name_and_number_of_retries(delivery_tag)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

asynsqs(queue=None)
basic_ack(delivery_tag, multiple=False)

Acknowledge message.

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_consume(queue, no_ack, callback, consumer_tag, *args, **kwargs)

Consume from queue.

canonical_queue_name(queue_name)
close()

Close channel.

Cancel all consumers, and requeue unacked messages.

property conninfo
default_region = 'us-east-1'
default_visibility_timeout = 1800
default_wait_time_seconds = 10
domain_format = 'kombu%(vhost)s'
drain_events(timeout=None, callback=None, **kwargs)

Return a single payload message from one of our queues.

Raises:

Queue.Empty – if no messages available.:

property endpoint_url
entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})

Format AMQP queue name into a legal SQS queue name.

property fanout: SNS

Provides SNS fanout functionality.

This method returns the fanout instance. If an instance of the fanout class has not been initialised, then initialise it.

Returns:

An instance of SNS fanout class.

property fetch_message_attributes
static generate_sts_session_token(role_arn: str, token_expiry_seconds: int)
generate_sts_session_token_with_buffer(role_arn, token_expiry_seconds, token_buffer_seconds=0)

Generate STS session credentials with an optional expiration buffer.

The buffer is only applied if it is less than token_expiry_seconds to prevent an expired token.

property get_message_attributes: dict[str, Any]

Get the message attributes to be fetched from SQS.

Ensures ‘ApproximateReceiveCount’ is included in system attributes if list is provided. - The number of retries is managed by SQS /

(specifically by the ApproximateReceiveCount message attribute)

  • See: class QoS(virtual.QoS):

    (method) def extract_task_name_and_number_of_retries

Returns:

A dictionary with SQS message attribute fetch config.

get_sts_credentials()
property is_secure
static is_sts_token_refresh_required(name: Any, client_map: dict[str, BaseClient], expire_time: datetime | None = None) bool

Checks if the STS token needs renewing.

This method will check different STS expiry times depending on the service the token was used for.

Parameters:
  • name – Either the queue name or exchange name

  • client_map – Map of client names to boto3 clients. Either the queue or exchange map

  • expire_time – The datetime when the token expires.

Returns:

True if the token needs renewing, False otherwise.

new_sqs_client(region, access_key_id, secret_access_key, session_token=None)

Create a new SQS client.

Parameters:
  • region – The AWS region to use.

  • access_key_id – The AWS access key ID for authenticating with boto.

  • secret_access_key – The AWS secret access key for authenticating with boto.

  • session_token – The AWS session token for authenticating with boto, if required.

Returns:

A Boto SQS client.

property port
property predefined_exchanges

Map of exchange_name to predefined SNS client.

property predefined_queues

Map of queue_name to predefined queue settings.

property queue_name_prefix
property region
property regioninfo
remove_stale_sns_subscriptions(exchange_name: str) None

Removes any stale SNS topic subscriptions.

This method will check that any SQS subscriptions on the SNS topic are associated with SQS queues. If not, it will remove the stale subscription. This method will only work if the ‘supports_fanout’ property is True.

Parameters:

exchange_name – The exchange to check for stale subscriptions

Returns:

None

sqs(queue=None)
property sqs_base64_encoding
property supports_fanout

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

property transport_options
property visibility_timeout
property wait_time_seconds: int
channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'botocore.exceptions.BotoCoreError'>)

Tuple of errors that can happen due to channel/method failure.

connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'botocore.exceptions.BotoCoreError'>, <class 'OSError'>)

Tuple of errors that can happen due to connection failure.

property default_connection_params
default_port = None

port number used when no port is specified.

driver_name = 'sqs'

Name of driver library (e.g. ‘py-amqp’, ‘redis’).

driver_type = 'sqs'

Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…

implements = {'asynchronous': True, 'exchange_type': frozenset({'direct'}), 'heartbeats': False}
polling_interval = 1

Time to sleep between unsuccessful polls.

wait_time_seconds = 0

Channel

class kombu.transport.SQS.Channel(*args, **kwargs)[source]

SQS Channel.

B64_REGEX = re.compile(b'^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$')
class QoS(channel, prefetch_count=0)

Quality of Service guarantees implementation for SQS.

apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks)
extract_task_name_and_number_of_retries(delivery_tag)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

asynsqs(queue=None)[source]
basic_ack(delivery_tag, multiple=False)[source]

Acknowledge message.

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_consume(queue, no_ack, callback, consumer_tag, *args, **kwargs)[source]

Consume from queue.

canonical_queue_name(queue_name)[source]
close()[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

property conninfo
default_region = 'us-east-1'
default_visibility_timeout = 1800
default_wait_time_seconds = 10
domain_format = 'kombu%(vhost)s'
drain_events(timeout=None, callback=None, **kwargs)[source]

Return a single payload message from one of our queues.

Raises:

Queue.Empty – if no messages available.:

property endpoint_url
entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})[source]

Format AMQP queue name into a legal SQS queue name.

property fanout: SNS

Provides SNS fanout functionality.

This method returns the fanout instance. If an instance of the fanout class has not been initialised, then initialise it.

Returns:

An instance of SNS fanout class.

property fetch_message_attributes
static generate_sts_session_token(role_arn: str, token_expiry_seconds: int)[source]
generate_sts_session_token_with_buffer(role_arn, token_expiry_seconds, token_buffer_seconds=0)[source]

Generate STS session credentials with an optional expiration buffer.

The buffer is only applied if it is less than token_expiry_seconds to prevent an expired token.

property get_message_attributes: dict[str, Any]

Get the message attributes to be fetched from SQS.

Ensures ‘ApproximateReceiveCount’ is included in system attributes if list is provided. - The number of retries is managed by SQS /

(specifically by the ApproximateReceiveCount message attribute)

  • See: class QoS(virtual.QoS):

    (method) def extract_task_name_and_number_of_retries

Returns:

A dictionary with SQS message attribute fetch config.

get_sts_credentials()[source]
property is_secure
static is_sts_token_refresh_required(name: Any, client_map: dict[str, BaseClient], expire_time: datetime | None = None) bool[source]

Checks if the STS token needs renewing.

This method will check different STS expiry times depending on the service the token was used for.

Parameters:
  • name – Either the queue name or exchange name

  • client_map – Map of client names to boto3 clients. Either the queue or exchange map

  • expire_time – The datetime when the token expires.

Returns:

True if the token needs renewing, False otherwise.

new_sqs_client(region, access_key_id, secret_access_key, session_token=None)[source]

Create a new SQS client.

Parameters:
  • region – The AWS region to use.

  • access_key_id – The AWS access key ID for authenticating with boto.

  • secret_access_key – The AWS secret access key for authenticating with boto.

  • session_token – The AWS session token for authenticating with boto, if required.

Returns:

A Boto SQS client.

property port
property predefined_exchanges

Map of exchange_name to predefined SNS client.

property predefined_queues

Map of queue_name to predefined queue settings.

property queue_name_prefix
property region
property regioninfo
remove_stale_sns_subscriptions(exchange_name: str) None[source]

Removes any stale SNS topic subscriptions.

This method will check that any SQS subscriptions on the SNS topic are associated with SQS queues. If not, it will remove the stale subscription. This method will only work if the ‘supports_fanout’ property is True.

Parameters:

exchange_name – The exchange to check for stale subscriptions

Returns:

None

sqs(queue=None)[source]
property sqs_base64_encoding
property supports_fanout

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

property transport_options
property visibility_timeout
property wait_time_seconds: int

Back-off policy

Back-off policy is using SQS visibility timeout mechanism altering the time difference between task retries. The mechanism changes message specific visibility timeout from queue Default visibility timeout to policy configured timeout. The number of retries is managed by SQS (specifically by the ApproximateReceiveCount message attribute) and no further action is required by the user.

Configuring the queues and backoff policy:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
            'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
            'backoff_tasks': ['svc.tasks.tasks.task1']
        }
    }
}

backoff_policy dictionary where key is number of retries, and value is delay seconds between retries (i.e SQS visibility timeout) backoff_tasks list of task names to apply the above policy

The above policy:

Attempt

Delay

2nd attempt

20 seconds

3rd attempt

40 seconds

4th attempt

80 seconds

5th attempt

320 seconds

6th attempt

640 seconds

Message Attributes

SQS supports sending message attributes along with the message body. To use this feature, you can pass a ‘message_attributes’ as keyword argument to basic_publish method.

Fair Queue Support (only available from version 5.7.0+)

Kombu supports Amazon SQS Fair Queues, which provide improved message processing fairness by ensuring that messages from different message groups are processed in a balanced manner.

Fair Queues are designed to prevent a single message group (or tenant) from monopolizing consumer resources, which can happen with standard queues that handle multi-tenant workloads with unbalanced message distribution.

When publishing messages to a Fair Queue, you should provide a MessageGroupId. This can be done by passing it as a keyword argument to the publish method. While the Kombu implementation only sends MessageGroupId if it is present, AWS requires it for FIFO and Fair Queues. If omitted, (a) FIFO: Kombu will assign a default group id, (b) standard fair queues: group id is needed to get fairness but omission shouldn’t imply AWS rejection. Example:

producer.publish(

message, routing_key=’my-fair-queue’, MessageGroupId=’customer-123’ # Required for FIFO queues, if not provided, Kombu will assign a default group id; needed for Fair queue functionality on standard queues, else fairness will not be guaranteed.

)

Benefits of using Fair Queues with Kombu: - Improved message processing fairness across message groups - Better workload distribution among consumers - Eliminates noisy neighbor problem

For more information, refer to the AWS documentation on Fair Queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html

Amazon SQS Transport - kombu.transport.SQS.exceptions

AWS SQS and SNS exceptions.

exception kombu.transport.SQS.exceptions.AccessDeniedQueueException[source]

Bases: KombuError

Raised when access to the AWS queue is denied.

This may occur if the permissions are not correctly set or the credentials are invalid.

exception kombu.transport.SQS.exceptions.DoesNotExistQueueException[source]

Bases: KombuError

The specified queue doesn’t exist.

exception kombu.transport.SQS.exceptions.InvalidQueueException[source]

Bases: KombuError

Predefined queues are being used and configuration is not valid.

exception kombu.transport.SQS.exceptions.UnableToSubscribeQueueToTopicException[source]

Bases: KombuError

Raised when unable to subscribe a queue to an SNS topic.

exception kombu.transport.SQS.exceptions.UnableToUnsubscribeQueueFromTopicException[source]

Bases: KombuError

Raised when unable to unsubscribe a queue from an SNS topic.

exception kombu.transport.SQS.exceptions.UndefinedExchangeException[source]

Bases: KombuError

Predefined exchanges are being used and an undefined exchange/SNS topic was used.

exception kombu.transport.SQS.exceptions.UndefinedQueueException[source]

Bases: KombuError

Predefined queues are being used and an undefined queue was used.

Amazon SQS Transport - kombu.transport.SQS.SNS

Amazon SNS fanout support for the AWS SQS transport module for Kombu.

This module provides a SNS class that can be used to manage SNS topics and subscriptions. It’s primarily used to provide fanout support via AWS Simple Notification Service (SNS) topics and subscriptions. The module also provides methods for handling the lifecycle of these topics.

class kombu.transport.SQS.SNS.SNS(channel: Channel)[source]

Bases: object

A class to manage AWS Simple Notification Service (SNS) for fanout exchanges.

This class maintains caches of SNS subscriptions, clients, topic ARNs etc to enable efficient management of SNS topics and subscriptions.

get_client(exchange_name: str | None = None)[source]

Get or create a Boto SNS client.

If an SNS client has already been initialised for this Channel instance, return it. If not, create a new SNS client, add it to this Channel instance and return it.

If the exchange is defined in the predefined_exchanges, then return the client for the exchange and handle any STS token renewal.

Parameters:

exchange_name – The name of the exchange

Returns:

A Boto SNS client.

initialise_exchange(exchange_name: str) None[source]

Initialise SNS topic for a fanout exchange.

This method will create the SNS topic if it doesn’t exist, and check for any SNS topic subscriptions that no longer exist. If there are any SNS topic subscriptions that no longer exist, then they will be removed.

Parameters:

exchange_name – The name of the exchange.

Returns:

None

publish(exchange_name: str, message: str, message_attributes: dict | None = None, request_params: dict | None = None) None[source]

Send a notification to AWS Simple Notification Service (SNS).

Parameters:
  • exchange_name – The name of the exchange.

  • message – The message to be sent as a JSON string

  • message_attributes – Attributes for the message.

  • request_params – Additional parameters for SNS notification.

Returns:

None

static serialise_message_attributes(message_attributes: dict | None) dict[source]

Serialises SQS message attributes into SNS format.

Parameters:

message_attributes – A dictionary of message attributes

Returns:

A dictionary of serialised message attributes in SNS format.