This document describes the current stable version of Celery (4.2). For development docs, go here.

Source code for celery.worker.state

# -*- coding: utf-8 -*-
"""Internal worker state (global).

This includes the currently active and reserved tasks,
statistics, and revoked tasks.
"""
from __future__ import absolute_import, print_function, unicode_literals

import os
import platform
import shelve
import sys
import weakref
import zlib

from kombu.serialization import pickle, pickle_protocol
from kombu.utils.objects import cached_property

from celery import __version__
from celery.exceptions import WorkerShutdown, WorkerTerminate
from celery.five import Counter
from celery.utils.collections import LimitedSet

__all__ = (
    'SOFTWARE_INFO', 'reserved_requests', 'active_requests',
    'total_count', 'revoked', 'task_reserved', 'maybe_shutdown',
    'task_accepted', 'task_ready', 'Persistent',
)

#: Worker software/platform information.
SOFTWARE_INFO = {
    'sw_ident': 'py-celery',
    'sw_ver': __version__,
    'sw_sys': platform.system(),
}

#: maximum number of revokes to keep in memory.
REVOKES_MAX = 50000

#: how many seconds a revoke will be active before
#: being expired when the max limit has been exceeded.
REVOKE_EXPIRES = 10800

#: Mapping of reserved task_id->Request.
requests = {}

#: set of all reserved :class:`~celery.worker.request.Request`'s.
reserved_requests = weakref.WeakSet()

#: set of currently active :class:`~celery.worker.request.Request`'s.
active_requests = weakref.WeakSet()

#: count of tasks accepted by the worker, sorted by type.
total_count = Counter()

#: count of all tasks accepted by the worker
all_total_count = [0]

#: the list of currently revoked tasks.  Persistent if ``statedb`` set.
revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)

should_stop = None
should_terminate = None


def reset_state():
    requests.clear()
    reserved_requests.clear()
    active_requests.clear()
    total_count.clear()
    all_total_count[:] = [0]
    revoked.clear()


[docs]def maybe_shutdown(): """Shutdown if flags have been set.""" if should_stop is not None and should_stop is not False: raise WorkerShutdown(should_stop) elif should_terminate is not None and should_terminate is not False:
raise WorkerTerminate(should_terminate) def task_reserved(request, add_request=requests.__setitem__, add_reserved_request=reserved_requests.add): """Update global state when a task has been reserved.""" add_request(request.id, request) add_reserved_request(request)
[docs]def task_accepted(request, _all_total_count=all_total_count, add_active_request=active_requests.add, add_to_total_count=total_count.update): """Update global state when a task has been accepted.""" add_active_request(request) add_to_total_count({request.name: 1})
all_total_count[0] += 1 def task_ready(request, remove_request=requests.pop, discard_active_request=active_requests.discard, discard_reserved_request=reserved_requests.discard): """Update global state when a task is ready.""" remove_request(request.id, None) discard_active_request(request) discard_reserved_request(request) C_BENCH = os.environ.get('C_BENCH') or os.environ.get('CELERY_BENCH') C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or os.environ.get('CELERY_BENCH_EVERY') or 1000) if C_BENCH: # pragma: no cover import atexit from billiard.process import current_process from celery.five import monotonic from celery.utils.debug import memdump, sample_mem all_count = 0 bench_first = None bench_start = None bench_last = None bench_every = C_BENCH_EVERY bench_sample = [] __reserved = task_reserved __ready = task_ready if current_process()._name == 'MainProcess': @atexit.register def on_shutdown(): if bench_first is not None and bench_last is not None: print('- Time spent in benchmark: {0!r}'.format( bench_last - bench_first)) print('- Avg: {0}'.format( sum(bench_sample) / len(bench_sample))) memdump()
[docs] def task_reserved(request): # noqa """Called when a task is reserved by the worker.""" global bench_start global bench_first now = None if bench_start is None: bench_start = now = monotonic() if bench_first is None: bench_first = now
return __reserved(request)
[docs] def task_ready(request): # noqa """Called when a task is completed.""" global all_count global bench_start global bench_last all_count += 1 if not all_count % bench_every: now = monotonic() diff = now - bench_start print('- Time spent processing {0} tasks (since first ' 'task received): ~{1:.4f}s\n'.format(bench_every, diff)) sys.stdout.flush() bench_start = bench_last = now bench_sample.append(diff) sample_mem()
return __ready(request)
[docs]class Persistent(object): """Stores worker state between restarts. This is the persistent data stored by the worker when :option:`celery worker --statedb` is enabled. Currently only stores revoked task id's. """ storage = shelve protocol = pickle_protocol compress = zlib.compress decompress = zlib.decompress _is_open = False def __init__(self, state, filename, clock=None): self.state = state self.filename = filename self.clock = clock self.merge()
[docs] def open(self): return self.storage.open( self.filename, protocol=self.protocol, writeback=True,
)
[docs] def merge(self):
self._merge_with(self.db)
[docs] def sync(self): self._sync_with(self.db)
self.db.sync()
[docs] def close(self): if self._is_open: self.db.close()
self._is_open = False
[docs] def save(self): self.sync()
self.close() def _merge_with(self, d): self._merge_revoked(d) self._merge_clock(d) return d def _sync_with(self, d): self._revoked_tasks.purge() d.update({ str('__proto__'): 3, str('zrevoked'): self.compress(self._dumps(self._revoked_tasks)), str('clock'): self.clock.forward() if self.clock else 0, }) return d def _merge_clock(self, d): if self.clock: d[str('clock')] = self.clock.adjust(d.get(str('clock')) or 0) def _merge_revoked(self, d): try: self._merge_revoked_v3(d[str('zrevoked')]) except KeyError: try: self._merge_revoked_v2(d.pop(str('revoked'))) except KeyError: pass # purge expired items at boot self._revoked_tasks.purge() def _merge_revoked_v3(self, zrevoked): if zrevoked: self._revoked_tasks.update(pickle.loads(self.decompress(zrevoked))) def _merge_revoked_v2(self, saved): if not isinstance(saved, LimitedSet): # (pre 3.0.18) used to be stored as a dict return self._merge_revoked_v1(saved) self._revoked_tasks.update(saved) def _merge_revoked_v1(self, saved): add = self._revoked_tasks.add for item in saved: add(item) def _dumps(self, obj): return pickle.dumps(obj, protocol=self.protocol) @property def _revoked_tasks(self): return self.state.revoked
[docs] @cached_property def db(self): self._is_open = True
return self.open()