import warnings
from dataclasses import dataclass, field
from functools import cached_property
from typing import Any, Dict, Union
import numpy as np
import pandas as pd
from estimagic.differentiation.derivatives import first_derivative, second_derivative
from estimagic.exceptions import InvalidFunctionError, NotAvailableError
from estimagic.inference.ml_covs import (
cov_cluster_robust,
cov_hessian,
cov_jacobian,
cov_robust,
cov_strata_robust,
)
from estimagic.inference.shared import (
FreeParams,
calculate_ci,
calculate_estimation_summary,
calculate_free_estimates,
calculate_p_values,
calculate_summary_data_estimation,
get_derivative_case,
transform_covariance,
transform_free_cov_to_cov,
transform_free_values_to_params_tree,
)
from estimagic.optimization.optimize import maximize
from estimagic.optimization.optimize_result import OptimizeResult
from estimagic.parameters.block_trees import block_tree_to_matrix, matrix_to_block_tree
from estimagic.parameters.conversion import Converter, get_converter
from estimagic.parameters.space_conversion import InternalParams
from estimagic.shared.check_option_dicts import (
check_numdiff_options,
check_optimization_options,
)
from estimagic.utilities import get_rng, to_pickle
[docs]def estimate_ml(
loglike,
params,
optimize_options,
*,
lower_bounds=None,
upper_bounds=None,
constraints=None,
logging=False,
log_options=None,
loglike_kwargs=None,
numdiff_options=None,
jacobian=None,
jacobian_kwargs=None,
hessian=None,
hessian_kwargs=None,
design_info=None,
):
"""Do a maximum likelihood (ml) estimation.
This is a high level interface of our lower level functions for maximization,
numerical differentiation and inference. It does the full workflow for maximum
likelihood estimation with just one function call.
While we have good defaults, you can still configure each aspect of each step
via the optional arguments of this function. If you find it easier to do the
maximization separately, you can do so and just provide the optimal parameters as
``params`` and set ``optimize_options=False``
Args:
loglike (callable): Likelihood function that takes a params (and potentially
other keyword arguments) and returns a dictionary that has at least the
entries "value" (a scalar float) and "contributions" (a 1d numpy array or
pytree) with the log likelihood contribution per individual.
params (pytree): A pytree containing the estimated or start parameters of the
likelihood model. If the supplied parameters are estimated parameters, set
optimize_options to False. Pytrees can be a numpy array, a pandas Series, a
DataFrame with "value" column, a float and any kind of (nested) dictionary
or list containing these elements. See :ref:`params` for examples.
optimize_options (dict, str or False): Keyword arguments that govern the
numerical optimization. Valid entries are all arguments of
:func:`~estimagic.optimization.optimize.minimize` except for those that are
passed explicilty to ``estimate_ml``. If you pass False as optimize_options
you signal that ``params`` are already the optimal parameters and no
numerical optimization is needed. If you pass a str as optimize_options it
is used as the ``algorithm`` option.
lower_bounds (pytree): A pytree with the same structure as params with lower
bounds for the parameters. Can be ``-np.inf`` for parameters with no lower
bound.
upper_bounds (pytree): As lower_bounds. Can be ``np.inf`` for parameters with
no upper bound.
constraints (list, dict): List with constraint dictionaries or single dict.
See :ref:`constraints`.
logging (pathlib.Path, str or False): Path to sqlite3 file (which typically has
the file extension ``.db``. If the file does not exist, it will be created.
The dashboard can only be used when logging is used.
log_options (dict): Additional keyword arguments to configure the logging.
- "fast_logging": A boolean that determines if "unsafe" settings are used
to speed up write processes to the database. This should only be used for
very short running criterion functions where the main purpose of the log
is a real-time dashboard and it would not be catastrophic to get a
corrupted database in case of a sudden system shutdown. If one evaluation
of the criterion function (and gradient if applicable) takes more than
100 ms, the logging overhead is negligible.
- "if_table_exists": (str) One of "extend", "replace", "raise". What to
do if the tables we want to write to already exist. Default "extend".
- "if_database_exists": (str): One of "extend", "replace", "raise". What to
do if the database we want to write to already exists. Default "extend".
loglike_kwargs (dict): Additional keyword arguments for loglike.
numdiff_options (dict): Keyword arguments for the calculation of numerical
derivatives for the calculation of standard errors. See
:ref:`first_derivative` for details.
jacobian (callable or None): A function that takes ``params`` and potentially
other keyword arguments and returns the jacobian of loglike["contributions"]
with respect to the params. Note that you only need to pass a Jacobian
function if you have a closed form Jacobian. If you pass None, a numerical
Jacobian will be calculated.
jacobian_kwargs (dict): Additional keyword arguments for the Jacobian function.
hessian (callable or None or False): A function that takes ``params`` and
potentially other keyword arguments and returns the Hessian of
loglike["value"] with respect to the params. If you pass None, a numerical
Hessian will be calculated. If you pass ``False``, you signal that no
Hessian should be calculated. Thus, no result that requires the Hessian will
be calculated.
hessian_kwargs (dict): Additional keyword arguments for the Hessian function.
design_info (pandas.DataFrame): DataFrame with one row per observation that
contains some or all of the variables "psu" (primary sampling unit),
"strata" and "fpc" (finite population corrector). See
:ref:`robust_likelihood_inference` for details.
Returns:
LikelihoodResult: A LikelihoodResult object.
"""
# ==================================================================================
# Check and process inputs
# ==================================================================================
is_optimized = optimize_options is False
if not is_optimized:
if isinstance(optimize_options, str):
optimize_options = {"algorithm": optimize_options}
check_optimization_options(
optimize_options,
usage="estimate_ml",
algorithm_mandatory=True,
)
jac_case = get_derivative_case(jacobian)
hess_case = get_derivative_case(hessian)
check_numdiff_options(numdiff_options, "estimate_ml")
numdiff_options = {} if numdiff_options in (None, False) else numdiff_options
loglike_kwargs = {} if loglike_kwargs is None else loglike_kwargs
constraints = [] if constraints is None else constraints
jacobian_kwargs = {} if jacobian_kwargs is None else jacobian_kwargs
hessian_kwargs = {} if hessian_kwargs is None else hessian_kwargs
# ==================================================================================
# Calculate estimates via maximization (if necessary)
# ==================================================================================
if is_optimized:
estimates = params
opt_res = None
else:
opt_res = maximize(
criterion=loglike,
criterion_kwargs=loglike_kwargs,
params=params,
lower_bounds=lower_bounds,
upper_bounds=upper_bounds,
constraints=constraints,
logging=logging,
log_options=log_options,
**optimize_options,
)
estimates = opt_res.params
# ==================================================================================
# Do first function evaluations at estimated parameters
# ==================================================================================
try:
loglike_eval = loglike(estimates, **loglike_kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except Exception as e:
msg = "Error while evaluating loglike at estimated params."
raise InvalidFunctionError(msg) from e
if callable(jacobian):
try:
jacobian_eval = jacobian(estimates, **jacobian_kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except Exception as e:
msg = "Error while evaluating closed form jacobian at estimated params."
raise InvalidFunctionError(msg) from e
else:
jacobian_eval = None
if callable(hessian):
try:
hessian_eval = hessian(estimates, **hessian_kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except Exception as e:
msg = "Error while evaluating closed form hessian at estimated params."
raise InvalidFunctionError(msg) from e
else:
hessian_eval = None
# ==================================================================================
# Get the converter for params and function outputs
# ==================================================================================
converter, internal_estimates = get_converter(
params=estimates,
constraints=constraints,
lower_bounds=lower_bounds,
upper_bounds=upper_bounds,
func_eval=loglike_eval,
primary_key="contributions",
scaling=False,
scaling_options=None,
derivative_eval=jacobian_eval,
)
# ==================================================================================
# Calculate internal jacobian
# ==================================================================================
if jac_case == "closed-form":
int_jac = converter.derivative_to_internal(
jacobian_eval, internal_estimates.values
)
elif jac_case == "numerical":
def func(x):
p = converter.params_from_internal(x)
loglike_eval = loglike(p, **loglike_kwargs)["contributions"]
out = converter.func_to_internal(loglike_eval)
return out
jac_res = first_derivative(
func=func,
params=internal_estimates.values,
lower_bounds=internal_estimates.lower_bounds,
upper_bounds=internal_estimates.upper_bounds,
**numdiff_options,
)
int_jac = jac_res["derivative"]
else:
int_jac = None
if constraints in [None, []] and jacobian_eval is None and int_jac is not None:
loglike_contribs = loglike_eval
if isinstance(loglike_contribs, dict) and "contributions" in loglike_contribs:
loglike_contribs = loglike_contribs["contributions"]
jacobian_eval = matrix_to_block_tree(
int_jac,
outer_tree=loglike_contribs,
inner_tree=estimates,
)
if jacobian_eval is None:
_no_jac_reason = (
"no closed form jacobian was provided and there are constraints"
)
else:
_no_jac_reason = None
# ==================================================================================
# Calculate internal Hessian
# ==================================================================================
if hess_case == "skip":
int_hess = None
elif hess_case == "numerical":
def func(x):
p = converter.params_from_internal(x)
loglike_eval = loglike(p, **loglike_kwargs)["value"]
out = converter.func_to_internal(loglike_eval)
return out
hess_res = second_derivative(
func=func,
params=internal_estimates.values,
lower_bounds=internal_estimates.lower_bounds,
upper_bounds=internal_estimates.upper_bounds,
**numdiff_options,
)
int_hess = hess_res["derivative"]
elif hess_case == "closed-form" and constraints:
raise NotImplementedError(
"Closed-form Hessians are not yet compatible with constraints."
)
elif hess_case == "closed-form":
int_hess = block_tree_to_matrix(
hessian_eval,
outer_tree=params,
inner_tree=params,
)
else:
raise ValueError()
if constraints in [None, []] and hessian_eval is None and int_hess is not None:
hessian_eval = matrix_to_block_tree(
int_hess,
outer_tree=params,
inner_tree=params,
)
if hessian_eval is None:
if hess_case == "skip":
_no_hess_reason = "the hessian calculation was explicitly skipped."
else:
_no_hess_reason = (
"no closed form hessian was provided and there are constraints"
)
else:
_no_hess_reason = None
# ==================================================================================
# create a LikelihoodResult object
# ==================================================================================
free_estimates = calculate_free_estimates(estimates, internal_estimates)
res = LikelihoodResult(
_params=estimates,
_converter=converter,
_optimize_result=opt_res,
_jacobian=jacobian_eval,
_no_jacobian_reason=_no_jac_reason,
_hessian=hessian_eval,
_no_hessian_reason=_no_hess_reason,
_internal_jacobian=int_jac,
_internal_hessian=int_hess,
_design_info=design_info,
_internal_estimates=internal_estimates,
_free_estimates=free_estimates,
_has_constraints=constraints not in [None, []],
)
return res
[docs]@dataclass
class LikelihoodResult:
"""Likelihood estimation results object."""
_params: Any
_internal_estimates: InternalParams
_free_estimates: FreeParams
_converter: Converter
_has_constraints: bool
_optimize_result: Union[OptimizeResult, None] = None
_jacobian: Any = None
_no_jacobian_reason: Union[str, None] = None
_hessian: Any = None
_no_hessian_reason: Union[str, None] = None
_internal_jacobian: Union[np.ndarray, None] = None
_internal_hessian: Union[np.ndarray, None] = None
_design_info: Union[pd.DataFrame, None] = None
_cache: Dict = field(default_factory=dict)
def __post_init__(self):
if self._internal_jacobian is None and self._internal_hessian is None:
raise ValueError(
"At least one of _internal_jacobian or _internal_hessian must be "
"not None."
)
elif self._internal_jacobian is None:
valid_methods = ["hessian"]
elif self._internal_hessian is None:
valid_methods = ["jacobian"]
else:
valid_methods = ["jacobian", "hessian", "robust"]
if self._design_info is not None:
if "psu" in self._design_info:
valid_methods.append("cluster_robust")
if {"strata", "psu", "fpc"}.issubset(self._design_info):
valid_methods.append("strata_robust")
self._valid_methods = set(valid_methods)
def _get_free_cov(
self,
method,
n_samples,
bounds_handling,
seed,
):
if method not in self._valid_methods:
msg = f"Invalid method: {method}. Valid methods are {self._valid_methods}."
raise ValueError(msg)
args = (method, n_samples, bounds_handling, seed)
is_cached = args in self._cache
if is_cached:
free_cov = self._cache[args]
else:
free_cov = _calculate_free_cov_ml(
method=method,
internal_estimates=self._internal_estimates,
converter=self._converter,
internal_jacobian=self._internal_jacobian,
internal_hessian=self._internal_hessian,
n_samples=n_samples,
design_info=self._design_info,
bounds_handling=bounds_handling,
seed=seed,
)
if seed is not None:
self._cache[args] = free_cov
elif self._converter.has_transforming_constraints:
msg = (
"seed is set to None and constraints are transforming. This leads "
"to randomness in the result. To avoid random behavior, choose a "
"non-None seed."
)
warnings.warn(msg)
return free_cov
@property
def params(self):
return self._params
@property
def optimize_result(self):
return self._optimize_result
@property
def jacobian(self):
if self._jacobian is None:
raise NotAvailableError(
f"No jacobian is available because {self._no_jacobian_reason}."
)
return self._jacobian
@property
def hessian(self):
if self._hessian is None:
raise NotAvailableError(
f"No hessian is available because {self._no_hessian_reason}."
)
return self._hessian
@cached_property
def _se(self):
return self.se()
@cached_property
def _cov(self):
return self.cov()
@cached_property
def _summary(self):
return self.summary()
@cached_property
def _ci(self):
return self.ci()
@cached_property
def _p_values(self):
return self.p_values()
[docs] def se(
self,
method="jacobian",
n_samples=10_000,
bounds_handling="clip",
seed=None,
):
"""Calculate standard errors.
Args:
method (str): One of "jacobian", "hessian", "robust", "cluster_robust",
"strata_robust". Default "jacobian". "cluster_robust" is only available
if design_info containts a columns called "psu" that identifies the
primary sampling unit. "strata_robust" is only available if the columns
"strata", "fpc" and "psu" are in design_info.
n_samples (int): Number of samples used to transform the covariance matrix
of the internal parameter vector into the covariance matrix of the
external parameters. For background information about internal and
external params see :ref:`implementation_of_constraints`. This is only
used if you are using constraints.
bounds_handling (str): One of "clip", "raise", "ignore". Determines how
bounds are handled. If "clip", confidence intervals are clipped at the
bounds. Standard errors are only adjusted if a sampling step is
necessary due to additional constraints. If "raise" and any lower or
upper bound is binding, we raise an Error. If "ignore", boundary
problems are simply ignored.
seed (int): Seed for the random number generator. Only used if there are
transforming constraints.
Returns:
Any: A pytree with the same structure as params containing standard errors
for the parameter estimates.
"""
free_cov = self._get_free_cov(
method=method,
n_samples=n_samples,
bounds_handling=bounds_handling,
seed=seed,
)
free_se = np.sqrt(np.diagonal(free_cov))
se = transform_free_values_to_params_tree(
values=free_se,
free_params=self._free_estimates,
params=self._params,
)
return se
[docs] def cov(
self,
method="jacobian",
n_samples=10_000,
bounds_handling="clip",
return_type="pytree",
seed=None,
):
"""Calculate the variance-covariance (matrix) of the estimated parameters.
Args:
method (str): One of "jacobian", "hessian", "robust", "cluster_robust",
"strata_robust". Default "jacobian". "cluster_robust" is only available
if design_info containts a columns called "psu" that identifies the
primary sampling unit. "strata_robust" is only available if the columns
"strata", "fpc" and "psu" are in design_info.
n_samples (int): Number of samples used to transform the covariance matrix
of the internal parameter vector into the covariance matrix of the
external parameters. For background information about internal and
external params see :ref:`implementation_of_constraints`. This is only
used if you are using constraints.
bounds_handling (str): One of "clip", "raise", "ignore". Determines how
bounds are handled. If "clip", confidence intervals are clipped at the
bounds. Standard errors are only adjusted if a sampling step is
necessary due to additional constraints. If "raise" and any lower or
upper bound is binding, we raise an Error. If "ignore", boundary
problems are simply ignored.
return_type (str): One of "pytree", "array" or "dataframe". Default pytree.
If "array", a 2d numpy array with the covariance is returned. If
"dataframe", a pandas DataFrame with parameter names in the
index and columns are returned.
seed (int): Seed for the random number generator. Only used if there are
transforming constraints.
Returns:
Any: The covariance matrix of the estimated parameters as block-pytree,
numpy.ndarray or pandas.DataFrame.
"""
free_cov = self._get_free_cov(
method=method,
n_samples=n_samples,
bounds_handling=bounds_handling,
seed=seed,
)
cov = transform_free_cov_to_cov(
free_cov=free_cov,
free_params=self._free_estimates,
params=self._params,
return_type=return_type,
)
return cov
[docs] def summary(
self,
method="jacobian",
n_samples=10_000,
ci_level=0.95,
bounds_handling="clip",
seed=None,
):
"""Create a summary of estimation results.
Args:
method (str): One of "jacobian", "hessian", "robust", "cluster_robust",
"strata_robust". Default "jacobian". "cluster_robust" is only available
if design_info containts a columns called "psu" that identifies the
primary sampling unit. "strata_robust" is only available if the columns
"strata", "fpc" and "psu" are in design_info.
ci_level (float): Confidence level for the calculation of confidence
intervals. The default is 0.95.
n_samples (int): Number of samples used to transform the covariance matrix
of the internal parameter vector into the covariance matrix of the
external parameters. For background information about internal and
external params see :ref:`implementation_of_constraints`. This is only
used if you are using constraints.
bounds_handling (str): One of "clip", "raise", "ignore". Determines how
bounds are handled. If "clip", confidence intervals are clipped at the
bounds. Standard errors are only adjusted if a sampling step is
necessary due to additional constraints. If "raise" and any lower or
upper bound is binding, we raise an Error. If "ignore", boundary
problems are simply ignored.
seed (int): Seed for the random number generator. Only used if there are
transforming constraints.
Returns:
Any: The estimation summary as pytree of DataFrames.
"""
summary_data = calculate_summary_data_estimation(
self,
free_estimates=self._free_estimates,
method=method,
ci_level=ci_level,
n_samples=n_samples,
bounds_handling=bounds_handling,
seed=seed,
)
summary = calculate_estimation_summary(
summary_data=summary_data,
names=self._free_estimates.all_names,
free_names=self._free_estimates.free_names,
)
return summary
[docs] def ci(
self,
method="jacobian",
n_samples=10_000,
ci_level=0.95,
bounds_handling="clip",
seed=None,
):
"""Calculate confidence intervals.
Args:
method (str): One of "jacobian", "hessian", "robust", "cluster_robust",
"strata_robust". Default "jacobian". "cluster_robust" is only available
if design_info containts a columns called "psu" that identifies the
primary sampling unit. "strata_robust" is only available if the columns
"strata", "fpc" and "psu" are in design_info.
ci_level (float): Confidence level for the calculation of confidence
intervals. The default is 0.95.
n_samples (int): Number of samples used to transform the covariance matrix
of the internal parameter vector into the covariance matrix of the
external parameters. For background information about internal and
external params see :ref:`implementation_of_constraints`. This is only
used if you are using constraints.
bounds_handling (str): One of "clip", "raise", "ignore". Determines how
bounds are handled. If "clip", confidence intervals are clipped at the
bounds. Standard errors are only adjusted if a sampling step is
necessary due to additional constraints. If "raise" and any lower or
upper bound is binding, we raise an Error. If "ignore", boundary
problems are simply ignored.
seed (int): Seed for the random number generator. Only used if there are
transforming constraints.
Returns:
Any: Pytree with the same structure as params containing lower bounds of
confidence intervals.
Any: Pytree with the same structure as params containing upper bounds of
confidence intervals.
"""
free_cov = self._get_free_cov(
method=method,
n_samples=n_samples,
bounds_handling=bounds_handling,
seed=seed,
)
free_lower, free_upper = calculate_ci(
free_values=self._free_estimates.values,
free_standard_errors=np.sqrt(np.diagonal(free_cov)),
ci_level=ci_level,
)
lower, upper = (
transform_free_values_to_params_tree(
values, free_params=self._free_estimates, params=self._params
)
for values in (free_lower, free_upper)
)
return lower, upper
[docs] def p_values(
self,
method="jacobian",
n_samples=10_000,
bounds_handling="clip",
seed=None,
):
"""Calculate p-values.
Args:
method (str): One of "jacobian", "hessian", "robust", "cluster_robust",
"strata_robust". Default "jacobian". "cluster_robust" is only available
if design_info containts a columns called "psu" that identifies the
primary sampling unit. "strata_robust" is only available if the columns
"strata", "fpc" and "psu" are in design_info.
ci_level (float): Confidence level for the calculation of confidence
intervals. The default is 0.95.
n_samples (int): Number of samples used to transform the covariance matrix
of the internal parameter vector into the covariance matrix of the
external parameters. For background information about internal and
external params see :ref:`implementation_of_constraints`. This is only
used if you are using constraints.
bounds_handling (str): One of "clip", "raise", "ignore". Determines how
bounds are handled. If "clip", confidence intervals are clipped at the
bounds. Standard errors are only adjusted if a sampling step is
necessary due to additional constraints. If "raise" and any lower or
upper bound is binding, we raise an Error. If "ignore", boundary
problems are simply ignored.
seed (int): Seed for the random number generator. Only used if there are
transforming constraints.
Returns:
Any: Pytree with the same structure as params containing p-values.
Any: Pytree with the same structure as params containing p-values.
"""
free_cov = self._get_free_cov(
method=method,
n_samples=n_samples,
bounds_handling=bounds_handling,
seed=seed,
)
free_p_values = calculate_p_values(
free_values=self._free_estimates.values,
free_standard_errors=np.sqrt(np.diagonal(free_cov)),
)
p_values = transform_free_values_to_params_tree(
free_p_values, free_params=self._free_estimates, params=self._params
)
return p_values
[docs] def to_pickle(self, path):
"""Save the LikelihoodResult object to pickle.
Args:
path (str, pathlib.Path): A str or pathlib.path ending in .pkl or .pickle.
"""
to_pickle(self, path=path)
def _calculate_free_cov_ml(
method,
internal_estimates,
converter,
internal_jacobian,
internal_hessian,
n_samples,
design_info,
bounds_handling,
seed,
):
if method == "jacobian":
int_cov = cov_jacobian(internal_jacobian)
elif method == "hessian":
int_cov = cov_hessian(internal_hessian)
elif method == "robust":
int_cov = cov_robust(jac=internal_jacobian, hess=internal_hessian)
elif method == "cluster_robust":
int_cov = cov_cluster_robust(
jac=internal_jacobian, hess=internal_hessian, design_info=design_info
)
elif method == "strata_robust":
int_cov = cov_strata_robust(
jac=internal_jacobian, hess=internal_hessian, design_info=design_info
)
rng = get_rng(seed)
free_cov = transform_covariance(
internal_params=internal_estimates,
internal_cov=int_cov,
converter=converter,
rng=rng,
n_samples=n_samples,
bounds_handling=bounds_handling,
)
return free_cov