from go_benchmark_lists import * from go_benchmarks import problems_2d, problems_3d, problems_4d from notwacube import book_of_optimizers from prog80 import prog from utils import OWrap, COWrap, m1, m33, m36 from utils import perform_another_experimental_scoring_method import numpy as np all_problems = { 2: problems_2d, 3: problems_3d, 4: problems_4d, } tiny_offset = 1.1102230246251565e-16 def make_transform(lo, hi): mul = max(lo, hi) - min(lo, hi) add = min(lo, hi) return mul, add def flipit(transformations, flippy): flip_dim = flippy - 1 mul, add = transformations[flip_dim] desired_mul, desired_add = -1, 1 # f(g(x)) = f(1 - x) mul, add = mul * desired_mul, mul * desired_add + add transformations[flip_dim] = (mul, add) def make_objective(problem, n_dim, *, fix_stuff=0): obj = problem(n_dim) name = problem.__name__ flippy = 0 # when positive, helps removes positive correlations in solutions trippy = None # when not None, moves solution away from center: (dim, dir) if fix_stuff >= 0: if name == "Deb03": # this problem has the wrong bounds for some reason, so we have to patch it. obj._bounds = list(zip([0.0] * obj.N, [1.0] * obj.N)) elif name == "Csendes" or name == "Infinity": # this problem is weird... let's avoid division by zero, okay? # these problems are duplicates of each other. weird. replacement = n_dim * (2 + np.sin(1)) _fun = obj.fun obj.fun = lambda x: replacement if np.any(x == 0.0) else _fun(x) elif name == "Keane": # another problem that may attempt to divide by zero. _fun = obj.fun obj.fun = lambda x: 0.0 if np.all(x**2 == 0.0) else _fun(x) elif name == "Kowalik": # this divide by zero actually approaches infinity when not clipped. # TODO: there seems to be some confusion about which is `a` # and which is `b` between the equations and the code. # hmm it seems like the code is right, judging by this: # https://www.itl.nist.gov/div898/strd/nls/data/LINKS/DATA/MGH09.dat helper = lambda x: np.where(x < 0, -1, 1) * (np.abs(x) + tiny_offset) # 1.0 - 1e-16 == 0.9999999999999999 # 1.0 + 1e-16 == 1.0 # 1.0 + 2e-16 == 1.0000000000000002 # 1 / 1.1102230246251565e-16**2 == 8.112963841460668e+31 a, b = obj.a, obj.b obj.fun = ( lambda x: sum( b - (x[0] * (a**2 + a * x[1]) / helper(a**2 + a * x[2] + x[3])) ) ** 2 ) elif name == "Gulf": # just another division by zero. adjust = np.array([tiny_offset, 0.0, 0.0]) _fun = obj.fun obj.fun = lambda x: _fun(x + adjust) if fix_stuff >= 1: stuff = {2: too_positive_2, 3: too_positive_3, 4: too_positive_4}[n_dim] if name.lower() in stuff: # too positively correlated, do some evil. ind = stuff.index(name.lower()) flippy = ind % n_dim + 1 # uniformly select a dimension to "flip" if fix_stuff >= 2: stuff = {2: too_centered_2, 3: too_centered_3, 4: too_centered_4}[n_dim] if name.lower() in stuff: # uniformly select offsets to "trip". ind = stuff.index(name.lower()) trippy = (ind % n_dim, ind // n_dim % n_dim) # (dim, dir) transformations = [make_transform(lo, hi) for lo, hi in obj.bounds] if flippy: flipit(transformations, flippy) def objective(x): # assert all(xi >= 0.0 for xi in x), list(float(xi) for xi in x) # assert all(xi <= 1.0 for xi in x), list(float(xi) for xi in x) if trippy: x = list(x) # mostly to create a copy ind = trippy[0] x[ind] = 1 - (1 - x[ind]) ** 0.5 if trippy[1] else x[ind] ** 0.5 x = [xi * mul + add for xi, (mul, add) in zip(x, transformations)] # if problem.__name__.startswith("Deb"): print(x) return obj.fun(np.array(x, copy=False)) objective.__name__ = f"go_{problem.__name__.lower()}_on_cube" objective.__realname__ = problem.__name__ # objective.__qualname__ = problem.__name__ return objective def make_objectives(n_dim, n_trials=None, fix_stuff=0): problems = all_problems[n_dim] return [make_objective(problem, n_dim, fix_stuff=fix_stuff) for problem in problems] def find_objective(query, n_dim=None): results = [] for p_dim, problems in all_problems.items(): if n_dim is not None and p_dim != n_dim: continue for problem in problems: if problem.__name__.lower() == query.lower(): results.append(problem) assert results, "no results found for name " + repr(query) return results[0] λ = lambda q: make_objective(find_objective(q, 2), 2, fix_stuff=2) GO_BENCHMARK_2D_PROBLEMS = list( map(λ, totally_fine_2 + too_positive_2 + too_centered_2) ) λ = lambda q: make_objective(find_objective(q, 3), 3, fix_stuff=2) GO_BENCHMARK_3D_PROBLEMS = list( map(λ, totally_fine_3 + too_positive_3 + too_centered_3) ) λ = lambda q: make_objective(find_objective(q, 4), 4, fix_stuff=2) GO_BENCHMARK_4D_PROBLEMS = list( map(λ, totally_fine_4 + too_positive_4 + too_centered_4) ) GO_BENCHMARKS = { 2: GO_BENCHMARK_2D_PROBLEMS, 3: GO_BENCHMARK_3D_PROBLEMS, 4: GO_BENCHMARK_4D_PROBLEMS, } for problem_list in GO_BENCHMARKS.values(): for problem in problem_list: # print(problem.__realname__) assert ( problem.__realname__ != "Csendes" ), "please use Infinity instead; it's basically equivalent" if __name__ == "__main__": from tqdm import tqdm import sys def fib(n): return pow(2 << n, n + 1, (4 << 2 * n) - (2 << n) - 1) % (2 << n) def optimizer_filter(name): # worst scoring optimizers: (awards=(5, 3, 2, 1)) (obj=146, opt=389) (dims=2, evals=80) # evosax_pbt with (score: 0, price: 593) -593 # worst scoring optimizers: (awards=(5, 3, 2, 1)) (obj=70, opt=389) (dims=3, evals=1440) # evosax_pbt with (score: 0, price: 315) -315 return not ( False # or name.startswith("freelunch_krillherd_") # too buggy or name == "ng_fcma_cube" # WTF HOW ARE YOU NAN'ING ON HYPERSPHERE or name == "ngx_fcmas03_cube" # same thing but quintic or name == "ngx_zero_cube" # 0 score, high price # or name == "ngx_microcma_cube" # 0 score, high price # or name == "ng_multiscalecma_cube" # 0 score, high price or name == "evosax_pbt_cube" # 0 score, high price or name == "evosax_guidedes_cube" # does get *a little* score... or name == "freelunch_sa_ps04_cube" or name == "ngx_microcma_cube" ) def stfu(please_be_quiet=None): if please_be_quiet is None: please_be_quiet = ( "ng_ascmadethird_cube", "ng_cmabounded_cube", "ng_cmatuning_cube", "ng_chaincmapowell_cube", "ng_chainnaivetbpsacmapowell_cube", "ng_paraportfolio_cube", "ng_rescaledcma_cube", ) for optimizer in optimizers: name = optimizer.__name__ if any(obnoxious in name for obnoxious in please_be_quiet): import warnings from cma.evolution_strategy import InjectionWarning warnings.simplefilter("ignore", InjectionWarning) # not our fault break def mark(opt_name): return ( "\033[95m@" if opt_name in ("another_random_cube", "quasirandom_cube") else "\033[96m!" if "nelder" in opt_name else " " ) def fancy_output(opt_name, score, price): name = opt_name.removesuffix("_cube") if type(score) is float: assert type(price) is float, "type mismatch" # unweight = 10 # len(optimizers) # sum(place_scores) unweight = multiple * np.sqrt(len(optimizers)) stats = f"(score:{score * unweight:4.0f}, price:{price * unweight:4.0f})" else: stats = f"(score:{score:4}, price:{price:4})" color = 0 reset = "\033[m" # this will need adjusting depending on your terminal colors: gradient = (32, 92, 93, 33, 91) # good to bad if score == 0 and price == 0: pass # wat? elif score < 0 and price < 0: color = 35 # wat?! elif score > 0 and price == 0: color = gradient[0] # good elif score == 0 and price > 0: color = gradient[4] # awful elif score > price: color = gradient[1] # ok elif score == price: color = gradient[2] # meh elif score < price: color = gradient[3] # bad color = f"\033[{color}m" s = f"{mark(opt_name)} {name:<32}{reset} with {color}{stats}{reset}" delta = score - price if type(score) is float: delta *= unweight color = 0 if delta > 6: color = gradient[0] elif delta < -6: color = gradient[4] elif delta > 1: color = gradient[1] elif delta < -1: color = gradient[3] else: color = gradient[2] color = f"\033[{color}m" s += f" {color}{float(delta):+.0f}{reset}" # s += f" {color}{delta:+}{reset}" return s def prune_results(results, multiple): # if there are more than `multiple` results for one optimizer+objective pair, # then trim the bottom and top until there are only `multiple` left. new_results = {} for obj_name, obj_res in results.items(): new_res = {} for fopt, opt_name, extra in sorted(obj_res): l = new_res.setdefault(opt_name, [[], []]) l[0].append(fopt) l[1].append(extra) slices = {} for opt_name, res in new_res.items(): # in the event that an odd number of results needs to be trimmed, # prefer trimming from the bottom (i.e. worse solutions get removed first). fopts, extras = res down = (len(fopts) - multiple) // 2 up = len(fopts) - (len(fopts) - multiple + 1) // 2 slices[opt_name] = slice(down, up) for opt_name, res in new_res.items(): fopts, extras = res s = slices[opt_name] fopts, extras = fopts[s], extras[s] if not no_summary: assert len(fopts) == multiple, (len(fopts), multiple) if len(fopts) == multiple: for fopt, extra in zip(fopts, extras): result = (fopt, opt_name, extra) new_results.setdefault(obj_name, []).append(result) return results reset = "\033[m" quieter = True please_stop_the_spam = True no_summary = True if 1: multiple = 2 run_anyway = 3 # run_anyway = 7 always_run_anyway = True else: multiple = 1 run_anyway = 3 always_run_anyway = False # percents = dict(frugal_percent=0.1, greedy_percent=1.5) percents = dict(frugal_percent=1.0, greedy_percent=2.0) book = book_of_optimizers which = book[sys.argv[1]] if len(sys.argv) > 1 else book["standard"] n_dim = int(sys.argv[2]) if len(sys.argv) > 2 else -2 n_trials = int(sys.argv[3]) if len(sys.argv) > 3 else fib(abs(n_dim) + 4) * 10 place_names = ("1st", "2nd", "3rd", "4th") assert n_dim < 0, "unsupported in this version" n_dim = abs(n_dim) place_scores = (5, 3, 2, 1) objectives = GO_BENCHMARKS[n_dim] # * multiple optimizers = list(which) # copy before = len(optimizers) # if which is not book["everything"]: optimizers = [opt for opt in optimizers if optimizer_filter(opt.__name__)] after = len(optimizers) s = "s" if before - after != 1 else "" print(f"Pruned {before - after} unwanted optimizer{s}.") ms = f" ({multiple}+{run_anyway-multiple} times)" if multiple != 1 else "" n_obj = len(objectives) n_opt = len(optimizers) print(f"Optimizing {n_obj} objectives{ms} with {n_opt} optimizers...") stfu() pseudo_shuffled = lambda stuff: sorted(stuff, key=lambda obj: hash(repr(obj))) results = {} for optimizer in prog(pseudo_shuffled(optimizers), pref="m"): opt_name = optimizer.__name__ wrapped = None for objective in prog(pseudo_shuffled(objectives), pref="s"): obj_name = objective.__name__ obj_realname = getattr(objective, "__realname__", obj_name) if wrapped is None: wrapped = COWrap( objective, optimizer=optimizer, n_trials=n_trials, n_dim=n_dim, **percents, ) else: wrapped.objective = objective # 10+ times faster run = 1 while (cache := wrapped.cached(run)) is not None: run += 1 fopt, xopt, history = cache results.setdefault(obj_name, []).append((fopt, opt_name, history)) note = (lambda s: None) if quieter else m36 once = False while ( run <= multiple or (always_run_anyway or not once) and run_anyway and run <= run_anyway ): # assert run == wrapped._run, (run, wrapped._run) if run != (_run := wrapped._run): m33(f"Note: updating local run count from {run} to {_run}.") run = _run continue # check conditions again note( f"Using {opt_name} to optimize {obj_realname} ({obj_name}) [{run}] ..." ) _ = optimizer(wrapped, n_trials=n_trials, n_dim=n_dim, with_count=False) fopt, xopt = wrapped.finish() result = (fopt, opt_name, wrapped.history) results.setdefault(obj_name, []).append(result) once = True run += 1 all_results = results results = prune_results(results, multiple) scores, prices = {}, {} all_opt_names = set() for obj_name, obj_res in results.items(): if not please_stop_the_spam: print() m1(f"{obj_name}:") all_res = {} for fopt, opt_name, extra in obj_res: all_res.setdefault(fopt, []).append(opt_name) all_opt_names.add(opt_name) scores.setdefault(opt_name, 0.0) prices.setdefault(opt_name, 0.0) sorted_res = sorted(all_res) score_insignificance = sum( len(all_res[fopt]) for _, fopt in zip(range(len(place_scores)), sorted_res) ) price_insignificance = sum( len(all_res[fopt]) for _, fopt in zip(range(len(place_scores)), reversed(sorted_res)) ) # print("score 1/x:", obj_name, score_insignificance) # print("price 1/x:", obj_name, price_insignificance) for i, fopt in enumerate(sorted_res): # if i >= len(place_scores): # TODO: just make this part of the loop. # break mi = len(all_res) - i - 1 if i < len(place_scores): for opt_name in all_res[fopt]: scores[opt_name] = ( scores[opt_name] + place_scores[i] / score_insignificance ) if mi < len(place_scores): for opt_name in all_res[fopt]: prices[opt_name] = ( prices[opt_name] + place_scores[mi] / price_insignificance ) more_scores = perform_another_experimental_scoring_method(results) for blah, points in zip(("best", "worst"), (scores, prices)): if not no_summary: print( f"\n\033[1m{blah} scoring optimizers:\033[m" f" (awards={place_scores})" f" (obj={len(objectives)}, opt={len(optimizers)})" f" (dims={n_dim}, evals={n_trials})" ) for opt_name, opt_point in sorted(points.items(), key=lambda t: -t[1]): # place = place_names[i] if i < len(place_names) else " " # delta = scores.get(opt_name, 0) - prices.get(opt_name, 0) if not no_summary: print( fancy_output( opt_name, scores.get(opt_name, 0), prices.get(opt_name, 0) ) ) positive, negative = [], [] for opt_name in sorted(all_opt_names): delta = scores.get(opt_name, 0) - prices.get(opt_name, 0) # note: this intentionally includes delta == 0 in both positive and negative. if delta >= 0: if opt_name not in positive: positive.append(opt_name) if delta <= 0: if opt_name not in negative: negative.append(opt_name) if no_summary: print( f"\n\033[1malternatively scored optimizers:\033[m" f" (awards={place_scores})" f" (obj={len(objectives)}, opt={len(optimizers)})" f" (dims={n_dim}, evals={n_trials})" ) for opt_name, opt_score in sorted(more_scores.items(), key=lambda t: -t[1]): # if opt_score < 1: continue stats = f"{opt_score:18.16f}" name = opt_name.removesuffix("_cube") color = ( "\033[1m" if opt_score > 1.0 else "\033[33m" if opt_score < 1.0 else "" ) s = f"{mark(opt_name)} {name:<32}{reset} with {color}{stats}{reset}" # s += f" {color}{float(delta):+.0f}{reset}" print(s) text = "# this file was automatically generated by go_benchmark_it.py,\n" text += "# any changes may be overwritten!\n" text += "PREVIOUSLY_POSITIVE = [\n" text += "".join(f' "{opt_name}",\n' for opt_name in positive) text += "]\n" text += "PREVIOUSLY_NEGATIVE = [\n" text += "".join(f' "{opt_name}",\n' for opt_name in negative) text += "]\n" if positive or negative: try: __import__("pathlib").Path("previous.py").write_text(text) except PermissionError: print("# failed to write previous.py, ignoring...") if len(sys.argv) > 1 and sys.argv[1] in ("positive", "negative"): all_old_opt_names = set(opt.__name__ for opt in optimizers) C = set(("quasirandom_cube", "another_random_cube")) if sys.argv[1] == "positive" and set(positive) - C == all_old_opt_names - C: exit(2) # no changes if sys.argv[1] == "negative" and set(negative) - C == all_old_opt_names - C: exit(2) # no changes