thursday/utils_np.py

314 lines
11 KiB
Python

# i've separated numpy-dependent methods from the rest of the utils.
from time import time
from utils import AcquireForWriting, merge_summaries, feps, m33, m34, m93
import numpy as np
def project(p, a, eps=1e-8):
# https://www.desmos.com/calculator/gdcu0ivk0i
p = np.asanyarray(p)
a = np.asanyarray(a)
d = p - a
if all(np.abs(d) <= eps):
# we might still be inching out of bounds, so just to be sure:
a[a <= 0] = 0
a[a >= 1] = 1
return a
inner = 1 / np.where(np.abs(d) > eps, d, np.where(d >= 0, eps, -eps))
small = -np.abs(p - 1) # np.minimum(1 - p, p - 1)
large = np.abs(p) # np.maximum(0 - p, p - 0)
rescale = np.min(np.maximum(inner * small, inner * large))
if rescale <= 1:
b = p - max(0, rescale - 1e-8) * d
return b
else:
return a
def do_bounding(x, method="clip"):
if method == "clip":
x = np.clip(x, 0, 1)
elif method == "proj":
# projects x back into the unit hypercube, poorly.
if any(x < 0) or any(x > 1):
x = 2 * x - 1
x /= np.max(np.abs(x)) + feps
x = (x + 1) / 2
assert all(x >= 0) and all(x <= 1), x
elif method == "pro2":
# a little more logical.
# FIXME: we need a way to determine the previous (or center) x somehow?
if any(x < 0) or any(x > 1):
x = project(best_so_far[1], x, eps=feps)
elif method == "prcl":
# over-engineered clipping with projection-sliding. (yeah don't ask)
# FIXME: we need a way to determine the previous (or center) x somehow?
from bitten_snes import _project_with
x = _project_with(x, old, np.array([[0.0, 1.0] * n_dim]), clipping=0.5)
elif method == "tria":
hp = np.pi / 2
x = np.abs(np.arcsin(np.sin(x * hp)) / hp)
elif method == "sine":
x = np.square(np.sin(0.5 * np.pi * x))
elif method == "ssin":
x = np.square(np.sin(0.5 * np.pi * (np.arcsinh(x - 0.5) + 0.5)))
elif method == "pycma":
raise Exception("TODO: workaround this like pycma does.") # old ver or new ver?
return x
class OWrap:
def __init__(
self,
objective,
n_trials,
frugal_percent=1.0,
greedy_percent=2.0,
history_frequency=10,
):
self.feval_count = 0
self.best_so_far = None
self.warning = None
self.objective = objective
self.n_trials = n_trials
self.__name__ = objective.__name__ # for evolopy
self.frugal_percent = float(frugal_percent)
self.greedy_percent = float(greedy_percent)
self.history_frequency = history_frequency
self.history = []
def __str__(self):
return (
"<OWrap on "
+ str(getattr(self.objective, "__name__", str(self.objective)))
+ ">"
)
def __call__(self, x, *args, **kwargs):
if getattr(x, "get_x", None): # zoopt
x = x.get_x()
if type(x) is list: # opytimizer
x = np.array(x)
if x.ndim == 2: # flatten column vectors
assert x.shape[1] == 1, x.shape
x = x.T[0]
if not self.warning and (any(x < 0) or any(x > 1.00000001)):
self.warning = "bounds"
# assert False, x
if not all(np.isfinite(x)):
if not self.warning:
m33("x is not finite (NaN or Inf or -Inf)")
self.warning = "finite"
x[~np.isfinite(x)] = 0.5
x = np.clip(x, 0, 1)
# assert all(np.isfinite(x)), "x is not finite (NaN or Inf or -Inf)"
fx = self.objective(x)
assert np.isfinite(fx), "f(x) is not finite (NaN or Inf or -Inf)"
self.feval_count += 1
if self.feval_count <= self.n_trials:
if self.best_so_far is None or fx < self.best_so_far[0]:
self.best_so_far = (fx, x)
if self.history_frequency > 0:
if self.feval_count % self.history_frequency == 0:
self.history.append(self.best_so_far[0])
return float(fx)
def finish(self, optimizer_name):
if self.warning == "bounds":
m33(f"{optimizer_name} did not abide to bounds")
if self.warning == "finite":
m33(f"{optimizer_name} passed a non-finite value")
if self.feval_count >= self.n_trials * self.greedy_percent:
m33(f"{optimizer_name} got greedy ({self.feval_count}>{self.n_trials})")
# if self.feval_count <= self.n_trials * 0.95:
if self.feval_count < self.n_trials * self.frugal_percent:
m34(f"{optimizer_name} was frugal ({self.feval_count}<{self.n_trials})")
return self.best_so_far
@property
def fopt(self):
return None if self.best_so_far is None else self.best_so_far[0]
@property
def xopt(self):
return None if self.best_so_far is None else self.best_so_far[1]
class COWrap:
def __init__(self, objective, *, optimizer, n_trials, n_dim, **kwargs):
self._objective = objective
self.optimizer = optimizer
self.n_trials = n_trials
self.n_dim = n_dim
self.kwargs = kwargs
self._dirty = False
self._history = None
from pathlib import Path
# self.cache_dir = Path("./cache")
self.cache_dir = Path("~/thursday-cache").expanduser()
self._cached_summaries = None
self.reset_objective()
def __str__(self):
return (
"<COWrap on "
+ str(getattr(self.ow.objective, "__name__", str(self.ow.objective)))
+ ">"
)
def __name__(self):
return str(getattr(self.ow.objective, "__name__", str(self.ow.objective)))
def __call__(self, x, *args, **kwargs):
assert not self._ran, "please run .finish() before continuing!"
if not self._dirty:
self.start_time = time()
result = self.ow.__call__(x, *args, **kwargs)
self._dirty = True
return result
@property
def objective(self):
return self._objective
@objective.setter
def objective(self, new_objective):
# don't do this or it defeats the purpose: self._cached_summaries = None
self._objective = new_objective
self.reset_objective()
@property
def cache_name(self):
opt_name = self.optimizer.__name__
return f"COWrap_d{self.n_dim:02}_n{self.n_trials:03}_{opt_name}"
@property
def history(self):
assert not self._dirty
assert self._history
return self._history
@property
def cache_key(self):
opt_name = self.optimizer.__name__
obj_name = self._objective.__name__
return f"{self.cache_name}_{obj_name}[{self._run}]"
@property
def cache_file(self):
opt_name = self.optimizer.__name__
return self.cache_dir / f"{self.cache_name}.json"
@property
def cache_file_fucked(self):
opt_name = self.optimizer.__name__
return self.cache_dir / f"{self.cache_name}_{opt_name}.json"
@property # TODO: write a setter as well?
def cached_summaries(self):
if self._cached_summaries is not None:
return self._cached_summaries
from json import loads
if not self.cache_dir.exists() or not (
self.cache_file.exists() or self.cache_file_fucked.exists()
):
return {}
# text = self.cache_file.read_text()
# if not text:
# return {}
# summaries = loads(text)
# self._cached_summaries = summaries
all_summaries = []
for cf in (self.cache_file, self.cache_file_fucked):
if cf.exists(): # at least one exists at this point...
if text := cf.read_text(): # ...but not every file contains anything
all_summaries.append(loads(text))
self._cached_summaries = merge_summaries(all_summaries)
return self._cached_summaries
def reset_objective(self):
self._dirty = False
self.ow = OWrap(self._objective, self.n_trials, **self.kwargs)
self._check_cache()
def _check_cache(self):
# assert not self._dirty # useless for a private method
self._run = 1
self._ran = False
while self.cache_key in self.cached_summaries:
self._run += 1
def cached(self, run):
assert not self._dirty
old_run = self._run
self._run = run
summary = self.cached_summaries.get(self.cache_key, None)
self._run = old_run
if summary is None:
return None
assert "fopt" in summary, summary
assert "xopt" in summary, summary
assert "timestamp" in summary, summary
assert "history" in summary, summary
fopt = float(summary["fopt"])
xopt = np.array(summary["xopt"], np.float64)
history = [fval for fval in summary["history"]]
assert history, "history cannot be empty" # this should get filtered now
return fopt, xopt, history
def finish(self, opt_name=None):
from json import dumps
assert self._dirty
self._ran = True
if opt_name is not None and opt_name != self.optimizer.__name__:
m93("Warning: opt_name mistmatch")
assert self.ow.best_so_far is not None
# fopt, xopt = self.ow.best_so_far
fopt, xopt = self.ow.finish(self.optimizer.__name__)
expected_length = self.n_trials // self.ow.history_frequency
history = [float(fval) for fval in self.ow.history]
history += [fopt] * (expected_length - len(history))
finish_time = time()
summary = dict(
fopt=float(fopt),
xopt=[float(x) for x in xopt],
# timestamp=float(-1), # old, bad for uniqueness
timestamp=finish_time,
# optional: (for now?)
history=history,
duration=finish_time - self.start_time,
)
self._history = summary["history"]
assert self._history, "why"
with AcquireForWriting(self.cache_file) as fp:
self._cached_summaries = None # force reload
self._check_cache() # refresh ._run and thereby .cache_key
summaries = self.cached_summaries
summaries[self.cache_key] = summary
text = dumps(summaries, separators=(",", ":"))
fp.write_text(text)
if self.cache_file_fucked.exists():
# safe to delete now that i've written and tested merge_summaries.
self.cache_file_fucked.unlink()
self._cached_summaries = None # force reload in case of other writes
self.reset_objective()
return fopt, xopt