# stdlib imports
import pathlib
import tempfile
import shutil
import logging
import collections
from timeit import default_timer as timer
# third party imports
import numpy as np
from mapio.reader import read, get_file_geodict
from mapio.writer import write
from mapio.shake import ShakeGrid
from mapio.geodict import GeoDict
from mapio.grid2d import Grid2D
# local imports
from gfail.spatial import trim_ocean2
COEFFS = {}
TERMS = {}
TERMLAYERS = {}
SHAKE_LAYERS = []
[docs]class LogisticModelBase(object):
"""TODO needs documentation
Args:
object (_type_): _description_
Raises:
Exception: _description_
Exception: _description_
Returns:
_type_: _description_
"""
COEFFS = {}
TERMS = {}
TERMLAYERS = {}
SHAKELAYERS = []
do_coverage = False
prob_units = None
notes = ""
slopefile = None
nonzero = None
slopemin = "none"
slopemax = "none"
def __init__(
self,
shakefile,
config,
bounds=None,
uncertfile=None,
trimfile=None,
slopefile=None,
saveinputs=False,
):
"""Initialize LogisticModelBase object.
Args:
shakefile (str): Path to ShakeMap grid.xml file.
config (dict): Dict like object as a result of reading in "logbase"
format INI files.
bounds (dict): Fields are 'xmin', 'xmax', 'ymin', 'ymax'.
uncertfile (str): Path to ShakeMap uncertainty.xml file.
trimfile (str): Path to shapefile to use for masking ocean pixels.
slopefile (str): File containing slope data to be used for slope
masking.
saveinputs (bool): Save input layer grids with model output.
Notes: All input data grids are loaded in one at a time, and saved to
a temporary folder that is deleted when the object is deleted. The
file names loaded are stored in the self.layers dictionary.
"""
logging.info("Initializing model...")
self.tempdir = tempfile.mkdtemp()
self.config = config
self.bounds = bounds
self.uncertfile = uncertfile
self.trimfile = trimfile
self.saveinputs = saveinputs
self.modeltype = config["gfetype"]
logging.info("Loading raw ShakeMap...")
start_shake = timer()
self.raw_shake_grid = ShakeGrid.load(shakefile, adjust="res")
self.get_sample_dict() # sets self.sampledict
logging.info(f"Loading resampled ShakeMap with geodict: {self.sampledict}")
self.shake_grid = ShakeGrid.load(
shakefile,
samplegeodict=self.sampledict,
resample=True,
doPadding=True,
method="linear",
adjust="res",
)
logging.info(f"Load shake elapsed: {timer() - start_shake:1.2f}")
start_error = timer()
if self.uncertfile is not None:
self.error_grid = ShakeGrid.load(
self.uncertfile,
samplegeodict=self.sampledict,
resample=True,
doPadding=True,
method="linear",
adjust="res",
)
else:
self.error_grid = None
logging.info(f"Load uncertainty elapsed: {timer() - start_error:1.2f}")
logging.info("Loaded resampled ShakeMap.")
self.shakedict = self.shake_grid.getShakeDict()
self.eventdict = self.shake_grid.getEventDict()
# make sure that if this model requires slope parameters, a slope file
# has either been passed in or configured as a layer
logging.info("Checking slope parameters...")
if slopefile is not None:
self.slopefile = slopefile
elif "slopefile" in config:
self.slopefile = config["slopefile"]
elif "slope" in config["layers"]:
self.slopefile = config["layers"]["slope"]["file"]
else:
self.slopefile = None
if self.slopefile is not None:
self.set_slope_params(config)
logging.info("Checking references, units, interpolations...")
(self.modelrefs, self.longrefs, self.shortrefs) = self.validate_refs(config)
self.units = self.validate_units(config)
if self.prob_units is None:
# this normally will be defined in a subclass
self.prob_units = "Probability of any occurrence"
self.interpolations = config["interpolations"]
logging.info("Looping over layers...")
self.layers = {}
for key, layer in config["layers"].items():
layerfile = pathlib.Path(layer["file"])
logging.info(f'Reading {layer["file"]}...')
interp = self.interpolations[key]
grid = read(
layerfile,
samplegeodict=self.sampledict,
method=interp,
resample=True,
doPadding=True,
interp_approach="rasterio",
)
self.pre_process(key, grid)
outfile = pathlib.Path(self.tempdir, f"{key}.cdf")
logging.info(f"Writing sampled layer {key}...")
start_write = timer()
write(grid, outfile, "netcdf")
logging.info(f"write {key} elapsed: {timer() - start_write:1.2f}")
self.layers[key] = outfile
for key in self.SHAKELAYERS:
shake_grid = self.shake_grid.getLayer(key)
# shake_grid = raw_shake_grid.interpolate2(self.sampledict, method="linear")
outfile = pathlib.Path(self.tempdir, f"{key}.cdf")
logging.info(f"Writing sampled layer {key}...")
start_write = timer()
write(shake_grid, outfile, "netcdf")
logging.info(f"write {key} elapsed: {timer() - start_write:1.2f}")
self.layers[key] = outfile
if uncertfile is not None:
start_write = timer()
self.save_uncertainty_layers()
logging.info(f"write uncertainty elapsed: {timer() - start_write:1.2f}")
if "slope" not in config["layers"] and self.slopefile is not None:
self.read_slope()
if self.slopefile is not None:
# establish the indices of the slope grid
# where values are > slopemin and <= slopemax
self.get_slope_non_zero()
[docs] def read_slope(self):
interp = "linear"
if "slope" in self.interpolations:
interp = self.interpolations["slope"]
fdict = get_file_geodict(self.slopefile)
if fdict.isAligned(self.sampledict):
grid = read(self.slopefile, samplegeodict=self.sampledict)
else:
grid = read(
self.slopefile,
samplegeodict=self.sampledict,
method=interp,
resample=True,
doPadding=True,
interp_approach="rasterio",
)
outfile = pathlib.Path(self.tempdir, "slope.cdf")
logging.info("Writing sampled layer slope...")
write(grid, outfile, "netcdf")
self.layers["slope"] = outfile
[docs] def get_slope_non_zero(self):
slopemin = self.slopemin
slopemax = self.slopemax
if self.slopemin == "none":
nx = self.sampledict.nx
ny = self.sampledict.ny
self.nonzero = np.ones((ny, nx), dtype=bool)
return
slopefile = self.layers["slope"]
slope = read(slopefile)._data
# do whatever conversions are necessary to get slope in degrees.
# this method is implemented in the child class.
slope = self.modify_slope(slope)
self.nonzero = np.array([(slope > slopemin) & (slope <= slopemax)])
self.nonzero = self.nonzero[0, :, :]
del slope
[docs] def modify_slope(self, slope):
"""This method should be implemented by child classes."""
pass
[docs] def set_slope_params(self, config):
# Find slope thresholds, if applicable
self.slopemin = "none"
self.slopemax = "none"
if "slopefile" in config:
try:
self.slopemin = float(config["slopemin"])
self.slopemax = float(config["slopemax"])
except BaseException:
print(
"Could not find slopemin and/or slopemax in config, "
"limits. No slope thresholds will be applied."
)
self.slopemin = "none"
self.slopemax = "none"
[docs] def get_units(self, layer):
"""Get units for an input layer.
Args:
layer (str): name of layer.
Returns:
str: units.
"""
try:
# should work for everything except ground motion layers
units = self.units[layer]
except BaseException:
if "pga" in layer.lower():
units = "%g"
elif "pgv" in layer.lower():
units = "cm/s"
elif "mmi" in layer.lower():
units = "intensity"
else:
units = ""
return units
[docs] def validate_units(self, cmodel):
"""Validate model units.
Args:
cmodel (dict): Sub-dictionary from config for specific model.
Returns:
dict: Model units.
"""
units = {}
for key in cmodel["layers"].keys():
if "units" in cmodel["layers"][key]:
units[key] = cmodel["layers"][key]["units"]
else:
raise Exception("No unit string configured for layer %s" % key)
return units
[docs] def validate_refs(self, cmodel):
"""Validate references for models and layers.
Args:
cmodel (dict): Sub-dictionary from config for specific model.
Returns:
tuple: (modelrefs, longrefs, shortrefs) where:
* modelrefs: dictionary of citation information for model
keys='longref', 'shortref'
* shortrefs: dictionary containing short reference for each
input layer
* longrefs: dictionary containing full references for each
input layer
"""
longrefs = {}
shortrefs = {}
modelrefs = {}
for key in cmodel["layers"].keys():
if "longref" in cmodel["layers"][key]:
longrefs[key] = cmodel["layers"][key]["longref"]
else:
print("No longref provided for layer %s" % key)
longrefs[key] = "unknown"
if "shortref" in cmodel["layers"][key]:
shortrefs[key] = cmodel["layers"][key]["shortref"]
else:
print("No shortref provided for layer %s" % key)
shortrefs[key] = "unknown"
try:
modelrefs["longref"] = cmodel["longref"]
except BaseException:
print("No model longref provided")
modelrefs["longref"] = "unknown"
try:
modelrefs["shortref"] = cmodel["shortref"]
except BaseException:
print("No model shortref provided")
modelrefs["shortref"] = "unknown"
return modelrefs, longrefs, shortrefs
[docs] def save_uncertainty_layers(self):
for shakelayer in self.SHAKELAYERS:
key = f"std{shakelayer}"
method = "linear"
if key in self.config["interpolations"].keys():
method = self.interpolations[key]
if self.error_grid is not None:
gm_grid = self.error_grid.getLayer(key)
gm_grid = gm_grid.interpolate2(self.sampledict, method=method)
filename = pathlib.Path(self.tempdir) / f"{key}.cdf"
self.layers[key] = filename
write(gm_grid, filename, "netcdf")
[docs] def pre_process(self, key, grid):
"""This method should be implemented by child classes."""
pass
[docs] def get_sample_dict(self):
baselayer = self.config["baselayer"]
basefile = self.config["layers"][baselayer]["file"]
basedict = get_file_geodict(basefile)
shake_dict = self.raw_shake_grid.getGeoDict()
intersection = shake_dict.getIntersection(basedict)
for layername, layer in self.config["layers"].items():
layerfile = layer["file"]
layerdict = get_file_geodict(layerfile)
intersection = layerdict.getIntersection(intersection)
self.sampledict = intersection
# self.sampledict = None
# shake_geodict = self.shake_grid.getGeoDict()
# if 'baselayer' in self.config:
# baselayer = self.config['baselayer']
# fname = self.config['layers'][baselayer]['file']
# basefile = pathlib.Path(fname)
# base_geodict = get_file_geodict(basefile)
# if self.bounds is None:
# self.sampledict = base_geodict.getBoundsWithin(shake_geodict)
# else:
# mod_geodict = GeoDict.createDictFromBox(self.bounds['xmin'],
# self.bounds['xmax'],
# self.bounds['ymin'],
# self.bounds['ymax'],
# base_geodict.dx,
# base_geodict.dy
# )
# if not shake_geodict.contains(mod_geodict):
# msg = ('Desired sample bounds must be within the bounds '
# 'of the ShakeMap.')
# raise Exception(msg)
# # intersection should have the resolution of the base geodict
# intersection = shake_geodict.getIntersection(mod_geodict)
# self.sampledict = mod_geodict.getBoundsWithin(intersection)
# else:
# self.sampledict = shake_geodict.copy()
# we may have some config that tells us to resample even the base layer
# by some factor of resolution.
self.subdivide()
[docs] def subdivide(self):
# Do we need to subdivide baselayer?
if "divfactor" in self.config.keys():
divfactor = float(self.config["divfactor"])
if divfactor != 1.0:
# adjust sampledict so everything will be resampled
newxmin = (
self.sampledict.xmin
- self.sampledict.dx / 2.0
+ self.sampledict.dx / (2.0 * divfactor)
)
newymin = (
self.sampledict.ymin
- self.sampledict.dy / 2.0
+ self.sampledict.dy / (2.0 * divfactor)
)
newxmax = (
self.sampledict.xmax
+ self.sampledict.dx / 2.0
- self.sampledict.dx / (2.0 * divfactor)
)
newymax = (
self.sampledict.ymax
+ self.sampledict.dy / 2.0
- self.sampledict.dy / (2.0 * divfactor)
)
newdx = self.sampledict.dx / divfactor
newdy = self.sampledict.dy / divfactor
if np.abs(newxmax) > 180.0:
newxmax = np.sign(newxmax) * 180.0
if np.abs(newxmin) > 180.0:
newxmin = np.sign(newxmin) * 180.0
self.sampledict = GeoDict.createDictFromBox(
newxmin, newxmax, newymin, newymax, newdx, newdy, inside=True
)
[docs] def calculate(self):
"""Calculate the probability, and sigma (if possible).
Returns:
dict: Must contain at least one sub dictionary with
key "model". That dictionary should look like:
{'grid': Grid2D object contanining probability values,
'label': String used for display label
'type': 'output',
'description': String description of model.
}
This method uses an "accumulator" array, and loads in data layers
from the temporary directory one *term* at a time. As some terms can
be a combination of layers, this may mean that multiple grids are
loaded into memory simultaneously, in addition to the accumulator grid.
Layer grids are deleted from memory once they have been added to the
accumulator array.
"""
nx = self.sampledict.nx
ny = self.sampledict.ny
# dictionary to hold output
rdict = collections.OrderedDict()
X = np.ones((ny, nx)) * self.COEFFS["b0"]
for term, operation in self.TERMS.items():
logging.info(f"Reading layer {term}")
start_term = timer()
coeff = self.COEFFS[term]
layers = self.TERMLAYERS[term].split(",")
layers = [layer.strip() for layer in layers]
for layer in layers:
layerfile = self.layers[layer]
loadstr = f'{layer} = read("{layerfile}",apply_nan=True)'
globaldict = {f"{layer}": layer, "read": read}
logging.info(f"Reading sampled layer {layer}...")
exec(loadstr, globaldict)
# this should be a reference...
globals()[layer] = globaldict[layer]
if self.saveinputs:
units = self.get_units(layer)
rdict[layer] = {
"grid": globaldict[layer],
"label": f"{layer} ({units})",
"type": "input",
"description": {"units": units},
}
if layer in self.shortrefs:
rdict[layer]["description"]["name"] = self.shortrefs[layer]
if layer in self.longrefs:
rdict[layer]["description"]["longref"] = self.longrefs[layer]
try:
msg = f"Executing layer operation {operation} * {coeff}..."
logging.info(msg)
# replace the macro MW with the magnitude value from
# the shakemap
magstr = f'{self.eventdict["magnitude"]:.1f}'
operation = operation.replace("MW", magstr)
X += eval(operation) * coeff
except Exception as e:
msg = (
f"{str(e)}: Unable to calculate term {term}: "
f"operation {operation}*{coeff}."
)
raise Exception(msg)
for layer in layers:
del globals()[layer]
logging.info(f"Read term elapsed: {timer() - start_term:1.2f}")
logging.info("Calculating probability...")
# save off the X grid for potential use by
# uncertainty calculations
outfile = pathlib.Path(self.tempdir, "X.cdf")
logging.info("Writing intermediate layer X...")
grid = Grid2D(data=X, geodict=self.sampledict)
start_writex = timer()
write(grid, outfile, "netcdf")
logging.info(f"wite X elapsed: {timer() - start_writex:1.2f}")
self.layers["X"] = outfile
P = 1 / (1 + np.exp(-X))
del X
start_modify = timer()
P = self.modify_probability(P)
logging.info(f"modify prob elapsed: {timer() - start_modify:1.2f}")
# save off the intermediate P grid for potential use by
# uncertainty calculations
outfile = pathlib.Path(self.tempdir, "P.cdf")
logging.info("Writing intermediate layer P...")
grid = Grid2D(data=P, geodict=self.sampledict)
start_writep = timer()
write(grid, outfile, "netcdf")
logging.info(f"wite P elapsed: {timer() - start_writep:1.2f}")
self.layers["P"] = outfile
sigma_grid = None
if self.uncertfile is not None:
start_uncertainty = timer()
sigma_grid = self.calculate_uncertainty()
logging.info(f"uncertainty elapsed: {timer() - start_uncertainty:1.2f}")
# because non-coverage P grid is used for uncertainty,
# we cannot convert to areal coverage until that is complete.
if self.do_coverage:
start_coverage = timer()
P = self.calculate_coverage(P)
logging.info(f"coverage elapsed: {timer() - start_coverage:1.2f}")
# apply slope cutoffs
if self.nonzero is not None:
P = P * self.nonzero
# create Grid2D object for P
p_grid = Grid2D(data=P, geodict=self.sampledict)
description = self.get_description()
# trim off ocean pixels if user wanted that
if self.trimfile is not None:
start_trim = timer()
# Turn all offshore cells to nan
p_grid = trim_ocean2(p_grid, self.trimfile)
if sigma_grid is not None:
sigma_grid = trim_ocean2(sigma_grid, self.trimfile)
logging.info(f"trim elapsed: {timer() - start_trim:1.2f}")
rdict["model"] = {
"grid": p_grid,
"label": "%s estimate - %s"
% (self.modeltype.capitalize(), self.prob_units.title()),
"type": "output",
"description": description,
}
if sigma_grid is not None:
rdict["std"] = {
"grid": sigma_grid,
"label": (
"%s estimate - %s (std)"
% (self.modeltype.capitalize(), self.prob_units.title())
),
"type": "output",
"description": description,
}
return rdict
[docs] def get_description(self):
shakedetail = "%s_ver%s" % (
self.shakedict["shakemap_id"],
self.shakedict["shakemap_version"],
)
description = {
"name": self.modelrefs["shortref"],
"longref": self.modelrefs["longref"],
"units": self.units,
"shakemap": shakedetail,
"event_id": self.eventdict["event_id"],
"parameters": {
"slopemin": self.slopemin,
"slopemax": self.slopemax,
"modeltype": self.modeltype,
"notes": self.notes,
},
}
if "vs30max" in self.config.keys():
description["vs30max"] = float(self.config["vs30max"])
if "minpgv" in self.config.keys():
description["minpgv"] = float(self.config["minpgv"])
return description
[docs] def calculate_coverage(self, P):
"""This method should be implemented by child classes."""
pass
[docs] def calculate_uncertainty(self):
"""This method should be implemented by child classes."""
pass
[docs] def modify_probability(self, P):
"""This method should be implemented by child classes."""
pass
def __del__(self):
if pathlib.Path(self.tempdir).exists():
shutil.rmtree(self.tempdir)