import csv
import gzip
import json
import operator
import os
import datetime
# 3rd party required for core package
import numpy
import pandas
# CSEP Imports
from csep.utils.time_utils import epoch_time_to_utc_datetime, datetime_to_utc_epoch, strptime_to_utc_datetime, \
millis_to_days, parse_string_format, days_to_millis, strptime_to_utc_epoch, utc_now_datetime, create_utc_datetime
from csep.utils.stats import min_or_none, max_or_none
from csep.utils.calc import discretize
from csep.core.regions import CartesianGrid2D
import csep.utils.comcat as comcat
import csep.utils.geonet as geonet
from csep.core.exceptions import CSEPSchedulerException, CSEPCatalogException, CSEPIOException
from csep.utils.calc import bin1d_vec
from csep.utils.constants import CSEP_MW_BINS
from csep.utils.log import LoggingMixin
from csep.utils.readers import csep_ascii
from csep.utils.file import get_file_extension
from csep.utils.plots import plot_catalog
[docs]
class AbstractBaseCatalog(LoggingMixin):
"""
Abstract catalog base class for PyCSEP catalogs. This class should not and cannot be used on its own. This just
provides the interface for implementing custom catalog classes.
"""
dtype = None
[docs]
def __init__(self, filename=None, data=None, catalog_id=None, format=None, name=None, region=None,
compute_stats=True, filters=None, metadata=None, date_accessed=None):
""" Standard catalog format for CSEP catalogs. Primary event data are stored in structured numpy array. Additional
metadata are available by the event_id in the catalog metadata information.
Args:
filename: location of catalog
catalog (numpy.ndarray or eventlist): catalog data
catalog_id: catalog id number (used for stochastic event set forecasts)
format: identification used for serialization
name: human readable name of catalog
region: spatial and magnitude region
compute_stats: whether statistics should be computed for the catalog
filters (str or list): filtering operations to apply to the catalog
metadata (dict): additional information for events
date_accessed (str): time string when catalog was accessed
"""
super().__init__()
self.filename = filename
self.catalog_id = catalog_id
self.format = format
self.name = name
self.region = region
self.compute_stats = compute_stats
self.filters = filters or []
self.date_accessed = date_accessed or utc_now_datetime() # type datetime.datetime
# used to store additional event information based on the event_id key, if no event_id will default to an
# integer index
self.metadata = metadata or {}
# cleans the catalog to set as ndarray, see setter.
self.catalog = data # type: numpy.ndarray
# use user defined stats if entered into catalog
if data is not None and self.compute_stats:
self.update_catalog_stats()
def __eq__(self, other):
""" Compares whether two catalogs are equal by comparing their dicts. """
return self.to_dict() == other.to_dict()
def __str__(self):
self.update_catalog_stats()
s = f'''
Name: {self.name}
Start Date: {self.start_time}
End Date: {self.end_time}
Latitude: ({self.min_latitude}, {self.max_latitude})
Longitude: ({self.min_longitude}, {self.max_longitude})
Min Mw: {self.min_magnitude}
Max Mw: {self.max_magnitude}
Event Count: {self.event_count}
'''
return s
def to_dict(self):
"""
Serializes class to json dictionary.
Returns:
catalog as dict
"""
excluded = ['_catalog']
out = {}
for k, v in self.__dict__.items():
# note: if 'v' is callable that implies that we have a function bound to a class-member. this happens
# for the catalog forecast and requires excluding this value.
if not callable(v) and k not in excluded:
if hasattr(v, 'to_dict'):
new_v = v.to_dict()
else:
new_v = v
if k.startswith('_'):
out[k[1:]] = new_v
else:
out[k] = new_v
out['catalog'] = []
for line in list(self.catalog.tolist()):
new_line = []
for item in line:
# try to decode, if it fails just use original, we use this to handle string-based event_ids
try:
item = item.decode('utf-8')
except:
pass
finally:
new_line.append(item)
out['catalog'].append(new_line)
return out
@property
def event_count(self):
""" Number of events in catalog """
return self.get_number_of_events()
@classmethod
def load_catalog(cls, filename, loader=csep_ascii, **kwargs):
raise NotImplementedError("subclass should implement load_catalog funtion.")
@classmethod
def from_dict(cls, adict, **kwargs):
""" Creates a class from the dictionary representation of the class state. The catalog is serialized into a list of
tuples that contain the event information in the order defined by the dtype.
This needs to handle reading in region information at some point.
"""
region_loader = {
'CartesianGrid2D': CartesianGrid2D
}
# could these be class values? can be changed later.
exclude = ['_catalog', 'region']
time_members = ['date_accessed', 'start_time', 'end_time']
catalog = adict.get('catalog', None)
out = cls(data=catalog, **kwargs)
# here we are looping over the items in the class and finding the associated value in the dict
for k, v in out.__dict__.items():
if k not in exclude:
if k not in time_members:
try:
setattr(out, k, adict[k])
except KeyError:
pass
else:
setattr(out, k, _none_or_datetime(adict[k]))
else:
if k == 'region':
# tries to read class id from catalog, should fail silently if catalog doesn't exist
try:
class_id = adict[k].get('class_id', None)
if class_id is None:
class_id = 'CartesianGrid2D'
setattr(out, k, region_loader[class_id].from_dict(adict[k]))
except AttributeError:
pass
return out
@classmethod
def from_dataframe(cls, df, **kwargs):
"""
Creates catalog from dataframe. Dataframe must have columns that are equivalent to whatever fields
the catalog expects in the catalog dtype.
For example:
cat = CSEPCatalog()
df = cat.get_dataframe()
new_cat = CSEPCatalog.from_dataframe(df)
Args:
df (pandas.DataFrame): pandas dataframe
**kwargs:
Returns:
Catalog
"""
catalog_id = None
try:
catalog_id = df['catalog_id'].iloc[0]
except KeyError:
pass
col_list = list(cls.dtype.names)
# we want this to be a structured array not a record array and only returns core attributes listed in dtype
# loses information about the region and event meta data
catalog = numpy.ascontiguousarray(df[col_list].to_records(index=False), dtype=cls.dtype)
out_cls = cls(data=catalog, catalog_id=catalog_id, **kwargs)
return out_cls
@classmethod
def load_json(cls, filename, **kwargs):
""" Loads catalog from JSON file """
with open(filename, 'r') as f:
adict = json.load(f)
return cls.from_dict(adict, **kwargs)
def write_json(self, filename):
""" Writes catalog to json file
Args:
filename (str): path to save file
"""
with open(filename, 'w') as f:
json.dump(self.to_dict(), f, indent=4, separators=(',', ': '), sort_keys=True, default=str)
@property
def catalog(self):
return self._catalog
@property
def data(self):
return self._catalog
@catalog.setter
def catalog(self, val):
"""
Ensures that catalogs with formats not numpy arrray are treated as numpy.array
Note:
This requires that catalog classes implement the self._get_catalog_as_ndarray() function.
This function should return structured numpy.ndarray.
Catalog will remain None, if assigned that way in constructor.
"""
self._catalog = val
if self._catalog is not None:
self._catalog = self._get_catalog_as_ndarray()
# ensure that people are behaving, somewhat non-pythonic but needed
if not isinstance(self._catalog, numpy.ndarray):
raise ValueError("Error: Catalog must be numpy.ndarray! Ensure that self._get_catalog_as_ndarray()" +
" returns an ndarray")
if self.compute_stats and self._catalog is not None:
self.update_catalog_stats()
def _get_catalog_as_ndarray(self):
"""
This function will be called anytime that a catalog is assigned to self.catalog
The purpose of this function is to ensure that the catalog is being properly parsed into the correct format, and
to prevent users of the catalog classes from assigning improper data types.
This also acts as a convenience to allow easy assignment of different types to the catalog. The default
implementation of this function expects that the data are arranged as a collection of tuples corresponding to
the catalog data type.
"""
"""
Converts eventlist into ndarray format.
Note:
Failure state exists if self.catalog is not bound
to instance explicity.
"""
# short-circuit
if isinstance(self.catalog, numpy.ndarray):
return self.catalog
# if catalog is not a numpy array, class must have dtype information
catalog_length = len(self.catalog)
catalog = numpy.empty(catalog_length, dtype=self.dtype)
if catalog_length == 0:
return catalog
if isinstance(self.catalog[0], (list, tuple)):
for i, event in enumerate(self.catalog):
catalog[i] = tuple(event)
elif isinstance(self.catalog[0], (comcat.SummaryEvent, geonet.SummaryEvent)):
for i, event in enumerate(self.catalog):
catalog[i] = (event.id, datetime_to_utc_epoch(event.time),
event.latitude, event.longitude, event.depth, event.magnitude)
else:
raise TypeError("Catalog data must be list of events tuples with order:\n"
f"{', '.join(self.dtype.names)} or \n"
"list of SummaryEvent type.")
return catalog
def write_ascii(self, filename, write_header=True, write_empty=True, append=False, id_col='id'):
"""
Write catalog in csep2 ascii format.
This format only uses the required variables from the catalog and should work by default. It can be overwritten
if an event_id (or other columns should be used). By default, the routine will look for a column the catalog array
called 'id' and will populate the event_id column with these values. If the 'id' column is not found, then it will
leave this column blank
Short format description (comma separated values):
longitude, latitude, M, time_string format="%Y-%m-%dT%H:%M:%S.%f", depth, catalog_id, [event_id]
Args:
filename (str): the file location to write the the ascii catalog file
write_header (bool): Write header string (default true)
write_empty (bool): Write file event if no events in catalog
append (bool): If true, append to the filename
id_col (str): name of event_id column (if included)
Returns:
NoneType
"""
# longitude, latitude, M, epoch_time (time in millisecond since Unix epoch in GMT), depth, catalog_id, event_id
header = ['lon', 'lat', 'mag', 'time_string', 'depth', 'catalog_id', 'event_id']
if append:
write_string = 'a'
else:
write_string = 'w'
with open(filename, write_string, newline='') as outfile:
writer = csv.DictWriter(outfile, fieldnames=header, delimiter=',')
if write_header:
writer.writeheader()
if write_empty and self.event_count == 0:
return
# create iterator from catalog columns
try:
event_ids = self.catalog[id_col]
except ValueError:
event_ids = [''] * self.event_count
row_iter = zip(self.get_longitudes(),
self.get_latitudes(),
self.get_magnitudes(),
self.get_epoch_times(),
self.get_depths(),
# populate list with `self.event_count` elements with val self.catalog_id
[self.catalog_id] * self.event_count,
event_ids)
# write csv file using DictWriter interface
for row in row_iter:
try:
event_id = row[6].decode('utf-8')
except AttributeError:
event_id = row[6]
# create dictionary for each row
adict = {'lon': row[0],
'lat': row[1],
'mag': row[2],
'time_string': str(epoch_time_to_utc_datetime(row[3]).replace(tzinfo=None)).replace(' ', 'T'),
'depth': row[4],
'catalog_id': row[5],
'event_id': event_id}
writer.writerow(adict)
def to_dataframe(self, with_datetime=False):
"""
Returns pandas Dataframe describing the catalog. Explicitly casts to pandas DataFrame.
Note:
The dataframe will be in the format of the original catalog. If you require that the
dataframe be in the CSEP ZMAP format, you must explicitly convert the catalog.
Returns:
(pandas.DataFrame): This function must return a pandas DataFrame
Raises:
ValueError: If self._catalog cannot be passed to pandas.DataFrame constructor, this function
must be overridden in the child class.
"""
df = pandas.DataFrame(self.catalog)
df['counts'] = 1
df['catalog_id'] = self.catalog_id
if with_datetime:
df['datetime'] = df['origin_time'].map(epoch_time_to_utc_datetime)
df.index = df['datetime']
# queries the region for the index of each event
if self.region is not None:
df['region_id'] = self.region.get_index_of(self.get_longitudes(), self.get_latitudes())
try:
# bin magnitudes
df['mag_id'] = self.get_mag_idx()
except AttributeError:
pass
# set index as datetime
return df
def get_mag_idx(self):
""" Return magnitude index from region magnitudes """
try:
return bin1d_vec(self.get_magnitudes(), self.region.magnitudes, tol=0.00001, right_continuous=True)
except AttributeError:
raise CSEPCatalogException("Cannot return magnitude index without self.region.magnitudes")
def get_spatial_idx(self):
""" Return spatial index of region for a longitudes and latitudes in catalog. """
try:
region_idx = self.region.get_index_of(self.get_longitudes(), self.get_latitudes())
except AttributeError:
raise CSEPCatalogException("Must have region information to compute region index.")
return region_idx
def get_event_ids(self):
return self.catalog['id']
def get_number_of_events(self):
"""
Computes the number of events from a catalog by checking its length.
:returns: number of events in catalog, zero if catalog is None
"""
if self.catalog is not None:
return self.catalog.shape[0]
else:
return 0
def get_epoch_times(self):
"""
Returns the datetime of the event as the UTC epoch time (aka unix timestamp)
"""
return self.catalog['origin_time']
def get_cumulative_number_of_events(self):
"""
Returns the cumulative number of events in the catalog.
Primarily used for plotting purposes.
Returns:
numpy.array: numpy array of the cumulative number of events, empty array if catalog is empty.
"""
return numpy.cumsum(numpy.ones(self.event_count))
def get_magnitudes(self):
"""
Returns magnitudes of all events in catalog
"""
return self.catalog['magnitude']
def get_datetimes(self):
"""
Returns datetime object from timestamp representation in catalog
:returns: list of timestamps from events in catalog.
"""
""" Returns datetimes from events in catalog """
return list(map(epoch_time_to_utc_datetime, self.get_epoch_times()))
def get_latitudes(self):
"""
Returns latitudes of all events in catalog
"""
return self.catalog['latitude']
def get_longitudes(self):
"""
Returns longitudes of all events in catalog
Returns:
(numpy.array): longitudes
"""
return self.catalog['longitude']
def get_bbox(self):
"""
Returns bounding box of all events in the catalog
Returns:
(numpy.array): [lon_min, lon_max, lat_min, lat_max]
"""
bbox = numpy.array([numpy.min(self.catalog['longitude']),
numpy.max(self.catalog['longitude']),
numpy.min(self.catalog['latitude']),
numpy.max(self.catalog['latitude'])])
return bbox
def get_depths(self):
""" Returns depths of all events in catalog """
return self.catalog['depth']
def filter(self, statements=None, in_place=True):
"""
Filters the catalog based on statements. This function takes about 60% of the run-time for processing UCERF3-ETAS
simulations, so likely all other simulations. Implementations should try and limit how often this function
will be called.
Args:
statements (str, iter): logical statements to evaluate, e.g., ['magnitude > 4.0', 'year >= 1995']
in_place (bool): return new instance of catalog
Returns:
self: instance of AbstractBaseCatalog, so that this function can be chained.
"""
if not self.filters and statements is None:
raise CSEPCatalogException("Must provide filter statements to function or class to filter")
# programmatically assign operators
operators = {'>': operator.gt,
'<': operator.lt,
'>=': operator.ge,
'<=': operator.le,
'==': operator.eq}
# filter catalogs, implied logical and
if statements is None:
statements = self.filters
if isinstance(statements, str):
name = statements.split(' ')[0]
if name == 'datetime':
_, oper, date, time = statements.split(' ')
name = 'origin_time'
# can be a datetime.datetime object or datetime string, if we want to support filtering on meta data it
# can happen here. but need to determine what to do if entry are not present bc meta data does not
# need to be square
value = strptime_to_utc_epoch(' '.join([date, time]))
filtered = self.catalog[operators[oper](self.catalog[name], float(value))]
else:
name, oper, value = statements.split(' ')
filtered = self.catalog[operators[oper](self.catalog[name], float(value))]
elif isinstance(statements, (list, tuple)):
# slower but at the convenience of not having to call multiple times
filters = list(statements)
filtered = numpy.copy(self.catalog)
for filt in filters:
name = filt.split(' ')[0]
# create indexing array, start with all events
if name == 'datetime':
_, oper, date, time = filt.split(' ')
# we map the requested datetime to an epoch time so we act like the user requested origin_time
name = 'origin_time'
value = strptime_to_utc_epoch(' '.join([date, time]))
filtered = filtered[operators[oper](filtered[name], float(value))]
else:
name, oper, value = filt.split(' ')
filtered = filtered[operators[oper](filtered[name], float(value))]
else:
raise ValueError('statements should be either a string or list or tuple of strings')
# can return new instance of class or original instance
self.filters = statements
if in_place:
self.catalog = filtered
return self
else:
# make and return new object
cls = self.__class__
inst = cls(data=filtered, catalog_id=self.catalog_id, format=self.format, name=self.name,
region=self.region, filters=statements)
return inst
def filter_spatial(self, region=None, update_stats=False, in_place=True):
"""
Removes events outside of the region. This takes some time and should be used sparingly. Typically for isolating a region
near the mainshock or inside a testing region. This should not be used to create gridded style data sets.
Args:
region: csep.utils.spatial.Region
update_stats (bool): if true will update catalog statistics
in_place (bool): if false, will create a new instance of the catalog preserving state
Returns:
self
"""
if region is None and self.region is None:
raise CSEPCatalogException("region provided to function or bound to the catalog instance.")
# update the region to the new region
if region is not None:
self.region = region
mask = self.region.get_masked(self.get_longitudes(), self.get_latitudes())
# logical index uses opposite boolean values than masked arrays.
filtered = self.catalog[~mask]
if in_place:
self.catalog = filtered
if update_stats:
self.update_catalog_stats()
return self
else:
cls = self.__class__
inst = cls(data=filtered, catalog_id=self.catalog_id, format=self.format, name=self.name,
region=self.region, compute_stats=update_stats)
return inst
def apply_mct(self, m_main, event_epoch, mc=2.5):
"""
Applies time-dependent magnitude of completeness following a mainshock. Taken
from Eq. (15) from Helmstetter et al., 2006.
Args:
m_main (float): mainshock magnitude
event_epoch: epoch time in millis of event
mc (float): mag_completeness
Returns:
"""
def compute_mct(t, m):
return m - 4.5 - 0.75 * numpy.log10(t)
# compute critical time for efficiency
t_crit_days = 10 ** -((mc - m_main + 4.5) / 0.75)
t_crit_millis = days_to_millis(t_crit_days)
times = self.get_epoch_times()
mws = self.get_magnitudes()
# catalogs are stored stored by milliseconds
t_crit_epoch = t_crit_millis + event_epoch
# another short-circuit, again assumes that catalogs are sorted in time
if times[0] > t_crit_epoch:
return self
# this is used to index the array, starting with accepting all events
filter = numpy.ones(self.event_count, dtype=bool)
for i, (mw, time) in enumerate(zip(mws, times)):
# we can break bc events are sorted in time
if time > t_crit_epoch:
break
if time < event_epoch:
continue
time_from_mshock_in_days = millis_to_days(time - event_epoch)
mct = compute_mct(time_from_mshock_in_days, m_main)
# ignore events with mw < mct
if mw < mct:
filter[i] = False
filtered = self.catalog[filter]
self.catalog = filtered
return self
def get_csep_format(self):
"""
This method should be overwritten for catalog formats that do not adhere to the CSEP ZMAP catalog format. For
those that do, this method will return the catalog as is.
"""
raise NotImplementedError('get_csep_format() not implemented.')
def update_catalog_stats(self):
""" Compute summary statistics of events in catalog """
# update min and max values
self.min_magnitude = min_or_none(self.get_magnitudes())
self.max_magnitude = max_or_none(self.get_magnitudes())
self.min_latitude = min_or_none(self.get_latitudes())
self.max_latitude = max_or_none(self.get_latitudes())
self.min_longitude = min_or_none(self.get_longitudes())
self.max_longitude = max_or_none(self.get_longitudes())
self.start_time = epoch_time_to_utc_datetime(min_or_none(self.get_epoch_times()))
self.end_time = epoch_time_to_utc_datetime(max_or_none(self.get_epoch_times()))
def spatial_counts(self):
"""
Returns counts of events within discrete spatial region
We figure out the index of the polygons and create a map that relates the spatial coordinate in the
Cartesian grid with with the polygon in region.
Returns:
ndarray containing the event count in each spatial bin
"""
# make sure region is specified with catalog
if self.event_count == 0:
return numpy.zeros(self.region.num_nodes)
if self.region is None:
raise CSEPSchedulerException("Cannot create binned rates without region information.")
n_poly = self.region.num_nodes
event_counts = numpy.zeros(n_poly)
# this function could throw ValueError if points are outside of the region
idx = self.region.get_index_of(self.get_longitudes(), self.get_latitudes())
numpy.add.at(event_counts, idx, 1)
return event_counts
def spatial_event_probability(self):
# make sure region is specified with catalog
if self.event_count == 0:
return numpy.zeros(self.region.num_nodes)
if self.region is None:
raise CSEPSchedulerException("Cannot create binned probabilities without region information.")
n_poly = self.region.num_nodes
event_flag = numpy.zeros(n_poly)
idx = self.region.get_index_of(self.get_longitudes(), self.get_latitudes())
event_flag[idx] = 1
return event_flag
def magnitude_counts(self, mag_bins=None, tol=0.00001, retbins=False):
""" Computes the count of events within mag_bins
Args:
mag_bins: uses csep.utils.constants.CSEP_MW_BINS as default magnitude bins
retbins (bool): if this is true, return the bins used
Returns:
numpy.ndarray: showing the counts of hte events in each magnitude bin
"""
# todo: keep track of events that are ignored
if mag_bins is None:
try:
# a forecast is a type of region, but region does not need a magnitude
mag_bins = self.region.magnitudes
except AttributeError:
# use default magnitude bins from csep
mag_bins = CSEP_MW_BINS
self.region.magnitudes = mag_bins
self.region.num_mag_bins = len(mag_bins)
out = numpy.zeros(len(mag_bins))
if self.event_count == 0:
if retbins:
return (mag_bins, out)
else:
return out
idx = bin1d_vec(self.get_magnitudes(), mag_bins, tol=tol, right_continuous=True)
numpy.add.at(out, idx, 1)
if retbins:
return (mag_bins, out)
else:
return out
def spatial_magnitude_counts(self, mag_bins=None, tol=0.00001):
""" Return counts of events in space-magnitude region.
We figure out the index of the polygons and create a map that relates the spatial coordinate in the
Cartesian grid with the polygon in region.
Args:
mag_bins (list, numpy.array): magnitude bins (optional), if empty tries to use magnitude bins associated with region
tol (float): tolerance for comparisons within magnitude bins
Returns:
output: unnormalized event count in each bin, 1d ndarray where index corresponds to midpoints
"""
# make sure region is specified with catalog
if self.region is None:
raise CSEPCatalogException("Cannot create binned rates without region information.")
if self.region.magnitudes is None and mag_bins is None:
raise CSEPCatalogException("Region must have magnitudes or mag_bins must be defined to "
"compute space magnitude binning.")
# prefer user supplied mag_bins
if mag_bins is None:
mag_bins = self.region.magnitudes
# short-circuit if zero-events in catalog... return array of zeros
n_poly = self.region.num_nodes
event_counts = numpy.zeros((n_poly, len(mag_bins)))
if self.event_count != 0:
# this will throw ValueError if points outside range
spatial_idx = self.region.get_index_of(self.get_longitudes(), self.get_latitudes())
# also throwing the same error
mag_idx = bin1d_vec(self.get_magnitudes(), mag_bins, tol=tol, right_continuous=True)
for idx in range(spatial_idx.shape[0]):
if mag_idx[idx] == -1:
raise ValueError("at least one magnitude value outside of the valid region.")
event_counts[(spatial_idx[idx], mag_idx[idx])] += 1
return event_counts
def length_in_seconds(self):
""" Returns catalog length in seconds assuming that the catalog is sorted by time. """
dts = self.get_datetimes()
elapsed_time = (dts[-1] - dts[0]).total_seconds()
return elapsed_time
def get_bvalue(self, mag_bins=None, return_error=True):
"""
Estimates the b-value of a catalog from Marzocchi and Sandri (2003). First, tries to use the magnitude bins
provided to the function. If those are not provided, tries the magnitude bins associated with the region.
If that fails, uses the default magnitude bins provided in constants.
Args:
mag_bins (list or array_like): monotonically increasing set of magnitude bin edges
return_error (bool): returns errors
Returns:
bval (float): b-value
err (float): std. err
"""
if self.get_number_of_events() == 0:
return None
# this might fail if magnitudes are not aligned
if mag_bins is None:
try:
mag_bins = self.region.magnitudes
except AttributeError:
mag_bins = CSEP_MW_BINS
mws = discretize(self.get_magnitudes(), mag_bins)
dmw = mag_bins[1] - mag_bins[0]
# compute the p term from eq 3.10 in marzocchi and sandri [2003]
def p():
top = dmw
bottom = numpy.mean(mws) - numpy.min(mws)
# this might happen if all mags are the same, or 1 event in catalog
if bottom == 0:
return None
return 1 + top / bottom
bottom = numpy.log(10) * dmw
p = p()
if p is None:
return None
bval = 1.0 / bottom * numpy.log(p)
if return_error:
err = (1 - p) / (numpy.log(10) * dmw * numpy.sqrt(self.event_count * p))
return (bval, err)
else:
return bval
def b_positive(self):
""" Implements the b-positive indicator from Nicholas van der Elst """
pass
def plot(self, ax=None, show=False, extent=None, set_global=False, plot_args=None):
""" Plot catalog according to plate-carree projection
Args:
ax (`matplotlib.pyplot.axes`): Previous axes onto which catalog can be drawn
show (bool): if true, show the figure. this call is blocking.
extent (list): Force an extent [lon_min, lon_max, lat_min, lat_max]
plot_args (optional/dict): dictionary containing plotting arguments for making figures
Returns:
axes: matplotlib.Axes.axes
"""
# no mutable function arguments
plot_args_default = {
'basemap': 'ESRI_terrain',
'markersize': 2,
'markercolor': 'red',
'alpha': 0.3,
'mag_scale': 7,
'legend': True,
'grid_labels': True,
'legend_loc': 3,
'figsize': (8, 8),
'title': self.name,
'mag_ticks': False
}
# Plot the region border (if it exists) by default
try:
# This will throw error if catalog does not have region
_ = self.region.num_nodes
plot_args_default['region_border'] = True
except AttributeError:
pass
plot_args = plot_args or {}
plot_args_default.update(plot_args)
# this call requires internet connection and basemap
ax = plot_catalog(self, ax=ax,show=show, extent=extent,
set_global=set_global, plot_args=plot_args_default)
return ax
[docs]
class CSEPCatalog(AbstractBaseCatalog):
"""
Standard catalog class for PyCSEP catalog operations.
"""
dtype = numpy.dtype([('id', 'S256'),
('origin_time', '<i8'),
('latitude', '<f8'),
('longitude', '<f8'),
('depth', '<f8'),
('magnitude', '<f8')])
[docs]
def __init__(self, **kwargs):
""" Standard catalog format for CSEP catalogs. Primary event data are stored in structured numpy array. Additional
metadata are available by the event_id in the catalog metadata information.
Args:
filename: location of catalog
catalog (numpy.ndarray or eventlist): catalog data
catalog_id: catalog id number (used for stochastic event set forecasts)
format: identification used for serialization
name: human readable name of catalog
region: spatial and magnitude region
compute_stats: whether statistics should be computed for the catalog
filters (str or list): filtering operations to apply to the catalog
metadata (dict): additional information for events
date_accessed (str): time string when catalog was accessed
"""
super().__init__(**kwargs)
[docs]
@classmethod
def load_ascii_catalogs(cls, filename, **kwargs):
""" Loads multiple catalogs in csep-ascii format.
This function can load multiple catalogs stored in a single file. This typically called to
load a catalog-based forecast, but could also load a collection of catalogs stored in the same file
Args:
filename (str): filepath or directory of catalog files
**kwargs (dict): passed to class constructor
Return:
yields CSEPCatalog class
"""
def parse_filename(filename):
# this works for unix
basename = str(os.path.basename(filename.rstrip('/')).split('.')[0])
split_fname = basename.split('_')
name = split_fname[0]
start_time = strptime_to_utc_datetime(split_fname[1], format="%Y-%m-%dT%H-%M-%S-%f")
return (name, start_time)
def read_float(val):
"""Returns val as float or None if unable"""
try:
val = float(val)
except:
val = None
return val
def is_header_line(line):
if line[0].lower() == 'lon':
return True
else:
return False
def read_catalog_line(line):
# convert to correct types
lon = read_float(line[0])
lat = read_float(line[1])
magnitude = read_float(line[2])
# maybe fractional seconds are not included
origin_time = line[3]
if origin_time:
try:
origin_time = strptime_to_utc_epoch(line[3], format='%Y-%m-%dT%H:%M:%S.%f')
except ValueError:
origin_time = strptime_to_utc_epoch(line[3], format='%Y-%m-%dT%H:%M:%S')
depth = read_float(line[4])
catalog_id = int(line[5])
event_id = line[6]
# temporary event
temp_event = (event_id, origin_time, lat, lon, depth, magnitude)
return temp_event, catalog_id
# overwrite filename, if user specifies
try:
name_from_file, start_time = parse_filename(filename)
kwargs.setdefault('name', name_from_file)
except:
pass
# handle all catalogs in single file
if os.path.isfile(filename):
with open(filename, 'r', newline='') as input_file:
catalog_reader = csv.reader(input_file, delimiter=',')
# csv treats everything as a string convert to correct types
events = []
# all catalogs should start at zero
prev_id = None
for line in catalog_reader:
# skip header line on first read if included in file
if prev_id is None:
if is_header_line(line):
continue
# read line and return catalog id
temp_event, catalog_id = read_catalog_line(line)
empty = False
# OK if event_id is empty
if all([val in (None, '') for val in temp_event[1:]]):
empty = True
# first event is when prev_id is none, catalog_id should always start at zero
if prev_id is None:
prev_id = 0
# if the first catalog doesn't start at zero
if catalog_id != prev_id:
if not empty:
events = [temp_event]
else:
events = []
for id in range(catalog_id):
yield cls(data=[], catalog_id=id, **kwargs)
prev_id = catalog_id
continue
# accumulate event if catalog_id is the same as previous event
if catalog_id == prev_id:
if not all([val in (None, '') for val in temp_event]):
events.append(temp_event)
prev_id = catalog_id
# create and yield class if the events are from different catalogs
elif catalog_id == prev_id + 1:
yield cls(data=events, catalog_id=prev_id, **kwargs)
# add event to new event list
if not empty:
events = [temp_event]
else:
events = []
prev_id = catalog_id
# this implies there are empty catalogs, because they are not listed in the ascii file
elif catalog_id > prev_id + 1:
yield cls(data=events, catalog_id=prev_id, **kwargs)
# if prev_id = 0 and catalog_id = 2, then we skipped one catalog. thus, we skip catalog_id - prev_id - 1 catalogs
num_empty_catalogs = catalog_id - prev_id - 1
# first yield empty catalog classes
for id in range(num_empty_catalogs):
yield cls(data=[], catalog_id=catalog_id - num_empty_catalogs + id, **kwargs)
prev_id = catalog_id
# add event to new event list
if not empty:
events = [temp_event]
else:
events = []
else:
raise ValueError(
"catalog_id should be monotonically increasing and events should be ordered by catalog_id")
# yield final catalog, note: since this is just loading catalogs, it has no idea how many should be there
cat = cls(data=events, catalog_id=prev_id, **kwargs)
yield cat
elif os.path.isdir(filename):
raise NotImplementedError("reading from directory or batched files not implemented yet!")
[docs]
@classmethod
def load_catalog(cls, filename, loader=csep_ascii, **kwargs):
""" Loads catalog stored in CSEP1 ascii format """
catalog_id = None
try:
event_list, catalog_id = loader(filename, return_catalog_id=True)
except TypeError:
event_list = loader(filename)
new_class = cls(data=event_list, catalog_id=catalog_id, **kwargs)
return new_class
[docs]
class UCERF3Catalog(AbstractBaseCatalog):
"""
Catalog written from UCERF3-ETAS binary format
:var header_dtype: numpy.dtype description of synthetic catalog header.
:var event_dtype: numpy.dtype description of ucerf3 catalog format
"""
# binary format of UCERF3 catalog
header_dtype = numpy.dtype([("file_version", ">i2"), ("catalog_size", ">i4")])
[docs]
def __init__(self, **kwargs):
# initialize parent constructor
super().__init__(**kwargs)
@classmethod
def load_catalogs(cls, filename, **kwargs):
"""
Loads catalogs based on the merged binary file format of UCERF3. File format is described at
https://scec.usc.edu/scecpedia/CSEP2_Storing_Stochastic_Event_Sets#Introduction.
There is also the load_catalog method that will work on the individual binary output of the UCERF3-ETAS
model.
Args:
filename (str): filename of binary stochastic event set
kwargs (dict): keyword arguments to pass to class constructor
Returns:
list of catalogs of type UCERF3Catalog
"""
# handle uncompressed binary file
if get_file_extension(filename) == 'bin':
with open(filename, 'rb') as catalog_file:
# parse 4byte header from merged file
number_simulations_in_set = numpy.fromfile(catalog_file, dtype='>i4', count=1)[0]
# load all catalogs from merged file
for catalog_id in range(number_simulations_in_set):
version = numpy.fromfile(catalog_file, dtype=">i2", count=1)[0]
dtype = cls._get_header_dtype(version)
header = numpy.fromfile(catalog_file, dtype=dtype, count=1)
catalog_size = header['catalog_size'][0]
# read catalog
catalog = numpy.fromfile(catalog_file, dtype=cls._get_catalog_dtype(version), count=catalog_size)
# add column that stores catalog_id in case we want to store in database
u3_catalog = cls(filename=filename, data=catalog, catalog_id=catalog_id, **kwargs)
u3_catalog.dtype = dtype
yield u3_catalog
# handle compressed file by decompressing inline
elif get_file_extension(filename) == 'gz':
with gzip.open(filename, 'rb') as catalog_file:
number_simulations_in_set = numpy.frombuffer(catalog_file.read(4), dtype='>i4')[0]
for catalog_id in range(number_simulations_in_set):
version = numpy.frombuffer(catalog_file.read(2), dtype='>i2')[0]
dtype = cls._get_header_dtype(version)
header_bytes = dtype.itemsize
header = numpy.frombuffer(catalog_file.read(header_bytes), dtype=dtype)
catalog_size = header['catalog_size'][0]
catalog_dtype = cls._get_catalog_dtype(version)
event_bytes = catalog_dtype.itemsize
catalog = numpy.frombuffer(
catalog_file.read(event_bytes*catalog_size),
dtype=catalog_dtype,
count=catalog_size
)
u3_catalog = cls(filename=filename, data=catalog, catalog_id=catalog_id, **kwargs)
u3_catalog.dtype = dtype
yield u3_catalog
@classmethod
def load_catalog(cls, filename, loader=None, **kwargs):
version = numpy.fromfile(filename, dtype=">i2", count=1)[0]
header = numpy.fromfile(filename, dtype=cls._get_header_dtype(version), count=1)
catalog_size = header['catalog_size'][0]
# assign dtype to make sure that its bound to the instance of the class
dtype = cls._get_catalog_dtype(version)
event_list = numpy.fromfile(filename, dtype=cls._get_catalog_dtype(version), count=catalog_size)
new_class = cls(filename=filename, data=event_list, **kwargs)
new_class.dtype = dtype
return new_class
def get_csep_format(self):
n = len(self.catalog)
# allocate array for csep catalog
csep_catalog = numpy.zeros(n, dtype=CSEPCatalog.dtype)
for i, event in enumerate(self.catalog):
csep_catalog[i] = (i,
event['origin_time'],
event['latitude'],
event['longitude'],
event['depth'],
event['magnitude'])
return CSEPCatalog(data=csep_catalog, catalog_id=self.catalog_id, filename=self.filename, format='csep',
name=self.name, region=self.region, compute_stats=self.compute_stats, filters=self.filters,
metadata=self.metadata, date_accessed=self.date_accessed)
@staticmethod
def _get_catalog_dtype(version):
"""
Get catalog dtype from version number
Args:
version:
Returns:
"""
if version == 1:
dtype = numpy.dtype([("rupture_id", ">i4"),
("parent_id", ">i4"),
("generation", ">i2"),
("origin_time", ">i8"),
("latitude", ">f8"),
("longitude", ">f8"),
("depth", ">f8"),
("magnitude", ">f8"),
("dist_to_parent", ">f8"),
("erf_index", ">i4"),
("fss_index", ">i4"),
("grid_node_index", ">i4")])
elif version >= 2:
dtype = numpy.dtype([("rupture_id", ">i4"),
("parent_id", ">i4"),
("generation", ">i2"),
("origin_time", ">i8"),
("latitude", ">f8"),
("longitude", ">f8"),
("depth", ">f8"),
("magnitude", ">f8"),
("dist_to_parent", ">f8"),
("erf_index", ">i4"),
("fss_index", ">i4"),
("grid_node_index", ">i4"),
("etas_k", ">f8")])
else:
raise ValueError("unknown catalog version, cannot read catalog.")
return dtype
@staticmethod
def _get_header_dtype(version):
if version == 1 or version == 2:
dtype = numpy.dtype([("catalog_size", ">i4")])
elif version >= 3:
dtype = numpy.dtype([("num_orignal_ruptures", ">i4"),
("seed", ">i8"),
("index", ">i4"),
("hist_rupt_start_id", ">i4"),
("hist_rupt_end_id", ">i4"),
("trig_rupt_start_id", ">i4"),
("trig_rupt_end_id", ">i4"),
("sim_start_epoch", ">i8"),
("sim_end_epoch", ">i8"),
("num_spont", ">i4"),
("num_supraseis", ">i4"),
("min_mag", ">f8"),
("max_mag", ">f8"),
("catalog_size", ">i4")])
else:
raise ValueError("unknown catalog version, cannot parse catalog header.")
return dtype
# helps to parse time-strings
def _none_or_datetime(value):
if isinstance(value, datetime.datetime):
return value
if value is not None:
format = parse_string_format(value)
value = strptime_to_utc_datetime(value, format=format)
return value