This document describes the current stable version of pytest_celery (1.3). For development docs, go here.

Source code for pytest_celery.vendors.memcached.fixtures

"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.

This module is part of the Memcached Backend vendor.
"""

# mypy: disable-error-code="misc"

from __future__ import annotations

import pytest
from pytest_docker_tools import container
from pytest_docker_tools import fxtr

from pytest_celery.vendors.memcached.api import MemcachedTestBackend
from pytest_celery.vendors.memcached.container import MemcachedContainer
from pytest_celery.vendors.memcached.defaults import MEMCACHED_CONTAINER_TIMEOUT


[docs] @pytest.fixture def celery_memcached_backend(default_memcached_backend: MemcachedContainer) -> MemcachedTestBackend: """Creates a MemcachedTestBackend instance. Responsible for tearing down the node. Args: default_memcached_backend (MemcachedContainer): Instantiated MemcachedContainer. """ backend = MemcachedTestBackend(default_memcached_backend) yield backend backend.teardown()
[docs] @pytest.fixture def default_memcached_backend_cls() -> type[MemcachedContainer]: """Default Memcached backend container class. Override to apply custom configuration globally. See also: :ref:`vendor-class`. Returns: type[MemcachedContainer]: API for managing the vendor's container. """ return MemcachedContainer
default_memcached_backend = container( image="{default_memcached_backend_image}", ports=fxtr("default_memcached_backend_ports"), environment=fxtr("default_memcached_backend_env"), network="{default_pytest_celery_network.name}", wrapper_class=MemcachedContainer, timeout=MEMCACHED_CONTAINER_TIMEOUT, )
[docs] @pytest.fixture def default_memcached_backend_env(default_memcached_backend_cls: type[MemcachedContainer]) -> dict: """Environment variables for this vendor. Args: default_memcached_backend_cls (type[MemcachedContainer]): See also: :ref:`vendor-class`. Returns: dict: Items to pass to the container's environment. """ return default_memcached_backend_cls.initial_env()
[docs] @pytest.fixture def default_memcached_backend_image(default_memcached_backend_cls: type[MemcachedContainer]) -> str: """Docker image for this vendor. Args: default_memcached_backend_cls (type[MemcachedContainer]): See also: :ref:`vendor-class`. Returns: str: Docker image name. """ return default_memcached_backend_cls.image()
[docs] @pytest.fixture def default_memcached_backend_ports(default_memcached_backend_cls: type[MemcachedContainer]) -> dict: """Port bindings for this vendor. Args: default_memcached_backend_cls (type[MemcachedContainer]): See also: :ref:`vendor-class`. Returns: dict: Port bindings. """ return default_memcached_backend_cls.ports()