This document describes the current stable version of pytest_celery (1.3). For development docs, go here.
Source code for pytest_celery.vendors.worker.volume
"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.
This module is part of the :ref:`built-in-worker` vendor.
"""
from __future__ import annotations
import inspect
from types import ModuleType
from typing import Any
from celery import Celery
from celery.app.base import PendingConfiguration
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_APP_NAME
[docs]
class WorkerInitialContent:
"""This class is responsible for generating the initial content for the
worker container volume.
Responsibility Scope:
Prepare the worker container with the required filesystem, configurations and
dependencies for your project.
"""
[docs]
class Parser:
"""Parser for the initial content of the worker container."""
[docs]
def imports_str(self, modules: set[ModuleType]) -> str:
"""Parse the given modules and return a string with the import
statements.
Args:
modules (set[ModuleType]): A set of modules to parse.
Returns:
str: "from module import \\*" statements.
"""
return "".join(f"from {module.__name__} import *\n" for module in modules)
[docs]
def imports_src(self, modules: set[ModuleType]) -> dict:
"""Parse the given modules and return a dict with the source code
of the modules.
Args:
modules (set[ModuleType]): A set of modules to parse.
Returns:
dict: A dict with the source code of the modules.
"""
src = dict()
for module in modules:
src[f"{module.__name__.replace('.', '/')}.py"] = inspect.getsource(module).encode()
return src
[docs]
def app_name(self, name: str | None = None) -> str:
"""Generates the Celery app initialization string.
Args:
name (str | None, optional): The app name. Defaults to None.
Returns:
str: app = Celery(name)
"""
name = name or DEFAULT_WORKER_APP_NAME
return f"app = Celery('{name}')"
[docs]
def config(self, app: Celery | None = None) -> str:
"""Generates the Celery app configuration changes.
Args:
app (Celery | None, optional): Celery app with possibly changed config. Defaults to None.
Raises:
TypeError: If the app.conf.changes property is not a dict.
Returns:
str: config = {key: value, ...} or config = None
"""
app = app or Celery(DEFAULT_WORKER_APP_NAME)
# Accessing the app.conf.changes.data property will trigger the PendingConfiguration to be resolved
# and the changes will be applied to the app.conf, so we make a clone app to avoid affecting the
# original app object.
tmp_app = Celery(app.main)
tmp_app.conf = app.conf
changes = tmp_app.conf.changes.copy()
if isinstance(changes, PendingConfiguration):
changes = changes.data.changes
if not isinstance(changes, dict):
raise TypeError(f"Unexpected type for app.conf.changes: {type(changes)}")
del changes["deprecated_settings"]
if changes:
changes = (f"\t{repr(key)}: {repr(value)}" for key, value in changes.items())
config = "config = {\n" + ",\n".join(changes) + "\n}" if changes else "config = None"
else:
config = "config = None"
return config
def __init__(
self,
app_module: ModuleType | None = None,
utils_module: ModuleType | None = None,
) -> None:
"""Creates an initial content for the worker container. Defaults to
built-in plugin-provided modules.
Args:
app_module (ModuleType | None, optional): App module adjusted for parsing. Defaults to None.
utils_module (ModuleType | None, optional): Utils module with for running python code in the
worker container. Defaults to None.
"""
self.parser = self.Parser()
self._initial_content = {
"__init__.py": b"",
"imports": dict(), # Placeholder item
}
self.set_app_module(app_module)
self.set_utils_module(utils_module)
self.set_app_name()
self.set_config_from_object()
def __eq__(self, other: object) -> bool:
if not isinstance(other, WorkerInitialContent):
return False
try:
return self.generate() == other.generate()
except ValueError:
return all(
[
self._app_module_src == other._app_module_src,
self._utils_module_src == other._utils_module_src,
self._initial_content == other._initial_content,
self._app == other._app,
self._config == other._config,
]
)
[docs]
def set_app_module(self, app_module: ModuleType | None = None) -> None:
"""Injects an app module into the initial content."""
self._app_module_src: str | None
if app_module:
self._app_module_src = inspect.getsource(app_module)
else:
self._app_module_src = None
[docs]
def set_utils_module(self, utils_module: ModuleType | None = None) -> None:
"""Injects a utils module into the initial content."""
self._utils_module_src: str | None
if utils_module:
self._utils_module_src = inspect.getsource(utils_module)
else:
self._utils_module_src = None
[docs]
def add_modules(self, name: str, modules: set[ModuleType]) -> None:
"""Adds a set of modules to the initial content.
Args:
name (str): Arbitrary unique name for the set of modules.
modules (set[ModuleType]): A set of modules to add.
"""
if not name:
raise ValueError("name cannot be empty")
if not modules:
raise ValueError("modules cannot be empty")
self._initial_content["imports"][name] = self.parser.imports_str(modules) # type: ignore
self._initial_content.update(self.parser.imports_src(modules))
[docs]
def set_app_name(self, name: str | None = None) -> None:
"""Sets the Celery app name.
Args:
name (str | None, optional): The app name. Defaults to None.
"""
name = name or DEFAULT_WORKER_APP_NAME
self._app = self.parser.app_name(name)
[docs]
def set_config_from_object(self, app: Celery | None = None) -> None:
"""Sets the Celery app configuration from the given app.
Args:
app (Celery | None, optional): Celery app with possibly changed config. Defaults to None.
"""
self._config = self.parser.config(app)
[docs]
def generate(self) -> dict:
"""Generates the initial content for the worker container.
Returns:
dict: Initial content volume for the worker container.
"""
initial_content = self._initial_content.copy()
initial_content["app.py"] = self._generate_app_py(initial_content)
initial_content["utils.py"] = self._generate_utils_py()
return initial_content
def _generate_app_py(self, initial_content: dict) -> bytes:
"""Generates the app.py file for the worker container.
Args:
initial_content (dict): The current initial content.
Returns:
bytes: The generated app.py file.
"""
if not self._app_module_src:
raise ValueError("Please use set_app_module() to define the app module before generating initial content")
if not initial_content["imports"]:
raise ValueError(
"Please use set_utils_module() to define the utils module before generating initial content"
)
_imports: dict | Any = initial_content.pop("imports")
imports = "{%s}" % "}{".join(_imports.keys())
imports = imports.format(**_imports)
app, config = self._app, self._config
replacement_args = {
"imports": "imports = None",
"app": f'app = Celery("{DEFAULT_WORKER_APP_NAME}")',
"config": "config = None",
}
self._app_module_src = self._app_module_src.replace(replacement_args["imports"], imports)
self._app_module_src = self._app_module_src.replace(replacement_args["app"], app)
self._app_module_src = self._app_module_src.replace(replacement_args["config"], config)
return self._app_module_src.encode()
def _generate_utils_py(self) -> bytes:
"""Generates the utils.py file for the worker container.
Returns:
bytes: The generated utils.py file.
"""
if not self._utils_module_src:
raise ValueError("Please set_utils_module() before generating initial content")
return self._utils_module_src.encode()