497 lines
19 KiB
Python
497 lines
19 KiB
Python
from go_benchmark_lists import *
|
|
from go_benchmarks import problems_2d, problems_3d, problems_4d
|
|
from notwacube import book_of_optimizers
|
|
from prog80 import prog
|
|
from utils import OWrap, COWrap, m1, m33, m36
|
|
from utils import perform_another_experimental_scoring_method
|
|
import numpy as np
|
|
|
|
all_problems = {
|
|
2: problems_2d,
|
|
3: problems_3d,
|
|
4: problems_4d,
|
|
}
|
|
|
|
tiny_offset = 1.1102230246251565e-16
|
|
|
|
|
|
def make_transform(lo, hi):
|
|
mul = max(lo, hi) - min(lo, hi)
|
|
add = min(lo, hi)
|
|
return mul, add
|
|
|
|
|
|
def flipit(transformations, flippy):
|
|
flip_dim = flippy - 1
|
|
mul, add = transformations[flip_dim]
|
|
desired_mul, desired_add = -1, 1 # f(g(x)) = f(1 - x)
|
|
mul, add = mul * desired_mul, mul * desired_add + add
|
|
transformations[flip_dim] = (mul, add)
|
|
|
|
|
|
def make_objective(problem, n_dim, *, fix_stuff=0):
|
|
obj = problem(n_dim)
|
|
name = problem.__name__
|
|
flippy = 0 # when positive, helps removes positive correlations in solutions
|
|
trippy = None # when not None, moves solution away from center: (dim, dir)
|
|
|
|
if fix_stuff >= 0:
|
|
if name == "Deb03":
|
|
# this problem has the wrong bounds for some reason, so we have to patch it.
|
|
obj._bounds = list(zip([0.0] * obj.N, [1.0] * obj.N))
|
|
elif name == "Csendes" or name == "Infinity":
|
|
# this problem is weird... let's avoid division by zero, okay?
|
|
# these problems are duplicates of each other. weird.
|
|
replacement = n_dim * (2 + np.sin(1))
|
|
_fun = obj.fun
|
|
obj.fun = lambda x: replacement if np.any(x == 0.0) else _fun(x)
|
|
elif name == "Keane":
|
|
# another problem that may attempt to divide by zero.
|
|
_fun = obj.fun
|
|
obj.fun = lambda x: 0.0 if np.all(x**2 == 0.0) else _fun(x)
|
|
elif name == "Kowalik":
|
|
# this divide by zero actually approaches infinity when not clipped.
|
|
# TODO: there seems to be some confusion about which is `a`
|
|
# and which is `b` between the equations and the code.
|
|
# hmm it seems like the code is right, judging by this:
|
|
# https://www.itl.nist.gov/div898/strd/nls/data/LINKS/DATA/MGH09.dat
|
|
helper = lambda x: np.where(x < 0, -1, 1) * (
|
|
np.abs(x) + tiny_offset
|
|
)
|
|
# 1.0 - 1e-16 == 0.9999999999999999
|
|
# 1.0 + 1e-16 == 1.0
|
|
# 1.0 + 2e-16 == 1.0000000000000002
|
|
# 1 / 1.1102230246251565e-16**2 == 8.112963841460668e+31
|
|
a, b = obj.a, obj.b
|
|
obj.fun = (
|
|
lambda x: sum(
|
|
b - (x[0] * (a**2 + a * x[1]) / helper(a**2 + a * x[2] + x[3]))
|
|
)
|
|
** 2
|
|
)
|
|
elif name == "Gulf":
|
|
# just another division by zero.
|
|
adjust = np.array([tiny_offset, 0.0, 0.0])
|
|
_fun = obj.fun
|
|
obj.fun = lambda x: _fun(x + adjust)
|
|
|
|
if fix_stuff >= 1:
|
|
stuff = {2: too_positive_2, 3: too_positive_3, 4: too_positive_4}[n_dim]
|
|
if name.lower() in stuff:
|
|
# too positively correlated, do some evil.
|
|
ind = stuff.index(name.lower())
|
|
flippy = ind % n_dim + 1 # uniformly select a dimension to "flip"
|
|
|
|
if fix_stuff >= 2:
|
|
stuff = {2: too_centered_2, 3: too_centered_3, 4: too_centered_4}[n_dim]
|
|
if name.lower() in stuff:
|
|
# uniformly select offsets to "trip".
|
|
ind = stuff.index(name.lower())
|
|
trippy = (ind % n_dim, ind // n_dim % n_dim) # (dim, dir)
|
|
|
|
transformations = [make_transform(lo, hi) for lo, hi in obj.bounds]
|
|
|
|
if flippy:
|
|
flipit(transformations, flippy)
|
|
|
|
def objective(x):
|
|
# assert all(xi >= 0.0 for xi in x), list(float(xi) for xi in x)
|
|
# assert all(xi <= 1.0 for xi in x), list(float(xi) for xi in x)
|
|
if trippy:
|
|
x = list(x) # mostly to create a copy
|
|
ind = trippy[0]
|
|
x[ind] = 1 - (1 - x[ind]) ** 0.5 if trippy[1] else x[ind] ** 0.5
|
|
x = [xi * mul + add for xi, (mul, add) in zip(x, transformations)]
|
|
# if problem.__name__.startswith("Deb"): print(x)
|
|
return obj.fun(np.array(x, copy=False))
|
|
|
|
objective.__name__ = f"go_{problem.__name__.lower()}_on_cube"
|
|
objective.__realname__ = problem.__name__
|
|
# objective.__qualname__ = problem.__name__
|
|
return objective
|
|
|
|
|
|
def make_objectives(n_dim, n_trials=None, fix_stuff=0):
|
|
problems = all_problems[n_dim]
|
|
return [make_objective(problem, n_dim, fix_stuff=fix_stuff) for problem in problems]
|
|
|
|
|
|
def find_objective(query, n_dim=None):
|
|
results = []
|
|
for p_dim, problems in all_problems.items():
|
|
if n_dim is not None and p_dim != n_dim:
|
|
continue
|
|
for problem in problems:
|
|
if problem.__name__.lower() == query.lower():
|
|
results.append(problem)
|
|
assert results, "no results found for name " + repr(query)
|
|
return results[0]
|
|
|
|
|
|
λ = lambda q: make_objective(find_objective(q, 2), 2, fix_stuff=2)
|
|
GO_BENCHMARK_2D_PROBLEMS = list(
|
|
map(λ, totally_fine_2 + too_positive_2 + too_centered_2)
|
|
)
|
|
|
|
λ = lambda q: make_objective(find_objective(q, 3), 3, fix_stuff=2)
|
|
GO_BENCHMARK_3D_PROBLEMS = list(
|
|
map(λ, totally_fine_3 + too_positive_3 + too_centered_3)
|
|
)
|
|
|
|
λ = lambda q: make_objective(find_objective(q, 4), 4, fix_stuff=2)
|
|
GO_BENCHMARK_4D_PROBLEMS = list(
|
|
map(λ, totally_fine_4 + too_positive_4 + too_centered_4)
|
|
)
|
|
|
|
GO_BENCHMARKS = {
|
|
2: GO_BENCHMARK_2D_PROBLEMS,
|
|
3: GO_BENCHMARK_3D_PROBLEMS,
|
|
4: GO_BENCHMARK_4D_PROBLEMS,
|
|
}
|
|
|
|
for problem_list in GO_BENCHMARKS.values():
|
|
for problem in problem_list:
|
|
# print(problem.__realname__)
|
|
assert (
|
|
problem.__realname__ != "Csendes"
|
|
), "please use Infinity instead; it's basically equivalent"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from tqdm import tqdm
|
|
import sys
|
|
|
|
def fib(n):
|
|
return pow(2 << n, n + 1, (4 << 2 * n) - (2 << n) - 1) % (2 << n)
|
|
|
|
def optimizer_filter(name):
|
|
# worst scoring optimizers: (awards=(5, 3, 2, 1)) (obj=146, opt=389) (dims=2, evals=80)
|
|
# evosax_pbt with (score: 0, price: 593) -593
|
|
# worst scoring optimizers: (awards=(5, 3, 2, 1)) (obj=70, opt=389) (dims=3, evals=1440)
|
|
# evosax_pbt with (score: 0, price: 315) -315
|
|
return not (
|
|
False
|
|
# or name.startswith("freelunch_krillherd_") # too buggy
|
|
or name == "ng_fcma_cube" # WTF HOW ARE YOU NAN'ING ON HYPERSPHERE
|
|
or name == "ngx_fcmas03_cube" # same thing but quintic
|
|
or name == "ngx_zero_cube" # 0 score, high price
|
|
# or name == "ngx_microcma_cube" # 0 score, high price
|
|
# or name == "ng_multiscalecma_cube" # 0 score, high price
|
|
or name == "evosax_pbt_cube" # 0 score, high price
|
|
or name == "evosax_guidedes_cube" # does get *a little* score...
|
|
or name == "freelunch_sa_ps04_cube"
|
|
or name == "ngx_microcma_cube"
|
|
)
|
|
|
|
def stfu(please_be_quiet=None):
|
|
if please_be_quiet is None:
|
|
please_be_quiet = (
|
|
"ng_ascmadethird_cube",
|
|
"ng_cmabounded_cube",
|
|
"ng_cmatuning_cube",
|
|
"ng_chaincmapowell_cube",
|
|
"ng_chainnaivetbpsacmapowell_cube",
|
|
"ng_paraportfolio_cube",
|
|
"ng_rescaledcma_cube",
|
|
)
|
|
|
|
for optimizer in optimizers:
|
|
name = optimizer.__name__
|
|
if any(obnoxious in name for obnoxious in please_be_quiet):
|
|
import warnings
|
|
from cma.evolution_strategy import InjectionWarning
|
|
|
|
warnings.simplefilter("ignore", InjectionWarning) # not our fault
|
|
break
|
|
|
|
def mark(opt_name):
|
|
return (
|
|
"\033[95m@"
|
|
if opt_name in ("another_random_cube", "quasirandom_cube")
|
|
else "\033[96m!"
|
|
if "nelder" in opt_name
|
|
else " "
|
|
)
|
|
|
|
def fancy_output(opt_name, score, price):
|
|
name = opt_name.removesuffix("_cube")
|
|
if type(score) is float:
|
|
assert type(price) is float, "type mismatch"
|
|
# unweight = 10 # len(optimizers) # sum(place_scores)
|
|
unweight = multiple * np.sqrt(len(optimizers))
|
|
stats = f"(score:{score * unweight:4.0f}, price:{price * unweight:4.0f})"
|
|
else:
|
|
stats = f"(score:{score:4}, price:{price:4})"
|
|
color = 0
|
|
reset = "\033[m"
|
|
# this will need adjusting depending on your terminal colors:
|
|
gradient = (32, 92, 93, 33, 91) # good to bad
|
|
|
|
if score == 0 and price == 0:
|
|
pass # wat?
|
|
elif score < 0 and price < 0:
|
|
color = 35 # wat?!
|
|
elif score > 0 and price == 0:
|
|
color = gradient[0] # good
|
|
elif score == 0 and price > 0:
|
|
color = gradient[4] # awful
|
|
elif score > price:
|
|
color = gradient[1] # ok
|
|
elif score == price:
|
|
color = gradient[2] # meh
|
|
elif score < price:
|
|
color = gradient[3] # bad
|
|
color = f"\033[{color}m"
|
|
|
|
s = f"{mark(opt_name)} {name:<32}{reset} with {color}{stats}{reset}"
|
|
|
|
delta = score - price
|
|
if type(score) is float:
|
|
delta *= unweight
|
|
color = 0
|
|
if delta > 6:
|
|
color = gradient[0]
|
|
elif delta < -6:
|
|
color = gradient[4]
|
|
elif delta > 1:
|
|
color = gradient[1]
|
|
elif delta < -1:
|
|
color = gradient[3]
|
|
else:
|
|
color = gradient[2]
|
|
color = f"\033[{color}m"
|
|
s += f" {color}{float(delta):+.0f}{reset}"
|
|
# s += f" {color}{delta:+}{reset}"
|
|
|
|
return s
|
|
|
|
def prune_results(results, multiple):
|
|
# if there are more than `multiple` results for one optimizer+objective pair,
|
|
# then trim the bottom and top until there are only `multiple` left.
|
|
new_results = {}
|
|
for obj_name, obj_res in results.items():
|
|
new_res = {}
|
|
for fopt, opt_name in sorted(obj_res):
|
|
new_res.setdefault(opt_name, []).append(fopt)
|
|
for opt_name, fopts in new_res.items():
|
|
# in the event that an odd number of results needs to be trimmed,
|
|
# prefer trimming from the bottom (i.e. worse solutions get removed first).
|
|
down = (len(fopts) - multiple) // 2
|
|
up = len(fopts) - (len(fopts) - multiple + 1) // 2
|
|
# print("asdf", len(fopts), down, up)
|
|
new_res[opt_name] = fopts[down:up]
|
|
for opt_name, fopts in new_res.items():
|
|
if not no_summary:
|
|
assert len(fopts) == multiple, (len(fopts), multiple)
|
|
if len(fopts) == multiple:
|
|
for fopt in fopts:
|
|
new_results.setdefault(obj_name, []).append((fopt, opt_name))
|
|
return results
|
|
|
|
reset = "\033[m"
|
|
|
|
quieter = True
|
|
please_stop_the_spam = True
|
|
no_summary = True
|
|
|
|
if 1:
|
|
multiple = 2
|
|
run_anyway = 3 # run_anyway = 7
|
|
always_run_anyway = True
|
|
else:
|
|
multiple = 1
|
|
run_anyway = 3
|
|
always_run_anyway = False
|
|
|
|
#percents = dict(frugal_percent=0.1, greedy_percent=1.5)
|
|
percents = dict(frugal_percent=1.0, greedy_percent=2.0)
|
|
|
|
book = book_of_optimizers
|
|
which = book[sys.argv[1]] if len(sys.argv) > 1 else book["standard"]
|
|
n_dim = int(sys.argv[2]) if len(sys.argv) > 2 else -2
|
|
n_trials = int(sys.argv[3]) if len(sys.argv) > 3 else fib(abs(n_dim) + 4) * 10
|
|
|
|
place_names = ("1st", "2nd", "3rd", "4th")
|
|
assert n_dim < 0, "unsupported in this version"
|
|
n_dim = abs(n_dim)
|
|
place_scores = (5, 3, 2, 1)
|
|
objectives = GO_BENCHMARKS[n_dim] # * multiple
|
|
|
|
optimizers = list(which) # copy
|
|
before = len(optimizers)
|
|
# if which is not book["everything"]:
|
|
optimizers = [opt for opt in optimizers if optimizer_filter(opt.__name__)]
|
|
after = len(optimizers)
|
|
s = "s" if before - after != 1 else ""
|
|
print(f"Pruned {before - after} unwanted optimizer{s}.")
|
|
|
|
ms = f" ({multiple} times)" if multiple != 1 else ""
|
|
n_obj = len(objectives)
|
|
n_opt = len(optimizers)
|
|
print(f"Optimizing {n_obj} objectives{ms} with {n_opt} optimizers...")
|
|
|
|
stfu()
|
|
|
|
pseudo_shuffled = lambda stuff: sorted(stuff, key=lambda obj: hash(repr(obj)))
|
|
|
|
results = {}
|
|
for optimizer in prog(pseudo_shuffled(optimizers), pref="m"):
|
|
opt_name = optimizer.__name__
|
|
wrapped = None
|
|
for objective in prog(pseudo_shuffled(objectives), pref="s"):
|
|
obj_name = objective.__name__
|
|
obj_realname = getattr(objective, "__realname__", obj_name)
|
|
|
|
if wrapped is None:
|
|
wrapped = COWrap(
|
|
objective,
|
|
optimizer=optimizer,
|
|
n_trials=n_trials,
|
|
n_dim=n_dim,
|
|
**percents,
|
|
)
|
|
else:
|
|
wrapped.objective = objective # 10+ times faster
|
|
|
|
run = 1
|
|
while (cache := wrapped.cached(run)) is not None:
|
|
run += 1
|
|
fopt, xopt = cache
|
|
results.setdefault(obj_name, []).append((fopt, opt_name))
|
|
|
|
note = (lambda s: None) if quieter else m36
|
|
|
|
once = False
|
|
while (
|
|
run <= multiple
|
|
or (always_run_anyway or not once)
|
|
and run_anyway
|
|
and run <= run_anyway
|
|
):
|
|
# assert run == wrapped._run, (run, wrapped._run)
|
|
if run != (_run := wrapped._run):
|
|
m33(f"Note: updating local run count from {run} to {_run}.")
|
|
run = _run
|
|
continue # check conditions again
|
|
|
|
note(
|
|
f"Using {opt_name} to optimize {obj_realname} ({obj_name}) [{run}] ..."
|
|
)
|
|
_ = optimizer(wrapped, n_trials=n_trials, n_dim=n_dim, with_count=False)
|
|
fopt, xopt = wrapped.finish()
|
|
results.setdefault(obj_name, []).append((fopt, opt_name))
|
|
once = True
|
|
run += 1
|
|
|
|
all_results = results
|
|
results = prune_results(results, multiple)
|
|
|
|
scores, prices = {}, {}
|
|
all_opt_names = set()
|
|
for obj_name, obj_res in results.items():
|
|
if not please_stop_the_spam:
|
|
print()
|
|
m1(f"{obj_name}:")
|
|
all_res = {}
|
|
for fopt, opt_name in obj_res:
|
|
all_res.setdefault(fopt, []).append(opt_name)
|
|
all_opt_names.add(opt_name)
|
|
scores.setdefault(opt_name, 0.0)
|
|
prices.setdefault(opt_name, 0.0)
|
|
sorted_res = sorted(all_res)
|
|
score_insignificance = sum(
|
|
len(all_res[fopt])
|
|
for _, fopt in zip(range(len(place_scores)), sorted_res)
|
|
)
|
|
price_insignificance = sum(
|
|
len(all_res[fopt])
|
|
for _, fopt in zip(range(len(place_scores)), reversed(sorted_res))
|
|
)
|
|
# print("score 1/x:", obj_name, score_insignificance)
|
|
# print("price 1/x:", obj_name, price_insignificance)
|
|
for i, fopt in enumerate(sorted_res):
|
|
# if i >= len(place_scores): # TODO: just make this part of the loop.
|
|
# break
|
|
mi = len(all_res) - i - 1
|
|
if i < len(place_scores):
|
|
for opt_name in all_res[fopt]:
|
|
scores[opt_name] = (
|
|
scores[opt_name] + place_scores[i] / score_insignificance
|
|
)
|
|
if mi < len(place_scores):
|
|
for opt_name in all_res[fopt]:
|
|
prices[opt_name] = (
|
|
prices[opt_name] + place_scores[mi] / price_insignificance
|
|
)
|
|
|
|
more_scores = perform_another_experimental_scoring_method(results)
|
|
|
|
for blah, points in zip(("best", "worst"), (scores, prices)):
|
|
if not no_summary:
|
|
print(
|
|
f"\n\033[1m{blah} scoring optimizers:\033[m"
|
|
f" (awards={place_scores})"
|
|
f" (obj={len(objectives)}, opt={len(optimizers)})"
|
|
f" (dims={n_dim}, evals={n_trials})"
|
|
)
|
|
for opt_name, opt_point in sorted(points.items(), key=lambda t: -t[1]):
|
|
# place = place_names[i] if i < len(place_names) else " "
|
|
# delta = scores.get(opt_name, 0) - prices.get(opt_name, 0)
|
|
if not no_summary:
|
|
print(
|
|
fancy_output(
|
|
opt_name, scores.get(opt_name, 0), prices.get(opt_name, 0)
|
|
)
|
|
)
|
|
|
|
positive, negative = [], []
|
|
for opt_name in sorted(all_opt_names):
|
|
delta = scores.get(opt_name, 0) - prices.get(opt_name, 0)
|
|
# note: this intentionally includes delta == 0 in both positive and negative.
|
|
if delta >= 0:
|
|
if opt_name not in positive:
|
|
positive.append(opt_name)
|
|
if delta <= 0:
|
|
if opt_name not in negative:
|
|
negative.append(opt_name)
|
|
|
|
if no_summary:
|
|
print(
|
|
f"\n\033[1malternatively scored optimizers:\033[m"
|
|
f" (awards={place_scores})"
|
|
f" (obj={len(objectives)}, opt={len(optimizers)})"
|
|
f" (dims={n_dim}, evals={n_trials})"
|
|
)
|
|
for opt_name, opt_score in sorted(more_scores.items(), key=lambda t: -t[1]):
|
|
# if opt_score < 1: continue
|
|
stats = f"{opt_score:18.16f}"
|
|
name = opt_name.removesuffix("_cube")
|
|
color = "\033[1m" if opt_score > 1.0 else "\033[33m" if opt_score < 1.0 else ""
|
|
s = f"{mark(opt_name)} {name:<32}{reset} with {color}{stats}{reset}"
|
|
# s += f" {color}{float(delta):+.0f}{reset}"
|
|
print(s)
|
|
|
|
text = "# this file was automatically generated by go_benchmark_it.py,\n"
|
|
text += "# any changes may be overwritten!\n"
|
|
|
|
text += "PREVIOUSLY_POSITIVE = [\n"
|
|
text += "".join(f' "{opt_name}",\n' for opt_name in positive)
|
|
text += "]\n"
|
|
|
|
text += "PREVIOUSLY_NEGATIVE = [\n"
|
|
text += "".join(f' "{opt_name}",\n' for opt_name in negative)
|
|
text += "]\n"
|
|
|
|
if positive or negative:
|
|
try:
|
|
__import__("pathlib").Path("previous.py").write_text(text)
|
|
except PermissionError:
|
|
print("# failed to write previous.py, ignoring...")
|
|
|
|
if len(sys.argv) > 1 and sys.argv[1] in ("positive", "negative"):
|
|
all_old_opt_names = set(opt.__name__ for opt in optimizers)
|
|
C = set(("quasirandom_cube", "another_random_cube"))
|
|
if sys.argv[1] == "positive" and set(positive) - C == all_old_opt_names - C:
|
|
exit(2) # no changes
|
|
if sys.argv[1] == "negative" and set(negative) - C == all_old_opt_names - C:
|
|
exit(2) # no changes
|