443 lines
16 KiB
Python
443 lines
16 KiB
Python
|
import atexit
|
||
|
import sys
|
||
|
import time
|
||
|
|
||
|
# TODO: make this a namedtuple? or one of those newfangles dataclasses.
|
||
|
options = dict(
|
||
|
ascii=False, # TODO
|
||
|
color=True,
|
||
|
flush=True,
|
||
|
patient=True, # TODO: better name?
|
||
|
)
|
||
|
|
||
|
active = []
|
||
|
allocated = 0
|
||
|
|
||
|
DEBUG = False
|
||
|
|
||
|
|
||
|
def get_terminal_size(file=sys.stderr):
|
||
|
from os import get_terminal_size as os_gts
|
||
|
|
||
|
cols, rows = 0, 0
|
||
|
try:
|
||
|
cols, rows = os_gts(file.fileno())
|
||
|
except OSError:
|
||
|
pass
|
||
|
if cols <= 0 or rows <= 0:
|
||
|
from shutil import get_terminal_size as shutil_gts
|
||
|
|
||
|
# NOTE: this uses sys.stdout instead of our file!
|
||
|
cols, rows = shutil_gts()
|
||
|
return cols, rows
|
||
|
|
||
|
|
||
|
def extend72(s, mode=""):
|
||
|
if "t" in mode or "d" in mode or "a" in mode:
|
||
|
s = s[:-1]
|
||
|
return ((s[0] + s[1:-1]) * 72)[:71] + s[-1]
|
||
|
|
||
|
|
||
|
@atexit.register # in the absence of a try-except for KeyboardInterrupt...
|
||
|
def reset(*, file=sys.stderr):
|
||
|
# TODO: flesh this out?
|
||
|
# TODO: fancy backspace "\b" stuff to hide potentially unsupported escape sequences.
|
||
|
s = "\0337\033[J\033[r\0338"
|
||
|
if file is not None:
|
||
|
print(end=s, file=file, flush=True)
|
||
|
else:
|
||
|
return s
|
||
|
|
||
|
|
||
|
def alloc(lines):
|
||
|
# first, perform a reset of whatever scrolling region may already be set.
|
||
|
# the cursor is (almost) always reset after performing a reset command,
|
||
|
# so wrap the command in a cursor-save and cursor-restore.
|
||
|
s = "\0337\033[r\0338"
|
||
|
|
||
|
# now, do some magic to ensure there's enough empty lines for the progress bars
|
||
|
# *without* forcing the screen to scroll if there already is.
|
||
|
# TODO: clear these lines too? hmm, that would overwrite bars though, wouldn't it?
|
||
|
s += "\n" * lines # this part is simple enough; just print some newlines.
|
||
|
s += "\0338" # however, that resets the cursor's column, so restore its position.
|
||
|
# the position might not be on the same row as the line of text it was on before,
|
||
|
s += f"\033[{lines}B\033[{lines}A" # so move it down (clamped) and back up again.
|
||
|
# s += "\0337" # that should be everything, so store the adjusted cursor for later.
|
||
|
|
||
|
return s
|
||
|
|
||
|
|
||
|
class Prog80:
|
||
|
# TODO: use different(-ly placed) delimiters when length < 72?
|
||
|
styles = dict(
|
||
|
# TODO: maybe use namedtuples for these?
|
||
|
# TODO: maybe use a separate field for the "cursor" characters?
|
||
|
# this ought to eliminate the need for "d", "t" and "P" modes.
|
||
|
bar=dict(A=" ", B="█▌", mode="r"),
|
||
|
bar2=dict(A=" ▌", B="██", mode="dp"), # TODO: what's the non-'p' version?
|
||
|
bracketed=dict(A="[" + " " * 70 + "]", B="[" + "=" * 70 + "]", mode="P"),
|
||
|
circumslider=dict(A="o", B="oô", mode="pr"),
|
||
|
crossslider=dict(A="†", B="†‡", mode="pr"),
|
||
|
dotslider=dict(A="-", B="-÷", mode="pr"),
|
||
|
dotted=dict(A=".", B="!?", mode="r"),
|
||
|
equally=dict(A=".", B="="),
|
||
|
fractional=dict(
|
||
|
A="·····¬·····¬·····¼·····¬·····¬·····½·····¬·····¬·····¾·····¬·····¬·····!",
|
||
|
B="xƒ",
|
||
|
mode="ir",
|
||
|
),
|
||
|
longdotslider=dict(A="—", B="—÷", mode="pr"),
|
||
|
meme=dict( # slightly modified
|
||
|
A="According to all known laws of aviation, there is no way a bee should be",
|
||
|
B="able to fly. Its wings are too small to get its fat body off the ground.",
|
||
|
mode="P",
|
||
|
),
|
||
|
middledots=dict(A="•", B="◘"),
|
||
|
money=dict(A="¢", B="$"),
|
||
|
# pacman=dict(A="-", B="#"),
|
||
|
# bracketedpacman=dict(A="[" + "-" * 70 + "]", B="[" + "#" * 70 + "]", mode="P"),
|
||
|
pacman=dict(A="[" + "-" * 70 + "]", B="[" + "#" * 70 + "]", mode="P"),
|
||
|
piping=dict(A="=", B="=|", mode="pr"),
|
||
|
ruler=dict(A="90123456780", B="90123456780", mode="P"),
|
||
|
slider=dict(A="_", B="_⌂", mode="pr"),
|
||
|
smile=dict(A="☺", B="☻"),
|
||
|
song=dict(
|
||
|
A=" - ♪ - ♪ - ♪ - ♪",
|
||
|
B=" ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫ ♫",
|
||
|
),
|
||
|
sticks=dict(A="/", B="\\|", mode="p"),
|
||
|
virus=dict(A=".▀", B=" TECHNO ▄", mode="ap"), # »Don't touch the keyboard«
|
||
|
)
|
||
|
|
||
|
bands = [ # TODO: come up with a better name for this variable.
|
||
|
("*", -4.94065645841246544177e-324, lambda x: "Waiting"),
|
||
|
("*", 0.0, lambda x: "Running"),
|
||
|
("*", 1.3888888888888887e-08, lambda x: "TooSlow"),
|
||
|
("*", 0.00027776388888888886, lambda x: f"{x * 60 * 60:6.4f}/h"[1:]),
|
||
|
("*", 0.001666583333333333, lambda x: f"{x * 60 * 60:5.3f}/h"),
|
||
|
("s", 0.01666583333333333, lambda x: f"{x * 60:6.4f}/m"[1:]),
|
||
|
("m", 0.01666583333333333, lambda x: f"{x * 60:6.4f}/m"[1:]),
|
||
|
("s", 0.09995, lambda x: f"{x * 60:5.3f}/m"),
|
||
|
("s", 0.9999499999999999, lambda x: f"{x:6.4f}/s"[1:]),
|
||
|
("m", 0.16665833333333332, lambda x: f"{x * 60:5.3f}/m"),
|
||
|
("m", 1.666583333333333, lambda x: f"{x * 60:5.2f}/m"),
|
||
|
("s", 9.999499999999998, lambda x: f"{x:5.3f}/s"),
|
||
|
("s", 12.000833333333333, lambda x: f"{x:5.2f}/s"),
|
||
|
("m", 16.66583333333333, lambda x: f"{x * 60:5.1f}/m"),
|
||
|
("m", 166.6583333333333, lambda x: f"{x * 60:4.0f}./m"),
|
||
|
("*", 0.002777638888888889, lambda x: f"{x * 60 * 60:5.3f}/h"),
|
||
|
("*", 0.027776388888888885, lambda x: f"{x * 60 * 60:5.2f}/h"),
|
||
|
("*", 0.20001388888888888, lambda x: f"{x * 60 * 60:5.1f}/h"),
|
||
|
("h", 0.2777638888888888, lambda x: f"{x * 60 * 60:5.1f}/h"),
|
||
|
("h", 2.7776388888888883, lambda x: f"{x * 60 * 60:4.0f}./h"),
|
||
|
("*", 1.666583333333333, lambda x: f"{x * 60:5.2f}/m"),
|
||
|
("*", 12.000833333333333, lambda x: f"{x * 60:5.1f}/m"),
|
||
|
("*", 99.99499999999999, lambda x: f"{x:5.2f}/s"),
|
||
|
("*", 999.9499999999999, lambda x: f"{x:5.1f}/s"),
|
||
|
("*", 9999.499999999998, lambda x: f"{x:4.0f}./s"),
|
||
|
("*", 1.7976931348623157e308, lambda x: f"TooFast"),
|
||
|
]
|
||
|
|
||
|
# TODO: default to sys.stderr as well!
|
||
|
def __init__(
|
||
|
self,
|
||
|
it,
|
||
|
length,
|
||
|
*,
|
||
|
printer=print,
|
||
|
style=None,
|
||
|
display="elapsed",
|
||
|
preference="*",
|
||
|
):
|
||
|
self.i = 0
|
||
|
self.it = iter(it)
|
||
|
self.length = length
|
||
|
self.printer = printer
|
||
|
self.depth = 0
|
||
|
self.start, self.end = 0, 0
|
||
|
self.cols, self.rows = 0, 0
|
||
|
self.preference = preference
|
||
|
self.previous_update = 0.0
|
||
|
self._state = None
|
||
|
self.state = "initialized"
|
||
|
self.style = style if style in self.styles else "fractional" # TODO: warning
|
||
|
# self.style = style if style in self.styles else "slider" # TODO: warning
|
||
|
# self.style = style if style in self.styles else list(self.styles)[-1]
|
||
|
self.display = display
|
||
|
assert self.display in ("elapsed", "eta", "rate")
|
||
|
|
||
|
@property
|
||
|
def state(self):
|
||
|
return self._state
|
||
|
|
||
|
@state.setter
|
||
|
def state(self, new_state):
|
||
|
if self._state is not None and new_state == self._state:
|
||
|
return # just ignore this
|
||
|
valid = {
|
||
|
"initialized": {None},
|
||
|
"prepared": {"initialized"},
|
||
|
"began": {"prepared", "progressing"},
|
||
|
"progressing": {"began"},
|
||
|
"finished": {"began"},
|
||
|
}
|
||
|
assert new_state in valid, f"unknown state {new_state}"
|
||
|
assert (
|
||
|
self._state in valid[new_state]
|
||
|
), f"invalid state transition from {self._state} to {new_state}"
|
||
|
self._state = new_state
|
||
|
|
||
|
def __len__(self):
|
||
|
return self.length
|
||
|
|
||
|
@property
|
||
|
def unfilled(self):
|
||
|
return extend72(self.styles[self.style]["A"], self.styling)
|
||
|
|
||
|
@property
|
||
|
def filled(self):
|
||
|
return extend72(self.styles[self.style]["B"], self.styling)
|
||
|
|
||
|
@property
|
||
|
def styling(self):
|
||
|
return self.styles[self.style].get("mode", "")
|
||
|
|
||
|
@property
|
||
|
def full(self):
|
||
|
return self.i * 72 // self.length if self.length > 0 else 72
|
||
|
|
||
|
@property
|
||
|
def needs_update(self):
|
||
|
if self.length <= 0:
|
||
|
return True
|
||
|
w = 72 * 3 if "t" in self.styling else 72 * 2 if "d" in self.styling else 72
|
||
|
return self.i * w // self.length != (self.i - 1) * w // self.length
|
||
|
|
||
|
@property
|
||
|
def elapsed(self):
|
||
|
now = self.end if self.state == "finished" else time.time()
|
||
|
return now - self.start
|
||
|
|
||
|
@property
|
||
|
def rate(self):
|
||
|
assert self.state in (
|
||
|
"began",
|
||
|
"progressing",
|
||
|
"finished",
|
||
|
), "invalid state for this"
|
||
|
elapsed = self.elasped
|
||
|
if elapsed == 0.0:
|
||
|
return self.bands[-1][1] # closest thing to infinity
|
||
|
return self.i / elapsed
|
||
|
|
||
|
@property
|
||
|
def eta(self):
|
||
|
raise NotImplementedError("TODO")
|
||
|
|
||
|
@classmethod
|
||
|
def format_rate(cls, rate, preference="*"):
|
||
|
for pref, threshold, formater in cls.bands:
|
||
|
if (pref == "*" or pref == preference) and rate <= threshold:
|
||
|
return formater(rate) + " "
|
||
|
return "????????" # normally unreachable (unless perhaps rate is Inf or NaN?)
|
||
|
|
||
|
@classmethod
|
||
|
def format_time(cls, duration, preference="*"):
|
||
|
o = "-" if duration < 0.0 else "+"
|
||
|
duration = abs(duration)
|
||
|
if duration < 60.0:
|
||
|
o += f"{max(0.0, duration - 0.005):5.2f}s "
|
||
|
elif duration < 3600.0:
|
||
|
x = int(duration)
|
||
|
o += f"{x // 60:2d}m{x % 60:02d}s "
|
||
|
elif duration < 86400.0:
|
||
|
x = int(duration)
|
||
|
o += f"{x // 3600:2d}h{x // 60 % 60:02d}m "
|
||
|
elif duration < 8640000.0:
|
||
|
x = int(duration)
|
||
|
o += f"{x // 86400:2d}d{x // 3600 % 24:02d}h "
|
||
|
else:
|
||
|
o += "TooLong"
|
||
|
return o
|
||
|
|
||
|
@property
|
||
|
def blurb(self): # only the first 8 characters that don't constitute the bar at all
|
||
|
if self.display == "rate":
|
||
|
return self.format_rate(self.rate, self.preference)
|
||
|
elif self.display == "elapsed":
|
||
|
return self.format_time(self.elapsed, self.preference)
|
||
|
elif self.display == "eta":
|
||
|
return self.format_time(-self.eta, self.preference)
|
||
|
|
||
|
@property
|
||
|
def bar(self): # the full 80 characters available in any state
|
||
|
if options["color"]:
|
||
|
color1 = "\033[7m"
|
||
|
color2 = "\033[m\033[1m"
|
||
|
color_reset = "\033[m"
|
||
|
else:
|
||
|
color1 = ""
|
||
|
color2 = ""
|
||
|
color_reset = ""
|
||
|
|
||
|
if self.state in ("began", "finished", "progressing"):
|
||
|
s = self.blurb
|
||
|
if options["color"] and "i" in self.styling:
|
||
|
left = self.unfilled[: self.full]
|
||
|
right = self.unfilled[self.full :]
|
||
|
if self.state == "progressing" or "p" in self.styling:
|
||
|
s += color1 + left + color2 + right[0]
|
||
|
s += color_reset + right[1:]
|
||
|
else:
|
||
|
s += color1 + left
|
||
|
s += color_reset + right
|
||
|
|
||
|
else:
|
||
|
filled = self.filled
|
||
|
style = self.styles[self.style]
|
||
|
progressing = self.state == "progressing" or "p" in self.styling
|
||
|
t = (filled[-1], style["A"][-1], style["B"][-1])
|
||
|
if "t" in self.styling and self.length > 0:
|
||
|
triple = self.i * 72 * 3 // self.length
|
||
|
filled = filled[:-1] + t[triple % 3]
|
||
|
elif "d" in self.styling and self.length > 0:
|
||
|
double = self.i * 72 * 2 // self.length
|
||
|
filled = filled[:-1] + t[double % 2 + 1]
|
||
|
elif "a" in self.styling and self.length > 0:
|
||
|
filled = filled[:-1] + t[self.full % 2 + 1]
|
||
|
|
||
|
s += color2
|
||
|
if progressing:
|
||
|
if "r" in self.styling:
|
||
|
s += filled[-self.full - 1 :]
|
||
|
elif "P" in self.styling:
|
||
|
s += filled[: self.full + 1]
|
||
|
else:
|
||
|
s += filled[: self.full] + filled[-1]
|
||
|
s += color_reset + self.unfilled[self.full + 1 :]
|
||
|
else:
|
||
|
if "r" in self.styling and self.full > 0:
|
||
|
s += filled[-self.full :]
|
||
|
else:
|
||
|
s += filled[: self.full]
|
||
|
s += color_reset + self.unfilled[self.full :]
|
||
|
elif self.state == "prepared":
|
||
|
s = "Ready..." + " " * 72
|
||
|
else:
|
||
|
s = "watdafuk" + "?" * 72
|
||
|
# assert len(s) == 80 + len(color2) + len(color_reset), (self.full, self.state, repr(s))
|
||
|
return s
|
||
|
|
||
|
@property
|
||
|
def row(self):
|
||
|
assert self.state in ("began", "progressing"), "invalid state for this"
|
||
|
return self.rows - allocated + self.depth
|
||
|
|
||
|
def __iter__(self):
|
||
|
# TODO: check env-var override for disabling and colors!
|
||
|
if self.length == 0:
|
||
|
return self.it
|
||
|
if self.i == 0:
|
||
|
self.begin()
|
||
|
for x in self.it:
|
||
|
self.progress(False)
|
||
|
yield x
|
||
|
self.progress(True)
|
||
|
self.progressing = False
|
||
|
self.finish()
|
||
|
|
||
|
# TODO: is there a case where i actually need this?
|
||
|
# def __next__(self):
|
||
|
# pass
|
||
|
|
||
|
def prepare(self):
|
||
|
self.state = "prepared"
|
||
|
return self
|
||
|
|
||
|
def begin(self):
|
||
|
global allocated
|
||
|
self.start = time.time()
|
||
|
assert self not in active, "what"
|
||
|
active.append(self)
|
||
|
self.depth = len(active)
|
||
|
|
||
|
(_cols, _rows) = get_terminal_size()
|
||
|
# propagate terminal size to parents (and self) in case it has since changed:
|
||
|
for pb in active:
|
||
|
pb.cols, pb.rows = _cols, _rows
|
||
|
|
||
|
# TODO: this should be handleable in most cases; hide the top bars for a while.
|
||
|
# need at least 1 blank line for regular output (or just for the cursor to sit)
|
||
|
# i guess just act as though only a subset of `active` are active for a while?
|
||
|
assert self.rows - 1 > self.depth, "ran out of space for progress bars"
|
||
|
|
||
|
s = alloc(self.depth) # TODO: only do this as necessary? (based on allocated)
|
||
|
|
||
|
# keep track of how many lines are available for progress bars.
|
||
|
newly = len(active) - allocated if len(active) > allocated else 0
|
||
|
# TODO: allow the user to allocate many at once.
|
||
|
allocated += newly
|
||
|
self.state = "began"
|
||
|
|
||
|
s += "\0337"
|
||
|
if 1:
|
||
|
for pb in active:
|
||
|
# TODO: respect 30 fps timers?
|
||
|
s += f"\033[{pb.row};1H\033[K{pb.bar}"
|
||
|
else: # this isn't as flexible, but it works...?
|
||
|
# FIXME: progress() is still broken!
|
||
|
for pb in active:
|
||
|
s += f"\n\033[K{pb.bar}"
|
||
|
# prevent the user from accidentally printing over the progress bars.
|
||
|
s += f"\033[1;{self.rows - allocated}r"
|
||
|
s += "\0338" # finally, return to the user's (properly adjusted) line and column.
|
||
|
self.printer(end=s)
|
||
|
# time.sleep(2); exit()
|
||
|
|
||
|
if DEBUG:
|
||
|
print(repr(s).replace("\\x1b", "\\033"))
|
||
|
time.sleep(0.5)
|
||
|
|
||
|
def progress(self, finishing=False):
|
||
|
if finishing:
|
||
|
self.i += 1
|
||
|
self.state = "began"
|
||
|
else:
|
||
|
self.state = "progressing"
|
||
|
|
||
|
if not options["patient"] or self.needs_update:
|
||
|
now = time.time()
|
||
|
if now >= self.previous_update + 0.03333333333333333: # limit to 30 fps
|
||
|
self.previous_update = now
|
||
|
s = f"\0337\033[{self.row};1H\033[K{self.bar}\0338"
|
||
|
self.printer(end=s)
|
||
|
|
||
|
def finish(self):
|
||
|
global allocated
|
||
|
self.end = time.time()
|
||
|
row = self.row # we need this before we transition states
|
||
|
assert self.depth == len(active) and active[-1] is self, "what"
|
||
|
active.pop()
|
||
|
self.state = "finished"
|
||
|
s = ""
|
||
|
if len(active) == 0:
|
||
|
s += "\0337\033[J\033[r\0338"
|
||
|
allocated = 0
|
||
|
else:
|
||
|
s += f"\0337\033[{row};1H\033[K\0338"
|
||
|
self.printer(end=s)
|
||
|
|
||
|
|
||
|
def prog(it, *, file=sys.stderr, pref="*"):
|
||
|
def printy(*args, **kwargs):
|
||
|
# print(repr(kwargs.get("end", "?").replace("\\x1b", "\\033")))
|
||
|
# time.sleep(2.5)
|
||
|
print(*args, **kwargs, file=file, flush=options["flush"])
|
||
|
# time.sleep(2.5)
|
||
|
|
||
|
try:
|
||
|
l = len(it)
|
||
|
except TypeError:
|
||
|
raise TypeError("prog(iterator) only works with iterators with a known length")
|
||
|
return Prog80(it, l, printer=printy, preference=pref).prepare()
|