This document describes the current stable version of Celery (4.0). For development docs, go here.
Source code for celery.bin.logtool
# -*- coding: utf-8 -*-
"""The :program:`celery logtool` command.
.. program:: celery logtool
"""
from __future__ import absolute_import, unicode_literals
import re
from collections import Counter
from fileinput import FileInput
from .base import Command
__all__ = ['logtool']
RE_LOG_START = re.compile(r'^\[\d\d\d\d\-\d\d-\d\d ')
RE_TASK_RECEIVED = re.compile(r'.+?\] Received')
RE_TASK_READY = re.compile(r'.+?\] Task')
RE_TASK_INFO = re.compile(r'.+?([\w\.]+)\[(.+?)\].+')
RE_TASK_RESULT = re.compile(r'.+?[\w\.]+\[.+?\] (.+)')
REPORT_FORMAT = """
Report
======
Task total: {task[total]}
Task errors: {task[errors]}
Task success: {task[succeeded]}
Task completed: {task[completed]}
Tasks
=====
{task[types].format}
"""
class _task_counts(list):
@property
def format(self):
return '\n'.join('{0}: {1}'.format(*i) for i in self)
def task_info(line):
m = RE_TASK_INFO.match(line)
return m.groups()
class Audit(object):
def __init__(self, on_task_error=None, on_trace=None, on_debug=None):
self.ids = set()
self.names = {}
self.results = {}
self.ready = set()
self.task_types = Counter()
self.task_errors = 0
self.on_task_error = on_task_error
self.on_trace = on_trace
self.on_debug = on_debug
self.prev_line = None
def run(self, files):
for line in FileInput(files):
self.feed(line)
return self
def task_received(self, line, task_name, task_id):
self.names[task_id] = task_name
self.ids.add(task_id)
self.task_types[task_name] += 1
def task_ready(self, line, task_name, task_id, result):
self.ready.add(task_id)
self.results[task_id] = result
if 'succeeded' not in result:
self.task_error(line, task_name, task_id, result)
def task_error(self, line, task_name, task_id, result):
self.task_errors += 1
if self.on_task_error:
self.on_task_error(line, task_name, task_id, result)
def feed(self, line):
if RE_LOG_START.match(line):
if RE_TASK_RECEIVED.match(line):
task_name, task_id = task_info(line)
self.task_received(line, task_name, task_id)
elif RE_TASK_READY.match(line):
task_name, task_id = task_info(line)
result = RE_TASK_RESULT.match(line)
if result:
result, = result.groups()
self.task_ready(line, task_name, task_id, result)
else:
if self.on_debug:
self.on_debug(line)
self.prev_line = line
else:
if self.on_trace:
self.on_trace('\n'.join(filter(None, [self.prev_line, line])))
self.prev_line = None
def incomplete_tasks(self):
return self.ids ^ self.ready
def report(self):
return {
'task': {
'types': _task_counts(self.task_types.most_common()),
'total': len(self.ids),
'errors': self.task_errors,
'completed': len(self.ready),
'succeeded': len(self.ready) - self.task_errors,
}
}
[docs]class logtool(Command):
"""The ``celery logtool`` command."""
args = """<action> [arguments]
..... stats [file1|- [file2 [...]]]
..... traces [file1|- [file2 [...]]]
..... errors [file1|- [file2 [...]]]
..... incomplete [file1|- [file2 [...]]]
..... debug [file1|- [file2 [...]]]
"""
[docs] def run(self, what=None, *files, **kwargs):
map = {
'stats': self.stats,
'traces': self.traces,
'errors': self.errors,
'incomplete': self.incomplete,
'debug': self.debug,
}
if not what:
raise self.UsageError('missing action')
elif what not in map:
raise self.Error(
'action {0} not in {1}'.format(what, '|'.join(map)),
)
return map[what](files)
[docs] def incomplete(self, files):
audit = Audit()
audit.run(files)
for task_id in audit.incomplete_tasks():
self.error('Did not complete: %r' % (task_id,))