import urwid, os, subprocess, fcntl, glob from trem_colors import trem_to_markup, trem_palette try: from local_config import * except ImportError: SERVER_PATH = "testing/home@*" PIPE_PATH = "testing/home@%s/slacker/pipe" LOG_COMMAND = ["/usr/bin/env", "python3", "testing/log_simulator.py"] class JournalViewer: def __init__(self, status, loop): self.status = status self.loop = loop self.list_walker = urwid.SimpleFocusListWalker([]) self.list_box = urwid.ListBox(self.list_walker) self.root = self.list_box self.proc = None self.following = True # FIXME: call this whenever the user scrolls through the log def update(self): lc = len(self.list_walker) if self.following: self.list_box.set_focus(lc - 1) self.status.set_text("%d/%d line%s, [F]ollow: %s" % ( self.list_box.focus_position + 1, lc, "" if lc == 1 else "s", "on" if self.following else "off")) def write(self, text, color_override=None): for line in str(text).split("\n"): # FIXME: where are these empty lines coming from? if line == "": continue if color_override is None: markup = trem_to_markup(line) else: markup = (color_override, line) self.list_walker.append(urwid.Text(markup)) self.update() def proc_open(self, argv): if self.proc is not None: self.loop.remove_watch_file(self.pipefd) # FIXME: closing the fd fucks up the event loop #os.close(self.pipefd) self.proc.kill() self.pipefd, writefd = os.pipe() fl = fcntl.fcntl(self.pipefd, fcntl.F_GETFL) fcntl.fcntl(self.pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK) self.pipe = os.fdopen(self.pipefd, encoding="utf-8", errors="replace") self.proc = subprocess.Popen(argv, stdout=writefd, stderr=writefd) self.loop.watch_file(self.pipefd, self.proc_write) def proc_write(self): self.write(self.pipe.read()) class ServerSelectorPopup(urwid.WidgetWrap): def __init__(self, parent, console): self.parent = parent self.console = console buttons = list() for server in ServerSelector.list_servers(): button = urwid.Button(server) urwid.connect_signal(button, "click", self.click) buttons.append(button) filler = urwid.Filler(urwid.Pile(buttons)) super().__init__(urwid.AttrWrap(filler, "popup")) def click(self, button): server = button.label # jfc self.parent.close_pop_up() self.console.select_server(server) class ServerSelector(urwid.PopUpLauncher): def list_servers(): return [path.split("@")[1] for path in glob.glob(SERVER_PATH)] def __init__(self, console): self.console = console self.button = urwid.Button("...") super().__init__(self.button) urwid.connect_signal(self.button, "click", lambda _: self.open_pop_up()) def create_pop_up(self): pop_up = ServerSelectorPopup(self, self.console) return pop_up def get_pop_up_parameters(self): return {'left':0, 'top':1, 'overlay_width':20, 'overlay_height':5} class Console: def __init__(self, loop): self.loop = loop self.edit_pre = ServerSelector(self) self.edit_pre_attrmap = urwid.AttrMap(self.edit_pre, "button", "button focus") self.edit = urwid.Edit(" $ ") self.edit_attrmap = urwid.AttrMap(self.edit, "edit", "edit focus") self.edit_box = urwid.Columns( [self.edit_pre_attrmap, ("weight", 6, self.edit_attrmap)] ) self.status = urwid.Text("", align="center") self.status_attrmap = urwid.AttrMap(self.status, "status", None) self.viewer = JournalViewer(self.status, self.loop) self.root = urwid.Frame(self.viewer.root, header=self.status_attrmap, footer=self.edit_box) # Focus on the edit box self.root.focus_position = "footer" self.edit_box.focus_position = 1 servers = ServerSelector.list_servers() if len(servers) == 0: print("There are no servers in '%s'" % SERVER_PATH) exit(1) self.select_server("trench" if "trench" in servers else servers[0]) def unhandled_input(self, text): if text == "tab": if self.root.focus_position == "body": self.root.focus_position = "footer" self.edit_box.focus_position = 1 else: self.root.focus_position = "body" elif text == "enter": command = self.edit.get_edit_text() self.edit.set_edit_text("") self.execute(command) elif text == "f": self.viewer.following = not self.viewer.following self.viewer.update() def select_server(self, server): self.server = server self.viewer.write("Now viewing logs from %s" % server, color_override="system") self.edit_pre.button.set_label(server) argv = [arg % server if "%s" in arg else arg for arg in LOG_COMMAND] self.viewer.proc_open(argv) def execute(self, command): self.viewer.write("%s $ %s" % (self.server, command), color_override="system") with open(PIPE_PATH % self.server, "w") as fp: fp.write(command + "\n") palette = [ # UI elements ("button", "black", "white"), ("button focus", "black", "white"), ("status", "black", "white"), ("edit focus", "white", "dark gray"), ("edit", "white", "black"), ("popup", "black", "white"), # Log text ("system", "white, bold", "dark gray"), ] \ + trem_palette screen = urwid.raw_display.Screen() screen.set_terminal_properties(colors=2**24) screen.register_palette(palette) loop = urwid.MainLoop(None, screen=screen, pop_ups=True) console = Console(loop) loop.widget = console.root loop.unhandled_input = console.unhandled_input try: loop.run() except KeyboardInterrupt: exit()