335 lines
9.6 KiB
Python
335 lines
9.6 KiB
Python
# from collections import namedtuple
|
||
from functools import partial
|
||
|
||
try:
|
||
from plots import Plot, CleanPlot, CleanPlotUnity, SquareCenterPlot
|
||
except ModuleNotFoundError: # matplotlib might not be installed
|
||
pass
|
||
|
||
# NOTE: these comments were written before autograd died, RIP.
|
||
# i'll probably wind up using aesara, or JAX if i'm desperate,
|
||
# so this kind of thing will have to be done differently.
|
||
# TODO: define gradients by hand,
|
||
# import both numpy and autograd.numpy,
|
||
# use numpy for forward passes.
|
||
# TODO: add gradient-checking stuff.
|
||
# import autograd.numpy as np
|
||
import numpy as np
|
||
|
||
|
||
invphi = (np.sqrt(5) - 1) / 2 # 1/phi
|
||
invphi2 = (3 - np.sqrt(5)) / 2 # 1/phi^2
|
||
|
||
|
||
class rpartial(partial):
|
||
"""`rpartial` functions similarly to `functools.partial`, but
|
||
prepends additional arguments instead of appending them."""
|
||
|
||
# https://stackoverflow.com/a/11831662
|
||
def __call__(self, *args, **kwargs):
|
||
kw = self.keywords.copy()
|
||
kw.update(kwargs)
|
||
return self.func(*(args + self.args), **kwargs)
|
||
|
||
|
||
class Timer:
|
||
def __init__(self, title="Untitled timer", *, verbose=True, cumulative=False):
|
||
from time import time
|
||
|
||
self.title = str(title)
|
||
self.verbose = bool(verbose)
|
||
self.cumulative = bool(cumulative)
|
||
self.time = time
|
||
self.reset()
|
||
|
||
def reset(self):
|
||
self.then = None
|
||
self.now = None
|
||
self.cum = 0.0
|
||
|
||
def __enter__(self):
|
||
self.then = self.time()
|
||
|
||
def __exit__(self, exc_type, exc, exc_tb):
|
||
self.now = self.time()
|
||
if self.cumulative:
|
||
self.cum += self.now - self.then
|
||
if self.verbose:
|
||
self.print()
|
||
|
||
def print(self, f=None):
|
||
from sys import stderr
|
||
|
||
elapsed = self.cum if self.cumulative else self.now - self.then
|
||
f = stderr if f is None else f
|
||
print(f"{self.title:>30}: {elapsed:8.3f} seconds", file=stderr)
|
||
|
||
|
||
class CumTimer(Timer):
|
||
def __init__(self, title="Untitled cumulative timer"):
|
||
super().__init__(title, verbose=False, cumulative=True)
|
||
|
||
|
||
def div0(a, b):
|
||
"""Return the element-wise division of array `a` by array `b`,
|
||
whereby division by zero equals zero."""
|
||
a = np.asanyarray(a)
|
||
b = np.asanyarray(b)
|
||
with np.errstate(divide="ignore", invalid="ignore"):
|
||
c = np.true_divide(a, b)
|
||
if np.ndim(c) == 0: # if np.isscalar(c):
|
||
return c if np.isfinite(c) else c.dtype.type(0)
|
||
c[~np.isfinite(c)] = 0 # -inf, inf, and NaN get set
|
||
return c
|
||
|
||
|
||
def invsqrt(a):
|
||
"""Return the element-wise inverse square-root of the array `a`."""
|
||
return np.reciprocal(np.sqrt(a))
|
||
|
||
|
||
def sab(a, axis=None):
|
||
"""Return the sum of absolute values of
|
||
the array `a` along its axis `axis`."""
|
||
return np.sum(np.abs(a), axis=axis)
|
||
|
||
|
||
def ssq(a, axis=None):
|
||
"""Return the halved sum-of-squares of
|
||
the array `a` along its axis `axis`."""
|
||
return 0.5 * np.sum(np.square(a), axis=axis)
|
||
|
||
|
||
def rms(a, axis=None):
|
||
"""Return the root-mean-square of
|
||
the array `a` along its axis `axis`."""
|
||
return np.sqrt(np.mean(np.square(a), axis=axis))
|
||
|
||
|
||
def cossim(x, y):
|
||
"""Return the cosine-similarity of
|
||
the arrays `x` and `y`.
|
||
(The exact behavior of non-vector arguments is undefined for now.)"""
|
||
return np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y))
|
||
|
||
|
||
class Regulator: # NOTE: should've been called Regularizer, whoops?
|
||
# TODO: support linear constraints.
|
||
|
||
def __init__(self, *args, r1=0.0, r2=0.0, r2s=0.0, rinf=0.0, ridges=None):
|
||
# this way i don't have to worry about arbitrary ordering:
|
||
assert len(args) == 0, "all arguments of Regulator must be given by keywords"
|
||
|
||
self.r1 = float(r1) # L1 aka Lasso
|
||
self.r2 = float(r2) # L2 aka Euclidian
|
||
self.r2s = float(r2s) # L2-squared aka Ridge
|
||
self.rinf = float(rinf) # L-infinity (L1-ball)
|
||
|
||
# "ridges" are just Ridge penalties with a non-zero bias,
|
||
# pulling the solution towards `point` by `coeff`.
|
||
# this is used in optimizers such as in arxiv:1801.02982.
|
||
# FIXME: but it isn't part of the proximal equation!
|
||
self.ridges = []
|
||
if ridges is not None:
|
||
for coeff, point in ridges:
|
||
self.push_ridge(coeff, point)
|
||
|
||
def push_ridge(self, coeff, point):
|
||
pair = (float(coeff), np.array(point, float, copy=False))
|
||
self.ridges.append(pair)
|
||
|
||
def copy(self):
|
||
"""creates a shallow copy of the Regulator."""
|
||
return Regulator(
|
||
r1=self.r1, r2=self.r2, r2s=self.r2s, rinf=self.rinf, ridges=self.ridges
|
||
)
|
||
|
||
def regulate(self, x):
|
||
# avoid using += here for autograd.
|
||
y = 0
|
||
if self.r1 > 0:
|
||
y = y + self.r1 * np.sum(np.abs(x))
|
||
if self.r2 > 0 or self.r2s > 0:
|
||
sum_of_squares = np.sum(np.square(x))
|
||
if self.r2 > 0:
|
||
y = y + self.r2 * np.sqrt(sum_of_squares)
|
||
if self.rinf > 0:
|
||
y = y + self.rinf * np.max(np.abs(x))
|
||
|
||
if self.r2s > 0:
|
||
y = y + self.r2s * 0.5 * sum_of_squares
|
||
for coeff, point in self.ridges:
|
||
y = y + coeff * ssq(x - point)
|
||
return y
|
||
|
||
def solve(self, x, g, rate=1.0): # solves the proximal
|
||
if rate > 0:
|
||
y = x - rate * g
|
||
else:
|
||
assert rate == 0, rate
|
||
return x
|
||
for coeff, point in self.ridges:
|
||
y += rate * coeff * point
|
||
|
||
if self.r1 > 0:
|
||
y = np.maximum(np.abs(y) - self.r1 * rate, 0) * np.sign(y)
|
||
if self.r2 > 0:
|
||
y = np.maximum(1 - rate * self.r2 / np.linalg.norm(y), 0) * y
|
||
if self.rinf > 0:
|
||
y = np.maximum(1 - rate * self.rinf / np.max(np.abs(y)), 0) * y
|
||
|
||
denominator = 1
|
||
if self.r2s > 0:
|
||
denominator += rate * self.r2s
|
||
for coeff, point in self.ridges:
|
||
denominator += rate * coeff
|
||
return y / denominator
|
||
|
||
|
||
class RandomIndices:
|
||
def __init__(self, n: int):
|
||
self.reset(n)
|
||
|
||
def reset(self, n=None):
|
||
if n is not None:
|
||
assert n >= 1, n
|
||
self.n = n
|
||
self.indices = np.arange(n)
|
||
self.i = 0
|
||
np.random.shuffle(self.indices)
|
||
|
||
def draw(self, n):
|
||
return zip(range(n), self)
|
||
|
||
def __next__(self):
|
||
ret = self.indices[self.i]
|
||
self.i += 1
|
||
if self.i == self.n:
|
||
self.i = 0
|
||
self.reset()
|
||
return ret
|
||
|
||
def __iter__(self):
|
||
return self
|
||
|
||
|
||
def estimate_condition(f, g, x, scale=1.0, iters=100):
|
||
"""Estimate the convexity and L-smoothness of the function `f`.
|
||
`g` should return the gradient of `f`.
|
||
Noise is centered around `x` with scale `scale`."""
|
||
|
||
# i.e. the condition of the hessian.
|
||
|
||
norm2 = lambda a: np.linalg.norm(a.ravel())
|
||
convexity, smoothness = np.inf, 0
|
||
|
||
fx, gx = f(x), g(x)
|
||
for i in range(iters):
|
||
y = x + np.random.normal(0, scale, size=x.shape)
|
||
fy, gy = f(y), g(y)
|
||
|
||
if 0:
|
||
# f(y) >= f(x) + g(x) @ (y - x) + σ * ssq(x - y)
|
||
σ = (fy - fx - np.dot(gx, (y - x))) / ssq(y - x)
|
||
if σ < convexity:
|
||
convexity = σ
|
||
|
||
# norm2(g(x) - g(y)) <= L * norm2(x - y)
|
||
L = norm2(gx - gy) / norm2(y - x)
|
||
# NOTE: i think the sqrt in norm2 can be pulled out like:
|
||
# np.sqrt(np.sum(np.square(gx - gy)) / np.sum(np.square(x - y)))
|
||
if L > smoothness:
|
||
smoothness = L
|
||
else:
|
||
# (gx - gy) @ (x - y) >= σ * norm2(x - y)**2
|
||
blah = (gx - gy) @ (x - y) / np.sum(np.square(x - y))
|
||
if blah < convexity:
|
||
convexity = blah
|
||
# f(y) >= f(x) + dot(g(x), y - x) + σ/2 * sum(square(x - y))
|
||
# TODO: is this still correct? it's a different definition...
|
||
if blah > smoothness:
|
||
smoothness = blah
|
||
|
||
return convexity, smoothness
|
||
|
||
|
||
def minimum_enclosing_ball(xa, xb, ra, rb):
|
||
"""Given the *squared* radii `ra` and `rb` of balls
|
||
centered at `xa` and `xb` respectively,
|
||
return the center and squared radius of a ball
|
||
encompassing their (possibly negative) intersection."""
|
||
|
||
xa, xb = np.asfarray(xa), np.asfarray(xb)
|
||
delnorm2 = np.sum(np.square(xa - xb))
|
||
if delnorm2 >= abs(ra - rb):
|
||
xc = ((xa + xb) - (ra - rb) / delnorm2) / 2
|
||
rc = np.square(delnorm2 + rb - ra) / delnorm2 / 4
|
||
return xc, rc
|
||
elif delnorm2 < ra - rb:
|
||
return xb, rb
|
||
else:
|
||
return xa, ra
|
||
|
||
|
||
def gss(f, a, b, tol=1e-5): # golden section search
|
||
"""Given a function `f` with a single local minimum in
|
||
the interval [`a`, `b`], return a subset interval
|
||
[`c`, `d`] that contains the minimum with `d - c <= tol`."""
|
||
# via https://en.wikipedia.org/wiki/Golden-section_search
|
||
|
||
a, b = min(a, b), max(a, b)
|
||
h = b - a
|
||
if h <= tol:
|
||
return (a, b)
|
||
|
||
# required steps to achieve tolerance:
|
||
n = int(np.ceil(np.log(tol / h) / np.log(invphi)))
|
||
|
||
c = a + invphi2 * h
|
||
d = a + invphi * h
|
||
y_c = f(c)
|
||
y_d = f(d)
|
||
|
||
for k in range(n - 1):
|
||
if y_c < y_d:
|
||
b, d, y_d = d, c, y_c
|
||
h = invphi * h
|
||
c = a + invphi2 * h
|
||
y_c = f(c)
|
||
else:
|
||
a, c, y_c = c, d, y_d
|
||
h = invphi * h
|
||
d = a + invphi * h
|
||
y_d = f(d)
|
||
|
||
if y_c < y_d:
|
||
return (a, d)
|
||
else:
|
||
return (c, b)
|
||
|
||
|
||
def gssm(f, a, b, tol=1e-5): # golden section search (scalar result)
|
||
return np.mean(gss(f, a, b, tol=tol))
|
||
|
||
|
||
__all__ = """
|
||
Plot
|
||
CleanPlot
|
||
CleanPlotUnity
|
||
SquareCenterPlot
|
||
|
||
RandomIndices
|
||
Regulator
|
||
|
||
partial
|
||
rpartial
|
||
invsqrt
|
||
sab
|
||
ssq
|
||
rms
|
||
cossim
|
||
|
||
estimate_condition
|
||
minimum_enclosing_ball
|
||
gss
|
||
""".split()
|