This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Source code for kombu.asynchronous.aws.connection

"""Amazon AWS Connection."""

from __future__ import annotations

from email import message_from_bytes
from email.mime.message import MIMEMessage

from vine import promise, transform

from kombu.asynchronous.aws.ext import AWSRequest, get_response
from kombu.asynchronous.http import Headers, Request, get_client


def message_from_headers(hdr):
    bs = "\r\n".join("{}: {}".format(*h) for h in hdr)
    return message_from_bytes(bs.encode())


__all__ = (
    'AsyncHTTPSConnection', 'AsyncConnection',
)


class AsyncHTTPResponse:
    """Async HTTP Response."""

    def __init__(self, response):
        self.response = response
        self._msg = None
        self.version = 10

    def read(self, *args, **kwargs):
        return self.response.body

    def getheader(self, name, default=None):
        return self.response.headers.get(name, default)

    def getheaders(self):
        return list(self.response.headers.items())

    @property
    def msg(self):
        if self._msg is None:
            self._msg = MIMEMessage(message_from_headers(self.getheaders()))
        return self._msg

    @property
    def status(self):
        return self.response.code

    @property
    def reason(self):
        if self.response.error:
            return self.response.error.message
        return ''

    def __repr__(self):
        return repr(self.response)


[docs]class AsyncHTTPSConnection: """Async HTTP Connection.""" Request = Request Response = AsyncHTTPResponse method = 'GET' path = '/' body = None default_ports = {'http': 80, 'https': 443} def __init__(self, strict=None, timeout=20.0, http_client=None): self.headers = [] self.timeout = timeout self.strict = strict self.http_client = http_client or get_client()
[docs] def request(self, method, path, body=None, headers=None): self.path = path self.method = method if body is not None: try: read = body.read except AttributeError: self.body = body else: self.body = read() if headers is not None: self.headers.extend(list(headers.items()))
[docs] def getrequest(self): headers = Headers(self.headers) return self.Request(self.path, method=self.method, headers=headers, body=self.body, connect_timeout=self.timeout, request_timeout=self.timeout, validate_cert=False)
[docs] def getresponse(self, callback=None): request = self.getrequest() request.then(transform(self.Response, callback)) return self.http_client.add_request(request)
[docs] def set_debuglevel(self, level): pass
[docs] def connect(self): pass
[docs] def close(self): pass
[docs] def putrequest(self, method, path): self.method = method self.path = path
[docs] def putheader(self, header, value): self.headers.append((header, value))
[docs] def endheaders(self): pass
[docs] def send(self, data): if self.body: self.body += data else: self.body = data
def __repr__(self): return f'<AsyncHTTPConnection: {self.getrequest()!r}>'
[docs]class AsyncConnection: """Async AWS Connection.""" def __init__(self, sqs_connection, http_client=None, **kwargs): self.sqs_connection = sqs_connection self._httpclient = http_client or get_client()
[docs] def get_http_connection(self): return AsyncHTTPSConnection(http_client=self._httpclient)
def _mexe(self, request, sender=None, callback=None): callback = callback or promise() conn = self.get_http_connection() if callable(sender): sender(conn, request.method, request.path, request.body, request.headers, callback) else: conn.request(request.method, request.url, request.body, request.headers) conn.getresponse(callback=callback) return callback
class AsyncAWSQueryConnection(AsyncConnection): """Async AWS Query Connection.""" STATUS_CODE_OK = 200 STATUS_CODE_REQUEST_TIMEOUT = 408 STATUS_CODE_NETWORK_CONNECT_TIMEOUT_ERROR = 599 STATUS_CODE_INTERNAL_ERROR = 500 STATUS_CODE_BAD_GATEWAY = 502 STATUS_CODE_SERVICE_UNAVAILABLE_ERROR = 503 STATUS_CODE_GATEWAY_TIMEOUT = 504 STATUS_CODES_SERVER_ERRORS = ( STATUS_CODE_INTERNAL_ERROR, STATUS_CODE_BAD_GATEWAY, STATUS_CODE_SERVICE_UNAVAILABLE_ERROR ) STATUS_CODES_TIMEOUT = ( STATUS_CODE_REQUEST_TIMEOUT, STATUS_CODE_NETWORK_CONNECT_TIMEOUT_ERROR, STATUS_CODE_GATEWAY_TIMEOUT ) def __init__(self, sqs_connection, http_client=None, http_client_params=None, **kwargs): if not http_client_params: http_client_params = {} super().__init__(sqs_connection, http_client, **http_client_params) def make_request(self, operation, params_, path, verb, callback=None): # noqa params = params_.copy() if operation: params['Action'] = operation signer = self.sqs_connection._request_signer # defaults for non-get signing_type = 'standard' param_payload = {'data': params} if verb.lower() == 'get': # query-based opts signing_type = 'presignurl' param_payload = {'params': params} request = AWSRequest(method=verb, url=path, **param_payload) signer.sign(operation, request, signing_type=signing_type) prepared_request = request.prepare() return self._mexe(prepared_request, callback=callback) def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa return self.make_request( operation, params, path, verb, callback=transform( self._on_list_ready, callback, parent or self, markers, operation ), ) def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa return self.make_request( operation, params, path, verb, callback=transform( self._on_obj_ready, callback, parent or self, operation ), ) def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa return self.make_request( operation, params, path, verb, callback=transform( self._on_status_ready, callback, parent or self, operation ), ) def _on_list_ready(self, parent, markers, operation, response): service_model = self.sqs_connection.meta.service_model if response.status == self.STATUS_CODE_OK: _, parsed = get_response( service_model.operation_model(operation), response.response ) return parsed elif ( response.status in self.STATUS_CODES_TIMEOUT or response.status in self.STATUS_CODES_SERVER_ERRORS ): # When the server returns a timeout or 50X server error, # the response is interpreted as an empty list. # This prevents hanging the Celery worker. return [] else: raise self._for_status(response, response.read()) def _on_obj_ready(self, parent, operation, response): service_model = self.sqs_connection.meta.service_model if response.status == self.STATUS_CODE_OK: _, parsed = get_response( service_model.operation_model(operation), response.response ) return parsed else: raise self._for_status(response, response.read()) def _on_status_ready(self, parent, operation, response): service_model = self.sqs_connection.meta.service_model if response.status == self.STATUS_CODE_OK: httpres, _ = get_response( service_model.operation_model(operation), response.response ) return httpres.code else: raise self._for_status(response, response.read()) def _for_status(self, response, body): context = 'Empty body' if not body else 'HTTP Error' return Exception("Request {} HTTP {} {} ({})".format( context, response.status, response.reason, body ))