Source code for csep.utils.calc

# Third-party imports
import numpy
import scipy.interpolate

# PyCSEP imports
from csep.core.exceptions import CSEPException
from csep.utils.stats import binned_ecdf, sup_dist, get_quantiles
from csep.utils import flat_map_to_ndarray


[docs] def nearest_index(array, value): """ Returns the index from array that is less than the value specified. """ array = numpy.asarray(array) idx = (numpy.abs(array - value)).argmin() return idx
[docs] def find_nearest(array, value): """ Returns the value from array that is less than the value specified. """ array = numpy.asarray(array) idx = nearest_index(array, value) return array[idx]
[docs] def func_inverse(x, y, val, kind='nearest', **kwargs): """ Returns the value of a function based on interpolation. """ f = scipy.interpolate.interp1d(x, y, kind=kind, **kwargs) return f(val)
[docs] def discretize(data, bin_edges, right_continuous=False): """ returns array with len(bin_edges) consisting of the discretized values from each bin. instead of returning the counts of each bin, this will return an array with values modified such that any value within bin_edges[0] <= x_new < bin_edges[1] ==> bin_edges[0]. This implementation forces you to define a bin edge that contains the data. """ bin_edges = numpy.array(bin_edges) if bin_edges.size == 0: raise ValueError("bin_edges must not be empty") if bin_edges[1] < bin_edges[0]: raise ValueError("bin_edges must be increasing") data = numpy.array(data) idx = bin1d_vec(data, bin_edges, right_continuous=right_continuous) if numpy.any(idx == -1): raise CSEPException("Discretized values should all be within bin_edges") x_new = bin_edges[idx] return x_new
[docs] def bin1d_vec(p, bins, tol=None, right_continuous=False): """Efficient implementation of binning routine on 1D Cartesian Grid. Returns the indices of the points into bins. Bins are inclusive on the lower bound and exclusive on the upper bound. In the case where a point does not fall within the bins a -1 will be returned. The last bin extends to infinity when right_continuous is set as true. Args: p (array-like): Point(s) to be placed into b bins (array-like): bins to considering for binning, must be monotonically increasing right_continuous (bool): if true, consider last bin extending to infinity Returns: idx (array-like): indexes hashed into grid Raises: ValueError: """ bins = numpy.array(bins) p = numpy.array(p) a0 = numpy.min(bins) # if user supplies only a single bin, do 2 things: 1) fix right continuous to true, and use of h is arbitrary if bins.size == 1: right_continuous = True h = 1 else: h = bins[1] - bins[0] a0_tol = numpy.abs(a0) * numpy.finfo(numpy.float64).eps h_tol = numpy.abs(h) * numpy.finfo(numpy.float64).eps p_tol = numpy.abs(p) * numpy.finfo(numpy.float64).eps # absolute tolerance if tol is None: idx = numpy.floor((p + (p_tol + a0_tol) - a0) / (h - h_tol)) else: idx = numpy.floor((p + (tol + a0_tol) - a0) / (h - h_tol)) if h < 0: raise ValueError("grid spacing must be positive and monotonically increasing.") # account for floating point uncertainties by considering extreme case if right_continuous: # set upper bin index to last try: idx[(idx < 0)] = -1 idx[(idx >= len(bins) - 1)] = len(bins) - 1 except TypeError: if idx >= len(bins) - 1: idx = len(bins) - 1 if idx < 0: idx = -1 else: try: idx[((idx < 0) | (idx >= len(bins)))] = -1 except TypeError: if idx < 0 or idx >= len(bins): idx = -1 try: idx = idx.astype(numpy.int64) except AttributeError: idx = int(idx) return idx
def _compute_likelihood(gridded_data, apprx_rate_density, expected_cond_count, n_obs): # compute pseudo likelihood idx = gridded_data != 0 # this value is: -inf obs at idx and no apprx_rate_density # -expected_cond_count if no target earthquakes n_events = numpy.sum(gridded_data) if n_events == 0: return(-expected_cond_count, numpy.nan) else: with numpy.errstate(divide='ignore'): likelihood = numpy.sum(gridded_data[idx] * numpy.log(apprx_rate_density[idx])) - expected_cond_count # cannot compute the spatial statistic score if there are no target events or forecast is computed undersampled if n_obs == 0 or expected_cond_count == 0: return (likelihood, numpy.nan) # normalize the rate density to sum to unity norm_apprx_rate_density = apprx_rate_density / numpy.sum(apprx_rate_density) # value could be: -inf if no value in apprx_rate_dens # nan if n_cat is 0 with numpy.errstate(divide='ignore'): likelihood_norm = numpy.sum(gridded_data[idx] * numpy.log(norm_apprx_rate_density[idx])) / n_events return (likelihood, likelihood_norm) def _compute_approximate_likelihood(gridded_data, apprx_forecasted_rate): """ Computes the approximate likelihood from Rhoades et al., 2011; Equation 4 Args: gridded_data (ndarray): observed counts on spatial grid mean_rate_density (ndarray): mean rates from forecast Notes: Mean rates from the forecast are assumed to not have any zeros. """ n_obs = numpy.sum(gridded_data) return numpy.sum(gridded_data*numpy.log10(apprx_forecasted_rate)) - n_obs def _compute_spatial_statistic(gridded_data, log10_probability_map): """ aggregates the log1 Args: gridded_data: log10_probability_map: """ # returns a unique set of indexes corresponding to cells where earthquakes occurred # this should implement similar logic to the spatial tests wrt undersampling. # techincally, if there are are target eqs you can't compute this statistic. if numpy.sum(gridded_data) == 0: return numpy.nan idx = numpy.unique(numpy.argwhere(gridded_data)) return numpy.sum(log10_probability_map[idx]) def _distribution_test(stochastic_event_set_data, observation_data): # for cached files want to write this with memmap union_catalog = flat_map_to_ndarray(stochastic_event_set_data) min_time = 0.0 max_time = numpy.max([numpy.max(numpy.ceil(union_catalog)), numpy.max(numpy.ceil(observation_data))]) # build test_distribution with 100 data points num_points = 100 tms = numpy.linspace(min_time, max_time, num_points, endpoint=True) # get combined ecdf and obs ecdf combined_ecdf = binned_ecdf(union_catalog, tms) obs_ecdf = binned_ecdf(observation_data, tms) # build test distribution n_cat = len(stochastic_event_set_data) test_distribution = [] for i in range(n_cat): test_ecdf = binned_ecdf(stochastic_event_set_data[i], tms) # indicates there were zero events in catalog if test_ecdf is not None: d = sup_dist(test_ecdf[1], combined_ecdf[1]) test_distribution.append(d) d_obs = sup_dist(obs_ecdf[1], combined_ecdf[1]) # score evaluation _, quantile = get_quantiles(test_distribution, d_obs) return test_distribution, d_obs, quantile def cleaner_range(start, end, h): """ Returns array holding bin edges that doesn't contain floating point wander. Floating point wander can occur when repeatedly adding floating point numbers together. The errors propogate and become worse over the sum. This function generates the values on an integer grid and converts back to floating point numbers through multiplication. Args: start (float) end (float) h (float): magnitude spacing Returns: bin_edges (numpy.ndarray) """ # convert to integers to prevent accumulating floating point errors const = 100000 start = numpy.floor(const * start) end = numpy.floor(const * end) d = const * h return numpy.arange(start, end + d / 2, d) / const def first_nonnan(arr, axis=0, invalid_val=-1): mask = arr==arr return numpy.where(mask.any(axis=axis), mask.argmax(axis=axis), invalid_val) def last_nonnan(arr, axis=0, invalid_val=-1): mask = arr==arr val = arr.shape[axis] - numpy.flip(mask, axis=axis).argmax(axis=axis) - 1 return numpy.where(mask.any(axis=axis), val, invalid_val)