from .go_benchmark_lists import * from .go_benchmarks import all_problems from .parties import parties from .utilities import OWrap, COWrap, m1, m33, m36, prog import numpy as np tiny_offset = 1.1102230246251565e-16 def make_transform(lo, hi): mul = max(lo, hi) - min(lo, hi) add = min(lo, hi) return mul, add def flipit(transformations, flippy): flip_dim = flippy - 1 mul, add = transformations[flip_dim] desired_mul, desired_add = -1, 1 # f(g(x)) = f(1 - x) mul, add = mul * desired_mul, mul * desired_add + add transformations[flip_dim] = (mul, add) def make_objective(problem, size, *, fix_stuff=0): obj = problem(size) name = problem.__name__ flippy = 0 # when positive, helps removes positive correlations in solutions trippy = None # when not None, moves solution away from center: (dim, dir) if fix_stuff >= 0: if name == "Deb03": # this problem has the wrong bounds for some reason, so we have to patch it. obj._bounds = list(zip([0.0] * obj.N, [1.0] * obj.N)) elif name == "Csendes" or name == "Infinity": # this problem is weird... let's avoid division by zero, okay? # these problems are duplicates of each other. weird. replacement = size * (2 + np.sin(1)) _fun = obj.fun obj.fun = lambda x: replacement if np.any(x == 0.0) else _fun(x) elif name == "Keane": # another problem that may attempt to divide by zero. _fun = obj.fun obj.fun = lambda x: 0.0 if np.all(x**2 == 0.0) else _fun(x) elif name == "Kowalik": # this divide by zero actually approaches infinity when not clipped. # TODO: there seems to be some confusion about which is `a` # and which is `b` between the equations and the code. # hmm it seems like the code is right, judging by this: # https://www.itl.nist.gov/div898/strd/nls/data/LINKS/DATA/MGH09.dat helper = lambda x: np.where(x < 0, -1, 1) * (np.abs(x) + tiny_offset) # 1.0 - 1e-16 == 0.9999999999999999 # 1.0 + 1e-16 == 1.0 # 1.0 + 2e-16 == 1.0000000000000002 # 1 / 1.1102230246251565e-16**2 == 8.112963841460668e+31 a, b = obj.a, obj.b obj.fun = ( lambda x: sum( b - (x[0] * (a**2 + a * x[1]) / helper(a**2 + a * x[2] + x[3])) ) ** 2 ) elif name == "Gulf": # just another division by zero. adjust = np.array([tiny_offset, 0.0, 0.0]) _fun = obj.fun obj.fun = lambda x: _fun(x + adjust) if fix_stuff >= 1: stuff = { 2: too_positive_2, 3: too_positive_3, 4: too_positive_4, 5: too_positive_5, 6: too_positive_6, }[size] if name.lower() in stuff: # too positively correlated, do some evil. ind = stuff.index(name.lower()) flippy = ind % size + 1 # uniformly select a dimension to "flip" if fix_stuff >= 2: stuff = { 2: too_centered_2, 3: too_centered_3, 4: too_centered_4, 5: too_centered_5, 6: too_centered_6, }[size] if name.lower() in stuff: # uniformly select offsets to "trip". ind = stuff.index(name.lower()) trippy = (ind % size, ind // size % size) # (dim, dir) transformations = [make_transform(lo, hi) for lo, hi in obj.bounds] if flippy: flipit(transformations, flippy) def objective(x): # assert all(xi >= 0.0 for xi in x), list(float(xi) for xi in x) # assert all(xi <= 1.0 for xi in x), list(float(xi) for xi in x) if trippy: x = list(x) # mostly to create a copy ind = trippy[0] x[ind] = 1 - (1 - x[ind]) ** 0.5 if trippy[1] else x[ind] ** 0.5 x = [xi * mul + add for xi, (mul, add) in zip(x, transformations)] # if problem.__name__.startswith("Deb"): print(x) return obj.fun(np.array(x, copy=False)) objective.__name__ = f"go_{problem.__name__.lower()}_on_cube" objective.__realname__ = problem.__name__ # objective.__qualname__ = problem.__name__ return objective def make_objectives(size, budget=None, fix_stuff=0): problems = all_problems[size] return [make_objective(problem, size, fix_stuff=fix_stuff) for problem in problems] def find_objective(query, size=None): results = [] for p_dim, problems in all_problems.items(): if size is not None and p_dim != size: continue for problem in problems: if problem.__name__.lower() == query.lower(): results.append(problem) assert results, "no results found for name " + repr(query) return results[0] λ = lambda q: make_objective(find_objective(q, 2), 2, fix_stuff=2) GO_BENCHMARK_2D_PROBLEMS = list( map(λ, totally_fine_2 + too_positive_2 + too_centered_2) ) λ = lambda q: make_objective(find_objective(q, 3), 3, fix_stuff=2) GO_BENCHMARK_3D_PROBLEMS = list( map(λ, totally_fine_3 + too_positive_3 + too_centered_3) ) λ = lambda q: make_objective(find_objective(q, 4), 4, fix_stuff=2) GO_BENCHMARK_4D_PROBLEMS = list( map(λ, totally_fine_4 + too_positive_4 + too_centered_4) ) λ = lambda q: make_objective(find_objective(q, 5), 5, fix_stuff=2) GO_BENCHMARK_5D_PROBLEMS = list( map(λ, totally_fine_5 + too_positive_5 + too_centered_5) ) λ = lambda q: make_objective(find_objective(q, 6), 6, fix_stuff=2) GO_BENCHMARK_6D_PROBLEMS = list( map(λ, totally_fine_6 + too_positive_6 + too_centered_6) ) GO_BENCHMARKS = { 2: GO_BENCHMARK_2D_PROBLEMS, 3: GO_BENCHMARK_3D_PROBLEMS, 4: GO_BENCHMARK_4D_PROBLEMS, 5: GO_BENCHMARK_5D_PROBLEMS, 6: GO_BENCHMARK_6D_PROBLEMS, } for problem_list in GO_BENCHMARKS.values(): for problem in problem_list: # print(problem.__realname__) assert ( problem.__realname__ != "Csendes" ), "please use Infinity instead; it's basically equivalent" class Runner: def __init__(self, multiple=2, run_anyway=3, always_run_anyway=True, quiet=False): self.multiple = multiple self.run_anyway = run_anyway self.always_run_anyway = always_run_anyway self.quiet = quiet self._wrapped = None def run( self, optimizer, objective, size, budget, frugal_percent=1.0, greedy_percent=2.0 ): note = (lambda s: None) if self.quiet else m36 warn = m33 opt_name = optimizer.__name__ obj_name = objective.__name__ obj_realname = getattr(objective, "__realname__", obj_name) wrapped_kwargs = dict( objective=objective, optimizer=optimizer, budget=budget, size=size, frugal_percent=frugal_percent, greedy_percent=greedy_percent, ) if self._wrapped is None: wrapped = COWrap(**wrapped_kwargs) else: # this can be 10+ times faster. wrapped = self._wrapped.attempt_reuse(**wrapped_kwargs) self._wrapped = wrapped results = [] run = 1 while (cache := wrapped.cached(run)) is not None: run += 1 fopt, xopt, history = cache results.append((fopt, opt_name, history)) once = False while ( run <= self.multiple or (self.always_run_anyway or not once) and self.run_anyway and run <= self.run_anyway ): # assert run == wrapped._run, (run, wrapped._run) if run != (_run := wrapped._run): warn(f"Note: updating local run count from {run} to {_run}.") run = _run continue # check conditions again note( f"Using {opt_name} to optimize {obj_realname} ({obj_name}) [{run}] ..." ) _ = optimizer(wrapped, size=size, budget=budget) fopt, xopt = wrapped.finish() result = (fopt, opt_name, wrapped.history) results.append(result) once = True run += 1 return results def main(argv, display=True): from .utilities import fib from .utilities import prune_results, perform_another_experimental_scoring_method import sys def stfu(please_be_quiet=None): if please_be_quiet is None: please_be_quiet = ( "ng_ascmadethird_cube", "ng_cmabounded_cube", "ng_cmatuning_cube", "ng_chaincmapowell_cube", "ng_chainnaivetbpsacmapowell_cube", "ng_paraportfolio_cube", "ng_rescaledcma_cube", ) for optimizer in optimizers: name = optimizer.__name__ if any(obnoxious in name for obnoxious in please_be_quiet): import warnings from cma.evolution_strategy import InjectionWarning warnings.simplefilter("ignore", InjectionWarning) # not our fault break def mark(opt_name): return ( "\033[95m@" if opt_name in ("another_random_cube", "quasirandom_cube") else "\033[96m!" if "nelder" in opt_name else " " ) def print_place(ind, names, which="best"): color = "\033[32m" if which == "best" else "\033[31m" reset, placed, tied = "\033[m", place_names[ind], " \033[90m(tied)\033[m" for i, name in enumerate(sorted(set(names))): print(tied if i else f" {color}{placed}:{reset}", name) def fancy_output(opt_name, score, price): name = opt_name.removesuffix("_cube") if type(score) is float: assert type(price) is float, "type mismatch" # unweight = 10 # len(optimizers) # sum(place_scores) unweight = runner.multiple * np.sqrt(len(optimizers)) stats = f"(score:{score * unweight:4.0f}, price:{price * unweight:4.0f})" else: stats = f"(score:{score:4}, price:{price:4})" color = 0 reset = "\033[m" # this will need adjusting depending on your terminal colors: gradient = (32, 92, 93, 33, 91) # good to bad if score == 0 and price == 0: pass # wat? elif score < 0 and price < 0: color = 35 # wat?! elif score > 0 and price == 0: color = gradient[0] # good elif score == 0 and price > 0: color = gradient[4] # awful elif score > price: color = gradient[1] # ok elif score == price: color = gradient[2] # meh elif score < price: color = gradient[3] # bad color = f"\033[{color}m" s = f"{mark(opt_name)} {name:<32}{reset} with {color}{stats}{reset}" delta = score - price if type(score) is float: delta *= unweight color = 0 if delta > 6: color = gradient[0] elif delta < -6: color = gradient[4] elif delta > 1: color = gradient[1] elif delta < -1: color = gradient[3] else: color = gradient[2] color = f"\033[{color}m" s += f" {color}{float(delta):+.0f}{reset}" # s += f" {color}{delta:+}{reset}" return s reset = "\033[m" quieter = False summarize_each_objective = True old_summary = True note = (lambda s: None) if quieter else m36 runner = Runner() percents = dict(frugal_percent=1.0, greedy_percent=2.0) which = parties[argv[1]] if len(argv) > 1 else parties["standard"] size = int(argv[2]) if len(argv) > 2 else -2 budget = int(argv[3]) if len(argv) > 3 else fib(abs(size) + 4) * 10 place_names = ("1st", "2nd", "3rd", "4th") place_scores = (5, 3, 2, 1) assert size < 0, "unsupported in this version" size = abs(size) objectives = GO_BENCHMARKS[size] # * multiple optimizers = list(which) # copy ms = ( f" ({runner.multiple}+{runner.run_anyway - runner.multiple} times)" if runner.multiple != 1 else "" ) n_obj = len(objectives) n_opt = len(optimizers) print(f"Optimizing {n_obj} objectives{ms} with {n_opt} optimizers...") stfu() pseudo_shuffled = lambda stuff: sorted(stuff, key=lambda obj: hash(repr(obj))) results = {} for optimizer in prog(pseudo_shuffled(optimizers), pref="m"): for objective in prog(pseudo_shuffled(objectives), pref="s"): new_results = runner.run(optimizer, objective, size, budget) results.setdefault(objective.__name__, []).extend(new_results) all_results = results results = prune_results(results, runner.multiple, _check=old_summary) scores, prices = {}, {} all_opt_names = set() for obj_name, obj_res in results.items(): if display and summarize_each_objective: print() m1(f"{obj_name}:") all_res = {} for fopt, opt_name, extra in obj_res: all_res.setdefault(fopt, []).append(opt_name) all_opt_names.add(opt_name) scores.setdefault(opt_name, 0.0) prices.setdefault(opt_name, 0.0) sorted_res = sorted(all_res) places = min(len(place_names), len(place_scores)) score_insignificance = sum(len(all_res[fopt]) for fopt in sorted_res[:places]) price_insignificance = sum(len(all_res[fopt]) for fopt in sorted_res[-places:]) # print("score 1/x:", obj_name, score_insignificance) # print("price 1/x:", obj_name, price_insignificance) for i, fopt in enumerate(sorted_res[:places]): for opt_i, opt_name in enumerate(all_res[fopt]): scores[opt_name] += place_scores[i] / score_insignificance if display and summarize_each_objective: print_place(i, all_res[fopt], "best") for i, fopt in enumerate(sorted_res[-places:]): mi = places - i - 1 for opt_i, opt_name in enumerate(all_res[fopt]): prices[opt_name] += place_scores[mi] / price_insignificance if display and summarize_each_objective: print_place(mi, all_res[fopt], "worst") if display: more_scores = perform_another_experimental_scoring_method(results) for blah, points in zip(("best", "worst"), (scores, prices)): if display and old_summary: print( f"\n\033[1m{blah} scoring optimizers:\033[m" f" (awards={place_scores})" f" (obj={len(objectives)}, opt={len(optimizers)})" f" (dims={size}, evals={budget})" ) for opt_name, opt_point in sorted(points.items(), key=lambda t: -t[1]): # place = place_names[i] if i < len(place_names) else " " # delta = scores.get(opt_name, 0) - prices.get(opt_name, 0) if display and old_summary: print( fancy_output( opt_name, scores.get(opt_name, 0), prices.get(opt_name, 0) ) ) positive, negative = [], [] for opt_name in sorted(all_opt_names): delta = scores.get(opt_name, 0) - prices.get(opt_name, 0) # note: this intentionally includes delta == 0 in both positive and negative. if delta >= 0: if opt_name not in positive: positive.append(opt_name) if delta <= 0: if opt_name not in negative: negative.append(opt_name) if display and not old_summary: print( f"\n\033[1malternatively scored optimizers:\033[m" f" (obj={len(objectives)}, opt={len(optimizers)})" f" (dims={size}, evals={budget})" ) for opt_name, opt_score in sorted(more_scores.items(), key=lambda t: -t[1]): # if opt_score < 1: continue stats = f"{opt_score:18.16f}" name = opt_name.removesuffix("_cube") color = ( "\033[1m" if opt_score > 1.0 else "\033[33m" if opt_score < 1.0 else "" ) s = f"{mark(opt_name)} {name:<32}{reset} with {color}{stats}{reset}" # s += f" {color}{float(delta):+.0f}{reset}" print(s) text = "# this file was automatically generated by go_benchmark_it.py,\n" text += "# any changes may be overwritten!\n" text += "PREVIOUSLY_POSITIVE = [\n" text += "".join(f' "{opt_name}",\n' for opt_name in positive) text += "]\n" text += "PREVIOUSLY_NEGATIVE = [\n" text += "".join(f' "{opt_name}",\n' for opt_name in negative) text += "]\n" if positive or negative: try: __import__("pathlib").Path("previous.py").write_text(text) except PermissionError: print("# failed to write previous.py, ignoring...") if len(argv) > 1 and argv[1] in ("positive", "negative"): all_old_opt_names = set(opt.__name__ for opt in optimizers) C = set(("quasirandom_cube", "another_random_cube")) if argv[1] == "positive" and set(positive) - C == all_old_opt_names - C: exit(2) # no changes if argv[1] == "negative" and set(negative) - C == all_old_opt_names - C: exit(2) # no changes