#!/usr/bin/env python3
#
# Bayesian coinflip,
# numerically discretized distribution functions
# by Daniel Nevermann
#
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# ***
# *** global variables
# ***
trueBias = 0.3 # true coin bias
nX = 101 # numerical discretization
xx = [i*1.0/(nX-1.0) for i in range(nX)]
pp = [1.0/nX for _ in range(nX)] # starting prior
damp_const = 0.
# ***
# *** Bayesian updating
# ***
def updatePrior():
"""Bayesian inference for coin flips;
marginal corresponds to normalization of posterior"""
evidence = 0.0 if (np.random.uniform() > trueBias) else 1.0 # coin flip
pp_old = pp.copy() # full prior
for iX in range(nX):
prior = pp[iX]
likelihood = xx[iX] if (evidence == 1) else 1.0 - xx[iX]
pp[iX] = likelihood * prior # Bayes theorem
norm = sum(pp)
for iX in range(nX): # damping
pp[iX] = pp[iX] / norm
pp[iX] = pp_old[iX] * damp_const + pp[iX] * (1-damp_const)
return evidence
# ***
# *** Animation
# ***
fig, ax = plt.subplots()
bar_container = ax.bar(xx, pp, width=0.05)
def update(frame):
evidence = updatePrior()
for iX, bar in enumerate(bar_container):
bar.set_height(pp[iX])
ax.set_title(f'Observation: {frame + 1}, Evidence: {evidence:.1f}')
return bar_container
# ***
# *** main
# ***
# Set up animation parameters
ani = FuncAnimation(fig, update, frames=2000, repeat=False, blit=False, interval=10)
# Display the animated plot
plt.xlabel("Coin Bias Hypothesis")
plt.ylabel("Probability")
plt.title("Bayesian Inference: Updating Prior")
plt.ylim(0, 0.3)
plt.show()
np.clip() to a certain range
#!/usr/bin/env python3
#
# Monte Carlo Bayes when two coins are flipped
#
import numpy as np
import matplotlib.pyplot as plt
def simulate_data(num_trials, b1, b2):
"""generating simulated data"""
results = []
chosen_coins = []
for _ in range(num_trials):
coin = np.random.choice([0, 1]) # 0 for coin1, 1 for coin2
chosen_coins.append(coin)
if coin == 0:
results.append(np.random.rand() < b1)
else:
results.append(np.random.rand() < b2)
return np.array(results), np.array(chosen_coins)
def log_likelihood(data, b1, b2):
"""log likelihood for tossing coins"""
log_lik = 0
for result in data:
coin = np.random.choice([0, 1]) # Randomly select coin
p = b1 if coin == 0 else b2
log_lik += np.log(p if result else 1 - p)
return log_lik
class VariationalMetropolis:
"""all in one class"""
def __init__(self, data, initial_b1=0.5, initial_b2=0.5, proposal_scale=0.05):
self.data = data
self.b1 = initial_b1
self.b2 = initial_b2
self.proposal_scale = proposal_scale
self.samples = []
def propose(self):
"""selecting potential new b1 and b2 values"""
eps = 0.001
b1_large = self.b1 + np.random.normal(0, self.proposal_scale)
b2_large = self.b2 + np.random.normal(0, self.proposal_scale)
new_b1 = np.clip(b1_large, eps, 1.0-eps)
new_b2 = np.clip(b2_large, eps, 1.0-eps)
return new_b1, new_b2
def acceptance_ratio(self, new_b1, new_b2):
"""calculating the acceptance ratio"""
old_ll = log_likelihood(self.data, self.b1, self.b2)
new_ll = log_likelihood(self.data, new_b1, new_b2)
return np.exp(new_ll - old_ll)
def run(self, iterations):
tenPercent = int(iterations/10)
for ii in range(iterations):
if (ii%tenPercent==0):
print(f'# walking {ii:6d}')
new_b1, new_b2 = self.propose()
alpha = self.acceptance_ratio(new_b1, new_b2)
if np.random.rand() < alpha:
self.b1, self.b2 = new_b1, new_b2
self.samples.append((self.b1, self.b2))
return self.samples
#
# main
#
true_b1 = 0.2
true_b2 = 0.2
num_trials = 100
n_MC = 10000
# generating data
print(f'# tossing coins {num_trials:d} times')
observed_data, chosen_coins = simulate_data(num_trials, true_b1, true_b2)
# running variational Metropolis
vm = VariationalMetropolis(observed_data)
print(f'# Monte Carlo walk in paramter space')
samples = vm.run(n_MC)
# extracting samples
samples = np.array(samples)
b1_samples = samples[:, 0]
b2_samples = samples[:, 1]
# ploting results
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.hist(b1_samples, bins=50, density=True, color='blue',
alpha=0.7, label='Posterior of b1')
plt.axvline(true_b1, color='red', linestyle='--', label='True b1')
plt.xlabel('b1')
plt.ylabel('Density')
plt.legend()
plt.subplot(1, 2, 2)
plt.hist(b2_samples, bins=50, density=True, color='green',
alpha=0.7, label='Posterior of b2')
plt.axvline(true_b2, color='red', linestyle='--', label='True b2')
plt.xlabel('b2')
plt.ylabel('Density')
plt.legend()
plt.tight_layout()
plt.show()
#!/usr/bin/env python3
# fitting the function
# f(t) = sin(t*(1.0+1.5*sin(0.3*t)))
# via linear time prediction
#
# straightfoward least-square fit
# minimized using PyTorch
import torch, math
import matplotlib.pyplot as plt
nSamples = 100
nIter = 1000
nPar = 3
paraMeters = torch.randn(nPar, requires_grad=True)
eps = 0.05
def allResiduals(f, parMet):
nF = len(f)
nPar = len(parMet)
nRes = nF+1-nPar
allRes = torch.zeros(nRes)
for i in range(nPar-1, len(f)):
predicted = parMet[0]
for pp in range(1,len(parMet)):
predicted = predicted + parMet[pp]*f[i-pp]
allRes[i+1-nPar] = f[i] - predicted
return allRes
#
# main: synthetic data; generation and plotting
#
f = torch.ones(nSamples)
for i in range(nSamples):
x = 0.2*i
f[i] = f[i]*math.sin(x*(1.0+1.5*math.sin(0.3*x)))
if (1==2):
plt.plot(f)
plt.title('f(t) = sin(t*(1.0+1.5*sin(0.3*t)))')
plt.xlabel('time')
plt.ylabel('f(t)')
plt.legend()
plt.show()
#
# main: minimizing least-square loss
#
for iIter in range(nIter):
errors = allResiduals(f, paraMeters)
loss = errors.pow(2).sum()/len(errors)
loss.backward()
with torch.no_grad():
paraMeters -= eps*paraMeters.grad
paraMeters.grad = None
if (iIter%100==0):
print(f'{iIter:5d} {loss.item():8.4f}')
#
# main: output
#
a_est = paraMeters[0]
b_est = paraMeters[1]
c_est = paraMeters[2]
print("Estimated paraMeters:")
print(f'# a = {a_est:7.3f}')
print(f'# b = {b_est:7.3f}')
print(f'# c = {c_est:7.3f}')
#
# visualizing the fit
#
errors = allResiduals(f, paraMeters)
small_f = f[nPar-1:]
predicted_f = small_f - errors
plt.plot(small_f, label='True Data')
plt.plot(predicted_f.detach(), label='Model Prediction',
linestyle='--')
plt.title('Model Fit vs. True Data')
plt.xlabel('time')
plt.ylabel('f(t)')
plt.legend()
plt.show()
#!/usr/bin/env python3
#
# Baysian Monte Carlo sampling for 2D linear model
# y = true_a + true_b*x1 + true_c*x2 + noise
#
import torch
import matplotlib.pyplot as plt
# Generate synthetic data
torch.manual_seed(42)
# Parameters
true_a = 1.0
true_b = 2.0
true_c = -1.5
n_samples = 1000
# Generate predictors and response
x1 = torch.rand(n_samples) * 10 # Random values between 0 and 10
x2 = torch.rand(n_samples) * 5 # Random values between 0 and 5
noise = torch.randn(n_samples) * 0.5
y = true_a + true_b * x1 + true_c * x2 + noise
def log_prior(a, b, c):
"""Log-prior distribution:
Assume normal priors for a, b, c and
HalfNormal for sigma."""
prior_a = torch.distributions.Normal(0, 10).log_prob(a)
prior_b = torch.distributions.Normal(0, 10).log_prob(b)
prior_c = torch.distributions.Normal(0, 10).log_prob(c)
return prior_a + prior_b + prior_c
def log_likelihood(a, b, c, sigma, x1, x2, y):
"""Log-likelihood of the data."""
mean = a + b * x1 + c * x2
likelihood = torch.distributions.Normal(mean, sigma).log_prob(y)
return likelihood.sum()
def log_posterior(a, b, c, sigma, x1, x2, y):
"""Log-posterior = Log-prior + Log-likelihood."""
return log_prior(a, b, c) +\
log_likelihood(a, b, c, sigma, x1, x2, y)
def metropolis_hastings(x1, x2, y, num_samples=5000, step_size=0.1):
"""Metropolis-Hastings sampler."""
samples = []
# Initialize parameters
a = torch.tensor(0.0)
b = torch.tensor(0.0)
c = torch.tensor(0.0)
sigma = torch.tensor(1.0)
current_log_posterior = log_posterior(a, b, c, sigma, x1, x2, y)
for iIter in range(num_samples):
if (iIter%1000==0):
print(f'# {iIter:6d}')
# Propose new values; ensure sigma > 0
a_new = a + torch.randn(1) * step_size
b_new = b + torch.randn(1) * step_size
c_new = c + torch.randn(1) * step_size
sigma_new = sigma * (1.0 + (torch.rand(1)-0.5)*step_size)
# Compute the log-posterior for the proposed values
proposed_log_posterior =\
log_posterior(a_new, b_new, c_new, sigma_new, x1, x2, y)
# Accept/reject step
acceptance_ratio = torch.exp(proposed_log_posterior -\
current_log_posterior)
if torch.rand(1) < acceptance_ratio:
a, b, c, sigma = a_new, b_new, c_new, sigma_new
current_log_posterior = proposed_log_posterior
# Store samples
samples.append((a.item(), b.item(), c.item(), sigma.item()))
return samples
# Run MCMC
print("Running MCMC...")
samples = metropolis_hastings(x1, x2, y, num_samples=5000, step_size=0.05)
# Extract samples
samples_tensor = torch.tensor(samples)
a_samples = samples_tensor[:, 0]
b_samples = samples_tensor[:, 1]
c_samples = samples_tensor[:, 2]
s_samples = samples_tensor[:, 3]
# Compute Means and Variances
mean_a = a_samples.mean().item()
mean_b = b_samples.mean().item()
mean_c = c_samples.mean().item()
mean_s = s_samples.mean().item()
var_a = a_samples.var().item()
var_b = b_samples.var().item()
var_c = c_samples.var().item()
var_s = s_samples.var().item()
# Print Results
print(f'a mean/variance: {mean_a:8.4f} {var_a:8.3f} | {true_a:5.2f}')
print(f'b mean/variance: {mean_b:8.4f} {var_b:8.3f} | {true_b:5.2f}')
print(f'c mean/variance: {mean_c:8.4f} {var_c:8.3f} | {true_c:5.2f}')
print(f'sigma mean/variance: {mean_s:8.4f} {var_s:8.3f}')
# Plot Posterior Distributions
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.hist(a_samples.numpy(), bins=30, density=True, alpha=0.7, color='blue')
plt.title("Posterior of a")
plt.xlabel("a")
plt.ylabel("Density")
plt.subplot(1, 3, 2)
plt.hist(b_samples.numpy(), bins=30, density=True, alpha=0.7, color='green')
plt.title("Posterior of b")
plt.xlabel("b")
plt.ylabel("Density")
plt.subplot(1, 3, 3)
plt.hist(c_samples.numpy(), bins=30, density=True, alpha=0.7, color='red')
plt.title("Posterior of c")
plt.xlabel("c")
plt.ylabel("Density")
plt.tight_layout()
plt.show()
y.roll(shifts=-n) n-step
cyclic shift of tensor (to the left)
#!/usr/bin/env python3
#!/usr/bin/env python3
#
# fitting the function
# f(t) = sin(t*(1.0+1.5*sin(0.3*t)))
# via linear time prediction
# using Monte-Carlo sampling of the Bayes posterior
#
import torch, math
import matplotlib.pyplot as plt
torch.manual_seed(12)
nSamples = 100
nMC = 5000 # number of MC steps
#
# synthetic data generation
#
y = torch.ones(nSamples+2)
for i in range(len(y)):
x = 0.2*i
y[i] = y[i]*math.sin(x*(1.0+1.5*math.sin(0.3*x))) # no noise needed
x1 = y.roll(shifts=-1) # cyclic shift
x2 = y.roll(shifts=-2)
y = y[:-2] # cut last two
x1 = x1[:-2]
x2 = x2[:-2]
def log_prior(a, b, c):
"""Log-prior distribution:
Assume normal priors for a, b, c and
HalfNormal for sigma."""
prior_a = torch.distributions.Normal(0, 10).log_prob(a)
prior_b = torch.distributions.Normal(0, 10).log_prob(b)
prior_c = torch.distributions.Normal(0, 10).log_prob(c)
return prior_a + prior_b + prior_c
def log_likelihood(a, b, c, sigma, x1, x2, y):
"""Log-likelihood of the data."""
mean = a + b * x1 + c * x2
likelihood = torch.distributions.Normal(mean, sigma).log_prob(y)
return likelihood.sum()
def log_posterior(a, b, c, sigma, x1, x2, y):
"""Log-posterior = Log-prior + Log-likelihood."""
return log_prior(a, b, c) +\
log_likelihood(a, b, c, sigma, x1, x2, y)
def metropolis_hastings(x1, x2, y, num_samples, step_size):
"""Metropolis-Hastings sampler."""
samples = []
# Initialize parameters
a = torch.tensor(0.0)
b = torch.tensor(0.0)
c = torch.tensor(0.0)
sigma = torch.tensor(1.0)
current_log_posterior = log_posterior(a, b, c, sigma, x1, x2, y)
for iIter in range(num_samples):
if (iIter%1000==0):
print(f'# {iIter:6d}')
# Propose new values; ensure sigma > 0
a_new = a + torch.randn(1) * step_size
b_new = b + torch.randn(1) * step_size
c_new = c + torch.randn(1) * step_size
sigma_new = sigma * (1.0 + (torch.rand(1)-0.5)*step_size)
# Compute the log-posterior for the proposed values
proposed_log_posterior =\
log_posterior(a_new, b_new, c_new, sigma_new, x1, x2, y)
# Accept/reject step
acceptance_ratio = torch.exp(proposed_log_posterior -\
current_log_posterior)
if torch.rand(1) < acceptance_ratio:
a, b, c, sigma = a_new, b_new, c_new, sigma_new
current_log_posterior = proposed_log_posterior
# Store samples
samples.append((a.item(), b.item(), c.item(), sigma.item()))
return samples
#
# running Metropolis - MC
#
print("Running MCMC...")
samples = metropolis_hastings(x1, x2, y, num_samples=nMC, step_size=0.05)
#
# means and variances
#
samples_tensor = torch.tensor(samples)
a_samples = samples_tensor[:, 0]
b_samples = samples_tensor[:, 1]
c_samples = samples_tensor[:, 2]
s_samples = samples_tensor[:, 3]
mean_a = a_samples.mean().item()
mean_b = b_samples.mean().item()
mean_c = c_samples.mean().item()
mean_s = s_samples.mean().item()
var_a = a_samples.var().item()
var_b = b_samples.var().item()
var_c = c_samples.var().item()
var_s = s_samples.var().item()
print(f'a mean/variance: {mean_a:8.4f} {var_a:8.3f}')
print(f'b mean/variance: {mean_b:8.4f} {var_b:8.3f}')
print(f'c mean/variance: {mean_c:8.4f} {var_c:8.3f}')
print(f'sigma mean/variance: {mean_s:8.4f} {var_s:8.3f}')
#
# plotting posterior distributions
#
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.hist(a_samples.numpy(), bins=30, density=True, alpha=0.7, color='blue')
plt.title("Posterior of a")
plt.xlabel("a")
plt.ylabel("Density")
plt.subplot(1, 3, 2)
plt.hist(b_samples.numpy(), bins=30, density=True, alpha=0.7, color='green')
plt.title("Posterior of b")
plt.xlabel("b")
plt.ylabel("Density")
plt.subplot(1, 3, 3)
plt.hist(c_samples.numpy(), bins=30, density=True, alpha=0.7, color='red')
plt.title("Posterior of c")
plt.xlabel("c")
plt.ylabel("Density")
plt.tight_layout()
plt.show()
#
# visualizing the fit
#
predicted_y = mean_a + mean_b*x1 + mean_c*x2
plt.plot(y, label='True Data')
plt.plot(predicted_y.detach(), label='Model Prediction',
linestyle='--')
plt.title('Model Fit vs. True Data')
plt.xlabel('time')
plt.ylabel('y(t)')
plt.legend()
plt.show()