from dataclasses import dataclass import math feps = 2.0**-23.0 tiniest = 2.0**-1022.0 check = object() # secret "key" to pass to wrap_untrustworthy to extract feval_count final = object() # secret "key" to pass to wrap_untrustworthy to extract results color_factory = lambda color: lambda s: print(f"\033[{color}m{s}\033[m") m1 = color_factory(1) m30 = color_factory(30) m31 = color_factory(31) m32 = color_factory(32) m33 = color_factory(33) m34 = color_factory(34) m35 = color_factory(35) m36 = color_factory(36) m37 = color_factory(37) m90 = color_factory(90) m91 = color_factory(91) m92 = color_factory(92) m93 = color_factory(93) m94 = color_factory(94) m95 = color_factory(95) m96 = color_factory(96) m97 = color_factory(97) class ExhaustedTrialsError(Exception): pass def scalar_softplus(x): if x >= 33.276435657655455: return float(x) elif x <= -745.13330078125: return 0.0 else: return math.log1p(math.exp(x)) def phi(d): # phi(1) = golden ratio # phi(2) = plastic constant # phi(3) = the positive real root of x**4-x-1 x = 2.0 for i in range(30 if d == 1 else max(10, 28 - d)): x = pow(1 + x, 1 / (d + 1)) return x def wrap_untrustworthy( objective, n_trials, *, raising=False, bounding=None, softplus=False, eps=0.0 ): # also handles bounding now, so it may be used for other purposes as well. whoops. feval_count = 0 best_so_far = None def _objective(x): nonlocal feval_count, best_so_far if x is check: return feval_count if x is final: assert best_so_far is not None fopt, xopt = best_so_far return fopt, xopt, feval_count if raising and feval_count >= n_trials: raise ExhaustedTrialsError() if bounding is not None: x = do_bounding(x, bounding) fx = objective(x) feval_count += 1 if n_trials is None or feval_count <= n_trials: if best_so_far is None or fx < best_so_far[0]: best_so_far = (fx, x.copy()) return scalar_softplus(fx) + eps if softplus else fx return _objective @dataclass class KeyData: key: str d: int n: int opt: str obj: str run: int def decode_key(key, _filtering=False): # example key: # COWrap_d03_n130_freelunch_qpso_ps16_cube_go_amgm_on_cube k, _, run = key.partition("[") run, _, _ = run.partition("]") k = k.removeprefix("COWrap_") d, _, k = k.partition("_") n, _, k = k.partition("_") opt, _, k = k.partition("_cube_") obj, _, k = k.partition("_on_cube") if not obj: if opt.endswith("_on_cube"): return # fcmaes_biteopt was missing the _cube in its name for a while if _filtering and obj in ("go_stochastic", "go_xinsheyang01"): return # these are random assert not k, k d = int(d.removeprefix("d"), 10) n = int(n.removeprefix("n"), 10) run = int(run, 10) return KeyData(key=key, d=d, n=n, opt=opt, obj=obj, run=run) class AcquireForWriting: """ A context manager that allows for very crude file-locking-like functionality when the FileLock module is missing. """ def __init__(self, filepath, usingfilelock=None): from pathlib import Path self.filepath = Path(filepath) if usingfilelock is None: try: from filelock import FileLock except ModuleNotFoundError: self._locking = False self.lock = None else: self._locking = True self.lock = FileLock(self._altpath) elif usingfilelock: from filelock import FileLock self._locking = True self.lock = FileLock(self._altpath) else: self._locking = False self.lock = None @property def _altpath(self): suffix = ".lock" if self._locking else "~" return self.filepath.with_suffix(self.filepath.suffix + suffix) def __enter__(self): if self._locking: self.lock.__enter__() else: from time import sleep for _ in range(3): if self._altpath.exists(): sleep(1) assert not self._altpath.exists(), f"file is locked: {self.filepath}" if not self._locking: self._altpath.write_bytes(b"") return self.filepath if self._locking else self._altpath def __exit__(self, *exc): if self._locking: self.lock.__exit__(*exc) elif exc == (None, None, None): assert self._altpath.exists(), f"file went missing: {self.filepath}" try: data = self._altpath.read_bytes() if data: self.filepath.write_bytes(data) finally: self._altpath.unlink() # from shutil import move # move(self._altpath, self.filepath) # assumes os.rename overwrites files def perform_another_experimental_scoring_method(results): if len(results) and len(something := next(iter(results.values()))[0]) == 3: history_length = len(something[2]) each = {} # for i in (history_length - 1,): for i in range(history_length): # for k, v in results.items(): for vi in v: assert len(vi) == 3, vi l = {k: [(res[2][i], res[1]) for res in v] for k, v in results.items()} for k, v in perform_another_experimental_scoring_method(l).items(): each.setdefault(k, []).append(v) return {k: sum(v) / len(v) for k, v in each.items()} new_results = {} all_opt_names = set() for obj_name, obj_res in results.items(): all_res = {} for fopt, opt_name in obj_res: all_res.setdefault(fopt, []).append(opt_name) all_opt_names.add(opt_name) new_results[obj_name] = dict(sorted(all_res.items())) limited_by_floating_point_precision = 53 best_ranks_and_counts = {} for outer_rank in range(1, limited_by_floating_point_precision + 1): for obj_name, all_res in new_results.items(): for fopt, opt_names in all_res.items(): dirty = False for opt_name in set(opt_names): if opt_name in best_ranks_and_counts: rank, count = best_ranks_and_counts[opt_name] if rank == outer_rank: best_ranks_and_counts[opt_name] = (rank, count + 1) dirty = True else: best_ranks_and_counts[opt_name] = (outer_rank, 1) dirty = True if dirty: break scores = {k: 0.0 for k in all_opt_names} for opt_name, (rank, count) in best_ranks_and_counts.items(): points = 2 ** (1 - rank) count = min(count, limited_by_floating_point_precision) scores[opt_name] = score = sum(points / 2**i for i in range(count)) return scores def needs_rerun(key, value): if value["duration"] < 0.0 or "history" not in value: return True if value["timestamp"] < 1683295630.0: # bugged history field return True if not value["history"]: # not sure what happened here return True n_dim = len(value["xopt"]) ng = [] kd = decode_key(key) assert kd is not None, key if kd.obj in ng: # print("filtered", key, file=__import__("sys").stderr) return True return False def merge_summaries(all_summaries): # i only needed to write this because i effed up my filenames at one point. oh well. if len(all_summaries) == 0: return {} elif len(all_summaries) == 1: return {k: v for k, v in all_summaries[0].items() if not needs_rerun(k, v)} new_summaries = {} for s in all_summaries: for key, value in s.items(): if needs_rerun(key, value): continue k, _, run = key.partition("[") run, _, _ = run.partition("]") for i in range(1, 100): new_key = f"{k}[{i}]" if new_key in new_summaries: if new_summaries[new_key] == value: # this works 'cause it's POD break # already exists (probably; duration is fucked) continue new_summaries[new_key] = value break return new_summaries