This document describes the current stable version of Celery (4.1). For development docs, go here.

Source code for celery.utils.debug

# -*- coding: utf-8 -*-
"""Utilities for debugging memory usage, blocking calls, etc."""
from __future__ import absolute_import, print_function, unicode_literals

import os
import sys
import traceback

from contextlib import contextmanager
from functools import partial
from pprint import pprint

from celery.five import WhateverIO, items, range
from celery.platforms import signals

try:
    from psutil import Process
except ImportError:
    Process = None  # noqa

__all__ = [
    'blockdetection', 'sample_mem', 'memdump', 'sample',
    'humanbytes', 'mem_rss', 'ps', 'cry',
]

UNITS = (
    (2 ** 40.0, 'TB'),
    (2 ** 30.0, 'GB'),
    (2 ** 20.0, 'MB'),
    (2 ** 10.0, 'KB'),
    (0.0, 'b'),
)

_process = None
_mem_sample = []


def _on_blocking(signum, frame):
    import inspect
    raise RuntimeError(
        'Blocking detection timed-out at: {0}'.format(
            inspect.getframeinfo(frame)
        )
    )


@contextmanager
def blockdetection(timeout):
    """Context that raises an exception if process is blocking.

    Uses ``SIGALRM`` to detect blocking functions.
    """
    if not timeout:
        yield
    else:
        old_handler = signals['ALRM']
        old_handler = None if old_handler == _on_blocking else old_handler

        signals['ALRM'] = _on_blocking

        try:
            yield signals.arm_alarm(timeout)
        finally:
            if old_handler:
                signals['ALRM'] = old_handler
            signals.reset_alarm()


[docs]def sample_mem(): """Sample RSS memory usage. Statistics can then be output by calling :func:`memdump`. """ current_rss = mem_rss() _mem_sample.append(current_rss) return current_rss
def _memdump(samples=10): # pragma: no cover S = _mem_sample prev = list(S) if len(S) <= samples else sample(S, samples) _mem_sample[:] = [] import gc gc.collect() after_collect = mem_rss() return prev, after_collect
[docs]def memdump(samples=10, file=None): # pragma: no cover """Dump memory statistics. Will print a sample of all RSS memory samples added by calling :func:`sample_mem`, and in addition print used RSS memory after :func:`gc.collect`. """ say = partial(print, file=file) if ps() is None: say('- rss: (psutil not installed).') return prev, after_collect = _memdump(samples) if prev: say('- rss (sample):') for mem in prev: say('- > {0},'.format(mem)) say('- rss (end): {0}.'.format(after_collect))
[docs]def sample(x, n, k=0): """Given a list `x` a sample of length ``n`` of that list is returned. For example, if `n` is 10, and `x` has 100 items, a list of every tenth. item is returned. ``k`` can be used as offset. """ j = len(x) // n for _ in range(n): try: yield x[k] except IndexError: break k += j
def hfloat(f, p=5): """Convert float to value suitable for humans. Arguments: f (float): The floating point number. p (int): Floating point precision (default is 5). """ i = int(f) return i if i == f else '{0:.{p}}'.format(f, p=p) def humanbytes(s): """Convert bytes to human-readable form (e.g., KB, MB).""" return next( '{0}{1}'.format(hfloat(s / div if div else s), unit) for div, unit in UNITS if s >= div )
[docs]def mem_rss(): """Return RSS memory usage as a humanized string.""" p = ps() if p is not None: return humanbytes(_process_memory_info(p).rss)
[docs]def ps(): # pragma: no cover """Return the global :class:`psutil.Process` instance. Note: Returns :const:`None` if :pypi:`psutil` is not installed. """ global _process if _process is None and Process is not None: _process = Process(os.getpid()) return _process
def _process_memory_info(process): try: return process.memory_info() except AttributeError: return process.get_memory_info() def cry(out=None, sepchr='=', seplen=49): # pragma: no cover """Return stack-trace of all active threads. See Also: Taken from https://gist.github.com/737056. """ import threading out = WhateverIO() if out is None else out P = partial(print, file=out) # get a map of threads by their ID so we can print their names # during the traceback dump tmap = {t.ident: t for t in threading.enumerate()} sep = sepchr * seplen for tid, frame in items(sys._current_frames()): thread = tmap.get(tid) if not thread: # skip old junk (left-overs from a fork) continue P('{0.name}'.format(thread)) P(sep) traceback.print_stack(frame, file=out) P(sep) P('LOCAL VARIABLES') P(sep) pprint(frame.f_locals, stream=out) P('\n') return out.getvalue()