Source code for estimagic.estimation.estimate_ml

from dataclasses import dataclass
from typing import Any
from typing import Union

import numpy as np
import pandas as pd
from estimagic.differentiation.derivatives import first_derivative
from estimagic.differentiation.derivatives import second_derivative
from estimagic.exceptions import InvalidFunctionError
from estimagic.exceptions import NotAvailableError
from estimagic.inference.ml_covs import cov_cluster_robust
from estimagic.inference.ml_covs import cov_hessian
from estimagic.inference.ml_covs import cov_jacobian
from estimagic.inference.ml_covs import cov_robust
from estimagic.inference.ml_covs import cov_strata_robust
from estimagic.inference.shared import calculate_ci
from estimagic.inference.shared import calculate_inference_quantities
from estimagic.inference.shared import calculate_p_values
from estimagic.inference.shared import check_is_optimized_and_derivative_case
from estimagic.inference.shared import get_derivative_case
from estimagic.inference.shared import transform_covariance
from estimagic.optimization.optimize import maximize
from estimagic.optimization.optimize_result import OptimizeResult
from estimagic.parameters.block_trees import block_tree_to_matrix
from estimagic.parameters.block_trees import matrix_to_block_tree
from estimagic.parameters.conversion import Converter
from estimagic.parameters.conversion import get_converter
from estimagic.shared.check_option_dicts import check_numdiff_options
from estimagic.shared.check_option_dicts import check_optimization_options
from estimagic.utilities import to_pickle


[docs]def estimate_ml( loglike, params, optimize_options, *, lower_bounds=None, upper_bounds=None, constraints=None, logging=False, log_options=None, loglike_kwargs=None, numdiff_options=None, jacobian=None, jacobian_kwargs=None, hessian=None, hessian_kwargs=None, design_info=None, ): """Do a maximum likelihood (ml) estimation. This is a high level interface of our lower level functions for maximization, numerical differentiation and inference. It does the full workflow for maximum likelihood estimation with just one function call. While we have good defaults, you can still configure each aspect of each step via the optional arguments of this function. If you find it easier to do the maximization separately, you can do so and just provide the optimal parameters as ``params`` and set ``optimize_options=False``. Args: loglike (callable): Likelihood function that takes a params (and potentially other keyword arguments) and returns a dictionary that has at least the entries "value" (a scalar float) and "contributions" (a 1d numpy array or pytree) with the log likelihood contribution per individual. params (pytree): A pytree containing the estimated or start parameters of the likelihood model. If the supplied parameters are estimated parameters, set optimize_options to False. Pytrees can be a numpy array, a pandas Series, a DataFrame with "value" column, a float and any kind of (nested) dictionary or list containing these elements. See :ref:`params` for examples. optimize_options (dict, str or False): Keyword arguments that govern the numerical optimization. Valid entries are all arguments of :func:`~estimagic.optimization.optimize.minimize` except for those that are passed explicilty to ``estimate_ml``. If you pass False as optimize_options you signal that ``params`` are already the optimal parameters and no numerical optimization is needed. If you pass a str as optimize_options it is used as the ``algorithm`` option. lower_bounds (pytree): A pytree with the same structure as params with lower bounds for the parameters. Can be ``-np.inf`` for parameters with no lower bound. upper_bounds (pytree): As lower_bounds. Can be ``np.inf`` for parameters with no upper bound. constraints (list, dict): List with constraint dictionaries or single dict. See :ref:`constraints`. logging (pathlib.Path, str or False): Path to sqlite3 file (which typically has the file extension ``.db``. If the file does not exist, it will be created. The dashboard can only be used when logging is used. log_options (dict): Additional keyword arguments to configure the logging. - "fast_logging": A boolean that determines if "unsafe" settings are used to speed up write processes to the database. This should only be used for very short running criterion functions where the main purpose of the log is a real-time dashboard and it would not be catastrophic to get a corrupted database in case of a sudden system shutdown. If one evaluation of the criterion function (and gradient if applicable) takes more than 100 ms, the logging overhead is negligible. - "if_table_exists": (str) One of "extend", "replace", "raise". What to do if the tables we want to write to already exist. Default "extend". - "if_database_exists": (str): One of "extend", "replace", "raise". What to do if the database we want to write to already exists. Default "extend". loglike_kwargs (dict): Additional keyword arguments for loglike. numdiff_options (dict): Keyword arguments for the calculation of numerical derivatives for the calculation of standard errors. See :ref:`first_derivative` for details. jacobian (callable or None): A function that takes ``params`` and potentially other keyword arguments and returns the jacobian of loglike["contributions"] with respect to the params. Note that you only need to pass a Jacobian function if you have a closed form Jacobian. If you pass None, a numerical Jacobian will be calculated. jacobian_kwargs (dict): Additional keyword arguments for the Jacobian function. hessian (callable or None or False): A function that takes ``params`` and potentially other keyword arguments and returns the Hessian of loglike["value"] with respect to the params. If you pass None, a numerical Hessian will be calculated. If you pass ``False``, you signal that no Hessian should be calculated. Thus, no result that requires the Hessian will be calculated. hessian_kwargs (dict): Additional keyword arguments for the Hessian function. design_info (pandas.DataFrame): DataFrame with one row per observation that contains some or all of the variables "psu" (primary sampling unit), "strata" and "fpc" (finite population corrector). See :ref:`robust_likelihood_inference` for details. Returns: LikelihoodResult: A LikelihoodResult object. """ # ================================================================================== # Check and process inputs # ================================================================================== is_optimized = optimize_options is False if not is_optimized: if isinstance(optimize_options, str): optimize_options = {"algorithm": optimize_options} check_optimization_options( optimize_options, usage="estimate_ml", algorithm_mandatory=True, ) jac_case = get_derivative_case(jacobian) hess_case = get_derivative_case(hessian) check_is_optimized_and_derivative_case(is_optimized, jac_case) check_is_optimized_and_derivative_case(is_optimized, hess_case) check_numdiff_options(numdiff_options, "estimate_ml") numdiff_options = {} if numdiff_options in (None, False) else numdiff_options loglike_kwargs = {} if loglike_kwargs is None else loglike_kwargs constraints = [] if constraints is None else constraints jacobian_kwargs = {} if jacobian_kwargs is None else jacobian_kwargs hessian_kwargs = {} if hessian_kwargs is None else hessian_kwargs # ================================================================================== # Calculate estimates via maximization (if necessary) # ================================================================================== if is_optimized: estimates = params opt_res = None else: opt_res = maximize( criterion=loglike, criterion_kwargs=loglike_kwargs, params=params, lower_bounds=lower_bounds, upper_bounds=upper_bounds, constraints=constraints, logging=logging, log_options=log_options, **optimize_options, ) estimates = opt_res.params # ================================================================================== # Do first function evaluations at estimated parameters # ================================================================================== try: loglike_eval = loglike(estimates, **loglike_kwargs) except (KeyboardInterrupt, SystemExit): raise except Exception as e: msg = "Error while evaluating loglike at estimated params." raise InvalidFunctionError(msg) from e if callable(jacobian): try: jacobian_eval = jacobian(estimates, **jacobian_kwargs) except (KeyboardInterrupt, SystemExit): raise except Exception as e: msg = "Error while evaluating closed form jacobian at estimated params." raise InvalidFunctionError(msg) from e else: jacobian_eval = None if callable(hessian): try: hessian_eval = hessian(estimates, **hessian_kwargs) except (KeyboardInterrupt, SystemExit): raise except Exception as e: msg = "Error while evaluating closed form hessian at estimated params." raise InvalidFunctionError(msg) from e else: hessian_eval = None # ================================================================================== # Get the converter for params and function outputs # ================================================================================== converter, internal_estimates = get_converter( func=loglike, params=estimates, constraints=constraints, lower_bounds=lower_bounds, upper_bounds=upper_bounds, func_eval=loglike_eval, primary_key="contributions", scaling=False, scaling_options=None, derivative_eval=jacobian_eval, ) # ================================================================================== # Calculate internal jacobian # ================================================================================== if jac_case == "closed-form": x = converter.params_to_internal(estimates) int_jac = converter.derivative_to_internal(jacobian_eval, x) elif jac_case == "numerical": def func(x): p = converter.params_from_internal(x) loglike_eval = loglike(p, **loglike_kwargs)["contributions"] out = converter.func_to_internal(loglike_eval) return out jac_res = first_derivative( func=func, params=internal_estimates.values, lower_bounds=internal_estimates.lower_bounds, upper_bounds=internal_estimates.upper_bounds, **numdiff_options, ) int_jac = jac_res["derivative"] else: int_jac = None if constraints in [None, []] and jacobian_eval is None and int_jac is not None: loglike_contribs = loglike_eval if isinstance(loglike_contribs, dict) and "contributions" in loglike_contribs: loglike_contribs = loglike_contribs["contributions"] jacobian_eval = matrix_to_block_tree( int_jac, outer_tree=loglike_contribs, inner_tree=estimates, ) if jacobian_eval is None: _no_jac_reason = ( "no closed form jacobian was provided and there are constraints" ) else: _no_jac_reason = None # ================================================================================== # Calculate internal Hessian # ================================================================================== if hess_case == "skip": int_hess = None elif hess_case == "numerical": def func(x): p = converter.params_from_internal(x) loglike_eval = loglike(p, **loglike_kwargs)["value"] out = converter.func_to_internal(loglike_eval) return out hess_res = second_derivative( func=func, params=internal_estimates.values, lower_bounds=internal_estimates.lower_bounds, upper_bounds=internal_estimates.upper_bounds, **numdiff_options, ) int_hess = hess_res["derivative"] elif hess_case == "closed-form" and constraints: raise NotImplementedError( "Closed-form Hessians are not yet compatible with constraints." ) elif hess_case == "closed-form": int_hess = block_tree_to_matrix( hessian_eval, outer_tree=params, inner_tree=params, ) else: raise ValueError() if constraints in [None, []] and hessian_eval is None and int_hess is not None: hessian_eval = matrix_to_block_tree( int_hess, outer_tree=params, inner_tree=params, ) if hessian_eval is None: if hess_case == "skip": _no_hess_reason = "the hessian calculation was explicitly skipped." else: _no_hess_reason = ( "no closed form hessian was provided and there are constraints" ) else: _no_hess_reason = None # ================================================================================== # create a LikelihoodResult object # ================================================================================== res = LikelihoodResult( params=estimates, _converter=converter, optimize_result=opt_res, _jacobian=jacobian_eval, _no_jacobian_reason=_no_jac_reason, _hessian=hessian_eval, _no_hessian_reason=_no_hess_reason, _internal_jacobian=int_jac, _internal_hessian=int_hess, _design_info=design_info, _flat_params=internal_estimates, _has_constraints=constraints not in [None, []], ) return res
@dataclass class LikelihoodResult: params: Any _flat_params: Any _converter: Converter _has_constraints: bool optimize_result: Union[OptimizeResult, None] = None _jacobian: Any = None _no_jacobian_reason: Union[str, None] = None _hessian: Any = None _no_hessian_reason: Union[str, None] = None _internal_jacobian: Union[np.ndarray, None] = None _internal_hessian: Union[np.ndarray, None] = None _design_info: Union[pd.DataFrame, None] = None def __post_init__(self): if self._internal_jacobian is None and self._internal_hessian is None: raise ValueError( "At least one of _internal_jacobian or _internal_hessian must be " "not None." ) elif self._internal_jacobian is None: valid_methods = ["hessian"] elif self._internal_hessian is None: valid_methods = ["jacobian"] else: valid_methods = ["jacobian", "hessian", "robust"] if self._design_info is not None: if "psu" in self._design_info: valid_methods.append("cluster_robust") if {"strata", "psu", "fpc"}.issubset(self._design_info): valid_methods.append("strata_robust") self._valid_methods = set(valid_methods) def _get_free_cov( self, method, n_samples, bounds_handling, seed, ): if method not in self._valid_methods: raise ValueError() internal_jac = self._internal_jacobian internal_hess = self._internal_hessian design_info = self._design_info converter = self._converter flat_params = self._flat_params if method == "jacobian": int_cov = cov_jacobian(internal_jac) elif method == "hessian": int_cov = cov_hessian(internal_hess) elif method == "robust": int_cov = cov_robust(jac=internal_jac, hess=internal_hess) elif method == "cluster_robust": int_cov = cov_cluster_robust( jac=internal_jac, hess=internal_hess, design_info=design_info ) elif method == "strata_robust": int_cov = cov_strata_robust( jac=internal_jac, hess=internal_hess, design_info=design_info ) else: raise ValueError(f"Invalid method: {method}") np.random.seed(seed) free_cov = transform_covariance( flat_params=flat_params, internal_cov=int_cov, converter=converter, n_samples=n_samples, bounds_handling=bounds_handling, ) return free_cov @property def jacobian(self): if self._jacobian is None: raise NotAvailableError( f"No jacobian is available because {self._no_jacobian_reason}." ) return self._jacobian @property def hessian(self): if self._hessian is None: raise NotAvailableError( f"No hessian is available because {self._no_hessian_reason}." ) return self._hessian @property def _se(self): return self.se() @property def _cov(self): return self.cov() @property def _summary(self): return self.summary() @property def _ci(self): return self.ci() @property def _p_values(self): return self.p_values() def se( self, method="jacobian", n_samples=10_000, bounds_handling="clip", seed=None, ): """Calculate standard errors. Args: method (str): One of "jacobian", "hessian", "robust", "cluster_robust", "strata_robust". Default "jacobian". "cluster_robust" is only available if design_info containts a columns called "psu" that identifies the primary sampling unit. "strata_robust" is only available if the columns "strata", "fpc" and "psu" are in design_info. n_samples (int): Number of samples used to transform the covariance matrix of the internal parameter vector into the covariance matrix of the external parameters. For background information about internal and external params see :ref:`implementation_of_constraints`. This is only used if you are using constraints. bounds_handling (str): One of "clip", "raise", "ignore". Determines how bounds are handled. If "clip", confidence intervals are clipped at the bounds. Standard errors are only adjusted if a sampling step is necessary due to additional constraints. If "raise" and any lower or upper bound is binding, we raise an Error. If "ignore", boundary problems are simply ignored. seed (int): Seed for the random number generator. Only used if there are transforming constraints. Returns: Any: A pytree with the same structure as params containing standard errors for the parameter estimates. """ free_cov = self._get_free_cov( method=method, n_samples=n_samples, bounds_handling=bounds_handling, seed=seed, ) helper = np.full(len(self._flat_params.values), np.nan) helper[self._flat_params.free_mask] = np.sqrt(np.diagonal(free_cov)) out = self._converter.params_from_internal(helper) return out def cov( self, method="jacobian", n_samples=10_000, bounds_handling="clip", return_type="pytree", seed=None, ): """Calculate the variance-covariance matrix of the estimated parameters. Args: method (str): One of "jacobian", "hessian", "robust", "cluster_robust", "strata_robust". Default "jacobian". "cluster_robust" is only available if design_info containts a columns called "psu" that identifies the primary sampling unit. "strata_robust" is only available if the columns "strata", "fpc" and "psu" are in design_info. n_samples (int): Number of samples used to transform the covariance matrix of the internal parameter vector into the covariance matrix of the external parameters. For background information about internal and external params see :ref:`implementation_of_constraints`. This is only used if you are using constraints. bounds_handling (str): One of "clip", "raise", "ignore". Determines how bounds are handled. If "clip", confidence intervals are clipped at the bounds. Standard errors are only adjusted if a sampling step is necessary due to additional constraints. If "raise" and any lower or upper bound is binding, we raise an Error. If "ignore", boundary problems are simply ignored. return_type (str): One of "pytree", "array" or "dataframe". Default pytree. If "array", a 2d numpy array with the covariance is returned. If "dataframe", a pandas DataFrame with parameter names in the index and columns are returned. seed (int): Seed for the random number generator. Only used if there are transforming constraints. Returns: Any: The covariance matrix of the estimated parameters as block-pytree or numpy array. """ free_cov = self._get_free_cov( method=method, n_samples=n_samples, bounds_handling=bounds_handling, seed=seed, ) if return_type == "array": out = free_cov elif return_type == "dataframe": free_index = np.array(self._flat_params.names)[self._flat_params.free_mask] out = pd.DataFrame(data=free_cov, columns=free_index, index=free_index) elif return_type == "pytree": if len(free_cov) != len(self._flat_params.values): raise NotAvailableError( "Covariance matrices in block-pytree format are only available if " "there are no constraints that reduce the number of free " "parameters." ) out = matrix_to_block_tree(free_cov, self.params, self.params) return out def summary( self, method="jacobian", n_samples=10_000, ci_level=0.95, bounds_handling="clip", seed=None, ): """Create a summary of estimation results. Args: method (str): One of "jacobian", "hessian", "robust", "cluster_robust", "strata_robust". Default "jacobian". "cluster_robust" is only available if design_info containts a columns called "psu" that identifies the primary sampling unit. "strata_robust" is only available if the columns "strata", "fpc" and "psu" are in design_info. ci_level (float): Confidence level for the calculation of confidence intervals. The default is 0.95. n_samples (int): Number of samples used to transform the covariance matrix of the internal parameter vector into the covariance matrix of the external parameters. For background information about internal and external params see :ref:`implementation_of_constraints`. This is only used if you are using constraints. bounds_handling (str): One of "clip", "raise", "ignore". Determines how bounds are handled. If "clip", confidence intervals are clipped at the bounds. Standard errors are only adjusted if a sampling step is necessary due to additional constraints. If "raise" and any lower or upper bound is binding, we raise an Error. If "ignore", boundary problems are simply ignored. seed (int): Seed for the random number generator. Only used if there are transforming constraints. Returns: Any: The estimation summary as pytree of DataFrames. """ free_cov = self._get_free_cov( method=method, n_samples=n_samples, bounds_handling=bounds_handling, seed=seed, ) summary = calculate_inference_quantities( estimates=self.params, internal_estimates=self._flat_params, free_cov=free_cov, ci_level=ci_level, ) return summary def ci( self, method="jacobian", n_samples=10_000, ci_level=0.95, bounds_handling="clip", seed=None, ): """Calculate confidence intervals. Args: method (str): One of "jacobian", "hessian", "robust", "cluster_robust", "strata_robust". Default "jacobian". "cluster_robust" is only available if design_info containts a columns called "psu" that identifies the primary sampling unit. "strata_robust" is only available if the columns "strata", "fpc" and "psu" are in design_info. ci_level (float): Confidence level for the calculation of confidence intervals. The default is 0.95. n_samples (int): Number of samples used to transform the covariance matrix of the internal parameter vector into the covariance matrix of the external parameters. For background information about internal and external params see :ref:`implementation_of_constraints`. This is only used if you are using constraints. bounds_handling (str): One of "clip", "raise", "ignore". Determines how bounds are handled. If "clip", confidence intervals are clipped at the bounds. Standard errors are only adjusted if a sampling step is necessary due to additional constraints. If "raise" and any lower or upper bound is binding, we raise an Error. If "ignore", boundary problems are simply ignored. seed (int): Seed for the random number generator. Only used if there are transforming constraints. Returns: Any: Pytree with the same structure as params containing lower bounds of confidence intervals. Any: Pytree with the same structure as params containing upper bounds of confidence intervals. """ free_cov = self._get_free_cov( method=method, n_samples=n_samples, bounds_handling=bounds_handling, seed=seed, ) free_values = self._flat_params.values[self._flat_params.free_mask] free_se = np.sqrt(np.diagonal(free_cov)) free_lower, free_upper = calculate_ci(free_values, free_se, ci_level) helper = np.full(len(self._flat_params.values), np.nan) helper[self._flat_params.free_mask] = free_lower lower = self._converter.params_from_internal(helper) helper = np.full(len(self._flat_params.values), np.nan) helper[self._flat_params.free_mask] = free_upper upper = self._converter.params_from_internal(helper) return lower, upper def p_values( self, method="jacobian", n_samples=10_000, bounds_handling="clip", seed=None, ): """Calculate p_values. Args: method (str): One of "jacobian", "hessian", "robust", "cluster_robust", "strata_robust". Default "jacobian". "cluster_robust" is only available if design_info containts a columns called "psu" that identifies the primary sampling unit. "strata_robust" is only available if the columns "strata", "fpc" and "psu" are in design_info. ci_level (float): Confidence level for the calculation of confidence intervals. The default is 0.95. n_samples (int): Number of samples used to transform the covariance matrix of the internal parameter vector into the covariance matrix of the external parameters. For background information about internal and external params see :ref:`implementation_of_constraints`. This is only used if you are using constraints. bounds_handling (str): One of "clip", "raise", "ignore". Determines how bounds are handled. If "clip", confidence intervals are clipped at the bounds. Standard errors are only adjusted if a sampling step is necessary due to additional constraints. If "raise" and any lower or upper bound is binding, we raise an Error. If "ignore", boundary problems are simply ignored. seed (int): Seed for the random number generator. Only used if there are transforming constraints. Returns: Any: Pytree with the same structure as params containing lower bounds of confidence intervals. Any: Pytree with the same structure as params containing upper bounds of confidence intervals. """ free_cov = self._get_free_cov( method=method, n_samples=n_samples, bounds_handling=bounds_handling, seed=seed, ) free_values = self._flat_params.values[self._flat_params.free_mask] free_se = np.sqrt(np.diagonal(free_cov)) free_p_values = calculate_p_values(free_values, free_se) helper = np.full(len(self._flat_params.values), np.nan) helper[self._flat_params.free_mask] = free_p_values out = self._converter.params_from_internal(helper) return out def to_pickle(self, path): """Save the LikelihoodResult object to pickle. Args: path (str, pathlib.Path): A str or pathlib.path ending in .pkl or .pickle. """ to_pickle(self, path=path)