This document describes the current stable version of pytest_celery (1.3). For development docs, go here.

Source code for pytest_celery.api.setup

"""The test setup represents the main entry point for accessing the celery
architecture from the test.

This module provides the base API for creating new test setups.
"""

from __future__ import annotations

from celery import Celery

from pytest_celery.api.backend import CeleryBackendCluster
from pytest_celery.api.backend import CeleryTestBackend
from pytest_celery.api.broker import CeleryBrokerCluster
from pytest_celery.api.broker import CeleryTestBroker
from pytest_celery.api.worker import CeleryTestWorker
from pytest_celery.api.worker import CeleryWorkerCluster
from pytest_celery.defaults import DEFAULT_WORKER_APP_NAME
from pytest_celery.defaults import RESULT_TIMEOUT
from pytest_celery.vendors.worker.tasks import ping


[docs] class CeleryTestSetup: """The test setup is the main entry point for accessing the celery architecture from the test. It is the glue that holds all of the relevant components of the specific test case environment. Each test case will have its own test setup instance, which is created for the test case by the plugin and is configured for the specific run and its given configurations. Responsibility Scope: Provide useful access to the celery architecture from the test. """ def __init__( self, worker_cluster: CeleryWorkerCluster, broker_cluster: CeleryBrokerCluster, backend_cluster: CeleryBackendCluster, app: Celery = None, ): """Setup the base components of a setup. Args: worker_cluster (CeleryWorkerCluster): Precorfigured worker cluster. broker_cluster (CeleryBrokerCluster): Precorfigured broker cluster. backend_cluster (CeleryBackendCluster): Precorfigured backend cluster. app (Celery, optional): Celery app configured for all of the nodes. Defaults to None. """ self._worker_cluster = worker_cluster self._broker_cluster = broker_cluster self._backend_cluster = backend_cluster self._app = app # Special internal ping task, does not conflict with user "ping" tasks self.ping = ping def __len__(self) -> int: """The total number of nodes in the setup.""" nodes_count = 0 if self.broker_cluster: nodes_count += len(self.broker_cluster) if self.backend_cluster: nodes_count += len(self.backend_cluster) if self.worker_cluster: nodes_count += len(self.worker_cluster) return nodes_count @property def app(self) -> Celery: """The celery app configured for all of the nodes.""" return self._app @property def backend_cluster(self) -> CeleryBackendCluster | None: """The backend cluster of this setup.""" return self._backend_cluster @property def backend(self) -> CeleryTestBackend | None: """The first backend node of the backend cluster.""" return self._backend_cluster[0] if self._backend_cluster else None # type: ignore @property def broker_cluster(self) -> CeleryBrokerCluster | None: """The broker cluster of this setup.""" return self._broker_cluster @property def broker(self) -> CeleryTestBroker | None: """The first broker node of the broker cluster.""" return self._broker_cluster[0] if self._broker_cluster else None # type: ignore @property def worker_cluster(self) -> CeleryWorkerCluster | None: """The worker cluster of this setup.""" return self._worker_cluster @property def worker(self) -> CeleryTestWorker | None: """The first worker node of the worker cluster.""" return self._worker_cluster[0] if self._worker_cluster else None # type: ignore
[docs] @classmethod def name(cls) -> str: """The name of the setup.""" # TODO: Possibly not needed/required refactoring return DEFAULT_WORKER_APP_NAME
[docs] @classmethod def config(cls, celery_worker_cluster_config: dict) -> dict: """Creates a configuration dict to be used by app.config_from_object(). The configuration is compiled from all of the nodes in the setup. Args: celery_worker_cluster_config (dict): The configuration of the worker cluster. Returns: dict: Celery-aware configuration dict. """ if not celery_worker_cluster_config: raise ValueError("celery_worker_cluster_config is empty") celery_broker_cluster_config: dict = celery_worker_cluster_config.get("celery_broker_cluster_config", {}) celery_backend_cluster_config: dict = celery_worker_cluster_config.get("celery_backend_cluster_config", {}) config = {} if celery_broker_cluster_config: config["broker_url"] = ";".join(celery_broker_cluster_config["host_urls"]) if celery_backend_cluster_config: config["result_backend"] = ";".join(celery_backend_cluster_config["host_urls"]) return config
[docs] @classmethod def update_app_config(cls, app: Celery) -> None: """Hook for updating the app configuration in a subclass. Args: app (Celery): App after initial configuration. """
[docs] @classmethod def create_setup_app(cls, celery_setup_config: dict, celery_setup_app_name: str) -> Celery: """Creates a celery app for the setup. Args: celery_setup_config (dict): Celery configuration dict. celery_setup_app_name (str): Celery app name. Returns: Celery: Celery app configured for this setup. """ if celery_setup_config is None: raise ValueError("celery_setup_config is None") if not celery_setup_app_name: raise ValueError("celery_setup_app_name is empty") app = Celery(celery_setup_app_name) app.config_from_object(celery_setup_config) cls.update_app_config(app) return app
[docs] def teardown(self) -> None: """Teardown the setup."""
[docs] def ready(self, ping: bool = False, control: bool = False, docker: bool = True) -> bool: """Check if the setup is ready for testing. All of the confirmations are optional. Warning: Enabling additional confirmations may hurt performance. Disabling all confirmations may result in false positive results. Use with caution. Args: ping (bool, optional): Confirm via ping task. Defaults to False. control (bool, optional): Confirm via ping control. Defaults to False. docker (bool, optional): Confirm via docker container status. Defaults to True. Returns: bool: True if the setup is ready for testing (all required confirmations passed). """ ready = ( self._is_task_ping_ready(ping) and self._is_control_ping_ready(control) and self._is_docker_ready(docker) ) if ready: self._set_app_for_all_nodes() return ready
def _is_docker_ready(self, docker: bool) -> bool: """Check if the node's containers are ready. Args: docker (bool): Flag to enable docker readiness check. Returns: bool: True if the node's containers are ready, False otherwise. """ if not docker: return True return ( (not self.broker_cluster or self.broker_cluster.ready()) and (not self.backend_cluster or self.backend_cluster.ready()) and (not self.worker_cluster or self.worker_cluster.ready()) ) def _is_control_ping_ready(self, control: bool) -> bool: """Check if worker nodes respond to control ping. Args: control (bool): Flag to enable control ping check. Returns: bool: True if control pings are successful, False otherwise. """ if not control: return True responses = self.app.control.ping() return all([all([res["ok"] == "pong" for _, res in response.items()]) for response in responses]) def _is_task_ping_ready(self, ping: bool) -> bool: """Check if worker nodes respond to ping task. Args: ping (bool): Flag to enable ping task check. Returns: bool: True if ping tasks are successful, False otherwise. """ if not ping: return True worker: CeleryTestWorker for worker in self.worker_cluster: # type: ignore res = self.ping.s().apply_async(queue=worker.worker_queue) if res.get(timeout=RESULT_TIMEOUT) != "pong": return False return True def _set_app_for_all_nodes(self) -> None: """Set the app instance for all nodes in the setup. This ensures each node has a reference to the centralized Celery app instance. """ nodes: tuple = tuple() if self.broker_cluster: nodes += self.broker_cluster.nodes if self.backend_cluster: nodes += self.backend_cluster.nodes if self.worker_cluster: nodes += self.worker_cluster.nodes for node in nodes: node._app = self.app