314 lines
11 KiB
Python
314 lines
11 KiB
Python
# i've separated numpy-dependent methods from the rest of the utils.
|
|
from time import time
|
|
from utils import AcquireForWriting, merge_summaries, feps, m33, m34, m93
|
|
import numpy as np
|
|
|
|
|
|
def project(p, a, eps=1e-8):
|
|
# https://www.desmos.com/calculator/gdcu0ivk0i
|
|
p = np.asanyarray(p)
|
|
a = np.asanyarray(a)
|
|
d = p - a
|
|
if all(np.abs(d) <= eps):
|
|
# we might still be inching out of bounds, so just to be sure:
|
|
a[a <= 0] = 0
|
|
a[a >= 1] = 1
|
|
return a
|
|
|
|
inner = 1 / np.where(np.abs(d) > eps, d, np.where(d >= 0, eps, -eps))
|
|
small = -np.abs(p - 1) # np.minimum(1 - p, p - 1)
|
|
large = np.abs(p) # np.maximum(0 - p, p - 0)
|
|
rescale = np.min(np.maximum(inner * small, inner * large))
|
|
|
|
if rescale <= 1:
|
|
b = p - max(0, rescale - 1e-8) * d
|
|
return b
|
|
else:
|
|
return a
|
|
|
|
|
|
def do_bounding(x, method="clip"):
|
|
if method == "clip":
|
|
x = np.clip(x, 0, 1)
|
|
elif method == "proj":
|
|
# projects x back into the unit hypercube, poorly.
|
|
if any(x < 0) or any(x > 1):
|
|
x = 2 * x - 1
|
|
x /= np.max(np.abs(x)) + feps
|
|
x = (x + 1) / 2
|
|
assert all(x >= 0) and all(x <= 1), x
|
|
elif method == "pro2":
|
|
# a little more logical.
|
|
# FIXME: we need a way to determine the previous (or center) x somehow?
|
|
if any(x < 0) or any(x > 1):
|
|
x = project(best_so_far[1], x, eps=feps)
|
|
elif method == "prcl":
|
|
# over-engineered clipping with projection-sliding. (yeah don't ask)
|
|
# FIXME: we need a way to determine the previous (or center) x somehow?
|
|
from bitten_snes import _project_with
|
|
|
|
x = _project_with(x, old, np.array([[0.0, 1.0] * n_dim]), clipping=0.5)
|
|
elif method == "tria":
|
|
hp = np.pi / 2
|
|
x = np.abs(np.arcsin(np.sin(x * hp)) / hp)
|
|
elif method == "sine":
|
|
x = np.square(np.sin(0.5 * np.pi * x))
|
|
elif method == "ssin":
|
|
x = np.square(np.sin(0.5 * np.pi * (np.arcsinh(x - 0.5) + 0.5)))
|
|
elif method == "pycma":
|
|
raise Exception("TODO: workaround this like pycma does.") # old ver or new ver?
|
|
return x
|
|
|
|
|
|
class OWrap:
|
|
def __init__(
|
|
self,
|
|
objective,
|
|
n_trials,
|
|
frugal_percent=1.0,
|
|
greedy_percent=2.0,
|
|
history_frequency=10,
|
|
):
|
|
self.feval_count = 0
|
|
self.best_so_far = None
|
|
self.warning = None
|
|
self.objective = objective
|
|
self.n_trials = n_trials
|
|
self.__name__ = objective.__name__ # for evolopy
|
|
self.frugal_percent = float(frugal_percent)
|
|
self.greedy_percent = float(greedy_percent)
|
|
self.history_frequency = history_frequency
|
|
self.history = []
|
|
|
|
def __str__(self):
|
|
return (
|
|
"<OWrap on "
|
|
+ str(getattr(self.objective, "__name__", str(self.objective)))
|
|
+ ">"
|
|
)
|
|
|
|
def __call__(self, x, *args, **kwargs):
|
|
if getattr(x, "get_x", None): # zoopt
|
|
x = x.get_x()
|
|
if type(x) is list: # opytimizer
|
|
x = np.array(x)
|
|
|
|
if x.ndim == 2: # flatten column vectors
|
|
assert x.shape[1] == 1, x.shape
|
|
x = x.T[0]
|
|
|
|
if not self.warning and (any(x < 0) or any(x > 1.00000001)):
|
|
self.warning = "bounds"
|
|
# assert False, x
|
|
|
|
if not all(np.isfinite(x)):
|
|
if not self.warning:
|
|
m33("x is not finite (NaN or Inf or -Inf)")
|
|
self.warning = "finite"
|
|
x[~np.isfinite(x)] = 0.5
|
|
x = np.clip(x, 0, 1)
|
|
# assert all(np.isfinite(x)), "x is not finite (NaN or Inf or -Inf)"
|
|
fx = self.objective(x)
|
|
assert np.isfinite(fx), "f(x) is not finite (NaN or Inf or -Inf)"
|
|
self.feval_count += 1
|
|
|
|
if self.feval_count <= self.n_trials:
|
|
if self.best_so_far is None or fx < self.best_so_far[0]:
|
|
self.best_so_far = (fx, x)
|
|
|
|
if self.history_frequency > 0:
|
|
if self.feval_count % self.history_frequency == 0:
|
|
self.history.append(self.best_so_far[0])
|
|
|
|
return float(fx)
|
|
|
|
def finish(self, optimizer_name):
|
|
if self.warning == "bounds":
|
|
m33(f"{optimizer_name} did not abide to bounds")
|
|
if self.warning == "finite":
|
|
m33(f"{optimizer_name} passed a non-finite value")
|
|
if self.feval_count >= self.n_trials * self.greedy_percent:
|
|
m33(f"{optimizer_name} got greedy ({self.feval_count}>{self.n_trials})")
|
|
# if self.feval_count <= self.n_trials * 0.95:
|
|
if self.feval_count < self.n_trials * self.frugal_percent:
|
|
m34(f"{optimizer_name} was frugal ({self.feval_count}<{self.n_trials})")
|
|
return self.best_so_far
|
|
|
|
@property
|
|
def fopt(self):
|
|
return None if self.best_so_far is None else self.best_so_far[0]
|
|
|
|
@property
|
|
def xopt(self):
|
|
return None if self.best_so_far is None else self.best_so_far[1]
|
|
|
|
|
|
class COWrap:
|
|
def __init__(self, objective, *, optimizer, n_trials, n_dim, **kwargs):
|
|
self._objective = objective
|
|
self.optimizer = optimizer
|
|
self.n_trials = n_trials
|
|
self.n_dim = n_dim
|
|
self.kwargs = kwargs
|
|
self._dirty = False
|
|
self._history = None
|
|
|
|
from pathlib import Path
|
|
|
|
# self.cache_dir = Path("./cache")
|
|
self.cache_dir = Path("~/thursday-cache").expanduser()
|
|
self._cached_summaries = None
|
|
self.reset_objective()
|
|
|
|
def __str__(self):
|
|
return (
|
|
"<COWrap on "
|
|
+ str(getattr(self.ow.objective, "__name__", str(self.ow.objective)))
|
|
+ ">"
|
|
)
|
|
|
|
def __name__(self):
|
|
return str(getattr(self.ow.objective, "__name__", str(self.ow.objective)))
|
|
|
|
def __call__(self, x, *args, **kwargs):
|
|
assert not self._ran, "please run .finish() before continuing!"
|
|
if not self._dirty:
|
|
self.start_time = time()
|
|
result = self.ow.__call__(x, *args, **kwargs)
|
|
self._dirty = True
|
|
return result
|
|
|
|
@property
|
|
def objective(self):
|
|
return self._objective
|
|
|
|
@objective.setter
|
|
def objective(self, new_objective):
|
|
# don't do this or it defeats the purpose: self._cached_summaries = None
|
|
self._objective = new_objective
|
|
self.reset_objective()
|
|
|
|
@property
|
|
def cache_name(self):
|
|
opt_name = self.optimizer.__name__
|
|
return f"COWrap_d{self.n_dim:02}_n{self.n_trials:03}_{opt_name}"
|
|
|
|
@property
|
|
def history(self):
|
|
assert not self._dirty
|
|
assert self._history
|
|
return self._history
|
|
|
|
@property
|
|
def cache_key(self):
|
|
opt_name = self.optimizer.__name__
|
|
obj_name = self._objective.__name__
|
|
return f"{self.cache_name}_{obj_name}[{self._run}]"
|
|
|
|
@property
|
|
def cache_file(self):
|
|
opt_name = self.optimizer.__name__
|
|
return self.cache_dir / f"{self.cache_name}.json"
|
|
|
|
@property
|
|
def cache_file_fucked(self):
|
|
opt_name = self.optimizer.__name__
|
|
return self.cache_dir / f"{self.cache_name}_{opt_name}.json"
|
|
|
|
@property # TODO: write a setter as well?
|
|
def cached_summaries(self):
|
|
if self._cached_summaries is not None:
|
|
return self._cached_summaries
|
|
from json import loads
|
|
|
|
if not self.cache_dir.exists() or not (
|
|
self.cache_file.exists() or self.cache_file_fucked.exists()
|
|
):
|
|
return {}
|
|
# text = self.cache_file.read_text()
|
|
# if not text:
|
|
# return {}
|
|
# summaries = loads(text)
|
|
# self._cached_summaries = summaries
|
|
all_summaries = []
|
|
for cf in (self.cache_file, self.cache_file_fucked):
|
|
if cf.exists(): # at least one exists at this point...
|
|
if text := cf.read_text(): # ...but not every file contains anything
|
|
all_summaries.append(loads(text))
|
|
self._cached_summaries = merge_summaries(all_summaries)
|
|
return self._cached_summaries
|
|
|
|
def reset_objective(self):
|
|
self._dirty = False
|
|
self.ow = OWrap(self._objective, self.n_trials, **self.kwargs)
|
|
self._check_cache()
|
|
|
|
def _check_cache(self):
|
|
# assert not self._dirty # useless for a private method
|
|
self._run = 1
|
|
self._ran = False
|
|
while self.cache_key in self.cached_summaries:
|
|
self._run += 1
|
|
|
|
def cached(self, run):
|
|
assert not self._dirty
|
|
old_run = self._run
|
|
self._run = run
|
|
summary = self.cached_summaries.get(self.cache_key, None)
|
|
self._run = old_run
|
|
if summary is None:
|
|
return None
|
|
assert "fopt" in summary, summary
|
|
assert "xopt" in summary, summary
|
|
assert "timestamp" in summary, summary
|
|
assert "history" in summary, summary
|
|
fopt = float(summary["fopt"])
|
|
xopt = np.array(summary["xopt"], np.float64)
|
|
history = [fval for fval in summary["history"]]
|
|
assert history, "history cannot be empty" # this should get filtered now
|
|
return fopt, xopt, history
|
|
|
|
def finish(self, opt_name=None):
|
|
from json import dumps
|
|
|
|
assert self._dirty
|
|
self._ran = True
|
|
if opt_name is not None and opt_name != self.optimizer.__name__:
|
|
m93("Warning: opt_name mistmatch")
|
|
|
|
assert self.ow.best_so_far is not None
|
|
# fopt, xopt = self.ow.best_so_far
|
|
fopt, xopt = self.ow.finish(self.optimizer.__name__)
|
|
|
|
expected_length = self.n_trials // self.ow.history_frequency
|
|
history = [float(fval) for fval in self.ow.history]
|
|
history += [fopt] * (expected_length - len(history))
|
|
|
|
finish_time = time()
|
|
summary = dict(
|
|
fopt=float(fopt),
|
|
xopt=[float(x) for x in xopt],
|
|
# timestamp=float(-1), # old, bad for uniqueness
|
|
timestamp=finish_time,
|
|
# optional: (for now?)
|
|
history=history,
|
|
duration=finish_time - self.start_time,
|
|
)
|
|
self._history = summary["history"]
|
|
assert self._history, "why"
|
|
|
|
with AcquireForWriting(self.cache_file) as fp:
|
|
self._cached_summaries = None # force reload
|
|
self._check_cache() # refresh ._run and thereby .cache_key
|
|
summaries = self.cached_summaries
|
|
summaries[self.cache_key] = summary
|
|
text = dumps(summaries, separators=(",", ":"))
|
|
fp.write_text(text)
|
|
|
|
if self.cache_file_fucked.exists():
|
|
# safe to delete now that i've written and tested merge_summaries.
|
|
self.cache_file_fucked.unlink()
|
|
|
|
self._cached_summaries = None # force reload in case of other writes
|
|
self.reset_objective()
|
|
return fopt, xopt
|