import torch
import matplotlib.pyplot as plt
import torch.distributions as dist
import copy
from functools import partial
# Matplotlib retina
%config InlineBackend.figure_format = 'retina'
Bayesian Optimization
= torch.linspace(-3, 3, 100).reshape(-1, 1)
x_lin #f = lambda x: 2*torch.sin(x) + torch.sin(2*x**(1.1) + 1) + 0.3*x + 10
= lambda x: 2*torch.sin(x) + 0.5*torch.sin(3*x + 0.5) + 5
f
='f(x)') plt.plot(x_lin, f(x_lin), label
def phi(x, degree=2):
# sin features
# [1, x, sin(x), sin(2x), ..., sin(degree*x)]
# if degree=1, then [1, x]
# if degree=2, then [1, x, sin(x)]
# if degree=3, then [1, x, sin(x), sin(2x)]
= torch.cat([torch.ones_like(x), x], dim=1)
x_new for d in range(2, degree+1):
= torch.cat([x_new, torch.sin(d*x), torch.cos(d*x), torch.sin(d*x + .5), torch.cos(d*x + 0.5)], dim=1)
x_new return x_new
def num_params(degree):
if degree == 1:
return 2 #[1, x]
else:
return 2 + 4*(degree-1)
class BLR:
def __init__(self, mu, sigma, sigma_noise, degree=2):
self.current_mean = mu
self.current_sigma = sigma
self.sigma_noise = sigma_noise # Add sigma_noise as an instance variable
self.is_prior_set = True
self.N = 0
self.X_total = None
self.y_total = None
self.degree = degree
def __repr__(self):
return f'BLR(mu={self.current_mean},\n sigma={self.current_sigma}, \nsigma_noise={self.sigma_noise})'
def update(self, X, y):
"""
X: (n_points, n_features)
y: (n_points, 1)
"""
if not self.is_prior_set:
raise Exception('Prior not set')
= X.shape[0]
n_points = X
X_orig = phi(X, self.degree)
X self.current_sigma_inverse = torch.inverse(self.current_sigma)
= torch.matmul(X.T, X)
XTX = self.current_sigma_inverse + XTX / self.sigma_noise**2
SN_inverse = torch.inverse(SN_inverse)
SN
self.current_mean = torch.matmul(SN, torch.matmul(self.current_sigma_inverse, self.current_mean) + torch.matmul(X.T, y).ravel() / self.sigma_noise**2)
self.current_sigma = SN
self.N += n_points
if self.X_total is None:
self.X_total = X_orig
self.y_total = y
else:
self.X_total = torch.cat((self.X_total, X_orig), 0)
self.y_total = torch.cat((self.y_total, y), 0)
def predict(self, X):
if not self.is_prior_set:
raise Exception('Prior not set')
= X
X_orig = phi(X, self.degree)
X
# Calculate the predictive mean and variance
= torch.matmul(X, self.current_mean)
predictive_mean = self.sigma_noise**2 + torch.diag(torch.matmul(X @ self.current_sigma, X.T))
predictive_variance
return predictive_mean, predictive_variance
def plot_predictive(self, X, ax = None):
= X
X_orig = phi(X, self.degree)
X
# Posterior distribution
= torch.matmul(X, self.current_mean)
posterior_mean = 1 / self.sigma_noise**2 + torch.diag(torch.matmul(X @ self.current_sigma, X.T))
posterior_variance
if ax is None:
= plt.subplots(figsize=(10, 6))
fig, ax "Posterior Distribution")
ax.set_title(- 2 * posterior_variance).numpy(), (posterior_mean + 2 * posterior_variance).numpy(), color='r', alpha=0.2, label='Posterior')
ax.fill_between(X_orig.ravel().numpy(), (posterior_mean
'r-', label='Posterior Mean')
ax.plot(X_orig.ravel().numpy(), posterior_mean.numpy(),
if self.X_total is not None:
self.X_total.numpy(), self.y_total.numpy(), c='purple', marker='*', label='Observed Data', s=200)
ax.scatter(
ax.legend()min(), X_orig.ravel().max())
ax.set_xlim(X_orig.ravel().return ax
= 2
d = phi(x_lin, degree=d) phi_x
for i in range(num_params(d)):
=r'$\phi_{}(x)$'.format(i, i))
plt.plot(x_lin, phi_x[:, i], label plt.legend()
<matplotlib.legend.Legend at 0x7f22680283a0>
def init_prior(d):
= torch.zeros(num_params(d)) # Adjust for your dimensionality
prior_mu = torch.eye(num_params(d)) # Adjust for your dimensionality
prior_sigma = 1.0 # Adjust for your noise level
sigma_noise return prior_mu, prior_sigma, sigma_noise
def init_prior_params(d):
= init_prior(d)
prior_params return prior_params + (d, )
init_prior_params(d)
(tensor([0., 0., 0., 0., 0., 0.]),
tensor([[1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0.],
[0., 0., 1., 0., 0., 0.],
[0., 0., 0., 1., 0., 0.],
[0., 0., 0., 0., 1., 0.],
[0., 0., 0., 0., 0., 1.]]),
1.0,
2)
= 2
d = BLR(*init_prior_params(d)) blr
blr
BLR(mu=tensor([0., 0., 0., 0., 0., 0.]),
sigma=tensor([[1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0.],
[0., 0., 1., 0., 0., 0.],
[0., 0., 0., 1., 0., 0.],
[0., 0., 0., 0., 1., 0.],
[0., 0., 0., 0., 0., 1.]]),
sigma_noise=1.0)
= blr.plot_predictive(x_lin) ax
0)
torch.manual_seed(= 30
N_TOT = torch.distributions.Uniform(-3, 3).sample((N_TOT, 1))
X_dataset = f(X_dataset)
f_dataset = f_dataset + torch.distributions.Normal(0, 1.0).sample((N_TOT, 1))
y_dataset
def plot_dataset(ax=None):
if ax is None:
= plt.subplots()
fig, ax # Plot the dataset
='g', marker='x', label='Whole Dataset')
ax.scatter(X_dataset.numpy(), y_dataset.numpy(), c='f(x)',c = 'k')
ax.plot(x_lin, f(x_lin), label
ax.legend()-3, 3)
ax.set_xlim(
plot_dataset()
# Add the first 1 data points to the model
= 2
d = BLR(*init_prior_params(d))
blr 1], y_dataset[:1]) blr.update(X_dataset[:
blr
BLR(mu=tensor([ 1.2552, -0.0282, -0.0564, 1.2539, 0.5517, 1.1274]),
sigma=tensor([[ 7.5003e-01, 5.6144e-03, 1.1225e-02, -2.4972e-01, -1.0987e-01,
-2.2453e-01],
[ 5.6144e-03, 9.9987e-01, -2.5212e-04, 5.6087e-03, 2.4677e-03,
5.0430e-03],
[ 1.1225e-02, -2.5212e-04, 9.9950e-01, 1.1214e-02, 4.9338e-03,
1.0083e-02],
[-2.4972e-01, 5.6087e-03, 1.1214e-02, 7.5054e-01, -1.0976e-01,
-2.2430e-01],
[-1.0987e-01, 2.4677e-03, 4.9338e-03, -1.0976e-01, 9.5171e-01,
-9.8688e-02],
[-2.2453e-01, 5.0430e-03, 1.0083e-02, -2.2430e-01, -9.8688e-02,
7.9832e-01]]),
sigma_noise=1.0)
= blr.plot_predictive(x_lin)
ax #plot_dataset(ax=ax)
# Add the next 1 data points to the model
1:2], y_dataset[1:2]) blr.update(X_dataset[
= blr.plot_predictive(x_lin)
ax =ax) plot_dataset(ax
blr
BLR(mu=tensor([ 2.7943, 1.9381, -0.1648, 0.3441, 0.0203, 0.3810]),
sigma=tensor([[ 5.0107e-01, -3.1243e-01, 2.8769e-02, -1.0255e-01, -2.3920e-02,
-1.0379e-01],
[-3.1243e-01, 5.9358e-01, 2.2160e-02, 1.9360e-01, 1.1227e-01,
1.5928e-01],
[ 2.8769e-02, 2.2160e-02, 9.9826e-01, 8.4362e-04, -1.1228e-03,
1.5747e-03],
[-1.0255e-01, 1.9360e-01, 8.4362e-04, 6.6355e-01, -1.6056e-01,
-2.9567e-01],
[-2.3920e-02, 1.1227e-01, -1.1228e-03, -1.6056e-01, 9.2204e-01,
-1.4037e-01],
[-1.0379e-01, 1.5928e-01, 1.5747e-03, -2.9567e-01, -1.4037e-01,
7.3977e-01]]),
sigma_noise=1.0)
# instead, add the two data points at once
= 2
d = BLR(*init_prior(d))
blr_new 2], y_dataset[:2]) blr_new.update(X_dataset[:
blr
BLR(mu=tensor([ 2.7943, 1.9381, -0.1648, 0.3441, 0.0203, 0.3810]),
sigma=tensor([[ 5.0107e-01, -3.1243e-01, 2.8769e-02, -1.0255e-01, -2.3920e-02,
-1.0379e-01],
[-3.1243e-01, 5.9358e-01, 2.2160e-02, 1.9360e-01, 1.1227e-01,
1.5928e-01],
[ 2.8769e-02, 2.2160e-02, 9.9826e-01, 8.4362e-04, -1.1228e-03,
1.5747e-03],
[-1.0255e-01, 1.9360e-01, 8.4362e-04, 6.6355e-01, -1.6056e-01,
-2.9567e-01],
[-2.3920e-02, 1.1227e-01, -1.1228e-03, -1.6056e-01, 9.2204e-01,
-1.4037e-01],
[-1.0379e-01, 1.5928e-01, 1.5747e-03, -2.9567e-01, -1.4037e-01,
7.3977e-01]]),
sigma_noise=1.0)
blr_new
BLR(mu=tensor([ 2.7943, 1.9381, -0.1648, 0.3441, 0.0203, 0.3810]),
sigma=tensor([[ 5.0107e-01, -3.1243e-01, 2.8769e-02, -1.0255e-01, -2.3920e-02,
-1.0379e-01],
[-3.1243e-01, 5.9358e-01, 2.2160e-02, 1.9360e-01, 1.1227e-01,
1.5928e-01],
[ 2.8769e-02, 2.2160e-02, 9.9826e-01, 8.4362e-04, -1.1228e-03,
1.5747e-03],
[-1.0255e-01, 1.9360e-01, 8.4362e-04, 6.6355e-01, -1.6056e-01,
-2.9567e-01],
[-2.3920e-02, 1.1227e-01, -1.1228e-03, -1.6056e-01, 9.2204e-01,
-1.4037e-01],
[-1.0379e-01, 1.5928e-01, 1.5747e-03, -2.9567e-01, -1.4037e-01,
7.3977e-01]]),
sigma_noise=1.0)
blr_new.plot_predictive(x_lin)
<AxesSubplot:title={'center':'Posterior Distribution'}>
# Entire dataset
def plot_fit(degree):
= BLR(*init_prior_params(degree))
blr
blr.update(X_dataset, y_dataset)= blr.plot_predictive(x_lin)
ax =ax)
plot_dataset(axf'Degree {degree}')
plt.title(
plt.show()
1) plot_fit(
2) plot_fit(
3) plot_fit(
4) plot_fit(
6) plot_fit(
= 2
d = BLR(*init_prior_params(d))
blr
# Fit first 2 points
2], y_dataset[:2]) blr.update(X_dataset[:
blr.current_mean
tensor([ 2.7943, 1.9381, -0.1648, 0.3441, 0.0203, 0.3810])
def plot_maximum(blr, ax=None):
if ax is None:
= plt.subplots()
fig, ax # Plot the dataset
='g', marker='x', label='Whole Dataset')
ax.scatter(X_dataset.numpy(), y_dataset.numpy(), c='f(x)',c = 'k')
ax.plot(x_lin, f(x_lin), label-3, 3)
ax.set_xlim(
# Plot the current maximum
= blr.y_total.max()
y_max = blr.X_total[blr.y_total.argmax()]
x_max
='k', linestyle='--', label='Current Max')
ax.axhline(y_max, c='k', linestyle='--')
ax.axvline(x_max, c
# Plot BLR predictive
= blr.plot_predictive(x_lin, ax=ax)
ax
# Put legend outside of plot
=(1.05, 1), loc='upper left', borderaxespad=0.)
ax.legend(bbox_to_anchor
return ax
plot_maximum(blr)
<AxesSubplot:title={'center':'Posterior Distribution'}>
def plot_maximum_and_acq(blr, acq_fn, candidate_points_x):
= plt.subplots(nrows=2, sharex=True)
fig, ax
=ax[0])
plot_maximum(blr, ax# Mark locations of candidate points on PI plot
1].scatter(candidate_points_x, torch.zeros_like(candidate_points_x), c='r', marker='x', label='Candidate Points')
ax[= acq_fn(blr, x_lin, blr.y_total.max())
alpha 1].plot(x_lin, alpha, label=r'Acquisition Function $\alpha(x)$')
ax[# legend outside of plot
# Chosen candidate point
= acq_fn(blr, candidate_points_x, blr.y_total.max())
alp_candidate = candidate_points_x[alp_candidate.argmax()]
x_candidate 1].axvline(x_candidate, c='k', linestyle='--', label='Chosen Candidate')
ax[1].legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
ax[
def plot_filled_cdf(mu, sigma, above):
# Create a normal distribution with the specified mean and standard deviation
= dist.Normal(mu, sigma)
normal_dist
# Generate a range of y values
= torch.linspace(mu - 10 * sigma, mu + 10 * sigma, 2000)
y_values
# Calculate the probability density at each y value
= normal_dist.log_prob(y_values).exp()
pdf_values
# Create a mask to select values above the specified "above" value
= y_values >= above
mask
# Plot the vertical line at "above"
='k', linestyle='--', label=f'Above {above}')
plt.axhline(above, c
# Plot the PDF
='PDF')
plt.plot(pdf_values.numpy(), y_values.numpy(), label
= torch.tensor((mu - above)/sigma)
z0 = dist.Normal(0, 1).cdf(z0)
phi
# Fill the area under the CDF curve for values above "above"
plt.fill_betweenx(y_values.numpy(), pdf_values.numpy(), =mask, alpha=0.2, color='r',
where=f'CDF Above {above} = {phi:.4f}')
label
'PDF')
plt.xlabel('Y')
plt.ylabel(
plt.legend() plt.show()
import ipywidgets as widgets
from IPython.display import display
# Create interactive widgets for mu, sigma, and above
= widgets.FloatSlider(value=0.5, min=-1, max=1, step=0.1, description='Mean:')
mu_widget = widgets.FloatSlider(value=0.5, min=0.1, max=2, step=0.1, description='Standard Deviation:')
sigma_widget = widgets.FloatSlider(value=0.6, min=-1, max=1, step=0.1, description='Above Value:')
above_widget
# Create an interactive output
= widgets.interactive_output(plot_filled_cdf, {'mu': mu_widget, 'sigma': sigma_widget, 'above': above_widget})
out
# Display the widgets and output
display(widgets.VBox([mu_widget, sigma_widget, above_widget, out]))
def alpha_PI(model, x_lin, y_max, eps=1e-1):
"""
model: BLR model
x_lin: (n_points, d)
y_max: current maximum
eps: exploration parameter
"""
# Get the predictive mean and variance
= model.predict(x_lin)
predictive_mean, predictive_variance
# Calculate the PI acquisition function
= (predictive_mean - y_max - eps) / torch.sqrt(predictive_variance)
z0
# Evaluate CDF of the standard normal at alpha
= torch.distributions.Normal(0, 1).cdf(z0)
alpha
return alpha
def setdiff1d(a, b):
= ~a.unsqueeze(1).eq(b).any(dim=1)
mask return torch.masked_select(a, mask)
= alpha_PI(blr, x_lin, blr.y_total.max())
alpha = setdiff1d(X_dataset, blr.X_total).reshape(-1, 1)
candidate_points_x
plot_maximum_and_acq(blr, alpha_PI, candidate_points_x)
= blr.y_total.max()
y_max = blr.predict(x_lin[0:1])
mu_0, sigma_0 = y_max
above **0.5, above) plot_filled_cdf(mu_0.item(), sigma_0.item()
/tmp/ipykernel_225842/1988263787.py:20: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).
z0 = torch.tensor((mu - above)/sigma)
def bo_loop(blr, n_iter, acq_fun):
= blr.y_total.max()
y_max for iterations in range(n_iter):
= setdiff1d(X_dataset, blr.X_total).reshape(-1, 1)
candidate_points_x = setdiff1d(y_dataset, blr.y_total)
candidate_points_y
plot_maximum_and_acq(blr, acq_fun, candidate_points_x)
= acq_fun(blr, candidate_points_x, y_max)
alpha_candidate = alpha_candidate.argmax()
to_add_idx = candidate_points_x[to_add_idx].view(-1, 1)
to_add_x = candidate_points_y[to_add_idx].view(-1, 1)
to_add_y print(f"Index: {to_add_idx.item()}, x: {to_add_x.item():0.3f}, y: {to_add_y.item():0.3f}")
blr.update(to_add_x, to_add_y)
print(f'Iteration {iterations+1} f(x+) = {blr.y_total.max():0.3f}')
print()
import copy
= copy.deepcopy(blr)
blr_copy 10, alpha_PI) bo_loop(blr_copy,
Index: 24, x: 2.716, y: 4.546
Iteration 1 f(x+) = 6.479
Index: 18, x: 1.090, y: 7.590
Iteration 2 f(x+) = 7.590
Index: 3, x: 0.804, y: 6.802
Iteration 3 f(x+) = 7.590
Index: 6, x: 0.794, y: 4.239
Iteration 4 f(x+) = 7.590
Index: 12, x: 1.186, y: 6.964
Iteration 5 f(x+) = 7.590
Index: 12, x: 1.800, y: 6.170
Iteration 6 f(x+) = 7.590
Index: 14, x: 2.491, y: 7.997
Iteration 7 f(x+) = 7.997
Index: 4, x: 2.379, y: 6.983
Iteration 8 f(x+) = 7.997
Index: 14, x: 2.245, y: 6.776
Iteration 9 f(x+) = 7.997
Index: 15, x: 0.317, y: 7.446
Iteration 10 f(x+) = 7.997
def alpha_ucb(model, x_lin, y_max, beta=100):
"""
model: BLR model
x_lin: (n_points, d)
y_max: current maximum
beta: exploration parameter
"""
# Get the predictive mean and variance
= model.predict(x_lin)
predictive_mean, predictive_variance
# Calculate the UCB acquisition function
= predictive_mean + beta * torch.sqrt(predictive_variance)
alpha
return alpha
= partial(alpha_ucb, beta=0.1)
alpha_ucb_0_1 = partial(alpha_ucb, beta=1)
alpha_ucb_1 = partial(alpha_ucb, beta=10) alpha_ucb_10
= copy.deepcopy(blr)
blr_copy 10, alpha_ucb_0_1) bo_loop(blr_copy,
Index: 24, x: 2.716, y: 4.546
Iteration 1 f(x+) = 6.479
Index: 19, x: 2.491, y: 7.997
Iteration 2 f(x+) = 7.997
Index: 5, x: 2.379, y: 6.983
Iteration 3 f(x+) = 7.997
Index: 19, x: 2.245, y: 6.776
Iteration 4 f(x+) = 7.997
Index: 14, x: 1.800, y: 6.170
Iteration 5 f(x+) = 7.997
Index: 13, x: 1.186, y: 6.964
Iteration 6 f(x+) = 7.997
Index: 15, x: 1.090, y: 7.590
Iteration 7 f(x+) = 7.997
Index: 3, x: 0.804, y: 6.802
Iteration 8 f(x+) = 7.997
Index: 5, x: 0.794, y: 4.239
Iteration 9 f(x+) = 7.997
Index: 15, x: 0.317, y: 7.446
Iteration 10 f(x+) = 7.997
= copy.deepcopy(blr)
blr_copy 10, alpha_ucb_10) bo_loop(blr_copy,
Index: 24, x: 2.716, y: 4.546
Iteration 1 f(x+) = 6.479
Index: 8, x: -0.907, y: 2.925
Iteration 2 f(x+) = 6.479
Index: 0, x: -2.469, y: 2.848
Iteration 3 f(x+) = 6.479
Index: 16, x: 1.090, y: 7.590
Iteration 4 f(x+) = 7.590
Index: 12, x: 1.186, y: 6.964
Iteration 5 f(x+) = 7.590
Index: 2, x: 0.804, y: 6.802
Iteration 6 f(x+) = 7.590
Index: 11, x: 1.800, y: 6.170
Iteration 7 f(x+) = 7.590
Index: 5, x: 0.794, y: 4.239
Iteration 8 f(x+) = 7.590
Index: 12, x: 2.491, y: 7.997
Iteration 9 f(x+) = 7.997
Index: 3, x: 2.379, y: 6.983
Iteration 10 f(x+) = 7.997
def alpha_thompson(model, x_lin, y_max, n_samples=100):
"""
model: BLR model
x_lin: (n_points, d)
y_max: current maximum
n_samples: number of samples to draw from the posterior
"""
0)
torch.manual_seed(# Get the predictive mean and variance
= model.predict(x_lin)
predictive_mean, predictive_variance
# Sample from the posterior
= dist.Normal(predictive_mean, torch.sqrt(predictive_variance)).sample()
posterior_sample
# Calculate the PI acquisition function
= posterior_sample
alpha
return alpha
= copy.deepcopy(blr)
blr_copy 10, alpha_thompson) bo_loop(blr_copy,
Index: 5, x: 2.379, y: 6.983
Iteration 1 f(x+) = 6.983
Index: 20, x: 2.245, y: 6.776
Iteration 2 f(x+) = 6.983
Index: 18, x: 2.491, y: 7.997
Iteration 3 f(x+) = 7.997
Index: 21, x: 2.716, y: 4.546
Iteration 4 f(x+) = 7.997
Index: 18, x: -0.617, y: 1.858
Iteration 5 f(x+) = 7.997
Index: 17, x: 1.090, y: 7.590
Iteration 6 f(x+) = 7.997
Index: 14, x: 1.800, y: 6.170
Iteration 7 f(x+) = 7.997
Index: 13, x: 1.186, y: 6.964
Iteration 8 f(x+) = 7.997
Index: 16, x: 0.317, y: 7.446
Iteration 9 f(x+) = 7.997
Index: 3, x: 0.804, y: 6.802
Iteration 10 f(x+) = 7.997