This document describes the current stable version of pytest_celery (1.0). For development docs, go here.
Source code for pytest_celery.api.broker
"""Broker components represents Celery's broker instances.
This module provides the base API for creating new broker components by
defining the base classes for broker nodes and clusters.
"""
from __future__ import annotations
from pytest_celery.api.base import CeleryTestCluster
from pytest_celery.api.base import CeleryTestNode
from pytest_celery.defaults import DEFAULT_WORKER_ENV
[docs]
class CeleryTestBroker(CeleryTestNode):
"""This is specialized node type for handling celery brokers nodes. It is
used to encapsulate a broker instance.
Responsibility Scope:
Handling broker specific requirements and configuration.
"""
[docs]
@classmethod
def default_config(cls) -> dict:
"""Default node configurations if not overridden by the user."""
return {
"url": DEFAULT_WORKER_ENV["CELERY_BROKER_URL"],
"host_url": DEFAULT_WORKER_ENV["CELERY_BROKER_URL"],
}
[docs]
def restart(self, reload_container: bool = True, force: bool = False) -> None:
"""Override restart method to update the app broker url with new
container values."""
super().restart(reload_container, force)
if self.app:
self.app.conf.update(
broker_url=self.config()["host_url"],
)
[docs]
class CeleryBrokerCluster(CeleryTestCluster):
"""This is a specialized cluster type for handling celery brokers. It is
used to define which broker instances are available for the test.
Responsibility Scope:
Provude useful methods for managing a cluster of celery brokers.
"""
[docs]
@classmethod
def default_config(cls) -> dict:
"""Default cluster configurations if not overridden by the user."""
return {
"urls": [DEFAULT_WORKER_ENV["CELERY_BROKER_URL"]],
"host_urls": [DEFAULT_WORKER_ENV["CELERY_BROKER_URL"]],
}