.. _examples_range: ======= range ======= :Release: |version| :Date: |today| .. contents:: :local: :depth: 2 Description =========== This example project demonstrates how to mess around with the workers cluster. It uses a list of Celery versions to create two test configurations: 1. Running against each Celery worker by itself. 2. Running against a cluster of all Celery workers. It demonstrate the flexibility of the library, and how you might use it to test your own code. .. note:: Testing different worker versions one by one or in a cluster is useful for testing migrations and compatibility between different versions. It can be used to define test suites that can verify a migration is working as expected, or that a new version is compatible with the old one, etc. Breakdown ========= File Structure ~~~~~~~~~~~~~~ The following diagram lists the relevant files in the project. .. code-block:: text range/ ├── tests/ │ ├── __init__.py │ ├── conftest.py │ └── test_range.py │ └── test_range_cluster.py └── requirements.txt conftest.py ~~~~~~~~~~~ We've created a helper function that will return a list of Celery versions that are between two given versions. It uses the `pypi.org `_ API to get the list of versions and filters them by the given range. .. code-block:: python def get_celery_versions(start_version: str, end_version: str) -> list[str]: url = "https://pypi.org/pypi/celery/json" response = requests.get(url) data = response.json() all_versions = data["releases"].keys() filtered_versions = [ v for v in all_versions if ( parse_version(start_version) <= parse_version(v) <= parse_version(end_version) and not parse_version(v).is_prerelease ) ] return sorted(filtered_versions, key=parse_version) .. _examples_range_test_range: test_range.py ~~~~~~~~~~~~~ In this scenario, we reconfigure the :func:`default worker ` to run against each Celery version in the list. This will add additional `parametization `_ to the worker fixture, and will generate a different worker for each Celery version. .. code-block:: python class TestRange: @pytest.fixture(scope="session", params=get_celery_versions("v4.4.7", "v5.0.0")) def default_worker_celery_version(self, request: pytest.FixtureRequest) -> str: return request.param Following up with this simple test case will produce a test run for each Celery version in the list. .. code-block:: python def test_ping(self, celery_setup: CeleryTestSetup, default_worker_celery_version: str): sig: Signature = ping.s() res: AsyncResult = sig.apply_async() assert res.get(timeout=RESULT_TIMEOUT) == "pong" .. note:: When using `pytest-xdist `_ to run tests in parallel, this will create a test run for each Celery version in the list, in parallel. .. code-block:: text tests/test_range.py::TestRange::test_ping[4.4.7-celery_setup_worker-celery_redis_broker-celery_redis_backend] tests/test_range.py::TestRange::test_ping[5.0.0-celery_setup_worker-celery_redis_broker-celery_redis_backend] tests/test_range.py::TestRange::test_ping[4.4.7-celery_setup_worker-celery_rabbitmq_broker-celery_redis_backend] tests/test_range.py::TestRange::test_ping[5.0.0-celery_setup_worker-celery_rabbitmq_broker-celery_redis_backend] Notice how it still runs against all the brokers and backends, as we running against default settings. test_range_cluster.py ~~~~~~~~~~~~~~~~~~~~~ In this scenario, we generate a list of workers per version, and then configure the :func:`celery_worker_cluster ` to include all of them. Once using a range of Celery versions, and once using a fixed list. .. code-block:: python versions_range = get_celery_versions("v5.0.0", "v5.0.5") versions_list = ["v4.4.7", "v5.2.7", "v5.3.0"] The ``generate_workers`` is a helper function that builds worker containers on the fly using the APIs from `pytest-docker-tools `_. Our focus should be on the ``worker_containers`` list, which will contain the names of the generated worker containers fixtures. .. code-block:: python def generate_workers(versions: list[str]) -> list[str]: worker_containers = list() for v in versions: img = f"worker_v{v.replace('.', '_')}_image" globals()[img] = build( path=WORKER_DOCKERFILE_ROOTDIR, tag=f"pytest-celery/examples/worker:v{v}", buildargs={ "CELERY_VERSION": v, "CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"), "CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"), "CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"), }, ) cnt = f"worker_v{v.replace('.', '_')}_container" globals()[cnt] = container( image="{" + f"{img}.id" + "}", ports=fxtr("default_worker_ports"), environment=fxtr("default_worker_env"), network="{default_pytest_celery_network.name}", volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=CeleryWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, ) worker_containers.append(cnt) return worker_containers Next, we configure the :func:`celery_worker_cluster ` to include all the workers, and then run a simple test case to verify the cluster is configured as expected. .. code-block:: python class TestClusterList: @pytest.fixture(params=[generate_workers(versions_list)]) def celery_worker_cluster(self, request: pytest.FixtureRequest) -> CeleryWorkerCluster: nodes: list[CeleryWorkerContainer] = [request.getfixturevalue(worker) for worker in request.param] cluster = CeleryWorkerCluster(*nodes) yield cluster cluster.teardown() def test_worker_cluster_with_fixed_list(self, celery_setup: CeleryTestSetup, subtests: SubTests): worker: CeleryTestWorker for version, worker in zip(versions_list, celery_setup.worker_cluster): with subtests.test(msg=f"Found worker {version} in cluster"): assert f"{worker.hostname()} {version}" in worker.logs() class TestClusterRange: @pytest.fixture(params=[generate_workers(versions_range)]) def celery_worker_cluster(self, request: pytest.FixtureRequest) -> CeleryWorkerCluster: nodes: list[CeleryWorkerContainer] = [request.getfixturevalue(worker) for worker in request.param] cluster = CeleryWorkerCluster(*nodes) yield cluster cluster.teardown() def test_worker_cluster_with_versions_range(self, celery_setup: CeleryTestSetup, subtests: SubTests): worker: CeleryTestWorker for version, worker in zip(versions_range, celery_setup.worker_cluster): with subtests.test(msg=f"Found worker v{version} in cluster"): assert f"{worker.hostname()} v{version}" in worker.logs() Running everything in parallel will produce the following output: .. code-block:: text PASSED tests/test_range.py::TestRange::test_ping[5.0.0-celery_setup_worker-celery_redis_broker-celery_redis_backend] PASSED tests/test_range.py::TestRange::test_ping[4.4.7-celery_setup_worker-celery_redis_broker-celery_redis_backend] PASSED tests/test_range.py::TestRange::test_ping[4.4.7-celery_setup_worker-celery_rabbitmq_broker-celery_redis_backend] PASSED tests/test_range.py::TestRange::test_ping[5.0.0-celery_setup_worker-celery_rabbitmq_broker-celery_redis_backend] [Found worker v4.4.7 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v4.4.7 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.2.7 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.2.7 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.3.0 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.3.0 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] PASSED tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] PASSED tests/test_range_cluster.py::TestClusterList::test_worker_cluster_with_fixed_list[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.0.0 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.0.1 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.0.2 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.0.3 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.0.4 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.0.5 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] PASSED tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_redis_broker-celery_redis_backend] [Found worker v5.0.0 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.0.1 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.0.2 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.0.3 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.0.4 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] [Found worker v5.0.5 in cluster] SUBPASS tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend] PASSED tests/test_range_cluster.py::TestClusterRange::test_worker_cluster_with_versions_range[celery_worker_cluster0-celery_rabbitmq_broker-celery_redis_backend]