Source code for monte.hamiltonian_mc

"""
Module containing Markov Chains Monte Carlo sampler utilizing Hamiltonian Monte Carlo (HMC)
algorithm. HMC uses Hamiltonian dynamics to sample by moving a particle through the posterior
to perform numerical integration that is  then corrected by Metropolis acceptance criterion.
"""

import numpy as np
import monte._checks
from monte import BaseSampler


[docs]class HamiltonianMC(BaseSampler):
[docs] def __init__(self, log_posterior, log_posterior_gradient) -> None: """ Initializes the problem sampler object :param log_posterior: Log-probability of the target distribution to be sampled from :type log_posterior: callable :param log_posterior_gradient: Log-probability of the gradient of the target distribution to be sampled from :type log_posterior_gradient: callable """ super().__init__() monte._checks._check_posterior(log_posterior) monte._checks._check_posterior_gradient(log_posterior_gradient) self.log_posterior = log_posterior self.log_posterior_gradient = log_posterior_gradient
def _hamiltonian(self, theta, rho, inverse_metric, **kwargs): """ Calculates the Hamiltonian of the current particle. Mass of the particle is taken as :math:`m=1`. The most general form of kinetic energy of a physical object is used: :math:`K(\rho) = \frac{|\rho|^2}{2m}`. :param theta: Vector of current values of parameter(s) :type theta: ndarray :param rho: Particle momentum vector :type rho: ndarray :return: Hamiltonian of the current particle :rtype: float """ return 0.5 * np.dot(np.matmul(rho, inverse_metric), rho) - self.log_posterior( theta, **kwargs ) def _leapfrog(self, theta, rho, epsilon, l, inverse_metric, **kwargs): """ Leapfrom integrator used to simulate the movement of the particle :param theta: Vector of current values of parameter(s) :type theta: ndarray :param rho: Particle momentum vector :type rho: ndarray :param epsilon: Discretization time or timestep :type epsilon: float :param l: Number of leapfrog steps taken :type l: int :param inverse_metric: Diagonal estimate of the covariance of the posterior :type inverse_metric: ndarray :return: Parameter and momentum vectors :rtype: ndarray, ndarray """ rho -= 0.5 * epsilon * self.log_posterior_gradient(theta, **kwargs) for _ in range(l - 1): theta += epsilon * np.matmul(inverse_metric, rho) rho -= epsilon * self.log_posterior_gradient(theta, **kwargs) theta += epsilon * np.matmul(inverse_metric, rho) rho -= 0.5 * epsilon * self.log_posterior_gradient(theta, **kwargs) return theta, rho def _iterate(self, theta_current, epsilon, l, metric, inverse_metric, **kwargs): """ Single iteration of the sampler :param theta_current: Current parameter vector :type theta_current: ndarray :param epsilon: Discretization time or timestep :type epsilon: float :param l: Number of leapfrog steps taken :type l: int :param metric: Covariance matrix of the momentum vector sampling distribution :type metric: ndarray :param inverse_metric: Diagonal estimate of the covariance of the posterior :type inverse_metric: ndarray :return: New value of parameter vector, acceptance information :rtype: ndarray, int """ rho_current = np.random.multivariate_normal( mean=np.zeros(theta_current.shape[0]), cov=metric ) theta_proposed, rho_updated = self._leapfrog( theta_current.copy(), rho_current.copy(), epsilon, l, inverse_metric, **kwargs, ) alpha = ( min( 0, -( self._hamiltonian( theta_proposed, rho_updated, inverse_metric, **kwargs ) - self._hamiltonian( theta_current, rho_current, inverse_metric, **kwargs ) ), ), ) u = np.log(np.random.random()) if u <= alpha: theta_new = theta_proposed a = 1 else: theta_new = theta_current a = 0 return theta_new, a
[docs] def sample(self, iter, warmup, theta, epsilon, l, metric=None, lag=1, **kwargs): """ Samples from the log_posterior distribution :param iter: Number of iterations of the algorithm :type iter: int :param warmup: Number of warmup steps of the algorithm. These are discarded so that the only samples recorded are the ones obtained after the Markov chain has reached the stationary distribution :type warmup: int :param theta: Vector of initial values of parameter(s) :type theta: ndarray :param epsilon: Discretization time or timestep :type epsilon: float :param l: Number of leapfrog steps taken :type l: int :param metric: Covariance matrix of the momentum vector sampling distribution :type metric: ndarray :param lag: Sampler lag. Parameter specifying every how many iterations will the sample be recorded. Used to limit autocorrelation of the samples. If `lag=1`, every sample is recorded, if `lag=3` each third sample is recorded, etc. , defaults to 1 :type lag: int, optional :return: Numpy arrays of samples and acceptance information for every algorithm iteration. :rtype: ndarray, ndarray """ monte._checks._check_parameters( iter=iter, warmup=warmup, epsilon=epsilon, l=l, lag=lag ) theta = monte._checks._check_theta(theta) metric = monte._checks._check_metric(metric, theta) samples = np.zeros((iter, theta.shape[0])) acceptances = np.zeros(iter) lp = np.zeros(iter) inverse_metric = np.linalg.inv(metric) for i in range(warmup): theta, a = self._iterate( theta, epsilon, l, metric, inverse_metric, **kwargs ) for i in range(iter): for _ in range(lag): theta, a = self._iterate( theta, epsilon, l, metric, inverse_metric, **kwargs ) samples[i] = theta acceptances[i] = a lp[i] = self.log_posterior(theta, **kwargs) self.samples = samples self.acceptances = acceptances self.lp = lp return samples, acceptances