This document describes the current stable version of Celery (5.3). For development docs, go here.

Source code for celery.events.cursesmon

"""Graphical monitor of Celery events using curses."""

import curses
import sys
import threading
from datetime import datetime
from itertools import count
from math import ceil
from textwrap import wrap
from time import time

from celery import VERSION_BANNER, states
from celery.app import app_or_default
from celery.utils.text import abbr, abbrtask

__all__ = ('CursesMonitor', 'evtop')

BORDER_SPACING = 4
LEFT_BORDER_OFFSET = 3
UUID_WIDTH = 36
STATE_WIDTH = 8
TIMESTAMP_WIDTH = 8
MIN_WORKER_WIDTH = 15
MIN_TASK_WIDTH = 16

# this module is considered experimental
# we don't care about coverage.

STATUS_SCREEN = """\
events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}
"""


[docs]class CursesMonitor: # pragma: no cover """A curses based Celery task monitor.""" keymap = {} win = None screen_delay = 10 selected_task = None selected_position = 0 selected_str = 'Selected: ' foreground = curses.COLOR_BLACK background = curses.COLOR_WHITE online_str = 'Workers online: ' help_title = 'Keys: ' help = ('j:down k:up i:info t:traceback r:result c:revoke ^c: quit') greet = f'celery events {VERSION_BANNER}' info_str = 'Info: ' def __init__(self, state, app, keymap=None): self.app = app self.keymap = keymap or self.keymap self.state = state default_keymap = { 'J': self.move_selection_down, 'K': self.move_selection_up, 'C': self.revoke_selection, 'T': self.selection_traceback, 'R': self.selection_result, 'I': self.selection_info, 'L': self.selection_rate_limit, } self.keymap = dict(default_keymap, **self.keymap) self.lock = threading.RLock()
[docs] def format_row(self, uuid, task, worker, timestamp, state): mx = self.display_width # include spacing detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH if uuid_space < UUID_WIDTH: uuid_width = uuid_space else: uuid_width = UUID_WIDTH detail_width = detail_width - uuid_width - 1 task_width = int(ceil(detail_width / 2.0)) worker_width = detail_width - task_width - 1 uuid = abbr(uuid, uuid_width).ljust(uuid_width) worker = abbr(worker, worker_width).ljust(worker_width) task = abbrtask(task, task_width).ljust(task_width) state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH) timestamp = timestamp.ljust(TIMESTAMP_WIDTH) row = f'{uuid} {worker} {task} {timestamp} {state} ' if self.screen_width is None: self.screen_width = len(row[:mx]) return row[:mx]
@property def screen_width(self): _, mx = self.win.getmaxyx() return mx @property def screen_height(self): my, _ = self.win.getmaxyx() return my @property def display_width(self): _, mx = self.win.getmaxyx() return mx - BORDER_SPACING @property def display_height(self): my, _ = self.win.getmaxyx() return my - 10 @property def limit(self): return self.display_height
[docs] def find_position(self): if not self.tasks: return 0 for i, e in enumerate(self.tasks): if self.selected_task == e[0]: return i return 0
[docs] def move_selection_up(self): self.move_selection(-1)
[docs] def move_selection_down(self): self.move_selection(1)
[docs] def move_selection(self, direction=1): if not self.tasks: return pos = self.find_position() try: self.selected_task = self.tasks[pos + direction][0] except IndexError: self.selected_task = self.tasks[0][0]
keyalias = {curses.KEY_DOWN: 'J', curses.KEY_UP: 'K', curses.KEY_ENTER: 'I'}
[docs] def handle_keypress(self): try: key = self.win.getkey().upper() except Exception: # pylint: disable=broad-except return key = self.keyalias.get(key) or key handler = self.keymap.get(key) if handler is not None: handler()
[docs] def alert(self, callback, title=None): self.win.erase() my, mx = self.win.getmaxyx() y = blank_line = count(2) if title: self.win.addstr(next(y), 3, title, curses.A_BOLD | curses.A_UNDERLINE) next(blank_line) callback(my, mx, next(y)) self.win.addstr(my - 1, 0, 'Press any key to continue...', curses.A_BOLD) self.win.refresh() while 1: try: return self.win.getkey().upper() except Exception: # pylint: disable=broad-except pass
[docs] def selection_rate_limit(self): if not self.selected_task: return curses.beep() task = self.state.tasks[self.selected_task] if not task.name: return curses.beep() my, mx = self.win.getmaxyx() r = 'New rate limit: ' self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE) self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r))) rlimit = self.readline(my - 2, 3 + len(r)) if rlimit: reply = self.app.control.rate_limit(task.name, rlimit.strip(), reply=True) self.alert_remote_control_reply(reply)
[docs] def alert_remote_control_reply(self, reply): def callback(my, mx, xs): y = count(xs) if not reply: self.win.addstr( next(y), 3, 'No replies received in 1s deadline.', curses.A_BOLD + curses.color_pair(2), ) return for subreply in reply: curline = next(y) host, response = next(subreply.items()) host = f'{host}: ' self.win.addstr(curline, 3, host, curses.A_BOLD) attr = curses.A_NORMAL text = '' if 'error' in response: text = response['error'] attr |= curses.color_pair(2) elif 'ok' in response: text = response['ok'] attr |= curses.color_pair(3) self.win.addstr(curline, 3 + len(host), text, attr) return self.alert(callback, 'Remote Control Command Replies')
[docs] def readline(self, x, y): buffer = '' curses.echo() try: i = 0 while 1: ch = self.win.getch(x, y + i) if ch != -1: if ch in (10, curses.KEY_ENTER): # enter break if ch in (27,): buffer = '' break buffer += chr(ch) i += 1 finally: curses.noecho() return buffer
[docs] def revoke_selection(self): if not self.selected_task: return curses.beep() reply = self.app.control.revoke(self.selected_task, reply=True) self.alert_remote_control_reply(reply)
[docs] def selection_info(self): if not self.selected_task: return def alert_callback(mx, my, xs): my, mx = self.win.getmaxyx() y = count(xs) task = self.state.tasks[self.selected_task] info = task.info(extra=['state']) infoitems = [ ('args', info.pop('args', None)), ('kwargs', info.pop('kwargs', None)) ] + list(info.items()) for key, value in infoitems: if key is None: continue value = str(value) curline = next(y) keys = key + ': ' self.win.addstr(curline, 3, keys, curses.A_BOLD) wrapped = wrap(value, mx - 2) if len(wrapped) == 1: self.win.addstr( curline, len(keys) + 3, abbr(wrapped[0], self.screen_width - (len(keys) + 3))) else: for subline in wrapped: nexty = next(y) if nexty >= my - 1: subline = ' ' * 4 + '[...]' self.win.addstr( nexty, 3, abbr(' ' * 4 + subline, self.screen_width - 4), curses.A_NORMAL, ) return self.alert( alert_callback, f'Task details for {self.selected_task}', )
[docs] def selection_traceback(self): if not self.selected_task: return curses.beep() task = self.state.tasks[self.selected_task] if task.state not in states.EXCEPTION_STATES: return curses.beep() def alert_callback(my, mx, xs): y = count(xs) for line in task.traceback.split('\n'): self.win.addstr(next(y), 3, line) return self.alert( alert_callback, f'Task Exception Traceback for {self.selected_task}', )
[docs] def selection_result(self): if not self.selected_task: return def alert_callback(my, mx, xs): y = count(xs) task = self.state.tasks[self.selected_task] result = (getattr(task, 'result', None) or getattr(task, 'exception', None)) for line in wrap(result or '', mx - 2): self.win.addstr(next(y), 3, line) return self.alert( alert_callback, f'Task Result for {self.selected_task}', )
[docs] def display_task_row(self, lineno, task): state_color = self.state_colors.get(task.state) attr = curses.A_NORMAL if task.uuid == self.selected_task: attr = curses.A_STANDOUT timestamp = datetime.utcfromtimestamp( task.timestamp or time(), ) timef = timestamp.strftime('%H:%M:%S') hostname = task.worker.hostname if task.worker else '*NONE*' line = self.format_row(task.uuid, task.name, hostname, timef, task.state) self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr) if state_color: self.win.addstr(lineno, len(line) - STATE_WIDTH + BORDER_SPACING - 1, task.state, state_color | attr)
[docs] def draw(self): with self.lock: win = self.win self.handle_keypress() x = LEFT_BORDER_OFFSET y = blank_line = count(2) my, _ = win.getmaxyx() win.erase() win.bkgd(' ', curses.color_pair(1)) win.border() win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5)) next(blank_line) win.addstr(next(y), x, self.format_row('UUID', 'TASK', 'WORKER', 'TIME', 'STATE'), curses.A_BOLD | curses.A_UNDERLINE) tasks = self.tasks if tasks: for row, (_, task) in enumerate(tasks): if row > self.display_height: break if task.uuid: lineno = next(y) self.display_task_row(lineno, task) # -- Footer next(blank_line) win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4) # Selected Task Info if self.selected_task: win.addstr(my - 5, x, self.selected_str, curses.A_BOLD) info = 'Missing extended info' detail = '' try: selection = self.state.tasks[self.selected_task] except KeyError: pass else: info = selection.info() if 'runtime' in info: info['runtime'] = '{:.2f}'.format(info['runtime']) if 'result' in info: info['result'] = abbr(info['result'], 16) info = ' '.join( f'{key}={value}' for key, value in info.items() ) detail = '... -> key i' infowin = abbr(info, self.screen_width - len(self.selected_str) - 2, detail) win.addstr(my - 5, x + len(self.selected_str), infowin) # Make ellipsis bold if detail in infowin: detailpos = len(infowin) - len(detail) win.addstr(my - 5, x + len(self.selected_str) + detailpos, detail, curses.A_BOLD) else: win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL) # Workers if self.workers: win.addstr(my - 4, x, self.online_str, curses.A_BOLD) win.addstr(my - 4, x + len(self.online_str), ', '.join(sorted(self.workers)), curses.A_NORMAL) else: win.addstr(my - 4, x, 'No workers discovered.') # Info win.addstr(my - 3, x, self.info_str, curses.A_BOLD) win.addstr( my - 3, x + len(self.info_str), STATUS_SCREEN.format( s=self.state, w_alive=len([w for w in self.state.workers.values() if w.alive]), w_all=len(self.state.workers), ), curses.A_DIM, ) # Help self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD) self.safe_add_str(my - 2, x + len(self.help_title), self.help, curses.A_DIM) win.refresh()
[docs] def safe_add_str(self, y, x, string, *args, **kwargs): if x + len(string) > self.screen_width: string = string[:self.screen_width - x] self.win.addstr(y, x, string, *args, **kwargs)
[docs] def init_screen(self): with self.lock: self.win = curses.initscr() self.win.nodelay(True) self.win.keypad(True) curses.start_color() curses.init_pair(1, self.foreground, self.background) # exception states curses.init_pair(2, curses.COLOR_RED, self.background) # successful state curses.init_pair(3, curses.COLOR_GREEN, self.background) # revoked state curses.init_pair(4, curses.COLOR_MAGENTA, self.background) # greeting curses.init_pair(5, curses.COLOR_BLUE, self.background) # started state curses.init_pair(6, curses.COLOR_YELLOW, self.foreground) self.state_colors = {states.SUCCESS: curses.color_pair(3), states.REVOKED: curses.color_pair(4), states.STARTED: curses.color_pair(6)} for state in states.EXCEPTION_STATES: self.state_colors[state] = curses.color_pair(2) curses.cbreak()
[docs] def resetscreen(self): with self.lock: curses.nocbreak() self.win.keypad(False) curses.echo() curses.endwin()
[docs] def nap(self): curses.napms(self.screen_delay)
@property def tasks(self): return list(self.state.tasks_by_time(limit=self.limit)) @property def workers(self): return [hostname for hostname, w in self.state.workers.items() if w.alive]
class DisplayThread(threading.Thread): # pragma: no cover def __init__(self, display): self.display = display self.shutdown = False super().__init__() def run(self): while not self.shutdown: self.display.draw() self.display.nap() def capture_events(app, state, display): # pragma: no cover def on_connection_error(exc, interval): print('Connection Error: {!r}. Retry in {}s.'.format( exc, interval), file=sys.stderr) while 1: print('-> evtop: starting capture...', file=sys.stderr) with app.connection_for_read() as conn: try: conn.ensure_connection(on_connection_error, app.conf.broker_connection_max_retries) recv = app.events.Receiver(conn, handlers={'*': state.event}) display.resetscreen() display.init_screen() recv.capture() except conn.connection_errors + conn.channel_errors as exc: print(f'Connection lost: {exc!r}', file=sys.stderr)
[docs]def evtop(app=None): # pragma: no cover """Start curses monitor.""" app = app_or_default(app) state = app.events.State() display = CursesMonitor(state, app) display.init_screen() refresher = DisplayThread(display) refresher.start() try: capture_events(app, state, display) except Exception: refresher.shutdown = True refresher.join() display.resetscreen() raise except (KeyboardInterrupt, SystemExit): refresher.shutdown = True refresher.join() display.resetscreen()
if __name__ == '__main__': # pragma: no cover evtop()