Source code for estimagic.estimation.msm_weighting

import functools

import numpy as np
import pandas as pd
from pybaum import tree_just_flatten
from scipy.linalg import block_diag

from estimagic.inference.bootstrap import bootstrap
from estimagic.parameters.block_trees import block_tree_to_matrix, matrix_to_block_tree
from estimagic.parameters.tree_registry import get_registry
from estimagic.utilities import robust_inverse

[docs]def get_moments_cov( data, calculate_moments, *, moment_kwargs=None, bootstrap_kwargs=None ): """Bootstrap the covariance matrix of the moment conditions. Args: data (pandas.DataFrame): DataFrame with empirical data. calculate_moments (callable): Function that calculates that takes data and moment_kwargs as arguments and returns a 1d numpy array or pandas Series with moment conditions. moment_kwargs (dict): Additional keyword arguments for calculate_moments. bootstrap_kwargs (dict): Additional keyword arguments that govern the bootstrapping. Allowed arguments are "n_draws", "seed", "n_cores", "batch_evaluator", "cluster_by" and "error_handling". For details see the bootstrap function. Returns: pandas.DataFrame or numpy.ndarray: The covariance matrix of the moment conditions for msm estimation. """ moment_kwargs = {} if moment_kwargs is None else moment_kwargs bootstrap_kwargs = {} if bootstrap_kwargs is None else bootstrap_kwargs valid_bs_kwargs = { "n_cores", "n_draws", "seed", "batch_evaluator", "cluster_by", "error_handling", "existing_result", "outcome_kwargs", } problematic = set(bootstrap_kwargs).difference(valid_bs_kwargs) if problematic: raise ValueError(f"Invalid bootstrap_kwargs: {problematic}") first_eval = calculate_moments(data, **moment_kwargs) registry = get_registry(extended=True) @functools.wraps(calculate_moments) def func(data, **kwargs): raw = calculate_moments(data, **kwargs) out = pd.Series( tree_just_flatten(raw, registry=registry) ) # xxxx won't be necessary soon! return out cov_arr = bootstrap( data=data, outcome=func, outcome_kwargs=moment_kwargs, **bootstrap_kwargs ).cov() if isinstance(cov_arr, pd.DataFrame): cov_arr = cov_arr.to_numpy() # xxxx won't be necessary soon cov = matrix_to_block_tree(cov_arr, first_eval, first_eval) return cov
def get_weighting_matrix( moments_cov, method, empirical_moments, clip_value=1e-6, return_type="pytree" ): """Calculate a weighting matrix from moments_cov. Args: moments_cov (pandas.DataFrame or numpy.ndarray): Square DataFrame or Array with the covariance matrix of the moment conditions for msm estimation. method (str): One of "optimal", "diagonal", or "identity". empirical_moments (pytree): Pytree containing empirical moments. Used to get the tree structure clip_value (float): Bound at which diagonal elements of the moments_cov are clipped to avoid dividing by zero. return_type (str): One of "pytree", "array" or "pytree_and_array" Returns: pandas.DataFrame or numpy.ndarray: Weighting matrix with the same shape as moments_cov. """ fast_path = isinstance(moments_cov, np.ndarray) and moments_cov.ndim == 2 if fast_path: _internal_cov = moments_cov else: _internal_cov = block_tree_to_matrix( moments_cov, outer_tree=empirical_moments, inner_tree=empirical_moments, ) if method == "optimal": array_weights = robust_inverse(_internal_cov) elif method == "diagonal": diagonal_values = 1 / np.clip(np.diagonal(_internal_cov), clip_value, np.inf) array_weights = np.diag(diagonal_values) elif method == "identity": array_weights = np.identity(_internal_cov.shape[0]) else: raise ValueError(f"Invalid method: {method}") if return_type == "array" or (fast_path and "_and_" not in return_type): out = array_weights elif fast_path: out = (array_weights, array_weights) else: tree_weights = matrix_to_block_tree( array_weights, outer_tree=empirical_moments, inner_tree=empirical_moments, ) if return_type == "pytree": out = tree_weights else: out = (tree_weights, array_weights) return out def _assemble_block_diagonal_matrix(matrices): """Build a block diagonal matrix out of matrices. Args: matrices (list): List of square numpy arrays or DataFrames with the building blocks for the block diagonal matrix. Returns: pandas.DataFrame or numpy.ndarray: The block diagonal matrix. """ values = block_diag(*matrices) if all(isinstance(mat, pd.DataFrame) for mat in matrices): to_concat = [pd.Series(index=mat.index, dtype=float) for mat in matrices] combined_index = pd.concat(to_concat).index out = pd.DataFrame(values, index=combined_index, columns=combined_index) else: out = values return out