This document describes the current stable version of Celery (4.4). For development docs, go here.

Source code for celery.backends.riak

# -*- coding: utf-8 -*-
"""Riak result store backend."""
from __future__ import absolute_import, unicode_literals

import sys
import warnings

from kombu.utils.url import _parse_url

from celery.exceptions import CeleryWarning, ImproperlyConfigured

from .base import KeyValueStoreBackend

try:
    import riak
    from riak import RiakClient
    from riak.resolver import last_written_resolver
except ImportError:  # pragma: no cover
    riak = RiakClient = last_written_resolver = None  # noqa

__all__ = ('RiakBackend',)

E_BUCKET_NAME = """\
Riak bucket names must be composed of ASCII characters only, not: {0!r}\
"""

W_UNSUPPORTED_PYTHON_VERSION = """\
Python {}.{} is unsupported by the client library \
https://pypi.org/project/riak\
""".format(sys.version_info.major, sys.version_info.minor)


if sys.version_info[0] == 3:
    if sys.version_info.minor >= 7:
        warnings.warn(CeleryWarning(W_UNSUPPORTED_PYTHON_VERSION))

    def to_bytes(string):
        return string.encode() if isinstance(string, str) else string

    def str_decode(string, encoding):
        return to_bytes(string).decode(encoding)

else:

    def str_decode(string, encoding):
        return string.decode('ascii')


def is_ascii(string):
    try:
        str_decode(string, 'ascii')
    except UnicodeDecodeError:
        return False
    return True


[docs]class RiakBackend(KeyValueStoreBackend): """Riak result backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`riak` is not available. """ # TODO: allow using other protocols than protobuf ? #: default protocol used to connect to Riak, might be `http` or `pbc` protocol = 'pbc' #: default Riak bucket name (`default`) bucket_name = 'celery' #: default Riak server hostname (`localhost`) host = 'localhost' #: default Riak server port (8087) port = 8087 _bucket = None def __init__(self, host=None, port=None, bucket_name=None, protocol=None, url=None, *args, **kwargs): super(RiakBackend, self).__init__(*args, **kwargs) self.url = url if not riak: raise ImproperlyConfigured( 'You need to install the riak library to use the ' 'Riak backend.') uhost = uport = upass = ubucket = None if url: _, uhost, uport, _, upass, ubucket, _ = _parse_url(url) if ubucket: ubucket = ubucket.strip('/') config = self.app.conf.get('riak_backend_settings', None) if config is not None: if not isinstance(config, dict): raise ImproperlyConfigured( 'Riak backend settings should be grouped in a dict') else: config = {} self.host = uhost or config.get('host', self.host) self.port = int(uport or config.get('port', self.port)) self.bucket_name = ubucket or config.get('bucket', self.bucket_name) self.protocol = protocol or config.get('protocol', self.protocol) # riak bucket must be ascii letters or numbers only if not is_ascii(self.bucket_name): raise ValueError(E_BUCKET_NAME.format(self.bucket_name)) self._client = None def _get_client(self): """Get client connection.""" if self._client is None or not self._client.is_alive(): self._client = RiakClient(protocol=self.protocol, host=self.host, pb_port=self.port) self._client.resolver = last_written_resolver return self._client def _get_bucket(self): """Connect to our bucket.""" if ( self._client is None or not self._client.is_alive() or not self._bucket ): self._bucket = self.client.bucket(self.bucket_name) return self._bucket @property def client(self): return self._get_client() @property def bucket(self): return self._get_bucket()
[docs] def get(self, key): return self.bucket.get(key).data
[docs] def set(self, key, value, state): _key = self.bucket.new(key, data=value) _key.store()
[docs] def mget(self, keys): return [self.get(key).data for key in keys]
[docs] def delete(self, key): self.bucket.delete(key)