274 lines
8.4 KiB
Python
274 lines
8.4 KiB
Python
from dataclasses import dataclass
|
|
import math
|
|
|
|
feps = 2.0**-23.0
|
|
tiniest = 2.0**-1022.0
|
|
check = object() # secret "key" to pass to wrap_untrustworthy to extract feval_count
|
|
final = object() # secret "key" to pass to wrap_untrustworthy to extract results
|
|
|
|
color_factory = lambda color: lambda s: print(f"\033[{color}m{s}\033[m")
|
|
m1 = color_factory(1)
|
|
m30 = color_factory(30)
|
|
m31 = color_factory(31)
|
|
m32 = color_factory(32)
|
|
m33 = color_factory(33)
|
|
m34 = color_factory(34)
|
|
m35 = color_factory(35)
|
|
m36 = color_factory(36)
|
|
m37 = color_factory(37)
|
|
m90 = color_factory(90)
|
|
m91 = color_factory(91)
|
|
m92 = color_factory(92)
|
|
m93 = color_factory(93)
|
|
m94 = color_factory(94)
|
|
m95 = color_factory(95)
|
|
m96 = color_factory(96)
|
|
m97 = color_factory(97)
|
|
|
|
|
|
class ExhaustedTrialsError(Exception):
|
|
pass
|
|
|
|
|
|
def scalar_softplus(x):
|
|
if x >= 33.276435657655455:
|
|
return float(x)
|
|
elif x <= -745.13330078125:
|
|
return 0.0
|
|
else:
|
|
return math.log1p(math.exp(x))
|
|
|
|
|
|
def phi(d):
|
|
# phi(1) = golden ratio
|
|
# phi(2) = plastic constant
|
|
# phi(3) = the positive real root of x**4-x-1
|
|
x = 2.0
|
|
for i in range(30 if d == 1 else max(10, 28 - d)):
|
|
x = pow(1 + x, 1 / (d + 1))
|
|
return x
|
|
|
|
|
|
def wrap_untrustworthy(
|
|
objective, n_trials, *, raising=False, bounding=None, softplus=False, eps=0.0
|
|
):
|
|
# also handles bounding now, so it may be used for other purposes as well. whoops.
|
|
feval_count = 0
|
|
best_so_far = None
|
|
|
|
def _objective(x):
|
|
nonlocal feval_count, best_so_far
|
|
if x is check:
|
|
return feval_count
|
|
if x is final:
|
|
assert best_so_far is not None
|
|
fopt, xopt = best_so_far
|
|
return fopt, xopt, feval_count
|
|
if raising and feval_count >= n_trials:
|
|
raise ExhaustedTrialsError()
|
|
if bounding is not None:
|
|
x = do_bounding(x, bounding)
|
|
fx = objective(x)
|
|
feval_count += 1
|
|
if n_trials is None or feval_count <= n_trials:
|
|
if best_so_far is None or fx < best_so_far[0]:
|
|
best_so_far = (fx, x.copy())
|
|
return scalar_softplus(fx) + eps if softplus else fx
|
|
|
|
return _objective
|
|
|
|
|
|
@dataclass
|
|
class KeyData:
|
|
key: str
|
|
d: int
|
|
n: int
|
|
opt: str
|
|
obj: str
|
|
run: int
|
|
|
|
|
|
def decode_key(key, _filtering=False):
|
|
# example key:
|
|
# COWrap_d03_n130_freelunch_qpso_ps16_cube_go_amgm_on_cube
|
|
k, _, run = key.partition("[")
|
|
run, _, _ = run.partition("]")
|
|
k = k.removeprefix("COWrap_")
|
|
d, _, k = k.partition("_")
|
|
n, _, k = k.partition("_")
|
|
opt, _, k = k.partition("_cube_")
|
|
obj, _, k = k.partition("_on_cube")
|
|
if not obj:
|
|
if opt.endswith("_on_cube"):
|
|
return # fcmaes_biteopt was missing the _cube in its name for a while
|
|
if _filtering and obj in ("go_stochastic", "go_xinsheyang01"):
|
|
return # these are random
|
|
assert not k, k
|
|
d = int(d.removeprefix("d"), 10)
|
|
n = int(n.removeprefix("n"), 10)
|
|
run = int(run, 10)
|
|
return KeyData(key=key, d=d, n=n, opt=opt, obj=obj, run=run)
|
|
|
|
|
|
class AcquireForWriting:
|
|
"""
|
|
A context manager that allows for very crude file-locking-like
|
|
functionality when the FileLock module is missing.
|
|
"""
|
|
|
|
def __init__(self, filepath, usingfilelock=None):
|
|
from pathlib import Path
|
|
|
|
self.filepath = Path(filepath)
|
|
if usingfilelock is None:
|
|
try:
|
|
from filelock import FileLock
|
|
except ModuleNotFoundError:
|
|
self._locking = False
|
|
self.lock = None
|
|
else:
|
|
self._locking = True
|
|
self.lock = FileLock(self._altpath)
|
|
elif usingfilelock:
|
|
from filelock import FileLock
|
|
|
|
self._locking = True
|
|
self.lock = FileLock(self._altpath)
|
|
else:
|
|
self._locking = False
|
|
self.lock = None
|
|
|
|
@property
|
|
def _altpath(self):
|
|
suffix = ".lock" if self._locking else "~"
|
|
return self.filepath.with_suffix(self.filepath.suffix + suffix)
|
|
|
|
def __enter__(self):
|
|
if self._locking:
|
|
self.lock.__enter__()
|
|
else:
|
|
from time import sleep
|
|
|
|
for _ in range(3):
|
|
if self._altpath.exists():
|
|
sleep(1)
|
|
assert not self._altpath.exists(), f"file is locked: {self.filepath}"
|
|
if not self._locking:
|
|
self._altpath.write_bytes(b"")
|
|
return self.filepath if self._locking else self._altpath
|
|
|
|
def __exit__(self, *exc):
|
|
if self._locking:
|
|
self.lock.__exit__(*exc)
|
|
elif exc == (None, None, None):
|
|
assert self._altpath.exists(), f"file went missing: {self.filepath}"
|
|
try:
|
|
data = self._altpath.read_bytes()
|
|
if data:
|
|
self.filepath.write_bytes(data)
|
|
finally:
|
|
self._altpath.unlink()
|
|
# from shutil import move
|
|
# move(self._altpath, self.filepath) # assumes os.rename overwrites files
|
|
|
|
|
|
def perform_another_experimental_scoring_method(results):
|
|
if len(results) and len(something := next(iter(results.values()))[0]) == 3:
|
|
history_length = len(something[2])
|
|
each = {}
|
|
for i in range(history_length):
|
|
# for k, v in results.items(): for vi in v: assert len(vi) == 3, vi
|
|
l = {k: [(res[2][i], res[1]) for res in v] for k, v in results.items()}
|
|
for k, v in perform_another_experimental_scoring_method(l).items():
|
|
each.setdefault(k, []).append(v)
|
|
return {k: sum(v) / len(v) for k, v in each.items()}
|
|
|
|
new_results = {}
|
|
all_opt_names = set()
|
|
for obj_name, obj_res in results.items():
|
|
all_res = {}
|
|
for fopt, opt_name in obj_res:
|
|
all_res.setdefault(fopt, []).append(opt_name)
|
|
all_opt_names.add(opt_name)
|
|
new_results[obj_name] = dict(sorted(all_res.items()))
|
|
|
|
limited_by_floating_point_precision = 53
|
|
|
|
best_ranks_and_counts = {}
|
|
for outer_rank in range(1, limited_by_floating_point_precision + 1):
|
|
for obj_name, all_res in new_results.items():
|
|
for fopt, opt_names in all_res.items():
|
|
dirty = False
|
|
for opt_name in set(opt_names):
|
|
if opt_name in best_ranks_and_counts:
|
|
rank, count = best_ranks_and_counts[opt_name]
|
|
if rank == outer_rank:
|
|
best_ranks_and_counts[opt_name] = (rank, count + 1)
|
|
dirty = True
|
|
else:
|
|
best_ranks_and_counts[opt_name] = (outer_rank, 1)
|
|
dirty = True
|
|
if dirty:
|
|
break
|
|
|
|
scores = {k: 0.0 for k in all_opt_names}
|
|
for opt_name, (rank, count) in best_ranks_and_counts.items():
|
|
points = 2 ** (1 - rank)
|
|
count = min(count, limited_by_floating_point_precision)
|
|
scores[opt_name] = score = sum(points / 2**i for i in range(count))
|
|
|
|
return scores
|
|
|
|
|
|
def needs_rerun(key, value):
|
|
if value["duration"] < 0.0 or "history" not in value:
|
|
return True
|
|
|
|
if value["timestamp"] < 1683295630.0: # bugged history field
|
|
return True
|
|
|
|
if not value["history"]: # not sure what happened here
|
|
return True
|
|
|
|
n_dim = len(value["xopt"])
|
|
ng = []
|
|
|
|
kd = decode_key(key)
|
|
assert kd is not None, key
|
|
if kd.obj in ng:
|
|
# print("filtered", key, file=__import__("sys").stderr)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def merge_summaries(all_summaries):
|
|
# i only needed to write this because i effed up my filenames at one point. oh well.
|
|
if len(all_summaries) == 0:
|
|
return {}
|
|
elif len(all_summaries) == 1:
|
|
return {k: v for k, v in all_summaries[0].items() if not needs_rerun(k, v)}
|
|
new_summaries = {}
|
|
for s in all_summaries:
|
|
for key, value in s.items():
|
|
if needs_rerun(key, value):
|
|
continue
|
|
k, _, run = key.partition("[")
|
|
run, _, _ = run.partition("]")
|
|
for i in range(1, 100):
|
|
new_key = f"{k}[{i}]"
|
|
if new_key in new_summaries:
|
|
if new_summaries[new_key] == value: # this works 'cause it's POD
|
|
break # already exists (probably; duration is fucked)
|
|
continue
|
|
new_summaries[new_key] = value
|
|
break
|
|
return new_summaries
|
|
|
|
|
|
try:
|
|
import numpy
|
|
except ModuleNotFoundError:
|
|
pass
|
|
else:
|
|
from utils_np import *
|