This document describes the current stable version of Celery (4.1). For development docs, go here.

Source code for celery.backends.couchdb

# -*- coding: utf-8 -*-
"""CouchDB result store backend."""
from __future__ import absolute_import, unicode_literals
from kombu.utils.url import _parse_url
from celery.exceptions import ImproperlyConfigured
from .base import KeyValueStoreBackend
try:
    import pycouchdb
except ImportError:
    pycouchdb = None  # noqa

__all__ = ['CouchBackend']

ERR_LIB_MISSING = """\
You need to install the pycouchdb library to use the CouchDB result backend\
"""


[docs]class CouchBackend(KeyValueStoreBackend): """CouchDB backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`pycouchdb` is not available. """ container = 'default' scheme = 'http' host = 'localhost' port = 5984 username = None password = None def __init__(self, url=None, *args, **kwargs): super(CouchBackend, self).__init__(*args, **kwargs) self.url = url if pycouchdb is None: raise ImproperlyConfigured(ERR_LIB_MISSING) uscheme = uhost = uport = uname = upass = ucontainer = None if url: _, uhost, uport, uname, upass, ucontainer, _ = _parse_url(url) # noqa ucontainer = ucontainer.strip('/') if ucontainer else None self.scheme = uscheme or self.scheme self.host = uhost or self.host self.port = int(uport or self.port) self.container = ucontainer or self.container self.username = uname or self.username self.password = upass or self.password self._connection = None def _get_connection(self): """Connect to the CouchDB server.""" if self.username and self.password: conn_string = '%s://%s:%s@%s:%s' % ( self.scheme, self.username, self.password, self.host, str(self.port)) server = pycouchdb.Server(conn_string, authmethod='basic') else: conn_string = '%s://%s:%s' % ( self.scheme, self.host, str(self.port)) server = pycouchdb.Server(conn_string) try: return server.database(self.container) except pycouchdb.exceptions.NotFound: return server.create(self.container) @property def connection(self): if self._connection is None: self._connection = self._get_connection() return self._connection
[docs] def get(self, key): try: return self.connection.get(key)['value'] except pycouchdb.exceptions.NotFound: return None
[docs] def set(self, key, value): data = {'_id': key, 'value': value} try: self.connection.save(data) except pycouchdb.exceptions.Conflict: # document already exists, update it data = self.connection.get(key) data['value'] = value self.connection.save(data)
[docs] def mget(self, keys): return [self.get(key) for key in keys]
[docs] def delete(self, key): self.connection.delete(key)