import itertools
import time
import os
import datetime
# third-party imports
import numpy
from csep.utils.log import LoggingMixin
from csep.core.regions import CartesianGrid2D, create_space_magnitude_region
from csep.models import Polygon
from csep.utils.calc import bin1d_vec
from csep.utils.time_utils import decimal_year, datetime_to_utc_epoch
from csep.core.catalogs import AbstractBaseCatalog
from csep.utils.constants import SECONDS_PER_ASTRONOMICAL_YEAR
from csep.utils.plots import plot_spatial_dataset
# idea: should this be a SpatialDataSet and the class below SpaceMagnitudeDataSet, bc of functions like
# get_latitudes(), and get_longitudes()
# or this class should be refactored as to use the underlying region
# idea: this needs to handle non-carteisan regions, so maybe (lons, lats) should be a single variable like locations
# note: these are specified to 2D data sets and some minor refactoring needs to happen here.
# todo: add mask to dataset that has the shape of data. consider using numpy.ma module to hold these values
class GriddedDataSet(LoggingMixin):
"""Represents space-magnitude discretized seismicity implementation.
Map-based and discrete forecasts, such as those provided by time-independent forecasts can be stored using this format.
This object will provide some convenience routines and functionality around the numpy.ndarray primative that
actually stores the space-time magnitude forecast.
Earthquake forecasts based on discrete space-magnitude regions are read into this format by default. This format can
support multiple types of region including 2d and 3d cartesian meshes. The appropriate region must be provided. By default
the magniutde is always treated as the 'fast' dimension of the numpy.array.
Attributes:
data (numpy.ndarray): 2d numpy.ndarray containing the spatial and magnitude bins with magnitudes being the fast dimension.
region: csep.utils.spatial.2DCartesianGrid class containing the mapping of the data points into the region.
mags: list or numpy.ndarray class containing the lower (inclusive) magnitude values from the discretized
magnitudes. The magnitude bins should be regularly spaced.
"""
def __init__(self, data=None, region=None, name=None):
""" Constructs GriddedSeismicity class.
Attributes:
data (numpy.ndarray): numpy.ndarray
region:
name:
time_horizon:
"""
super().__init__()
# note: do not access this member through _data, always use .data.
self._data = data
self.region = region
self.name = name
# this value lets us scale the forecast without much additional memory constraints and makes the calls
# idempotent
self._scale = 1
@property
def data(self):
""" Contains the spatio-magnitude forecast as 2d numpy.ndarray.
The dimensions of this array are (num_spatial_bins, num_magnitude_bins). The spatial bins can be indexed through
a look up table as part of the region class. The magnitude bins used are stored as directly as an attribute of
class.
"""
return self._data * self._scale
@property
def event_count(self):
""" Returns a sum of the forecast data """
return self.sum()
def sum(self):
""" Sums over all of the forecast data"""
return numpy.sum(self.data)
def spatial_counts(self, cartesian=False):
""" Returns the counts (or rates) of earthquakes within each spatial bin.
Args:
cartesian (bool): if true, will return a 2d grid representing the bounding box of the forecast
Returns:
ndarray containing the count in each bin
"""
if cartesian:
return self.region.get_cartesian(self.data)
else:
return self.data
def get_latitudes(self):
""" Returns the latitude of the lower left node of the spatial grid"""
return self.region.origins()[:,1]
def get_longitudes(self):
""" Returns the lognitude of the lower left node of the spatial grid """
return self.region.origins()[:,0]
def get_valid_midpoints(self):
""" Returns the midpoints of the valid testing region
Returns:
lons (numpy.array), lats (numpy.array): two numpy arrays containing the valid midpoints from the forecast
"""
latitudes = []
longitudes = []
for idx in range(self.region.num_nodes):
if self.region.bbox_max[idx] == 0:
latitudes.append(self.region.midpoints()[idx,1])
longitudes.append(self.region.midpoints()[idx,0])
return numpy.array(longitudes), numpy.array(latitudes)
@property
def polygons(self):
return self.region.polygons
def get_index_of(self, lons, lats):
""" Returns the index of lons, lats in spatial region
See csep.utils.spatial.CartesianGrid2D for more details.
Args:
lons: ndarray-like
lats: ndarray-like
Returns:
idx: ndarray-like
Raises:
ValueError: if lons or lats are outside of the region.
"""
return self.region.get_index_of(lons, lats)
def scale(self, val):
"""Scales forecast by floating point value.
Args:
val: int, float, or ndarray. This value multiplicatively scale the values in forecast. Use a value of
1 to recover original value of the forecast.
Raises:
ValueError: must be int, float, or ndarray
"""
self._scale = val
return self
def to_dict(self):
return
@classmethod
def from_dict(cls, adict):
raise NotImplementedError()
class MarkedGriddedDataSet(GriddedDataSet):
"""
Represents a gridded forecast in CSEP. The data must be stored as a 2D numpy array where the fast column is magnitude.
The shape of this array will have (n_space_bins, n_mag_bins) and can be large.
"""
def __init__(self, magnitudes=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.region = create_space_magnitude_region(self.region, magnitudes)
@property
def magnitudes(self):
return self.region.magnitudes
@property
def min_magnitude(self):
""" Returns the lowest magnitude bin edge """
return numpy.min(self.magnitudes)
@property
def num_mag_bins(self):
return len(self.magnitudes)
def get_magnitudes(self):
""" Returns the left edge of the magnitude bins. """
return self.magnitudes
@property
def num_nodes(self):
return self.region.num_nodes
def spatial_counts(self, cartesian=False):
"""
Integrates over magnitudes to return the spatial version of the forecast.
Args:
cartesian (bool): if true, will return a 2d grid representing the bounding box of the forecast
Returns:
ndarray containing the count in each bin
"""
if cartesian:
return self.region.get_cartesian(numpy.sum(self.data, axis=1))
else:
return numpy.sum(self.data, axis=1)
def magnitude_counts(self):
""" Returns counts of events in magnitude bins """
return numpy.sum(self.data, axis=0)
def get_magnitude_index(self, mags, tol=0.00001):
""" Returns the indices into the magnitude bins of selected magnitudes
Note: the right-most bin is treated as extending to infinity.
Args:
mags (array-like): list of magnitudes
Returns:
idm (array-like): indices corresponding to mags
Raises:
ValueError
"""
idm = bin1d_vec(mags, self.magnitudes, tol=tol, right_continuous=True)
if numpy.any(idm == -1):
raise ValueError("mags outside the range of forecast magnitudes.")
return idm
[docs]
class GriddedForecast(MarkedGriddedDataSet):
""" Class to represent grid-based forecasts """
[docs]
def __init__(self, start_time=None, end_time=None, *args, **kwargs):
"""
Constructor for GriddedForecast class
Args:
start_time (datetime.datetime):
end_time (datetime.datetime):
"""
super().__init__(*args, **kwargs)
self.start_time = start_time
self.end_time = end_time
[docs]
def scale_to_test_date(self, test_datetime):
""" Scales forecast data by the fraction of the date.
Uses the concept of decimal years to keep track of leap years. See the csep.utils.time_utils.decimal_year for
details on the implementation. If datetime is before the start_date or after the end_date, we will scale the
forecast by unity.
These datetime objects can be timezone aware in UTC timezone or both not time aware. This function will raise a
TypeError according to the specifications of datetime module if these conditions are not met.
Args:
test_datetime (datetime.datetime): date to scale the forecast
in_place (bool): if false, creates a deep copy of the object and scales that instead
"""
# Note: this will throw a TypeError if datetimes are not either both time aware or not time aware.
if test_datetime >= self.end_time:
return self
if test_datetime <= self.start_time:
return self
fore_dur = decimal_year(self.end_time) - decimal_year(self.start_time)
# we are adding one day, bc tests are considered to occur at the end of the day specified by test_datetime.
test_date_dec = decimal_year(test_datetime + datetime.timedelta(1))
fore_frac = (test_date_dec - decimal_year(self.start_time)) / fore_dur
res = self.scale(fore_frac)
return res
[docs]
def target_event_rates(self, target_catalog, scale=False):
""" Generates data set of target event rates given a target data.
The data should already be scaled to the same length as the forecast time horizon. Explicit checks for these
cases are not conducted in this function.
If scale=True then the target event rates will be scaled down to the rates for one day. This choice of time
can be made without a loss of generality. Please see Rhoades, D. A., D. Schorlemmer, M. C. Gerstenberger,
A. Christophersen, J. D. Zechar, and M. Imoto (2011). Efficient testing of earthquake forecasting models,
Acta Geophys 59 728-747.
Args:
target_catalog (csep.core.data.AbstractBaseCatalog): data containing target events
scale (bool): if true, rates will be scaled to one day.
Returns:
out (tuple): target_event_rates, n_fore. target event rates are the
"""
if not isinstance(target_catalog, AbstractBaseCatalog):
raise TypeError("target_catalog must be csep.core.data.AbstractBaseCatalog class.")
if scale:
# first get copy so we dont contaminate the rates of the forecast, this can be quite large for global files
# if we run into memory problems, we can implement a sparse form of the forecast.
data = numpy.copy(self.data)
# straight-forward implementation, relies on correct start and end time
elapsed_days = (self.end_time - self.start_time).days
# scale the data down to days
data = data / elapsed_days
else:
# just pull reference to stored data
data = self.data
# get longitudes and latitudes of target events
lons = target_catalog.get_longitudes()
lats = target_catalog.get_latitudes()
mags = target_catalog.get_magnitudes()
# this array does not keep track of any location anymore. however, it can be computed using the data again.
rates = self.get_rates(lons, lats, mags, data=data)
# we return the sum of data, bc data might be scaled within this function
return rates, numpy.sum(data)
[docs]
def get_rates(self, lons, lats, mags, data=None, ret_inds=False):
""" Returns the rate associated with a longitude, latitude, and magnitude.
Args:
lon: longitude of interest
lat: latitude of interest
mag: magnitude of interest
data: optional, if not none then use this data value provided with the forecast
Returns:
rates (float or ndarray)
Raises:
RuntimeError: lons lats and mags must be the same length
"""
if len(lons) != len(lats) and len(lats) != len(mags):
raise RuntimeError("lons, lats, and mags must have the same length.")
# get index of lon and lat value, if lats, lons, not in region raise value error
idx = self.get_index_of(lons, lats)
# get index of magnitude bins, if lats, lons, not in region raise value error
idm = self.get_magnitude_index(mags)
# retrieve rates from internal data structure
if data is None:
rates = self.data[idx,idm]
else:
rates = data[idx,idm]
if ret_inds:
return rates, (idx, idm)
else:
return rates
[docs]
@classmethod
def from_custom(cls, func, func_args=(), **kwargs):
""" Creates MarkedGriddedDataSet class from custom parsing function.
Args:
func (callable): function will be called as func(*func_args).
func_args (tuple): arguments to pass to func
**kwargs: keyword arguments to pass to the GriddedForecast class constructor.
Returns:
:class:`csep.core.forecasts.GriddedForecast`: forecast object
Note:
The loader function `func` needs to return a tuple that contains (data, region, magnitudes). data is a
:class:`numpy.ndarray`, region is a :class:`CartesianGrid2D<csep.core.regions.CartesianGrid2D>`, and
magnitudes are a :class:`numpy.ndarray` consisting of the magnitude bin edges. See the function
:meth:`load_ascii<csep.core.forecasts.GriddedForecast.load_ascii>` for an example.
"""
data, region, magnitudes = func(*func_args)
# try to ensure that data are region are compatible with one another, but we can only rely on heuristics
return cls(data=data, region=region, magnitudes=magnitudes, **kwargs)
[docs]
@classmethod
def load_ascii(cls, ascii_fname, start_date=None, end_date=None, name=None, swap_latlon=False):
""" Reads Forecast file from CSEP1 ascii format.
The ascii format from CSEP1 testing centers. The ASCII format does not contain headers. The format is listed here:
Lon_0, Lon_1, Lat_0, Lat_1, z_0, z_1, Mag_0, Mag_1, Rate, Flag
For the purposes of defining region objects and magnitude bins use the Lat_0 and Lon_0 values along with Mag_0.
We can assume that the magnitude bins are regularly spaced to allow us to compute Deltas.
The file is row-ordered so that magnitude bins are fastest then followed by space.
Args:
ascii_fname: file name of csep forecast in .dat format
swap_latlon (bool): if true, read forecast spatial cells as lat_0, lat_1, lon_0, lon_1
"""
# Load data
data = numpy.loadtxt(ascii_fname)
# this is very ugly, but since unique returns a sorted list, we want to get the index, sort that and then return
# from the original array. same for magnitudes below.
all_polys = data[:, :4]
all_poly_mask = data[:, -1]
sorted_idx = numpy.sort(numpy.unique(all_polys, return_index=True, axis=0)[1], kind='stable')
unique_poly = all_polys[sorted_idx]
# gives the flag for a spatial cell in the order it was presented in the file
poly_mask = all_poly_mask[sorted_idx]
# create magnitudes bins using Mag_0, ignoring Mag_1 bc they are regular until last bin. we dont want binary search for this
all_mws = data[:, -4]
sorted_idx = numpy.sort(numpy.unique(all_mws, return_index=True)[1], kind='stable')
mws = all_mws[sorted_idx]
# csep1 stores the lat lons as min values and not (x,y) tuples
if swap_latlon:
bboxes = [((i[2], i[0]), (i[3], i[0]), (i[3], i[1]), (i[2], i[1])) for i in unique_poly]
else:
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in unique_poly]
# the spatial cells are arranged fast in latitude, so this only works for the specific csep1 file format
dh = float(unique_poly[0, 3] - unique_poly[0, 2])
# create CarteisanGrid of points
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
# get dims of 2d np.array
n_mag_bins = len(mws)
n_poly = region.num_nodes
# reshape rates into correct 2d format
rates = data[:,-2].reshape(n_poly, n_mag_bins)
# create / return class
if name is None:
name = os.path.basename(ascii_fname[:-4])
gds = cls(start_date, end_date, magnitudes=mws, name=name, region=region, data=rates)
return gds
[docs]
def plot(self, ax=None, show=False, log=True, extent=None, set_global=False, plot_args=None):
""" Plot gridded forecast according to plate-carree projection
Args:
show (bool): if true, show the figure. this call is blocking.
plot_args (optional/dict): dictionary containing plotting arguments for making figures
Returns:
axes: matplotlib.Axes.axes
"""
# no mutable function arguments
if self.start_time is None or self.end_time is None:
time = 'forecast period'
else:
start = decimal_year(self.start_time)
end = decimal_year(self.end_time)
time = f'{round(end-start,3)} years'
plot_args = plot_args or {}
plot_args.setdefault('figsize', (10, 10))
plot_args.setdefault('title', self.name)
# this call requires internet connection and basemap
if log:
plot_args.setdefault('clabel', f'log10 M{self.min_magnitude}+ rate per cell per {time}')
with numpy.errstate(divide='ignore'):
ax = plot_spatial_dataset(numpy.log10(self.spatial_counts(cartesian=True)), self.region, ax=ax,
show=show, extent=extent, set_global=set_global, plot_args=plot_args)
else:
plot_args.setdefault('clabel', f'M{self.min_magnitude}+ rate per cell per {time}')
ax = plot_spatial_dataset(self.spatial_counts(cartesian=True), self.region, ax=ax,show=show, extent=extent,
set_global=set_global, plot_args=plot_args)
return ax
[docs]
class CatalogForecast(LoggingMixin):
""" Catalog based forecast defined as a family of stochastic event sets. """
[docs]
def __init__(self, filename=None, catalogs=None, name=None,
filter_spatial=False, filters=None, apply_mct=False,
region=None, expected_rates=None, start_time=None, end_time=None,
n_cat=None, event=None, loader=None, catalog_type='ascii',
catalog_format='native', store=True, apply_filters=False):
"""
The region information can be provided along side the data, if they are stored in one of the supported file formats.
It is assumed that the region for each data is identical. If the regions are not provided with the data files,
they must be provided explicitly. The california testing region can be loaded using :func:`csep.utils.spatial.california_relm_region`.
There are a few different ways this class can be constructed, each
The region is not required to load a forecast or to perform basic operations on a forecast, such as counting events.
Any binning of events in space or magnitude will require a spatial region or magnitude bin definitions, respectively.
Args:
filename (str): Path to the file or directory containing the forecast.
catalogs: iterable of :class:`csep.core.catalogs.AbstractBaseCatalog`
filter_spatial (bool): if true, will filter to area defined in space region
apply_mct (bool): this should be provided if a time-dependent magnitude completeness model should be
applied to the forecast
filters (iterable): list of data filter strings. these override the filter_magnitude and filter_time arguments
region: :class:`csep.core.spatial.CartesianGrid2D` including magnitude bins
start_time (datetime.datetime): start time of the forecast
end_time (datetime.datetime): end time of the forecast
name (str): name of the forecast, will be used for defaults in plotting and other places
n_cat (int): number of catalogs in the forecast
event (:class:`csep.models.Event`): if the forecast is associated with a particular event
store (bool): if true, will store catalogs on object in memory. this should only be made false if working
with very large forecast files that cannot be stored in memory
apply_filters (bool): if true, filters will be applied automatically to the catalogs as the forecast
is iterated through
"""
super().__init__()
# used for labeling plots, filenames, etc, should be human readable
self.name = name
# path to forecast location
self.filename = filename
# should be iterable
self.catalogs = catalogs or []
self._catalogs = []
# should be a generator function
self.loader = loader
# used if the forecast is associated with a particular event
self.event = event
# if false, no filters will be applied when iterating though forecast
self.apply_filters = apply_filters
# these can be used to filter catalogs to a desired experiment region
self.filters = filters or []
self.filter_spatial = filter_spatial
self.apply_mct = apply_mct
# data format used for loading catalogs
self.catalog_type = catalog_type
self.catalog_format = catalog_format
# should be a MarkedGriddedDataSet
self.expected_rates = expected_rates
self._event_counts = []
# defines the space, time, and magnitude region of the forecasts
self.region = region
# start and end time of the forecast
self.start_time = start_time
self.end_time = end_time
# stores catalogs in memory
self.store = store
# time horizon in years
if self.start_time is not None and self.end_time is not None:
self.time_horizon_years = (self.end_epoch - self.start_epoch) / SECONDS_PER_ASTRONOMICAL_YEAR / 1000
# number of simulated catalogs
self.n_cat = n_cat
# used to handle the iteration over catalogs
self._idx = 0
# load catalogs if catalogs aren't provided, this might be a generator
if not self.catalogs:
self._load_catalogs()
def __iter__(self):
return self
def __next__(self):
""" Allows the class to be used in a for-loop. Handles the case where the catalogs are stored as a list or
loaded in using a generator function. The latter solves the problem where memory is a concern or all of the
catalogs should not be held in memory at once. """
is_generator = True
try:
n_items = len(self.catalogs)
is_generator = False
assert self.n_cat == n_items
# here, we have reached the end of the list, simply reset the index to the front
if self._idx >= self.n_cat:
self._idx = 0
raise StopIteration()
catalog = self.catalogs[self._idx]
self._idx += 1
except TypeError:
# handle generator case. a generator does not have the __len__ attribute, but an iterable does.
try:
catalog = next(self.catalogs)
self._idx += 1
except StopIteration:
# gets a new generator function after the old one is exhausted
if not self.store:
self.catalogs = self.loader(format=self.catalog_format, filename=self.filename,
region=self.region, name=self.name)
else:
self.catalogs = self._catalogs
del self._catalogs
if self.apply_filters:
self.apply_filters = False
self.n_cat = self._idx
self._idx = 0
raise StopIteration()
# apply filtering to catalogs, these can throw errors if not configured properly
if self.apply_filters:
if self.filters:
catalog = catalog.filter(self.filters)
if self.apply_mct:
catalog = catalog.apply_mct(self.event.magnitude, datetime_to_utc_epoch(self.event.time))
if self.filter_spatial:
catalog = catalog.filter_spatial(self.region)
self._event_counts.append(catalog.event_count)
if is_generator and self.store:
self._catalogs.append(catalog)
# return potentially filtered data
return catalog
def _load_catalogs(self):
self.catalogs = self.loader(format=self.catalog_format, filename=self.filename, region=self.region, name=self.name)
@property
def start_epoch(self):
return datetime_to_utc_epoch(self.start_time)
@property
def end_epoch(self):
return datetime_to_utc_epoch(self.end_time)
@property
def magnitudes(self):
""" Returns left bin-edges of magnitude bins """
return self.region.magnitudes
@property
def min_magnitude(self):
""" Returns smallest magnitude bin edge of forecast """
return numpy.min(self.region.magnitudes)
[docs]
def spatial_counts(self, cartesian=False):
""" Returns the expected spatial counts from forecast """
if self.expected_rates is None:
self.get_expected_rates()
return self.expected_rates.spatial_counts(cartesian=cartesian)
[docs]
def magnitude_counts(self):
""" Returns expected magnitude counts from forecast """
if self.expected_rates is None:
self.get_expected_rates()
return self.expected_rates.magnitude_counts()
def get_event_counts(self, verbose=True):
""" Returns a numpy array containing the number of event counts for each catalog.
Note: This function can take a while to compute if called without already iterating through a forecast that
is being stored on disk. This should only happen to large forecasts that have been initialized with
store = False. This should only happen on the first iteration of the catalog.
Returns:
(numpy.array): event counts with size equal of catalogs in forecast
"""
if len(self._event_counts) == 0:
# event counts is filled while iterating over the catalog
t0 = time.time()
for i, _ in enumerate(self):
if verbose:
tens_exp = numpy.floor(numpy.log10(i + 1))
if (i + 1) % 10 ** tens_exp == 0:
t1 = time.time()
print(f'Processed {i + 1} catalogs in {t1 - t0:.2f} seconds', flush=True)
pass
return numpy.array(self._event_counts)
[docs]
def get_expected_rates(self, verbose=False):
""" Compute the expected rates in space-magnitude bins
Args:
catalogs_iterable (iterable): collection of catalogs, should be filtered outside the function
data (csep.core.AbstractBaseCatalog): observation data
Return:
:class:`csep.core.forecasts.GriddedForecast`
list of tuple(lon, lat, magnitude) events that were skipped in binning. if data was filtered in space
and magnitude beforehand this list shoudl be empty.
"""
# self.n_cat might be none here, if catalogs haven't been loaded and its not yet specified.
if self.region is None or self.region.magnitudes is None:
raise AttributeError("Forecast must have space-magnitude regions to compute expected rates.")
# need to compute expected rates, else return.
if self.expected_rates is None:
t0 = time.time()
data = numpy.empty([])
for i, cat in enumerate(self):
# compute spatial density from each data, force data region to use the forecast region
cat.region = self.region
gridded_counts = cat.spatial_magnitude_counts()
if i == 0:
data = numpy.array(gridded_counts)
else:
data += numpy.array(gridded_counts)
# output status
if verbose:
tens_exp = numpy.floor(numpy.log10(i + 1))
if (i + 1) % 10 ** tens_exp == 0:
t1 = time.time()
print(f'Processed {i + 1} catalogs in {t1 - t0:.3f} seconds', flush=True)
# after we iterate through the catalogs, we know self.n_cat
data = data / self.n_cat
self.expected_rates = GriddedForecast(self.start_time, self.end_time, data=data, region=self.region,
magnitudes=self.magnitudes, name=self.name)
return self.expected_rates
def plot(self, plot_args = None, verbose=True, **kwargs):
plot_args = plot_args or {}
if self.expected_rates is None:
self.get_expected_rates(verbose=verbose)
args_dict = {'title': self.name,
'grid_labels': True,
'grid': True,
'borders': True,
'feature_lw': 0.5,
'basemap': 'ESRI_terrain',
}
args_dict.update(plot_args)
ax = self.expected_rates.plot(**kwargs, plot_args=args_dict)
return ax
[docs]
def get_dataframe(self):
"""Return a single dataframe with all of the events from all of the catalogs."""
raise NotImplementedError("get_dataframe is not implemented.")
[docs]
def write_ascii(self, fname, header=True, loader=None ):
""" Writes data forecast to ASCII format
Args:
fname (str): Output filename of forecast
header (bool): If true, write header information; else, do not write header.
Returns:
NoneType
"""
raise NotImplementedError('write_ascii is not implemented!')
[docs]
@classmethod
def load_ascii(cls, fname, **kwargs):
""" Loads ASCII format for data forecast.
Args:
fname (str): path to file or directory containing forecast files
Returns:
:class:`csep.core.forecasts.CatalogForecast
"""
raise NotImplementedError("load_ascii is not implemented!")