thursday/prog80.py

443 lines
16 KiB
Python
Raw Normal View History

import atexit
import sys
import time
# TODO: make this a namedtuple? or one of those newfangles dataclasses.
options = dict(
ascii=False, # TODO
color=True,
flush=True,
patient=True, # TODO: better name?
)
active = []
allocated = 0
DEBUG = False
def get_terminal_size(file=sys.stderr):
from os import get_terminal_size as os_gts
cols, rows = 0, 0
try:
cols, rows = os_gts(file.fileno())
except OSError:
pass
if cols <= 0 or rows <= 0:
from shutil import get_terminal_size as shutil_gts
# NOTE: this uses sys.stdout instead of our file!
cols, rows = shutil_gts()
return cols, rows
def extend72(s, mode=""):
if "t" in mode or "d" in mode or "a" in mode:
s = s[:-1]
return ((s[0] + s[1:-1]) * 72)[:71] + s[-1]
@atexit.register # in the absence of a try-except for KeyboardInterrupt...
def reset(*, file=sys.stderr):
# TODO: flesh this out?
# TODO: fancy backspace "\b" stuff to hide potentially unsupported escape sequences.
s = "\0337\033[J\033[r\0338"
if file is not None:
print(end=s, file=file, flush=True)
else:
return s
def alloc(lines):
# first, perform a reset of whatever scrolling region may already be set.
# the cursor is (almost) always reset after performing a reset command,
# so wrap the command in a cursor-save and cursor-restore.
s = "\0337\033[r\0338"
# now, do some magic to ensure there's enough empty lines for the progress bars
# *without* forcing the screen to scroll if there already is.
# TODO: clear these lines too? hmm, that would overwrite bars though, wouldn't it?
s += "\n" * lines # this part is simple enough; just print some newlines.
s += "\0338" # however, that resets the cursor's column, so restore its position.
# the position might not be on the same row as the line of text it was on before,
s += f"\033[{lines}B\033[{lines}A" # so move it down (clamped) and back up again.
# s += "\0337" # that should be everything, so store the adjusted cursor for later.
return s
class Prog80:
# TODO: use different(-ly placed) delimiters when length < 72?
styles = dict(
# TODO: maybe use namedtuples for these?
# TODO: maybe use a separate field for the "cursor" characters?
# this ought to eliminate the need for "d", "t" and "P" modes.
bar=dict(A=" ", B="█▌", mode="r"),
bar2=dict(A="", B="██", mode="dp"), # TODO: what's the non-'p' version?
bracketed=dict(A="[" + " " * 70 + "]", B="[" + "=" * 70 + "]", mode="P"),
circumslider=dict(A="o", B="", mode="pr"),
crossslider=dict(A="", B="†‡", mode="pr"),
dotslider=dict(A="-", B="", mode="pr"),
dotted=dict(A=".", B="!?", mode="r"),
equally=dict(A=".", B="="),
fractional=dict(
A="·····¬·····¬·····¼·····¬·····¬·····½·····¬·····¬·····¾·····¬·····¬·····!",
B="",
mode="ir",
),
longdotslider=dict(A="", B="—÷", mode="pr"),
meme=dict( # slightly modified
A="According to all known laws of aviation, there is no way a bee should be",
B="able to fly. Its wings are too small to get its fat body off the ground.",
mode="P",
),
middledots=dict(A="", B=""),
money=dict(A="¢", B="$"),
# pacman=dict(A="-", B="#"),
# bracketedpacman=dict(A="[" + "-" * 70 + "]", B="[" + "#" * 70 + "]", mode="P"),
pacman=dict(A="[" + "-" * 70 + "]", B="[" + "#" * 70 + "]", mode="P"),
piping=dict(A="=", B="=|", mode="pr"),
ruler=dict(A="90123456780", B="90123456780", mode="P"),
slider=dict(A="_", B="_⌂", mode="pr"),
smile=dict(A="", B=""),
song=dict(
A=" - ♪ - ♪ - ♪ - ♪",
B=" ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫",
),
sticks=dict(A="/", B="\\|", mode="p"),
virus=dict(A=".▀", B=" TECHNO ▄", mode="ap"), # »Don't touch the keyboard«
)
bands = [ # TODO: come up with a better name for this variable.
("*", -4.94065645841246544177e-324, lambda x: "Waiting"),
("*", 0.0, lambda x: "Running"),
("*", 1.3888888888888887e-08, lambda x: "TooSlow"),
("*", 0.00027776388888888886, lambda x: f"{x * 60 * 60:6.4f}/h"[1:]),
("*", 0.001666583333333333, lambda x: f"{x * 60 * 60:5.3f}/h"),
("s", 0.01666583333333333, lambda x: f"{x * 60:6.4f}/m"[1:]),
("m", 0.01666583333333333, lambda x: f"{x * 60:6.4f}/m"[1:]),
("s", 0.09995, lambda x: f"{x * 60:5.3f}/m"),
("s", 0.9999499999999999, lambda x: f"{x:6.4f}/s"[1:]),
("m", 0.16665833333333332, lambda x: f"{x * 60:5.3f}/m"),
("m", 1.666583333333333, lambda x: f"{x * 60:5.2f}/m"),
("s", 9.999499999999998, lambda x: f"{x:5.3f}/s"),
("s", 12.000833333333333, lambda x: f"{x:5.2f}/s"),
("m", 16.66583333333333, lambda x: f"{x * 60:5.1f}/m"),
("m", 166.6583333333333, lambda x: f"{x * 60:4.0f}./m"),
("*", 0.002777638888888889, lambda x: f"{x * 60 * 60:5.3f}/h"),
("*", 0.027776388888888885, lambda x: f"{x * 60 * 60:5.2f}/h"),
("*", 0.20001388888888888, lambda x: f"{x * 60 * 60:5.1f}/h"),
("h", 0.2777638888888888, lambda x: f"{x * 60 * 60:5.1f}/h"),
("h", 2.7776388888888883, lambda x: f"{x * 60 * 60:4.0f}./h"),
("*", 1.666583333333333, lambda x: f"{x * 60:5.2f}/m"),
("*", 12.000833333333333, lambda x: f"{x * 60:5.1f}/m"),
("*", 99.99499999999999, lambda x: f"{x:5.2f}/s"),
("*", 999.9499999999999, lambda x: f"{x:5.1f}/s"),
("*", 9999.499999999998, lambda x: f"{x:4.0f}./s"),
("*", 1.7976931348623157e308, lambda x: f"TooFast"),
]
# TODO: default to sys.stderr as well!
def __init__(
self,
it,
length,
*,
printer=print,
style=None,
display="elapsed",
preference="*",
):
self.i = 0
self.it = iter(it)
self.length = length
self.printer = printer
self.depth = 0
self.start, self.end = 0, 0
self.cols, self.rows = 0, 0
self.preference = preference
self.previous_update = 0.0
self._state = None
self.state = "initialized"
self.style = style if style in self.styles else "fractional" # TODO: warning
# self.style = style if style in self.styles else "slider" # TODO: warning
# self.style = style if style in self.styles else list(self.styles)[-1]
self.display = display
assert self.display in ("elapsed", "eta", "rate")
@property
def state(self):
return self._state
@state.setter
def state(self, new_state):
if self._state is not None and new_state == self._state:
return # just ignore this
valid = {
"initialized": {None},
"prepared": {"initialized"},
"began": {"prepared", "progressing"},
"progressing": {"began"},
"finished": {"began"},
}
assert new_state in valid, f"unknown state {new_state}"
assert (
self._state in valid[new_state]
), f"invalid state transition from {self._state} to {new_state}"
self._state = new_state
def __len__(self):
return self.length
@property
def unfilled(self):
return extend72(self.styles[self.style]["A"], self.styling)
@property
def filled(self):
return extend72(self.styles[self.style]["B"], self.styling)
@property
def styling(self):
return self.styles[self.style].get("mode", "")
@property
def full(self):
return self.i * 72 // self.length if self.length > 0 else 72
@property
def needs_update(self):
if self.length <= 0:
return True
w = 72 * 3 if "t" in self.styling else 72 * 2 if "d" in self.styling else 72
return self.i * w // self.length != (self.i - 1) * w // self.length
@property
def elapsed(self):
now = self.end if self.state == "finished" else time.time()
return now - self.start
@property
def rate(self):
assert self.state in (
"began",
"progressing",
"finished",
), "invalid state for this"
elapsed = self.elasped
if elapsed == 0.0:
return self.bands[-1][1] # closest thing to infinity
return self.i / elapsed
@property
def eta(self):
raise NotImplementedError("TODO")
@classmethod
def format_rate(cls, rate, preference="*"):
for pref, threshold, formater in cls.bands:
if (pref == "*" or pref == preference) and rate <= threshold:
return formater(rate) + " "
return "????????" # normally unreachable (unless perhaps rate is Inf or NaN?)
@classmethod
def format_time(cls, duration, preference="*"):
o = "-" if duration < 0.0 else "+"
duration = abs(duration)
if duration < 60.0:
o += f"{max(0.0, duration - 0.005):5.2f}s "
elif duration < 3600.0:
x = int(duration)
o += f"{x // 60:2d}m{x % 60:02d}s "
elif duration < 86400.0:
x = int(duration)
o += f"{x // 3600:2d}h{x // 60 % 60:02d}m "
elif duration < 8640000.0:
x = int(duration)
o += f"{x // 86400:2d}d{x // 3600 % 24:02d}h "
else:
o += "TooLong"
return o
@property
def blurb(self): # only the first 8 characters that don't constitute the bar at all
if self.display == "rate":
return self.format_rate(self.rate, self.preference)
elif self.display == "elapsed":
return self.format_time(self.elapsed, self.preference)
elif self.display == "eta":
return self.format_time(-self.eta, self.preference)
@property
def bar(self): # the full 80 characters available in any state
if options["color"]:
color1 = "\033[7m"
color2 = "\033[m\033[1m"
color_reset = "\033[m"
else:
color1 = ""
color2 = ""
color_reset = ""
if self.state in ("began", "finished", "progressing"):
s = self.blurb
if options["color"] and "i" in self.styling:
left = self.unfilled[: self.full]
right = self.unfilled[self.full :]
if self.state == "progressing" or "p" in self.styling:
s += color1 + left + color2 + right[0]
s += color_reset + right[1:]
else:
s += color1 + left
s += color_reset + right
else:
filled = self.filled
style = self.styles[self.style]
progressing = self.state == "progressing" or "p" in self.styling
t = (filled[-1], style["A"][-1], style["B"][-1])
if "t" in self.styling and self.length > 0:
triple = self.i * 72 * 3 // self.length
filled = filled[:-1] + t[triple % 3]
elif "d" in self.styling and self.length > 0:
double = self.i * 72 * 2 // self.length
filled = filled[:-1] + t[double % 2 + 1]
elif "a" in self.styling and self.length > 0:
filled = filled[:-1] + t[self.full % 2 + 1]
s += color2
if progressing:
if "r" in self.styling:
s += filled[-self.full - 1 :]
elif "P" in self.styling:
s += filled[: self.full + 1]
else:
s += filled[: self.full] + filled[-1]
s += color_reset + self.unfilled[self.full + 1 :]
else:
if "r" in self.styling and self.full > 0:
s += filled[-self.full :]
else:
s += filled[: self.full]
s += color_reset + self.unfilled[self.full :]
elif self.state == "prepared":
s = "Ready..." + " " * 72
else:
s = "watdafuk" + "?" * 72
# assert len(s) == 80 + len(color2) + len(color_reset), (self.full, self.state, repr(s))
return s
@property
def row(self):
assert self.state in ("began", "progressing"), "invalid state for this"
return self.rows - allocated + self.depth
def __iter__(self):
# TODO: check env-var override for disabling and colors!
if self.length == 0:
return self.it
if self.i == 0:
self.begin()
for x in self.it:
self.progress(False)
yield x
self.progress(True)
self.progressing = False
self.finish()
# TODO: is there a case where i actually need this?
# def __next__(self):
# pass
def prepare(self):
self.state = "prepared"
return self
def begin(self):
global allocated
self.start = time.time()
assert self not in active, "what"
active.append(self)
self.depth = len(active)
(_cols, _rows) = get_terminal_size()
# propagate terminal size to parents (and self) in case it has since changed:
for pb in active:
pb.cols, pb.rows = _cols, _rows
# TODO: this should be handleable in most cases; hide the top bars for a while.
# need at least 1 blank line for regular output (or just for the cursor to sit)
# i guess just act as though only a subset of `active` are active for a while?
assert self.rows - 1 > self.depth, "ran out of space for progress bars"
s = alloc(self.depth) # TODO: only do this as necessary? (based on allocated)
# keep track of how many lines are available for progress bars.
newly = len(active) - allocated if len(active) > allocated else 0
# TODO: allow the user to allocate many at once.
allocated += newly
self.state = "began"
s += "\0337"
if 1:
for pb in active:
# TODO: respect 30 fps timers?
s += f"\033[{pb.row};1H\033[K{pb.bar}"
else: # this isn't as flexible, but it works...?
# FIXME: progress() is still broken!
for pb in active:
s += f"\n\033[K{pb.bar}"
# prevent the user from accidentally printing over the progress bars.
s += f"\033[1;{self.rows - allocated}r"
s += "\0338" # finally, return to the user's (properly adjusted) line and column.
self.printer(end=s)
# time.sleep(2); exit()
if DEBUG:
print(repr(s).replace("\\x1b", "\\033"))
time.sleep(0.5)
def progress(self, finishing=False):
if finishing:
self.i += 1
self.state = "began"
else:
self.state = "progressing"
if not options["patient"] or self.needs_update:
now = time.time()
if now >= self.previous_update + 0.03333333333333333: # limit to 30 fps
self.previous_update = now
s = f"\0337\033[{self.row};1H\033[K{self.bar}\0338"
self.printer(end=s)
def finish(self):
global allocated
self.end = time.time()
row = self.row # we need this before we transition states
assert self.depth == len(active) and active[-1] is self, "what"
active.pop()
self.state = "finished"
s = ""
if len(active) == 0:
s += "\0337\033[J\033[r\0338"
allocated = 0
else:
s += f"\0337\033[{row};1H\033[K\0338"
self.printer(end=s)
def prog(it, *, file=sys.stderr, pref="*"):
def printy(*args, **kwargs):
# print(repr(kwargs.get("end", "?").replace("\\x1b", "\\033")))
# time.sleep(2.5)
print(*args, **kwargs, file=file, flush=options["flush"])
# time.sleep(2.5)
try:
l = len(it)
except TypeError:
raise TypeError("prog(iterator) only works with iterators with a known length")
return Prog80(it, l, printer=printy, preference=pref).prepare()