This document describes the current stable version of pytest_celery (1.3). For development docs, go here.
Source code for pytest_celery.vendors.worker.container
"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.
This module is part of the :ref:`built-in-worker` vendor.
"""
from __future__ import annotations
from types import ModuleType
from celery import Celery
from pytest_celery.api.container import CeleryTestContainer
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_ENV
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_LOG_LEVEL
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_NAME
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_PORTS
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_PYTEST_CELERY_PKG
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_QUEUE
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_VERSION
from pytest_celery.vendors.worker.volume import WorkerInitialContent
[docs]
class CeleryWorkerContainer(CeleryTestContainer):
"""This is the base class for all Celery worker containers. It is
preconfigured for a built-in Celery worker image and should be customized
for your own worker image.
The purpose of this class is manipulating the container volume and
configurations to warm up the worker container according to the test case requirements.
Responsibility Scope:
Prepare the worker container with the required filesystem, configurations and
dependencies of your project.
"""
[docs]
@classmethod
def command(
cls,
*args: str,
debugpy: bool = False,
wait_for_client: bool = True,
**kwargs: dict,
) -> list[str]:
args = args or tuple()
cmd = list()
if debugpy:
cmd.extend(
[
"python",
"-m",
"debugpy",
"--listen",
"0.0.0.0:5678",
]
)
if wait_for_client:
cmd.append("--wait-for-client")
cmd.append("-m")
cmd.extend(
[
"celery",
"-A",
"app",
"worker",
f"--loglevel={cls.log_level()}",
"-n",
f"{cls.worker_name()}@%h",
"-Q",
f"{cls.worker_queue()}",
*args,
]
)
return cmd
def _wait_port(self, port: str) -> int:
# Not needed for worker container
raise NotImplementedError
@property
def ready_prompt(self) -> str:
return "ready."
[docs]
@classmethod
def version(cls) -> str:
"""Celery version to use for the worker container."""
return DEFAULT_WORKER_VERSION
[docs]
@classmethod
def log_level(cls) -> str:
"""Celery worker log level."""
return DEFAULT_WORKER_LOG_LEVEL
[docs]
@classmethod
def worker_name(cls) -> str:
"""Celery worker name."""
return DEFAULT_WORKER_NAME
[docs]
@classmethod
def worker_queue(cls) -> str:
"""Celery worker queue."""
return DEFAULT_WORKER_QUEUE
[docs]
@classmethod
def app_module(cls) -> ModuleType:
"""A preconfigured module that contains the Celery app instance.
The module is manipulated at runtime to inject the required
configurations from the test case.
"""
from pytest_celery.vendors.worker.content import app
return app
[docs]
@classmethod
def utils_module(cls) -> ModuleType:
"""A utility helper module for running python code in the worker
container context."""
from pytest_celery.vendors.worker.content import utils
return utils
[docs]
@classmethod
def tasks_modules(cls) -> set:
"""Tasks modules."""
from pytest_celery.vendors.worker import tasks as default_tasks
return {default_tasks}
[docs]
@classmethod
def signals_modules(cls) -> set:
"""Signals handlers modules.
This is an optional feature that can be used to inject signals
handlers that needs to in the context of the worker container.
"""
return set()
[docs]
@classmethod
def pytest_celery_pkg(cls) -> str:
"""The pytest-celery package to install in the worker container.
Returns:
str: pip install spec for pytest-celery.
"""
return DEFAULT_WORKER_PYTEST_CELERY_PKG
[docs]
@classmethod
def buildargs(cls) -> dict:
"""Build arguments for the built-in worker image."""
return {
"CELERY_VERSION": cls.version(),
"CELERY_LOG_LEVEL": cls.log_level(),
"CELERY_WORKER_NAME": cls.worker_name(),
"CELERY_WORKER_QUEUE": cls.worker_queue(),
"PYTEST_CELERY_PKG": cls.pytest_celery_pkg(),
}
[docs]
@classmethod
def initial_env(cls, celery_worker_cluster_config: dict, initial: dict | None = None) -> dict:
"""Defines the environment variables for the worker container.
See more: pytest_docker_tools.container()
Args:
celery_worker_cluster_config (dict): Environment variables to set.
initial (dict | None, optional): Additional variables. Defaults to None.
Returns:
dict: Environment variables set for the worker container from the test case.
"""
env = initial or {}
env = {
**DEFAULT_WORKER_ENV.copy(),
**env,
}
config_mappings = [
("celery_broker_cluster_config", "CELERY_BROKER_URL"),
("celery_backend_cluster_config", "CELERY_RESULT_BACKEND"),
]
for config_key, env_key in config_mappings:
cluster_config = celery_worker_cluster_config.get(config_key)
if cluster_config:
env[env_key] = ";".join(cluster_config["urls"])
else:
del env[env_key]
return env
[docs]
@classmethod
def initial_content(
cls,
worker_tasks: set | None = None,
worker_signals: set | None = None,
worker_app: Celery | None = None,
app_module: ModuleType | None = None,
utils_module: ModuleType | None = None,
) -> dict:
"""Defines the initial content of the worker container.
See more: pytest_docker_tools.volume()
Args:
worker_tasks (set | None, optional): Set of tasks modules. Defaults to None.
worker_signals (set | None, optional): Set of signals handlers modules. Defaults to None.
worker_app (Celery | None, optional): Celery app instance. Defaults to None.
app_module (ModuleType | None, optional): app module. Defaults to None.
utils_module (ModuleType | None, optional): utils module. Defaults to None.
Returns:
dict: Custom volume content for the worker container.
"""
if app_module is None:
app_module = cls.app_module()
if utils_module is None:
utils_module = cls.utils_module()
if worker_tasks is None:
worker_tasks = cls.tasks_modules()
content = WorkerInitialContent()
content.set_app_module(app_module)
content.set_utils_module(utils_module)
content.add_modules("tasks", worker_tasks)
if worker_signals:
content.add_modules("signals", worker_signals)
if worker_app:
content.set_app_name(worker_app.main)
content.set_config_from_object(worker_app)
return content.generate()
[docs]
@classmethod
def ports(cls) -> dict | None:
"""Ports to expose from the worker container."""
return DEFAULT_WORKER_PORTS