258 lines
8.9 KiB
Python
258 lines
8.9 KiB
Python
|
# i've separated numpy-dependent methods from the rest of the utils.
|
||
|
from project import project
|
||
|
from utils import AcquireForWriting, merge_summaries, feps, m33, m34, m93
|
||
|
import numpy as np
|
||
|
|
||
|
|
||
|
def do_bounding(x, method="clip"):
|
||
|
if method == "clip":
|
||
|
x = np.clip(x, 0, 1)
|
||
|
elif method == "proj":
|
||
|
# projects x back into the unit hypercube, poorly.
|
||
|
if any(x < 0) or any(x > 1):
|
||
|
x = 2 * x - 1
|
||
|
x /= np.max(np.abs(x)) + feps
|
||
|
x = (x + 1) / 2
|
||
|
assert all(x >= 0) and all(x <= 1), x
|
||
|
elif method == "pro2":
|
||
|
# a little more logical.
|
||
|
# FIXME: we need a way to determine the previous (or center) x somehow?
|
||
|
if any(x < 0) or any(x > 1):
|
||
|
x = project(best_so_far[1], x, eps=feps)
|
||
|
elif method == "prcl":
|
||
|
# over-engineered clipping with projection-sliding. (yeah don't ask)
|
||
|
# FIXME: we need a way to determine the previous (or center) x somehow?
|
||
|
from bitten_snes import _project_with
|
||
|
|
||
|
x = _project_with(x, old, np.array([[0.0, 1.0] * n_dim]), clipping=0.5)
|
||
|
elif method == "tria":
|
||
|
hp = np.pi / 2
|
||
|
x = np.abs(np.arcsin(np.sin(x * hp)) / hp)
|
||
|
elif method == "sine":
|
||
|
x = np.square(np.sin(0.5 * np.pi * x))
|
||
|
elif method == "ssin":
|
||
|
x = np.square(np.sin(0.5 * np.pi * (np.arcsinh(x - 0.5) + 0.5)))
|
||
|
elif method == "pycma":
|
||
|
raise Exception("TODO: workaround this like pycma does.") # old ver or new ver?
|
||
|
return x
|
||
|
|
||
|
|
||
|
class OWrap:
|
||
|
def __init__(self, objective, n_trials, frugal_percent=1.0, greedy_percent=2.0):
|
||
|
self.feval_count = 0
|
||
|
self.best_so_far = None
|
||
|
self.warning = None
|
||
|
self.objective = objective
|
||
|
self.n_trials = n_trials
|
||
|
self.__name__ = objective.__name__ # for evolopy
|
||
|
self.frugal_percent = float(frugal_percent)
|
||
|
self.greedy_percent = float(greedy_percent)
|
||
|
|
||
|
def __str__(self):
|
||
|
return (
|
||
|
"<OWrap on "
|
||
|
+ str(getattr(self.objective, "__name__", str(self.objective)))
|
||
|
+ ">"
|
||
|
)
|
||
|
|
||
|
def __call__(self, x, *args, **kwargs):
|
||
|
if getattr(x, "get_x", None): # zoopt
|
||
|
x = x.get_x()
|
||
|
if type(x) is list: # opytimizer
|
||
|
x = np.array(x)
|
||
|
|
||
|
if x.ndim == 2: # flatten column vectors
|
||
|
assert x.shape[1] == 1, x.shape
|
||
|
x = x.T[0]
|
||
|
|
||
|
if not self.warning and (any(x < 0) or any(x > 1.00000001)):
|
||
|
self.warning = "bounds"
|
||
|
# assert False, x
|
||
|
|
||
|
if not all(np.isfinite(x)):
|
||
|
if not self.warning:
|
||
|
m33("x is not finite (NaN or Inf or -Inf)")
|
||
|
self.warning = "finite"
|
||
|
x[~np.isfinite(x)] = 0.5
|
||
|
x = np.clip(x, 0, 1)
|
||
|
# assert all(np.isfinite(x)), "x is not finite (NaN or Inf or -Inf)"
|
||
|
fx = self.objective(x)
|
||
|
assert np.isfinite(fx), "f(x) is not finite (NaN or Inf or -Inf)"
|
||
|
self.feval_count += 1
|
||
|
|
||
|
if self.feval_count <= self.n_trials:
|
||
|
if self.best_so_far is None or fx < self.best_so_far[0]:
|
||
|
self.best_so_far = (fx, x)
|
||
|
|
||
|
return float(fx)
|
||
|
|
||
|
def finish(self, optimizer_name):
|
||
|
if self.warning == "bounds":
|
||
|
m33(f"{optimizer_name} did not abide to bounds")
|
||
|
if self.warning == "finite":
|
||
|
m33(f"{optimizer_name} passed a non-finite value")
|
||
|
if self.feval_count >= self.n_trials * self.greedy_percent:
|
||
|
m33(f"{optimizer_name} got greedy ({self.feval_count}>{self.n_trials})")
|
||
|
# if self.feval_count <= self.n_trials * 0.95:
|
||
|
if self.feval_count < self.n_trials * self.frugal_percent:
|
||
|
m34(f"{optimizer_name} was frugal ({self.feval_count}<{self.n_trials})")
|
||
|
return self.best_so_far
|
||
|
|
||
|
@property
|
||
|
def fopt(self):
|
||
|
return None if self.best_so_far is None else self.best_so_far[0]
|
||
|
|
||
|
@property
|
||
|
def xopt(self):
|
||
|
return None if self.best_so_far is None else self.best_so_far[1]
|
||
|
|
||
|
|
||
|
class COWrap:
|
||
|
def __init__(self, objective, *, optimizer, n_trials, n_dim, **kwargs):
|
||
|
self._objective = objective
|
||
|
self.optimizer = optimizer
|
||
|
self.n_trials = n_trials
|
||
|
self.n_dim = n_dim
|
||
|
self.kwargs = kwargs
|
||
|
self._dirty = False
|
||
|
|
||
|
from pathlib import Path
|
||
|
|
||
|
# self.cache_dir = Path("./cache")
|
||
|
self.cache_dir = Path("~/thursday-cache").expanduser()
|
||
|
self._cached_summaries = None
|
||
|
self.reset_objective()
|
||
|
|
||
|
def __str__(self):
|
||
|
return (
|
||
|
"<COWrap on "
|
||
|
+ str(getattr(self.ow.objective, "__name__", str(self.ow.objective)))
|
||
|
+ ">"
|
||
|
)
|
||
|
|
||
|
def __name__(self):
|
||
|
return str(getattr(self.ow.objective, "__name__", str(self.ow.objective)))
|
||
|
|
||
|
def __call__(self, x, *args, **kwargs):
|
||
|
assert not self._ran, "please run .finish() before continuing!"
|
||
|
result = self.ow.__call__(x, *args, **kwargs)
|
||
|
self._dirty = True
|
||
|
return result
|
||
|
|
||
|
@property
|
||
|
def objective(self):
|
||
|
return self._objective
|
||
|
|
||
|
@objective.setter
|
||
|
def objective(self, new_objective):
|
||
|
# don't do this or it defeats the purpose: self._cached_summaries = None
|
||
|
self._objective = new_objective
|
||
|
self.reset_objective()
|
||
|
|
||
|
@property
|
||
|
def cache_name(self):
|
||
|
opt_name = self.optimizer.__name__
|
||
|
return f"COWrap_d{self.n_dim:02}_n{self.n_trials:03}_{opt_name}"
|
||
|
|
||
|
@property
|
||
|
def cache_key(self):
|
||
|
opt_name = self.optimizer.__name__
|
||
|
obj_name = self._objective.__name__
|
||
|
return f"{self.cache_name}_{obj_name}[{self._run}]"
|
||
|
|
||
|
@property
|
||
|
def cache_file(self):
|
||
|
opt_name = self.optimizer.__name__
|
||
|
return self.cache_dir / f"{self.cache_name}.json"
|
||
|
|
||
|
@property
|
||
|
def cache_file_fucked(self):
|
||
|
opt_name = self.optimizer.__name__
|
||
|
return self.cache_dir / f"{self.cache_name}_{opt_name}.json"
|
||
|
|
||
|
@property # TODO: write a setter as well?
|
||
|
def cached_summaries(self):
|
||
|
if self._cached_summaries is not None:
|
||
|
return self._cached_summaries
|
||
|
from json import loads
|
||
|
|
||
|
if not self.cache_dir.exists() or not (
|
||
|
self.cache_file.exists() or self.cache_file_fucked.exists()
|
||
|
):
|
||
|
return {}
|
||
|
# text = self.cache_file.read_text()
|
||
|
# if not text:
|
||
|
# return {}
|
||
|
# summaries = loads(text)
|
||
|
# self._cached_summaries = summaries
|
||
|
all_summaries = []
|
||
|
for cf in (self.cache_file, self.cache_file_fucked):
|
||
|
if cf.exists(): # at least one exists at this point...
|
||
|
if text := cf.read_text(): # ...but not every file contains anything
|
||
|
all_summaries.append(loads(text))
|
||
|
self._cached_summaries = merge_summaries(all_summaries)
|
||
|
return self._cached_summaries
|
||
|
|
||
|
def reset_objective(self):
|
||
|
self._dirty = False
|
||
|
self.ow = OWrap(self._objective, self.n_trials, **self.kwargs)
|
||
|
self._check_cache()
|
||
|
|
||
|
def _check_cache(self):
|
||
|
# assert not self._dirty # useless for a private method
|
||
|
self._run = 1
|
||
|
self._ran = False
|
||
|
while self.cache_key in self.cached_summaries:
|
||
|
self._run += 1
|
||
|
|
||
|
def cached(self, run):
|
||
|
assert not self._dirty
|
||
|
old_run = self._run
|
||
|
self._run = run
|
||
|
summary = self.cached_summaries.get(self.cache_key, None)
|
||
|
self._run = old_run
|
||
|
if summary is None:
|
||
|
return None
|
||
|
assert "fopt" in summary, summary
|
||
|
assert "xopt" in summary, summary
|
||
|
assert "duration" in summary, summary
|
||
|
fopt = float(summary["fopt"])
|
||
|
xopt = np.array(summary["xopt"], np.float64)
|
||
|
duration = float(summary["duration"])
|
||
|
return fopt, xopt
|
||
|
|
||
|
def finish(self, opt_name=None):
|
||
|
from json import dumps
|
||
|
from time import time
|
||
|
|
||
|
assert self._dirty
|
||
|
self._ran = True
|
||
|
if opt_name is not None and opt_name != self.optimizer.__name__:
|
||
|
m93("Warning: opt_name mistmatch")
|
||
|
|
||
|
assert self.ow.best_so_far is not None
|
||
|
# fopt, xopt = self.ow.best_so_far
|
||
|
fopt, xopt = self.ow.finish(self.optimizer.__name__)
|
||
|
summary = dict(
|
||
|
fopt=float(fopt),
|
||
|
xopt=[float(x) for x in xopt],
|
||
|
# duration=float(-1), # old, bad for uniqueness
|
||
|
duration=float(-time()),
|
||
|
)
|
||
|
|
||
|
with AcquireForWriting(self.cache_file) as fp:
|
||
|
self._cached_summaries = None # force reload
|
||
|
self._check_cache() # refresh ._run and thereby .cache_key
|
||
|
summaries = self.cached_summaries
|
||
|
summaries[self.cache_key] = summary
|
||
|
text = dumps(summaries, separators=(",", ":"))
|
||
|
fp.write_text(text)
|
||
|
|
||
|
if self.cache_file_fucked.exists():
|
||
|
# safe to delete now that i've written and tested merge_summaries.
|
||
|
self.cache_file_fucked.unlink()
|
||
|
|
||
|
self._cached_summaries = None # force reload in case of other writes
|
||
|
self.reset_objective()
|
||
|
return fopt, xopt
|