import urwid, os, subprocess, fcntl, glob try: from local_config import * except ImportError: SERVER_PATH = "testing/home@*" PIPE_PATH = "testing/home@%s/slacker/pipe" LOG_COMMAND = ["/usr/bin/env", "python3", "testing/log_simulator.py"] class JournalViewer: def __init__(self, status, loop): self.status = status self.loop = loop self.list_walker = urwid.SimpleFocusListWalker([]) self.list_box = urwid.ListBox(self.list_walker) self.root = self.list_box self.proc = None def write(self, text): for line in str(text).split("\n"): self.list_walker.append(urwid.Text(line)) lc = len(self.list_walker) self.status.set_text("%d line%s" % (lc, "" if lc == 1 else "s")) def proc_open(self, argv): if self.proc is not None: self.loop.remove_watch_file(self.pipefd) # FIXME: closing the fd fucks up the event loop #os.close(self.pipefd) self.proc.kill() self.pipefd, writefd = os.pipe() fl = fcntl.fcntl(self.pipefd, fcntl.F_GETFL) fcntl.fcntl(self.pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK) self.pipe = os.fdopen(self.pipefd) self.proc = subprocess.Popen(argv, stdout=writefd, stderr=writefd) self.loop.watch_file(self.pipefd, self.proc_write) def proc_write(self): self.write(self.pipe.read()) class ServerSelectorPopup(urwid.WidgetWrap): def __init__(self, parent, console): self.parent = parent self.console = console buttons = list() for server in ServerSelector.list_servers(): button = urwid.Button(server) urwid.connect_signal(button, "click", self.click) buttons.append(button) filler = urwid.Filler(urwid.Pile(buttons)) super().__init__(urwid.AttrWrap(filler, "popup")) def click(self, button): server = button.label # jfc self.parent.close_pop_up() self.console.select_server(server) class ServerSelector(urwid.PopUpLauncher): def list_servers(): return [path.split("@")[1] for path in glob.glob(SERVER_PATH)] def __init__(self, console): self.console = console self.button = urwid.Button("...") super().__init__(self.button) urwid.connect_signal(self.button, "click", lambda _: self.open_pop_up()) def create_pop_up(self): pop_up = ServerSelectorPopup(self, self.console) return pop_up def get_pop_up_parameters(self): return {'left':0, 'top':1, 'overlay_width':20, 'overlay_height':5} class Console: def __init__(self, loop): self.loop = loop self.edit_pre = ServerSelector(self) self.edit_pre_attrmap = urwid.AttrMap(self.edit_pre, "button", "button focus") self.edit = urwid.Edit() self.edit_attrmap = urwid.AttrMap(self.edit, "edit", "edit focus") self.edit_box = urwid.Columns( [self.edit_pre_attrmap, ("weight", 6, self.edit_attrmap)] ) self.status = urwid.Text("", align="center") self.status_attrmap = urwid.AttrMap(self.status, "status", None) self.viewer = JournalViewer(self.status, self.loop) self.root = urwid.Frame(self.viewer.root, header=self.status_attrmap, footer=self.edit_box) # Focus on the edit box self.root.focus_position = "footer" self.edit_box.focus_position = 1 servers = ServerSelector.list_servers() if len(servers) == 0: print("There are no servers in '%s'" % SERVER_PATH) exit(1) self.select_server("trench" if "trench" in servers else servers[0]) def unhandled_input(self, text): if text == "tab": if self.root.focus_position == "body": self.root.focus_position = "footer" self.edit_box.focus_position = 1 else: self.root.focus_position = "body" elif text == "enter": command = self.edit.get_edit_text() self.edit.set_edit_text("") self.execute(command) def select_server(self, server): self.server = server self.viewer.write("*** Now viewing logs from %s ***" % server) self.edit_pre.button.set_label(server) argv = [arg % server if "%s" in arg else arg for arg in LOG_COMMAND] self.viewer.proc_open(argv) def execute(self, command): self.viewer.write(PIPE_PATH % self.server) with open(PIPE_PATH % self.server, "w") as fp: fp.write(command + "\n") palette = [ ("button", "white", "dark red"), ("button focus", "white, bold", "light red"), ("status", "black", "white"), ("edit focus", "white", "black"), ("edit", "white", "dark gray"), ("popup", "white", "dark red"), ] loop = urwid.MainLoop(None, palette, pop_ups=True) console = Console(loop) loop.widget = console.root loop.unhandled_input = console.unhandled_input try: loop.run() except KeyboardInterrupt: exit()