This document describes the current stable version of pytest_celery (1.3). For development docs, go here.
Source code for pytest_celery.vendors.worker.fixtures
"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.
This module is part of the :ref:`built-in-worker` vendor.
"""
# mypy: disable-error-code="misc"
from __future__ import annotations
from types import ModuleType
import pytest
from celery import Celery
from pytest_docker_tools import build
from pytest_docker_tools import container
from pytest_docker_tools import fxtr
from pytest_docker_tools import volume
from pytest_celery.api.worker import CeleryTestWorker
from pytest_celery.vendors.worker.container import CeleryWorkerContainer
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_CONTAINER_TIMEOUT
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_VOLUME
from pytest_celery.vendors.worker.defaults import WORKER_DOCKERFILE_ROOTDIR
[docs]
@pytest.fixture
def celery_setup_worker(
default_worker_cls: type[CeleryTestWorker],
default_worker_container: CeleryWorkerContainer,
default_worker_app: Celery,
) -> CeleryTestWorker:
"""Creates a CeleryTestWorker instance. Responsible for tearing down the
node.
Args:
default_worker_cls (type[CeleryTestWorker]): Interface class.
default_worker_container (CeleryWorkerContainer): Instantiated CeleryWorkerContainer.
default_worker_app (Celery): Celery app instance.
"""
worker = default_worker_cls(
container=default_worker_container,
app=default_worker_app,
)
yield worker
worker.teardown()
[docs]
@pytest.fixture
def default_worker_cls() -> type[CeleryTestWorker]:
"""Default worker class. Override to apply custom configuration globally.
See also: :ref:`vendor-class`.
Returns:
type[CeleryTestWorker]: API for managing the vendor's node.
"""
return CeleryTestWorker
[docs]
@pytest.fixture
def default_worker_container_cls() -> type[CeleryWorkerContainer]:
"""Default worker container class. Override to apply custom configuration
globally.
See also: :ref:`vendor-class`.
Returns:
type[CeleryWorkerContainer]: API for managing the vendor's container.
"""
return CeleryWorkerContainer
[docs]
@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
"""Default worker container session class. Override to apply custom
configuration globally.
See also: :ref:`vendor-class`.
Returns:
type[CeleryWorkerContainer]: API for managing the vendor's container.
"""
return CeleryWorkerContainer
default_worker_container = container(
image="{celery_base_worker_image.id}",
ports=fxtr("default_worker_ports"),
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=CeleryWorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)
celery_base_worker_image = build(
path=WORKER_DOCKERFILE_ROOTDIR,
tag="pytest-celery/components/worker:default",
buildargs={
"CELERY_VERSION": fxtr("default_worker_celery_version"),
"CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"),
"CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"),
"CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"),
"PYTEST_CELERY_PKG": fxtr("default_worker_pytest_celery_pkg"),
},
)
default_worker_volume = volume(
initial_content=fxtr("default_worker_initial_content"),
)
[docs]
@pytest.fixture(scope="session")
def default_worker_celery_version(default_worker_container_session_cls: type[CeleryWorkerContainer]) -> str:
"""Celery version for this worker.
Args:
default_worker_container_session_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
str: Celery version.
"""
return default_worker_container_session_cls.version()
[docs]
@pytest.fixture(scope="session")
def default_worker_celery_log_level(default_worker_container_session_cls: type[CeleryWorkerContainer]) -> str:
"""Log level for this worker.
Args:
default_worker_container_session_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
str: Log level.
"""
return default_worker_container_session_cls.log_level()
[docs]
@pytest.fixture(scope="session")
def default_worker_celery_worker_name(default_worker_container_session_cls: type[CeleryWorkerContainer]) -> str:
"""Name of the worker.
Args:
default_worker_container_session_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
str: Worker name.
"""
return default_worker_container_session_cls.worker_name()
[docs]
@pytest.fixture(scope="session")
def default_worker_celery_worker_queue(default_worker_container_session_cls: type[CeleryWorkerContainer]) -> str:
"""Worker queue for this worker.
Args:
default_worker_container_session_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
str: Worker queue.
"""
return default_worker_container_session_cls.worker_queue()
[docs]
@pytest.fixture(scope="session")
def default_worker_pytest_celery_pkg(default_worker_container_session_cls: type[CeleryWorkerContainer]) -> str:
"""The pytest-celery package to install in the worker container.
Args:
default_worker_container_session_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
str: pip install spec for pytest-celery.
"""
return default_worker_container_session_cls.pytest_celery_pkg()
[docs]
@pytest.fixture
def default_worker_command(default_worker_container_cls: type[CeleryWorkerContainer]) -> list[str]:
"""Command to run the container.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
list[str]: Docker CMD instruction.
"""
return default_worker_container_cls.command()
[docs]
@pytest.fixture
def default_worker_env(
default_worker_container_cls: type[CeleryWorkerContainer],
celery_worker_cluster_config: dict,
) -> dict:
"""Environment variables for this worker.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
celery_worker_cluster_config (dict): Broker & Backend clusters configuration.
Returns:
dict: Items to pass to the container's environment.
"""
return default_worker_container_cls.initial_env(celery_worker_cluster_config)
[docs]
@pytest.fixture
def default_worker_initial_content(
default_worker_container_cls: type[CeleryWorkerContainer],
default_worker_app_module: ModuleType,
default_worker_utils_module: ModuleType,
default_worker_tasks: set,
default_worker_signals: set,
default_worker_app: Celery,
) -> dict:
"""Initial content for this worker's volume.
This is applied on a worker container when using the following volume configuration:
.. code-block:: python
default_worker_container = container(
...
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
...
)
.. note::
More volumes may be added additionally.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
default_worker_app_module (ModuleType): App module to inject.
default_worker_utils_module (ModuleType): Utils module to inject.
default_worker_tasks (set): Tasks modules to inject.
default_worker_signals (set): Signals modules to inject.
default_worker_app (Celery): Celery app to initialize the worker with.
Returns:
dict: Initial volume content (dict of files).
"""
return default_worker_container_cls.initial_content(
app_module=default_worker_app_module,
utils_module=default_worker_utils_module,
worker_tasks=default_worker_tasks,
worker_signals=default_worker_signals,
worker_app=default_worker_app,
)
[docs]
@pytest.fixture
def default_worker_ports(default_worker_container_cls: type[CeleryWorkerContainer]) -> dict | None:
"""Port bindings for this vendor.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
dict: Port bindings.
"""
return default_worker_container_cls.ports()
[docs]
@pytest.fixture
def default_worker_app_module(default_worker_container_cls: type[CeleryWorkerContainer]) -> ModuleType:
"""App module for this worker.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
ModuleType: App module.
"""
return default_worker_container_cls.app_module()
[docs]
@pytest.fixture
def default_worker_utils_module(default_worker_container_cls: type[CeleryWorkerContainer]) -> ModuleType:
"""Utils module for this worker.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
ModuleType: Utils module.
"""
return default_worker_container_cls.utils_module()
[docs]
@pytest.fixture
def default_worker_tasks(default_worker_container_cls: type[CeleryWorkerContainer]) -> set:
"""Tasks modules set for this worker.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
set: Tasks modules.
"""
return default_worker_container_cls.tasks_modules()
[docs]
@pytest.fixture
def default_worker_signals(default_worker_container_cls: type[CeleryWorkerContainer]) -> set:
"""Signals modules set for this worker.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
set: Signals modules.
"""
return default_worker_container_cls.signals_modules()
[docs]
@pytest.fixture
def default_worker_app(celery_setup_app: Celery) -> Celery:
"""Celery app instance for this worker.
Args:
celery_setup_app (Celery): See also: :ref:`vendor-class`.
Returns:
Celery: Celery app instance.
"""
return celery_setup_app