import atexit import sys import time # TODO: make this a namedtuple? or one of those newfangles dataclasses. options = dict( ascii=False, # TODO color=True, flush=True, patient=True, # TODO: better name? ) active = [] allocated = 0 DEBUG = False def get_terminal_size(file=sys.stderr): from os import get_terminal_size as os_gts cols, rows = 0, 0 try: cols, rows = os_gts(file.fileno()) except OSError: pass if cols <= 0 or rows <= 0: from shutil import get_terminal_size as shutil_gts # NOTE: this uses sys.stdout instead of our file! cols, rows = shutil_gts() return cols, rows def extend72(s, mode=""): if "t" in mode or "d" in mode or "a" in mode: s = s[:-1] return ((s[0] + s[1:-1]) * 72)[:71] + s[-1] @atexit.register # in the absence of a try-except for KeyboardInterrupt... def reset(*, file=sys.stderr): # TODO: flesh this out? # TODO: fancy backspace "\b" stuff to hide potentially unsupported escape sequences. s = "\0337\033[J\033[r\0338" if file is not None: print(end=s, file=file, flush=True) else: return s def alloc(lines): # first, perform a reset of whatever scrolling region may already be set. # the cursor is (almost) always reset after performing a reset command, # so wrap the command in a cursor-save and cursor-restore. s = "\0337\033[r\0338" # now, do some magic to ensure there's enough empty lines for the progress bars # *without* forcing the screen to scroll if there already is. # TODO: clear these lines too? hmm, that would overwrite bars though, wouldn't it? s += "\n" * lines # this part is simple enough; just print some newlines. s += "\0338" # however, that resets the cursor's column, so restore its position. # the position might not be on the same row as the line of text it was on before, s += f"\033[{lines}B\033[{lines}A" # so move it down (clamped) and back up again. # s += "\0337" # that should be everything, so store the adjusted cursor for later. return s class Prog80: # TODO: use different(-ly placed) delimiters when length < 72? styles = dict( # TODO: maybe use namedtuples for these? # TODO: maybe use a separate field for the "cursor" characters? # this ought to eliminate the need for "d", "t" and "P" modes. bar=dict(A=" ", B="█▌", mode="r"), bar2=dict(A=" ▌", B="██", mode="dp"), # TODO: what's the non-'p' version? bracketed=dict(A="[" + " " * 70 + "]", B="[" + "=" * 70 + "]", mode="P"), circumslider=dict(A="o", B="oô", mode="pr"), crossslider=dict(A="†", B="†‡", mode="pr"), dotslider=dict(A="-", B="-÷", mode="pr"), dotted=dict(A=".", B="!?", mode="r"), equally=dict(A=".", B="="), fractional=dict( A="·····¬·····¬·····¼·····¬·····¬·····½·····¬·····¬·····¾·····¬·····¬·····!", B="xƒ", mode="ir", ), longdotslider=dict(A="—", B="—÷", mode="pr"), meme=dict( # slightly modified A="According to all known laws of aviation, there is no way a bee should be", B="able to fly. Its wings are too small to get its fat body off the ground.", mode="P", ), middledots=dict(A="•", B="◘"), money=dict(A="¢", B="$"), # pacman=dict(A="-", B="#"), # bracketedpacman=dict(A="[" + "-" * 70 + "]", B="[" + "#" * 70 + "]", mode="P"), pacman=dict(A="[" + "-" * 70 + "]", B="[" + "#" * 70 + "]", mode="P"), piping=dict(A="=", B="=|", mode="pr"), ruler=dict(A="90123456780", B="90123456780", mode="P"), slider=dict(A="_", B="_⌂", mode="pr"), smile=dict(A="☺", B="☻"), song=dict( A=" - ♪ - ♪ - ♪ - ♪", B=" ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫", ), sticks=dict(A="/", B="\\|", mode="p"), virus=dict(A=".▀", B=" TECHNO ▄", mode="ap"), # »Don't touch the keyboard« ) bands = [ # TODO: come up with a better name for this variable. ("*", -4.94065645841246544177e-324, lambda x: "Waiting"), ("*", 0.0, lambda x: "Running"), ("*", 1.3888888888888887e-08, lambda x: "TooSlow"), ("*", 0.00027776388888888886, lambda x: f"{x * 60 * 60:6.4f}/h"[1:]), ("*", 0.001666583333333333, lambda x: f"{x * 60 * 60:5.3f}/h"), ("s", 0.01666583333333333, lambda x: f"{x * 60:6.4f}/m"[1:]), ("m", 0.01666583333333333, lambda x: f"{x * 60:6.4f}/m"[1:]), ("s", 0.09995, lambda x: f"{x * 60:5.3f}/m"), ("s", 0.9999499999999999, lambda x: f"{x:6.4f}/s"[1:]), ("m", 0.16665833333333332, lambda x: f"{x * 60:5.3f}/m"), ("m", 1.666583333333333, lambda x: f"{x * 60:5.2f}/m"), ("s", 9.999499999999998, lambda x: f"{x:5.3f}/s"), ("s", 12.000833333333333, lambda x: f"{x:5.2f}/s"), ("m", 16.66583333333333, lambda x: f"{x * 60:5.1f}/m"), ("m", 166.6583333333333, lambda x: f"{x * 60:4.0f}./m"), ("*", 0.002777638888888889, lambda x: f"{x * 60 * 60:5.3f}/h"), ("*", 0.027776388888888885, lambda x: f"{x * 60 * 60:5.2f}/h"), ("*", 0.20001388888888888, lambda x: f"{x * 60 * 60:5.1f}/h"), ("h", 0.2777638888888888, lambda x: f"{x * 60 * 60:5.1f}/h"), ("h", 2.7776388888888883, lambda x: f"{x * 60 * 60:4.0f}./h"), ("*", 1.666583333333333, lambda x: f"{x * 60:5.2f}/m"), ("*", 12.000833333333333, lambda x: f"{x * 60:5.1f}/m"), ("*", 99.99499999999999, lambda x: f"{x:5.2f}/s"), ("*", 999.9499999999999, lambda x: f"{x:5.1f}/s"), ("*", 9999.499999999998, lambda x: f"{x:4.0f}./s"), ("*", 1.7976931348623157e308, lambda x: f"TooFast"), ] # TODO: default to sys.stderr as well! def __init__( self, it, length, *, printer=print, style=None, display="elapsed", preference="*", ): self.i = 0 self.it = iter(it) self.length = length self.printer = printer self.depth = 0 self.start, self.end = 0, 0 self.cols, self.rows = 0, 0 self.preference = preference self.previous_update = 0.0 self._state = None self.state = "initialized" self.style = style if style in self.styles else "fractional" # TODO: warning # self.style = style if style in self.styles else "slider" # TODO: warning # self.style = style if style in self.styles else list(self.styles)[-1] self.display = display assert self.display in ("elapsed", "eta", "rate") @property def state(self): return self._state @state.setter def state(self, new_state): if self._state is not None and new_state == self._state: return # just ignore this valid = { "initialized": {None}, "prepared": {"initialized"}, "began": {"prepared", "progressing"}, "progressing": {"began"}, "finished": {"began"}, } assert new_state in valid, f"unknown state {new_state}" assert ( self._state in valid[new_state] ), f"invalid state transition from {self._state} to {new_state}" self._state = new_state def __len__(self): return self.length @property def unfilled(self): return extend72(self.styles[self.style]["A"], self.styling) @property def filled(self): return extend72(self.styles[self.style]["B"], self.styling) @property def styling(self): return self.styles[self.style].get("mode", "") @property def full(self): return self.i * 72 // self.length if self.length > 0 else 72 @property def needs_update(self): if self.length <= 0: return True w = 72 * 3 if "t" in self.styling else 72 * 2 if "d" in self.styling else 72 return self.i * w // self.length != (self.i - 1) * w // self.length @property def elapsed(self): now = self.end if self.state == "finished" else time.time() return now - self.start @property def rate(self): assert self.state in ( "began", "progressing", "finished", ), "invalid state for this" elapsed = self.elasped if elapsed == 0.0: return self.bands[-1][1] # closest thing to infinity return self.i / elapsed @property def eta(self): raise NotImplementedError("TODO") @classmethod def format_rate(cls, rate, preference="*"): for pref, threshold, formater in cls.bands: if (pref == "*" or pref == preference) and rate <= threshold: return formater(rate) + " " return "????????" # normally unreachable (unless perhaps rate is Inf or NaN?) @classmethod def format_time(cls, duration, preference="*"): o = "-" if duration < 0.0 else "+" duration = abs(duration) if duration < 60.0: o += f"{max(0.0, duration - 0.005):5.2f}s " elif duration < 3600.0: x = int(duration) o += f"{x // 60:2d}m{x % 60:02d}s " elif duration < 86400.0: x = int(duration) o += f"{x // 3600:2d}h{x // 60 % 60:02d}m " elif duration < 8640000.0: x = int(duration) o += f"{x // 86400:2d}d{x // 3600 % 24:02d}h " else: o += "TooLong" return o @property def blurb(self): # only the first 8 characters that don't constitute the bar at all if self.display == "rate": return self.format_rate(self.rate, self.preference) elif self.display == "elapsed": return self.format_time(self.elapsed, self.preference) elif self.display == "eta": return self.format_time(-self.eta, self.preference) @property def bar(self): # the full 80 characters available in any state if options["color"]: color1 = "\033[7m" color2 = "\033[m\033[1m" color_reset = "\033[m" else: color1 = "" color2 = "" color_reset = "" if self.state in ("began", "finished", "progressing"): s = self.blurb if options["color"] and "i" in self.styling: left = self.unfilled[: self.full] right = self.unfilled[self.full :] if self.state == "progressing" or "p" in self.styling: s += color1 + left + color2 + right[0] s += color_reset + right[1:] else: s += color1 + left s += color_reset + right else: filled = self.filled style = self.styles[self.style] progressing = self.state == "progressing" or "p" in self.styling t = (filled[-1], style["A"][-1], style["B"][-1]) if "t" in self.styling and self.length > 0: triple = self.i * 72 * 3 // self.length filled = filled[:-1] + t[triple % 3] elif "d" in self.styling and self.length > 0: double = self.i * 72 * 2 // self.length filled = filled[:-1] + t[double % 2 + 1] elif "a" in self.styling and self.length > 0: filled = filled[:-1] + t[self.full % 2 + 1] s += color2 if progressing: if "r" in self.styling: s += filled[-self.full - 1 :] elif "P" in self.styling: s += filled[: self.full + 1] else: s += filled[: self.full] + filled[-1] s += color_reset + self.unfilled[self.full + 1 :] else: if "r" in self.styling and self.full > 0: s += filled[-self.full :] else: s += filled[: self.full] s += color_reset + self.unfilled[self.full :] elif self.state == "prepared": s = "Ready..." + " " * 72 else: s = "watdafuk" + "?" * 72 # assert len(s) == 80 + len(color2) + len(color_reset), (self.full, self.state, repr(s)) return s @property def row(self): assert self.state in ("began", "progressing"), "invalid state for this" return self.rows - allocated + self.depth def __iter__(self): # TODO: check env-var override for disabling and colors! if self.length == 0: return self.it if self.i == 0: self.begin() for x in self.it: self.progress(False) yield x self.progress(True) self.progressing = False self.finish() # TODO: is there a case where i actually need this? # def __next__(self): # pass def prepare(self): self.state = "prepared" return self def begin(self): global allocated self.start = time.time() assert self not in active, "what" active.append(self) self.depth = len(active) (_cols, _rows) = get_terminal_size() # propagate terminal size to parents (and self) in case it has since changed: for pb in active: pb.cols, pb.rows = _cols, _rows # TODO: this should be handleable in most cases; hide the top bars for a while. # need at least 1 blank line for regular output (or just for the cursor to sit) # i guess just act as though only a subset of `active` are active for a while? assert self.rows - 1 > self.depth, "ran out of space for progress bars" s = alloc(self.depth) # TODO: only do this as necessary? (based on allocated) # keep track of how many lines are available for progress bars. newly = len(active) - allocated if len(active) > allocated else 0 # TODO: allow the user to allocate many at once. allocated += newly self.state = "began" s += "\0337" if 1: for pb in active: # TODO: respect 30 fps timers? s += f"\033[{pb.row};1H\033[K{pb.bar}" else: # this isn't as flexible, but it works...? # FIXME: progress() is still broken! for pb in active: s += f"\n\033[K{pb.bar}" # prevent the user from accidentally printing over the progress bars. s += f"\033[1;{self.rows - allocated}r" s += "\0338" # finally, return to the user's (properly adjusted) line and column. self.printer(end=s) # time.sleep(2); exit() if DEBUG: print(repr(s).replace("\\x1b", "\\033")) time.sleep(0.5) def progress(self, finishing=False): if finishing: self.i += 1 self.state = "began" else: self.state = "progressing" if not options["patient"] or self.needs_update: now = time.time() if now >= self.previous_update + 0.03333333333333333: # limit to 30 fps self.previous_update = now s = f"\0337\033[{self.row};1H\033[K{self.bar}\0338" self.printer(end=s) def finish(self): global allocated self.end = time.time() row = self.row # we need this before we transition states assert self.depth == len(active) and active[-1] is self, "what" active.pop() self.state = "finished" s = "" if len(active) == 0: s += "\0337\033[J\033[r\0338" allocated = 0 else: s += f"\0337\033[{row};1H\033[K\0338" self.printer(end=s) def prog(it, *, file=sys.stderr, pref="*"): def printy(*args, **kwargs): # print(repr(kwargs.get("end", "?").replace("\\x1b", "\\033"))) # time.sleep(2.5) print(*args, **kwargs, file=file, flush=options["flush"]) # time.sleep(2.5) try: l = len(it) except TypeError: raise TypeError("prog(iterator) only works with iterators with a known length") return Prog80(it, l, printer=printy, preference=pref).prepare()