This document describes the current stable version of Kombu (5.3). For development docs, go here.

Source code for kombu.transport.pyro

"""Pyro transport module for kombu.

Pyro transport, and Kombu Broker daemon.

Requires the :mod:`Pyro4` library to be installed.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================

To use the Pyro transport with Kombu, use an url of the form:

.. code-block::

    pyro://localhost/kombu.broker

The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``

Transport Options
=================
"""


from __future__ import annotations

import sys
from queue import Empty, Queue

from kombu.exceptions import reraise
from kombu.log import get_logger
from kombu.utils.objects import cached_property

from . import virtual

try:
    import Pyro4 as pyro
    from Pyro4.errors import NamingError
    from Pyro4.util import SerializerBase
except ImportError:          # pragma: no cover
    pyro = NamingError = SerializerBase = None

DEFAULT_PORT = 9090
E_NAMESERVER = """\
Unable to locate pyro nameserver on host {0.hostname}\
"""
E_LOOKUP = """\
Unable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}\
"""

logger = get_logger(__name__)


[docs] class Channel(virtual.Channel): """Pyro Channel."""
[docs] def close(self): super().close() if self.shared_queues: self.shared_queues._pyroRelease()
[docs] def queues(self): return self.shared_queues.get_queue_names()
def _new_queue(self, queue, **kwargs): if queue not in self.queues(): self.shared_queues.new_queue(queue) def _has_queue(self, queue, **kwargs): return self.shared_queues.has_queue(queue) def _get(self, queue, timeout=None): queue = self._queue_for(queue) return self.shared_queues.get(queue) def _queue_for(self, queue): if queue not in self.queues(): self.shared_queues.new_queue(queue) return queue def _put(self, queue, message, **kwargs): queue = self._queue_for(queue) self.shared_queues.put(queue, message) def _size(self, queue): return self.shared_queues.size(queue) def _delete(self, queue, *args, **kwargs): self.shared_queues.delete(queue) def _purge(self, queue): return self.shared_queues.purge(queue)
[docs] def after_reply_message_received(self, queue): pass
@cached_property def shared_queues(self): return self.connection.shared_queues
[docs] class Transport(virtual.Transport): """Pyro Transport.""" Channel = Channel #: memory backend state is global. # TODO: To be checked whether state can be per-Transport global_state = virtual.BrokerState() default_port = DEFAULT_PORT driver_type = driver_name = 'pyro' def __init__(self, client, **kwargs): super().__init__(client, **kwargs) self.state = self.global_state def _open(self): logger.debug("trying Pyro nameserver to find the broker daemon") conninfo = self.client try: nameserver = pyro.locateNS(host=conninfo.hostname, port=self.default_port) except NamingError: reraise(NamingError, NamingError(E_NAMESERVER.format(conninfo)), sys.exc_info()[2]) try: # name of registered pyro object uri = nameserver.lookup(conninfo.virtual_host) return pyro.Proxy(uri) except NamingError: reraise(NamingError, NamingError(E_LOOKUP.format(conninfo)), sys.exc_info()[2])
[docs] def driver_version(self): return pyro.__version__
@cached_property def shared_queues(self): return self._open()
if pyro is not None: SerializerBase.register_dict_to_class("queue.Empty", lambda cls, data: Empty()) @pyro.expose @pyro.behavior(instance_mode="single") class KombuBroker: """Kombu Broker used by the Pyro transport. You have to run this as a separate (Pyro) service. """ def __init__(self): self.queues = {} def get_queue_names(self): return list(self.queues) def new_queue(self, queue): if queue in self.queues: return # silently ignore the fact that queue already exists self.queues[queue] = Queue() def has_queue(self, queue): return queue in self.queues def get(self, queue): return self.queues[queue].get(block=False) def put(self, queue, message): self.queues[queue].put(message) def size(self, queue): return self.queues[queue].qsize() def delete(self, queue): del self.queues[queue] def purge(self, queue): while True: try: self.queues[queue].get(blocking=False) except Empty: break # launch a Kombu Broker daemon with the command: # ``python -m kombu.transport.pyro`` if __name__ == "__main__": print("Launching Broker for Kombu's Pyro transport.") with pyro.Daemon() as daemon: print("(Expecting a Pyro name server at {}:{})" .format(pyro.config.NS_HOST, pyro.config.NS_PORT)) with pyro.locateNS() as ns: print("You can connect with Kombu using the url " "'pyro://{}/kombu.broker'".format(pyro.config.NS_HOST)) uri = daemon.register(KombuBroker) ns.register("kombu.broker", uri) daemon.requestLoop()