Source code for pylfi.inferences.mcmc_abc

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import copy
import multiprocessing
from multiprocessing import Lock, RLock

import numpy as np
import scipy.stats as stats
from pathos.pools import ProcessPool
from pylfi.inferences import ABCBase, PilotStudyMissing, SamplingNotPerformed
from pylfi.journal import Journal
from pylfi.utils import advance_PRNG_state, generate_seed_sequence
from tqdm.auto import tqdm


[docs]class MCMCABC(ABCBase): """Class implementing the MCMC ABC algorithm. """ def __init__( self, observation, simulator, stat_calc, priors, log=True ): super().__init__( observation=observation, simulator=simulator, stat_calc=stat_calc, priors=priors, inference_scheme="MCMC ABC", log=log )
[docs] def tune( self, prop_scale=0.5, epsilon=None, tune_iter=500, tune_interval=100, stat_weight=1., stat_scale=1., seed=None, use_pilot=False ): """Tune the proposal scale So how do we choose sd for the proposal distribution? There are some papers that suggest Metropolis-Hastings is most efficient when you accept 23.4% of proposed samples, and it turns out that lowering step size increases the probability of accepting a proposal. PyMC3 will spend the first 500 steps increasing and decreasing the step size to try to find the best value of sd that will give you an acceptance rate of 23.4% (you can even set different acceptance rates). The problem is that if you change the step size while sampling, you lose the guarantees that your samples (asymptotically) come from the target distribution, so you should typically discard these. Also, there is typically a lot more adaptation going on in those first steps than just step_size. """ if self._log: self.logger.info("Run MCMC tuner.") if use_pilot: if not self._done_pilot_study: msg = ("In order to use tuning from pilot study, the " "pilot_study method must be run in advance.") raise PilotStudyMissing(msg) else: if epsilon is None: msg = ("epsilon must be passed.") raise ValueError(msg) self._epsilon = epsilon self._stat_scale = stat_scale self._n_samples = n_samples self._burn = burn self._stat_weight = stat_weight self._prop_scale = prop_scale if self._log: t_range = tqdm(range(n_iter), desc=f"[Sampling progress] Chain {position+1}", position=position, leave=False, colour='green') else: t_range = range(n_iter) seeds = generate_seed_sequence(seed, n_jobs) n_accepted = 0 # Initialize chain thetas_current, _, _ = self._draw_initial_sample(seed) # Compute current logpdf # (only needs to be re-computed if proposal is accepted) logpdf_current = self._compute_logpdf(thetas_current) # Metropolis algorithm for i in t_range: # Advance PRNG state next_gen = advance_PRNG_state(seed, i) # Draw proposal thetas_proposal = self._draw_proposal(thetas_current, next_gen) # Compute proposal logpdf logpdf_proposal = self._compute_logpdf(thetas_proposal) # Compute acceptance probability alpha = self._acceptance_prob(logpdf_proposal, logpdf_current) # Draw a uniform random number u = self._draw_uniform(next_gen) # Metropolis reject/accept step if u < alpha: # Simulator call to generate simulated data sim = self._simulator(*thetas_proposal) # Calculate summary statistics of simulated data if isinstance(sim, tuple): sim_sumstat = self._stat_calc(*sim) else: sim_sumstat = self._stat_calc(sim) # Compute distance between summary statistics distance = self.distance(sim_sumstat, self._obs_sumstat, weight=self._stat_weight, scale=self._stat_scale ) # ABC reject/accept step if distance <= self._epsilon: thetas_current = thetas_proposal # Increase accepted counter n_accepted += 1 # Re-compute current logpdf logpdf_current = self._compute_logpdf(thetas_current) if tune_now: pass # self._done_tuning = True
[docs] def sample( self, n_samples, epsilon=None, prop_scale=0.5, burn=100, tune=True, tune_iter=500, tune_interval=100, stat_weight=1., stat_scale=1., use_pilot=False, chains=2, seed=None, return_journal=False ): """ tune: bool Flag for tuning. Defaults to True. tune_interval: int The frequency of tuning. Defaults to 100 iterations. Due to multiprocessing, estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly. A good choice for the number of jobs is the number of cores or processors on your computer. If your processor supports hyperthreading, you can select an even higher number of jobs. The number of jobs is set to the number of cores found in the system by default. There are some papers that suggest Metropolis-Hastings is most efficient when you accept 23.4% of proposed samples, and it turns out that lowering step size increases the probability of accepting a proposal. PyMC3 will spend the first 500 steps increasing and decreasing the step size to try to find the best value of sd that will give you an acceptance rate of 23.4% (you can even set different acceptance rates). burn : either burn away or add to n_samples """ if self._log: self.logger.info("Run MCMC sampler.") if use_pilot: if not self._done_pilot_study: msg = ("In order to use tuning from pilot study, the " "pilot_study method must be run in advance.") raise PilotStudyMissing(msg) else: if epsilon is None: msg = ("epsilon must be passed.") raise ValueError(msg) self._epsilon = epsilon self._quantile = None self._stat_scale = stat_scale self._n_samples = n_samples self._burn = burn self._stat_weight = stat_weight # These are set in base instead #self._prior_logpdfs = [prior.logpdf for prior in self._priors] #self._rng = np.random.default_rng #self._uniform_distr = stats.uniform(loc=0, scale=1) # mcmc knobs self._prop_scale = prop_scale # force equal, n_samples n_samples, chains, tasks, seeds = self.batches(n_samples, chains, seed, force_equal=True ) # n_samples + burn if self._log: # for managing output contention initializer = tqdm.set_lock(RLock(),) initargs = (tqdm.get_lock(),) else: initializer = None initargs = None with ProcessPool(chains) as pool: r0, r1, r2, r3 = zip( *pool.map( self._sample, tasks, range(chains), seeds, initializer=initializer, initargs=initargs ) ) #self._original_samples = np.stack(r0) self._original_samples = np.concatenate(r0, axis=0) self._samples = copy.deepcopy(self._original_samples) self._distances = np.concatenate(r1, axis=0) self._sum_stats = np.concatenate(r2, axis=0) self._n_accepted = np.sum(r3) self._done_sampling = True if return_journal: return self.journal()
def _sample(self, n_samples, position, seed): """Sample n_samples from posterior.""" n_iter = n_samples + self._burn - 1 if self._log: t_range = tqdm(range(n_iter), desc=f"[Sampling progress] Chain {position+1}", position=position, leave=False, colour='green') else: t_range = range(n_iter) n_accepted = 0 samples = [] distances = [] sum_stats = [] # Initialize chain thetas_current, distance, sim_sumstat = self._draw_initial_sample(seed) samples.append(thetas_current) distances.append(distance) sum_stats.append(sim_sumstat) # Compute current logpdf # (only needs to be re-computed if proposal is accepted) logpdf_current = self._compute_logpdf(thetas_current) # Metropolis algorithm for i in t_range: # Advance PRNG state next_gen = advance_PRNG_state(seed, i) # Draw proposal thetas_proposal = self._draw_proposal(thetas_current, next_gen) # Compute proposal logpdf logpdf_proposal = self._compute_logpdf(thetas_proposal) # Compute acceptance probability alpha = self._acceptance_prob(logpdf_proposal, logpdf_current) # Draw a uniform random number u = self._draw_uniform(next_gen) # Metropolis reject/accept step if u < alpha: # Simulator call to generate simulated data sim = self._simulator(*thetas_proposal) # Calculate summary statistics of simulated data if isinstance(sim, tuple): sim_sumstat = self._stat_calc(*sim) else: sim_sumstat = self._stat_calc(sim) # Compute distance between summary statistics distance = self.distance(sim_sumstat, self._obs_sumstat, weight=self._stat_weight, scale=self._stat_scale ) # ABC reject/accept step if distance <= self._epsilon: thetas_current = thetas_proposal # Increase accepted counter n_accepted += 1 # Re-compute current logpdf logpdf_current = self._compute_logpdf(thetas_current) # Update chain samples.append(thetas_current) distances.append(distance) sum_stats.append(sim_sumstat) if self._log: t_range.clear() # Remove burn-in samples samples = samples[self._burn:] distances = distances[self._burn:] sum_stats = sum_stats[self._burn:] return [samples, distances, sum_stats, n_accepted] def _acceptance_prob(self, logpdf_proposal, logpdf_current): """Compute Metropolis acceptance probability Since the proposal density is symmetric, the ratio of proposal densities in the Metropolis-Hastings algorithm cancels, and we are left with what is known as the Metropolis algorithm where we only need to evaluate the ratio of the prior densities. """ # Compute Metropolis ratio r = np.exp(logpdf_proposal - logpdf_current) # Compute acceptance probability alpha = np.minimum(1., r) return alpha def _draw_initial_sample(self, seed): """Draw first posterior sample from prior via Rejection ABC algorithm.""" sample = None n_sims = 0 while sample is None: # Advance PRNG state next_gen = advance_PRNG_state(seed, n_sims) # Draw proposal parameters from priors thetas = [prior.rvs(seed=next_gen) for prior in self._priors] # Simulator call to generate simulated data sim = self._simulator(*thetas) # Calculate summary statistics of simulated data if isinstance(sim, tuple): sim_sumstat = self._stat_calc(*sim) else: sim_sumstat = self._stat_calc(sim) # Increase simulations counter n_sims += 1 # Compute distance between summary statistics distance = self.distance(sim_sumstat, self._obs_sumstat, weight=self._stat_weight, scale=self._stat_scale ) # ABC reject/accept step if distance <= self._epsilon: sample = thetas return sample, distance, sim_sumstat def _draw_proposal(self, thetas_current, next_gen): """Suggest new positions""" # Gaussian proposal distribution (which is symmetric) proposal_distr = stats.norm( loc=thetas_current, scale=self._prop_scale, ) # Draw proposal parameters thetas_proposal = proposal_distr.rvs( random_state=self._rng(seed=next_gen) ).tolist() if not isinstance(thetas_proposal, list): thetas_proposal = [thetas_proposal] return thetas_proposal def _compute_logpdf(self, thetas): """ Compute the joint prior log density for thetas. In case of multiple parameters, the joint prior logpdf is computed Note that where the proposal log density needs to be computed for each new proposal, the current log density only needs to be (re-)computed if a proposal is accepted. """ ''' logpdf = np.array( [prior_logpdf(theta)] for prior_logpdf, theta in zip(self._prior_logpdfs, thetas) ).prod() ''' logpdf = np.array([prior_logpdf(theta) for prior_logpdf, theta in zip(self._prior_logpdfs, thetas)] ).prod() return logpdf def _draw_uniform(self, next_gen): """Draw a uniform random number""" return self._uniform_distr.rvs(random_state=self._rng(seed=next_gen)) def _tune_scale_table(self): """Proposal scale lookup table. Function retrieved from PyMC3 source code. Tunes the scaling parameter for the proposal distribution according to the acceptance rate over the last tune_interval: Rate Variance adaptation ---- ------------------- <0.001 x 0.1 <0.05 x 0.5 <0.2 x 0.9 >0.5 x 1.1 >0.75 x 2 >0.95 x 10 """ if acc_rate < 0.001: # reduce by 90 percent return scale * 0.1 elif acc_rate < 0.05: # reduce by 50 percent return scale * 0.5 elif acc_rate < 0.2: # reduce by ten percent return scale * 0.9 elif acc_rate > 0.95: # increase by factor of ten return scale * 10.0 elif acc_rate > 0.75: # increase by double return scale * 2.0 elif acc_rate > 0.5: # increase by ten percent return scale * 1.1 return scale @property def prop_scale(self): try: return self._prop_scale except AttributeError: msg = ("stat_scale inaccessible. A call to a method where the" "attribute is set must be carried out first.") raise MissingParameter(msg)
[docs] def journal(self): """ Create and return an instance of Journal class. Returns ------- """ if not self._done_sampling: msg = ("In order to access the journal, the " "sample method must be run in advance.") raise SamplingNotPerformed(msg) if self._log: self.logger.info(f"Write results to journal.") accept_ratio = self._n_accepted / self._n_samples print(f"{accept_ratio=}") journal = Journal() journal._write_to_journal( inference_scheme=self._inference_scheme, observation=self._obs_data, simulator=self._simulator, stat_calc=self._stat_calc, priors=self._priors, n_samples=self._n_samples, n_chains=1, n_sims=1, samples=self._samples, accept_ratio=accept_ratio, epsilon=self._epsilon, quantile=self._quantile ) return journal
if __name__ == "__main__": import arviz as az import matplotlib.pyplot as plt import pylfi import scipy.stats as stats import seaborn as sns N = 1000 mu_true = 163 sigma_true = 15 true_parameter_values = [mu_true, sigma_true] # likelihood = stats.norm(loc=mu_true, scale=sigma_true) likelihood = pylfi.Prior('norm', loc=mu_true, scale=sigma_true, name='likelihood' ) obs_data = likelihood.rvs(size=N, seed=30) # simulator model def gaussian_model(mu, sigma, seed=43, n_samples=1000): """Simulator model""" # sim = stats.norm(loc=mu, scale=sigma).rvs(size=n_samples) model = pylfi.Prior('norm', loc=mu, scale=sigma, name='model') sim = model.rvs(size=n_samples, seed=seed) return sim # summary stats def summary_calculator(data): """returns summary statistic(s)""" sumstat = np.array([np.mean(data), np.std(data)]) # sumstat = np.mean(sim) return sumstat s_obs = summary_calculator(obs_data) # print(f"{s_obs=}") # priors mu = pylfi.Prior('norm', loc=165, scale=2, name='mu', tex=r'$\mu$') sigma = pylfi.Prior('norm', loc=17, scale=4, name='sigma', tex=r'$\sigma$') # mu = pylfi.Prior('uniform', loc=160, scale=10, name='mu') # sigma = pylfi.Prior('uniform', loc=10, scale=10, name='sigma') priors = [mu, sigma] # initialize sampler sampler = MCMCABC(obs_data, gaussian_model, summary_calculator, priors, log=True ) sampler.pilot_study(3000, quantile=0.1, stat_scale="mad", n_jobs=4, seed=4 ) journal = sampler.sample(4000, use_pilot=True, chains=4, burn=1000, seed=42, return_journal=True ) df = journal.df print(df["mu"].mean()) print(df["sigma"].mean()) sns.jointplot( data=df, x="mu", y="sigma", kind="kde", fill=True ) journal = sampler.reg_adjust( method='loclinear', transform=True, kernel='epkov', return_journal=True ) df = journal.df print(df["mu"].mean()) print(df["sigma"].mean()) sns.jointplot( data=df, x="mu", y="sigma", kind="kde", fill=True ) plt.show()