Source code for pylfi.inferences.abc_base

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import copy
from abc import abstractmethod
from multiprocessing import Lock, RLock

import numpy as np
import scipy.stats as stats
from pathos.pools import ProcessPool
from pylfi.utils import (advance_PRNG_state, check_and_set_jobs,
                         distribute_workload, generate_seed_sequence,
                         setup_logger)
from tqdm.auto import tqdm

VALID_STAT_SCALES = ["sd", "mad"]
VALID_KERNELS = ["gaussian", "epkov"]


class PilotStudyMissing(Exception):
    r"""Failed attempt at accessing pilot study.

    A call to the pilot_study method must be carried out first.
    """
    pass


class SamplingNotPerformed(Exception):
    r"""Failed attempt at accessing posterior samples.

    A call to the sample method must be carried out first.
    """
    pass


class MissingParameter(Exception):
    r"""Failed attempt at accessing hyperparameter value.

    A call to a method where the attribute is set must
    be carried out first.
    """
    pass


[docs]class ABCBase: r"""Base class for Approximate Bayesian Computation algorithms. Parameters ---------- observation : :term:`array_like` The observed data :math:`y_\mathrm{obs}`. simulator : `callable` The simulator model parametrized by unknown model parameters :math:`\theta`. Any regular ``Python`` `callable`, i.e., function or class with ``__call__`` method can be used. stat_calc : `callable` Summary statistics calculator. Must take the return from ``simulator`` as arguments. Any regular ``Python`` `callable`, i.e., function or class with ``__call__`` method can be used. priors : `list` of `.Prior` List containing the priors for the unknown model parameters. The order must be the same as the order of positional arguments in ``simulator``. inference_scheme : `str` Name of inference scheme. To be passed by sub-class in call to base constructor. log : `bool` Whether logger should be displayed or not. Default: ``True``. """ def __init__( self, observation, simulator, stat_calc, priors, inference_scheme, log=True ): self._obs_data = observation self._stat_calc = stat_calc self._simulator = simulator self._priors = priors self._log = log self._prior_logpdfs = [prior.logpdf for prior in self._priors] self._rng = np.random.default_rng self._uniform_distr = stats.uniform(loc=0, scale=1) if isinstance(self._obs_data, tuple): self._obs_sumstat = np.array(self._stat_calc(*self._obs_data)) else: self._obs_sumstat = np.array(self._stat_calc(self._obs_data)) self._inference_scheme = inference_scheme if self._log: self.logger = setup_logger(self.__class__.__name__) self.logger.info(f"Initialize {self._inference_scheme} sampler.") self._done_pilot_study = False self._done_tuning = False self._done_sampling = False
[docs] @abstractmethod def sample(self): r"""To be overwritten by sub-class: should implement sampling from inference scheme and optionally return `.Journal` via the boolean ``return_journal`` keyword argument. The sample method needs specific attribute names. See `.RejABC` class for an example. """ raise NotImplementedError
[docs] @abstractmethod def journal(self): r"""To be overwritten by sub-class: method to write and return `.Journal` instance. See `.RejABC` class for an example. Returns ------- `.Journal` Journal with results and information created by the run of inference schemes. """ raise NotImplementedError
[docs] def distance(self, s_sim, s_obs, weight=1., scale=1.): r"""Weighted Euclidean distance. Computes the weighted Euclidean distance between two 1-D arrays of summary statistics. The ``weight`` parameter can be set to weight the importance of a summary statistic, whereas the ``scale`` parameter can be used to scale particular summary statistics in order to avoid dominance. Parameters ---------- s_sim : {`int`, `float`}, :term:`array_like` Simulated summary statistic(s). s_obs : {`int`, `float`}, :term:`array_like` Observed summary statistic(s). weight : {`int`, `float`}, `numpy.ndarray`, optional Importance weight(s) of summary statistic(s). Should sum to 1. Default: ``1.0``. scale : {`int`, `float`}, `numpy.ndarray`, optional Scale weight(s) of summary statistic(s). Default: ``1.0``. Returns ------- distance : `float` The weighted Euclidean distance between simulated and observed summary statistics. """ if isinstance(s_sim, (int, float)): s_sim = [s_sim] if isinstance(s_obs, (int, float)): s_obs = [s_obs] s_sim = np.asarray(s_sim, dtype=float) s_obs = np.asarray(s_obs, dtype=float) q = np.sqrt(weight) * (s_sim - s_obs) / scale dist = np.linalg.norm(q, ord=2) return dist
[docs] def batches(self, n_samples, n_jobs, seed, force_equal=False): r"""Divide and conquer. Divide the number of samples, ``n_samples``, between ``n_jobs`` workers. If the ABC algorithm uses Markov chains, then ``n_jobs`` is the number chains. If ``n_jobs`` exceeds the number of available CPUs found by `Pathos <https://pathos.readthedocs.io/en/latest/pathos.html>`_ (this might include hardware threads), ``n_jobs`` is set to the number found by `Pathos <https://pathos.readthedocs.io/en/latest/pathos.html>`_. The method also creates a seed for each worker based on the input seed. This ensures reproducibility if wanted. If ``seed=None``, the results will be stochastic between each run of the sampler. When using Markov chains, the ``force_equal`` keyword must be set to ``True``, in order to enforce equal length of chains. The ``n_samples`` that results in equal chain lengths will be found internally and returned. The corrected ``n_samples`` should be used downstream if needed. Parameters ---------- n_samples : `int` Number of (posterior) samples to draw. n_jobs : `int` Number of processes (workers). If ``n_jobs=-1``, then ``n_jobs`` is set to half of the CPUs found by `Pathos <https://pathos.readthedocs.io/en/latest/pathos.html>`_ (we assume half of the CPUs are hardware threads only and ignore those). seed : `int` User-provided seed. force_equal : `bool`, optional If ``True``, ``n_samples`` will be adjusted to enforce equal number of tasks for each worker. Default: ``False``. Returns ------- n_samples : `int` Possibly corrected number of samples (posterior draws). n_jobs : `int` Possibly corrected number of processes (workers). tasks : `int` The number of tasks for each parallel pool worker. seeds : `list` Initial states for parallel pool workers. """ if self._log: n_jobs = check_and_set_jobs(n_jobs, self.logger) else: n_jobs = check_and_set_jobs(n_jobs) seeds = generate_seed_sequence(seed, n_jobs) if self._log: n_samples, tasks = distribute_workload(n_samples, n_jobs, force_equal=force_equal, logger=self.logger ) else: n_samples, tasks = distribute_workload(n_samples, n_jobs, force_equal=force_equal ) return n_samples, n_jobs, tasks, seeds
[docs] def pilot_study( self, n_sim=500, quantile=None, stat_scale=None, stat_weight=1., n_jobs=-1, seed=None, ): r"""Perform pilot study. The pilot study runs the simulator ``n_sim`` times and sets the threshold parameter ``epsilon`` automatically as the q-quantile of simulated distances from the prior predictive distribution. For instance, the 0.5-quantile (the median) will give a threshold that accepts 50% of the simulations. The pilot study can also be used to provide an estimate of the ``stat_scale`` parameter, used in the weighted Euclidean distance, from the prior predictive distribution by passing the ``stat_scale`` keyword as ``sd`` or ``mad``. The ``stat_scale`` parameter is used to avoid dominance of particular summary statistics. ``stat_scale=sd`` scales the summary statistics according to their standard deviation (SD) estimated from the prior predictive samples, and ``stat_scale=mad`` according to their median absolute deviation (MAD). It is important to note that if more than 50% of the prior predictive samples for a particular summary statistic have identical values, MAD will equal zero. In this case, the logger will raise a warning and the scale for the particular summary statistic will be set to SD instead. If there is no variability at all, the scale will be set to 1. It is recommended to check that there are not too many identical samples before setting the scale, to avoid surprises. Parameters ---------- n_sims : `int`, optional Number of simulator runs. quantile : `int` Quantile of the Euclidean distances. stat_scale : `str`, optional Summary statistics scale to estimate; can be set as either ``sd`` (standard deviation) or ``mad`` (median absolute deviation). If ``None``, scale is set to ``1.0``. Default: ``None``. stat_weight : {`int`, `float`}, `numpy.ndarray`, optional Importance weights of summary statistics. Default: ``1.0``. n_jobs : `int`, optional Number of processes (workers). If ``n_jobs=-1``, then ``n_jobs`` is set to half of the CPUs found by `Pathos <https://pathos.readthedocs.io/en/latest/pathos.html>`_ (we assume half of the CPUs are hardware threads only and ignore those). Default: ``-1``. seed : `int` User-provided seed. Will be used to generate seed for each worker. Default: ``None``. """ if quantile is None: msg = ("quantile must be passed. The pilot study sets the " "accept/reject threshold as the provided q-quantile of the " "distances.") raise ValueError(msg) if not 0 < quantile <= 1.0: msg = ("quantile must be a value in (0, 1].") raise ValueError(msg) if isinstance(stat_scale, str): if stat_scale not in VALID_STAT_SCALES: msg = ("scale can be set as either sd (standard deviation) or " "mad (median absolute deviation). If None, it defaults " "to 1.") raise ValueError(msg) if self._log: msg = f"Run pilot study to estimate:\n" msg += f"* epsilon as the {quantile}-quantile of the distances" if stat_scale is not None: msg += f"\n* summary statistics scale ({stat_scale.upper()}) " msg += f"from the prior predictive distribution" self.logger.info(msg) self._quantile = quantile _, n_jobs, tasks, seeds = self.batches(n_sim, n_jobs, seed) if self._log: # for managing output contention initializer = tqdm.set_lock(RLock()) initargs = (Lock(),) else: initializer = None initargs = None if n_jobs == 1: sum_stats = self._pilot_study(tasks[0], 0, seeds[0]) else: with ProcessPool(n_jobs) as pool: results = pool.map(self._pilot_study, tasks, range(n_jobs), seeds, initializer=initializer, initargs=initargs ) sum_stats = np.concatenate(results, axis=0) if stat_scale is None: self._stat_scale = 1. elif stat_scale == "sd": self._stat_scale = self._sd(sum_stats) if 0 in self._stat_scale: idx = np.where(self._stat_scale == 0) if self._log: msg = (f"Encounterd SD = 0 for summary statistic at index:" f" {idx[0]}. Setting this to 1.") self.logger.warn(msg) self._stat_scale[idx] = 1. elif stat_scale == "mad": self._stat_scale = self._mad(sum_stats) if 0 in self._stat_scale: # Check if MAD=0 for some sum stats idx = np.where(self._stat_scale == 0) if self._log: msg = (f"Encounterd MAD = 0 for summary statistic at index:" f" {idx[0]}. Setting this to SD " "(or 1 if also SD = 0).") self.logger.warn(msg) backup_stat_scale = sum_stats.std(axis=0) self._stat_scale[idx] = backup_stat_scale[idx] if 0 in self._stat_scale: # Ensure that replaced sum stat scales is not SD=0 idx = np.where(self._stat_scale == 0) self._stat_scale[idx] = 1. distances = [] for sum_stat in sum_stats: distance = self.distance(sum_stat, self._obs_sumstat, weight=stat_weight, scale=self._stat_scale ) distances.append(distance) distances = np.array(distances, dtype=np.float64) distances[distances == np.inf] = np.NaN self._epsilon = np.nanquantile(distances, self._quantile) self._done_pilot_study = True if self._log: self.logger.info(f"epsilon = {self._epsilon}") self.logger.info(f"stat_scale = {self._stat_scale}")
def _sd(self, a, axis=0): """Standard deviation from the mean""" a = np.asarray(a, dtype=float) #a = a.astype(np.float64) a[a == np.inf] = np.NaN sd = np.sqrt(np.nanmean( np.abs(a - np.nanmean(a, axis=axis))**2, axis=axis)) return sd def _mad(self, a, axis=0): """Median absolute deviation (MAD)""" a = a.astype(np.float64) mad = np.median(np.absolute( a - np.median(a, axis=axis)), axis=axis) return mad def _pilot_study(self, n_sims, position, seed): """Pilot study simulator calls and calculations.""" if self._log: t_range = tqdm(range(n_sims), desc=f"[Simulation progress] CPU {position+1}", position=position, leave=False, colour='green') else: t_range = range(n_sims) sum_stats = [] for i in t_range: next_gen = advance_PRNG_state(seed, i) thetas = [prior.rvs(seed=next_gen) for prior in self._priors] sim = self._simulator(*thetas) if isinstance(sim, tuple): sim_sumstat = self._stat_calc(*sim) else: sim_sumstat = self._stat_calc(sim) sum_stats.append(sim_sumstat) if self._log: t_range.clear() return sum_stats
[docs] def reg_adjust( self, method="loclinear", transform=True, kernel='epkov', return_journal=False ): r"""Post-sampling regression adjustment. Regresses summary statistics on the obtained posterior samples, and corrects the posterior samples for the trend in the relationship. Implementation based on Beaumont et al. (2002) and Blum (2017). References ---------- M. Beaumont, W. Zhang and D.Balding. "Approximate Bayesian Computation in Population Genetics" GENETICS December 1, 2002 vol. 162 no. 4 2025-2035 M. Blum. "Regression approaches for approximate bayesian computation". arXiv preprint arXiv:1707.01254, 2017 - arxiv.org Parameters ---------- method : `str`, optional The regression method to use: * ``linear``: ordinary least squares regression; * ``loclinar``: local linear regression (default). transform : `bool`, optional If ``True``, parameters are regressed on a logarithm scale. A log transformation will both stabilize the variance of the regression model and make it more homoscedastic. Note that the parameters must be positive in order to do a log transform. Default: ``True``. kernel : `str`, optional The smoothing kernel function to use in local regression: * ``gaussian``: Gaussian smoothing kernel; * ``epkov``: Epanechnikov smoothing kernel. return_journal : :obj:`bool`, optional If ``True``, `.Journal` is returned. Default: ``False``. """ if not self._done_sampling: msg = ("In order to perform regression adjustment, the " "sample method must be run in advance.") raise SamplingNotPerformed(msg) if kernel not in VALID_KERNELS: msg = ("kernel must be passed as either 'gaussian' or 'epkov' " "(Epanechnikov).") raise ValueError(msg) self._kernel = kernel self._transform = transform if self._log: self.logger.info(f"Perform {method} regression adjustment.") self._factor = self._stat_weight / self._stat_scale ssim = copy.deepcopy(self._sum_stats) * self._factor sobs = copy.deepcopy(self._obs_sumstat) * self._factor X = ssim - sobs if X.ndim == 1: X = X.reshape(-1, 1) # augment with column of ones X = np.c_[np.ones(X.shape[0]), X] # target thetas = copy.deepcopy(self._original_samples) if self._transform: thetas = np.log(thetas) if method == "linear": self._lra(X, thetas) elif method == "loclinear": self._loclra(X, thetas) else: raise ValueError("Unrecognized regression method.") if return_journal: return self.journal()
def _gaussian_kernel(self, d, h): """Gaussian smoothing kernel function""" return np.exp(-0.5 * (d * d) / (h * h)) def _epkov_kernel(self, d, h): """Epanechnikov smoothing kernel function""" return (1.0 - (d * d) / (h * h)) * (d < h) def _lra(self, X, y): """Linear regression adjustment""" # Compute coefficients (pinv = pseudo-inverse) coef = np.linalg.pinv(X.T @ X) @ X.T @ y alpha = coef[0] beta = coef[1:] # Adjust posterior samples s_sim = self._sum_stats * self._factor s_obs = self._obs_sumstat * self._factor correction = (s_sim - s_obs) @ beta if self._transform: self._samples = np.exp(np.log(self._original_samples) - correction) else: self._samples = self._original_samples - correction def _loclra(self, X, y): """Local linear regression adjustment""" # Compute weights if self._kernel == "gaussian": weights = np.array([self._gaussian_kernel(d, self._epsilon) for d in self._distances]) elif self._kernel == "epkov": weights = np.array([self._epkov_kernel(d, self._epsilon) for d in self._distances]) else: raise ValueError(f'Unrecognized kernel') # Weight matrix W = np.diag(weights) # Compute coefficients (pinv = pseudo-inverse) coef = np.linalg.pinv(X.T @ (W @ X)) @ X.T @ (W @ y) alpha = coef[0] beta = coef[1:] # Adjust posterior samples s_sim = self._sum_stats * self._factor s_obs = self._obs_sumstat * self._factor correction = (s_sim - s_obs) @ beta if self._transform: self._samples = np.exp(np.log(self._original_samples) - correction) else: self._samples = self._original_samples - correction @property def epsilon(self): r"""Tolerance parameter. Returns ------- `float` """ try: return self._epsilon except AttributeError: msg = ("epsilon inaccessible. A call to a method where the" "attribute is set must be carried out first.") raise MissingParameter(msg) @property def quantile(self): r"""The chosen q-quantile. Returns ------- `float` """ try: return self._quantile except AttributeError: msg = ("quantile inaccessible. A call to a method where the" "attribute is set must be carried out first.") raise MissingParameter(msg) @property def stat_scale(self): r"""The summary statistic scales. Returns ------- `numpy.ndarray` """ try: return self._stat_scale except AttributeError: msg = ("stat_scale inaccessible. A call to a method where the" "attribute is set must be carried out first.") raise MissingParameter(msg)