This document describes the current stable version of Kombu (5.3). For development docs, go here.

Source code for kombu.transport.zookeeper

# copyright: (c) 2010 - 2013 by Mahendra M.
# license: BSD, see LICENSE for more details.

"""Zookeeper transport module for kombu.

Zookeeper based transport. This transport uses the built-in kazoo Zookeeper
based queue implementation.

**References**

- https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connects to a zookeeper node as:

.. code-block::

    zookeeper://SERVER:PORT/VHOST

The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.


Transport Options
=================

"""

from __future__ import annotations

import os
import socket
from queue import Empty

from kombu.utils.encoding import bytes_to_str, ensure_bytes
from kombu.utils.json import dumps, loads

from . import virtual

try:
    import kazoo
    from kazoo.client import KazooClient
    from kazoo.recipe.queue import Queue

    KZ_CONNECTION_ERRORS = (
        kazoo.exceptions.SystemErrorException,
        kazoo.exceptions.ConnectionLossException,
        kazoo.exceptions.MarshallingErrorException,
        kazoo.exceptions.UnimplementedException,
        kazoo.exceptions.OperationTimeoutException,
        kazoo.exceptions.NoAuthException,
        kazoo.exceptions.InvalidACLException,
        kazoo.exceptions.AuthFailedException,
        kazoo.exceptions.SessionExpiredException,
    )

    KZ_CHANNEL_ERRORS = (
        kazoo.exceptions.RuntimeInconsistencyException,
        kazoo.exceptions.DataInconsistencyException,
        kazoo.exceptions.BadArgumentsException,
        kazoo.exceptions.MarshallingErrorException,
        kazoo.exceptions.UnimplementedException,
        kazoo.exceptions.OperationTimeoutException,
        kazoo.exceptions.ApiErrorException,
        kazoo.exceptions.NoNodeException,
        kazoo.exceptions.NoAuthException,
        kazoo.exceptions.NodeExistsException,
        kazoo.exceptions.NoChildrenForEphemeralsException,
        kazoo.exceptions.NotEmptyException,
        kazoo.exceptions.SessionExpiredException,
        kazoo.exceptions.InvalidCallbackException,
        socket.error,
    )
except ImportError:
    kazoo = None
    KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = ()

DEFAULT_PORT = 2181

__author__ = 'Mahendra M <mahendra.m@gmail.com>'


[docs] class Channel(virtual.Channel): """Zookeeper Channel.""" _client = None _queues = {} def __init__(self, connection, **kwargs): super().__init__(connection, **kwargs) vhost = self.connection.client.virtual_host self._vhost = '/{}'.format(vhost.strip('/')) def _get_path(self, queue_name): return os.path.join(self._vhost, queue_name) def _get_queue(self, queue_name): queue = self._queues.get(queue_name, None) if queue is None: queue = Queue(self.client, self._get_path(queue_name)) self._queues[queue_name] = queue # Ensure that the queue is created len(queue) return queue def _put(self, queue, message, **kwargs): return self._get_queue(queue).put( ensure_bytes(dumps(message)), priority=self._get_message_priority(message, reverse=True), ) def _get(self, queue): queue = self._get_queue(queue) msg = queue.get() if msg is None: raise Empty() return loads(bytes_to_str(msg)) def _purge(self, queue): count = 0 queue = self._get_queue(queue) while True: msg = queue.get() if msg is None: break count += 1 return count def _delete(self, queue, *args, **kwargs): if self._has_queue(queue): self._purge(queue) self.client.delete(self._get_path(queue)) def _size(self, queue): queue = self._get_queue(queue) return len(queue) def _new_queue(self, queue, **kwargs): if not self._has_queue(queue): queue = self._get_queue(queue) def _has_queue(self, queue): return self.client.exists(self._get_path(queue)) is not None def _open(self): conninfo = self.connection.client hosts = [] if conninfo.alt: for host_port in conninfo.alt: if host_port.startswith('zookeeper://'): host_port = host_port[len('zookeeper://'):] if not host_port: continue try: host, port = host_port.split(':', 1) host_port = (host, int(port)) except ValueError: if host_port == conninfo.hostname: host_port = (host_port, conninfo.port or DEFAULT_PORT) else: host_port = (host_port, DEFAULT_PORT) hosts.append(host_port) host_port = (conninfo.hostname, conninfo.port or DEFAULT_PORT) if host_port not in hosts: hosts.insert(0, host_port) conn_str = ','.join([f'{h}:{p}' for h, p in hosts]) conn = KazooClient(conn_str) conn.start() return conn @property def client(self): if self._client is None: self._client = self._open() return self._client
[docs] class Transport(virtual.Transport): """Zookeeper Transport.""" Channel = Channel polling_interval = 1 default_port = DEFAULT_PORT connection_errors = ( virtual.Transport.connection_errors + KZ_CONNECTION_ERRORS ) channel_errors = ( virtual.Transport.channel_errors + KZ_CHANNEL_ERRORS ) driver_type = 'zookeeper' driver_name = 'kazoo' def __init__(self, *args, **kwargs): if kazoo is None: raise ImportError('The kazoo library is not installed') super().__init__(*args, **kwargs)
[docs] def driver_version(self): return kazoo.__version__