#!/usr/bin/env python
"""
This module contains functions that can be used to run Newmark-based
mechanistic landslide models.
"""
# stdlib imports
import os.path
# import warnings
import collections
import tempfile
import shutil
# local imports
from mapio.shake import ShakeGrid
from mapio.gdal import GDALGrid
from mapio.reader import read, get_file_geodict
from mapio.geodict import GeoDict
from gfail.spatial import trim_ocean2
# third party imports
import numpy as np
[docs]def godt2008(
shakefile,
config,
uncertfile=None,
saveinputs=False,
displmodel=None,
bounds=None,
slopediv=100.0,
codiv=10.0,
numstd=None,
trimfile=None,
):
"""
This function runs the Godt and others (2008) global method for a given
ShakeMap. The Factor of Safety is calculated using infinite slope analysis
assumuing dry conditions. The method uses threshold newmark displacement
and estimates areal coverage by doing the calculations for each slope
quantile.
Args:
shakefile (str): Path to shakemap xml file.
config (ConfigObj): ConfigObj of config file containing inputs required
for running the model
uncertfile (str): Path to shakemap uncertainty xml file (optional).
saveinputs (bool): Whether or not to return the model input layers,
False (default) returns only the model output (one layer).
displmodel (str): Newmark displacement regression model to use
* ``'J_PGA'`` (default) -- PGA-based model, equation 6 from
Jibson (2007).
* ``'J_PGA_M'`` -- PGA and M-based model, equation 7 from
Jibson (2007).
* ``'RS_PGA_M'`` -- PGA and M-based model from from Rathje and
Saygili (2009).
* ``'RS_PGA_PGV'`` -- PGA and PGV-based model, equation 6
from Saygili and Rathje (2008).
bounds (dict): Optional dictionary with keys 'xmin', 'xmax', 'ymin',
'ymax' that defines a subset of the shakemap area to compute.
slopediv (float): Divide slope by this number to get slope in degrees
(Verdin datasets need to be divided by 100).
codiv (float): Divide cohesion input layer by this number
(For Godt method, need to divide by 10 because that is how it was
calibrated).
numstd (float): Number of (+/-) standard deviations to use if
uncertainty is computed (uncertfile must be supplied).
trimfile (str): shapefile of earth's land masses to trim offshore areas
of model
Returns:
dict: Dictionary containing output and input layers (if
saveinputs=True):
.. code-block:: python
{
'grid': mapio grid2D object,
'label': 'label for colorbar and top line of subtitle',
'type': 'output or input to model',
'description': {'name': 'short reference of model',
'longref': 'full model reference',
'units': 'units of output',
'shakemap': 'information about shakemap used',
'event_id': 'shakemap event id',
'parameters': 'dictionary of model parameters
used'
}
}
Raises:
NameError: when unable to parse the config correctly (probably a
formatting issue in the configfile) or when unable to find the
shakefile (Shakemap filepath) -- these cause program to end.
"""
# TODO:
# - Add 'all' -- averages Dn from all four equations, add term to
# convert PGA and PGV to Ia and use other equations, add Ambraseys and
# Menu (1988) option.
# Empty refs
slopesref = "unknown"
slopelref = "unknown"
cohesionlref = "unknown"
cohesionsref = "unknown"
frictionsref = "unknown"
frictionlref = "unknown"
modellref = "unknown"
modelsref = "unknown"
# See if trimfile exists
if trimfile is not None:
if not os.path.exists(trimfile):
print(
"trimfile defined does not exist: %s\n"
"Ocean will not be trimmed" % trimfile
)
trimfile = None
if os.path.splitext(trimfile)[1] != ".shp":
print("trimfile must be a shapefile, ocean will not be trimmed")
trimfile = None
# Parse config
try: # May want to add error handling so if refs aren't given, just
# includes unknown
slopefilepath = config["godt_2008"]["layers"]["slope"]["filepath"]
cohesionfile = config["godt_2008"]["layers"]["cohesion"]["file"]
frictionfile = config["godt_2008"]["layers"]["friction"]["file"]
thick = float(config["godt_2008"]["parameters"]["thick"])
uwt = float(config["godt_2008"]["parameters"]["uwt"])
nodata_cohesion = float(config["godt_2008"]["parameters"]["nodata_cohesion"])
nodata_friction = float(config["godt_2008"]["parameters"]["nodata_friction"])
dnthresh = float(config["godt_2008"]["parameters"]["dnthresh"])
fsthresh = float(config["godt_2008"]["parameters"]["fsthresh"])
acthresh = float(config["godt_2008"]["parameters"]["acthresh"])
try:
slopemin = float(config["godt_2008"]["parameters"]["slopemin"])
except Exception:
slopemin = 0.01
print("No slopemin found in config file, using 0.01 deg for slope minimum")
except Exception as e:
raise NameError("Could not parse configfile, %s" % e)
if displmodel is None:
try:
displmodel = config["godt_2008"]["parameters"]["displmodel"]
except Exception:
print("No regression model specified, using default of J_PGA_M")
displmodel = "J_PGA_M"
# TO DO: ADD ERROR CATCHING ON UNITS, MAKE SURE THEY ARE WHAT THEY SHOULD
# BE FOR THIS MODEL
try: # Try to fetch source information from config
modelsref = config["godt_2008"]["shortref"]
modellref = config["godt_2008"]["longref"]
slopesref = config["godt_2008"]["layers"]["slope"]["shortref"]
slopelref = config["godt_2008"]["layers"]["slope"]["longref"]
cohesionsref = config["godt_2008"]["layers"]["cohesion"]["shortref"]
cohesionlref = config["godt_2008"]["layers"]["cohesion"]["longref"]
frictionsref = config["godt_2008"]["layers"]["friction"]["shortref"]
frictionlref = config["godt_2008"]["layers"]["friction"]["longref"]
except Exception:
print("Was not able to retrieve all references from config file. Continuing")
# Figure out how/if need to cut anything
geodict = ShakeGrid.getFileGeoDict(shakefile) # , adjust='res')
if bounds is not None: # Make sure bounds are within ShakeMap Grid
if geodict.xmin < geodict.xmax: # only if signs are not opposite
if (
geodict.xmin > bounds["xmin"]
or geodict.xmax < bounds["xmax"]
or geodict.ymin > bounds["ymin"]
or geodict.ymax < bounds["ymax"]
):
print(
"Specified bounds are outside shakemap area, using "
"ShakeMap bounds instead."
)
bounds = None
if bounds is not None:
tempgdict = GeoDict.createDictFromBox(
bounds["xmin"],
bounds["xmax"],
bounds["ymin"],
bounds["ymax"],
geodict.dx,
geodict.dy,
inside=False,
)
# If Shakemap geodict crosses 180/-180 line, fix geodict so things
# don't break
if geodict.xmin > geodict.xmax:
if tempgdict.xmin < 0:
geodict._xmin -= 360.0
else:
geodict._xmax += 360.0
geodict = geodict.getBoundsWithin(tempgdict)
slpfile = os.path.join(slopefilepath, "slope_min.bil")
basegeodict = get_file_geodict(slpfile)
if basegeodict == geodict:
sampledict = geodict
else:
sampledict = basegeodict.getBoundsWithin(geodict)
# Do we need to subdivide baselayer?
if "divfactor" in config["godt_2008"].keys():
divfactor = float(config["godt_2008"]["divfactor"])
if divfactor != 1.0:
# adjust sampledict so everything will be resampled (cut one cell
# of each edge so will be inside bounds)
newxmin = (
sampledict.xmin
- sampledict.dx / 2.0
+ sampledict.dx / (2.0 * divfactor)
+ sampledict.dx
)
newymin = (
sampledict.ymin
- sampledict.dy / 2.0
+ sampledict.dy / (2.0 * divfactor)
+ sampledict.dy
)
newxmax = (
sampledict.xmax
+ sampledict.dx / 2.0
- sampledict.dx / (2.0 * divfactor)
- sampledict.dx
)
newymax = (
sampledict.ymax
+ sampledict.dy / 2.0
- sampledict.dy / (2.0 * divfactor)
- sampledict.dy
)
newdx = sampledict.dx / divfactor
newdy = sampledict.dy / divfactor
sampledict = GeoDict.createDictFromBox(
newxmin, newxmax, newymin, newymax, newdx, newdy, inside=True
)
tmpdir = tempfile.mkdtemp()
# Load in ShakeMap and get new geodictionary
temp = ShakeGrid.load(shakefile) # , adjust='res')
pga = temp.getLayer("pga")
pga = pga.interpolate2(sampledict, method="linear")
pgv = temp.getLayer("pgv")
pgv = pgv.interpolate2(sampledict, method="linear")
# sampledict = pga.getGeoDict()
t2 = temp.getEventDict()
M = t2["magnitude"]
event_id = t2["event_id"]
shakedict = temp.getShakeDict()
del temp
# read in uncertainty if present
if uncertfile is not None:
try:
temp = ShakeGrid.load(uncertfile) # , adjust='res')
uncertpga = temp.getLayer("stdpga")
uncertpga = uncertpga.interpolate2(sampledict, method="linear")
uncertpgv = uncertpgv = temp.getLayer("stdpgv")
uncertpgv.interpolate2(sampledict, method="linear")
except Exception:
print("Could not read uncertainty file, ignoring uncertainties")
uncertfile = None
if numstd is None:
numstd = 1.0
# Read in all the slope files, divide all by 100 to get to slope in
# degrees (because input files are multiplied by 100.)
slopes = []
quantiles = [
"slope_min.bil",
"slope10.bil",
"slope30.bil",
"slope50.bil",
"slope70.bil",
"slope90.bil",
"slope_max.bil",
]
for quant in quantiles:
tmpslp = read(os.path.join(slopefilepath, quant), samplegeodict=sampledict)
tgd = tmpslp.getGeoDict()
if tgd != sampledict:
raise Exception("Input layers are not aligned to same geodict")
else:
slopes.append(tmpslp.getData() / slopediv)
slopestack = np.dstack(slopes)
# Change any zero slopes to a very small number to avoid dividing by
# zero later
slopestack[slopestack == 0] = 1e-8
# Read in the cohesion and friction files and duplicate layers so they
# are same shape as slope structure
tempco = read(
cohesionfile,
samplegeodict=sampledict,
resample=True,
method="nearest",
doPadding=True,
padValue=np.nan,
)
tempco = tempco.getData()[:, :, np.newaxis] / codiv
cohesion = np.repeat(tempco, 7, axis=2)
cohesion[cohesion == -999.9] = nodata_cohesion
cohesion = np.nan_to_num(cohesion)
cohesion[cohesion == 0] = nodata_cohesion
tempfric = read(
frictionfile,
samplegeodict=sampledict,
resample=True,
method="nearest",
doPadding=True,
padValue=np.nan,
)
tempfric = tempfric.getData().astype(float)[:, :, np.newaxis]
friction = np.repeat(tempfric, 7, axis=2)
friction[friction == -9999] = nodata_friction
friction = np.nan_to_num(friction)
friction[friction == 0] = nodata_friction
# Do the calculations using Jibson (2007) PGA only model for Dn
FS = cohesion / (uwt * thick * np.sin(slopestack * (np.pi / 180.0))) + np.tan(
friction * (np.pi / 180.0)
) / np.tan(slopestack * (np.pi / 180.0))
FS[FS < fsthresh] = fsthresh
# Compute critical acceleration, in g
# This gives ac in g, equations that multiply by g give ac in m/s2
Ac = (FS - 1) * np.sin(slopestack * (np.pi / 180.0)).astype(float)
Ac[Ac < acthresh] = acthresh
# Get PGA in g (PGA is %g in ShakeMap, convert to g)
PGA = np.repeat(pga.getData()[:, :, np.newaxis] / 100.0, 7, axis=2).astype(float)
if "PGV" in displmodel: # Load in PGV also, in cm/sec
PGV = np.repeat(pgv.getData()[:, :, np.newaxis], 7, axis=2).astype(float)
else:
PGV = None
if uncertfile is not None:
stdpga = np.repeat(uncertpga.getData()[:, :, np.newaxis], 7, axis=2).astype(
float
)
stdpgv = np.repeat(uncertpgv.getData()[:, :, np.newaxis], 7, axis=2).astype(
float
)
# estimate PGA +- 1std
PGAmin = np.exp(np.log(PGA * 100) - numstd * stdpga) / 100
PGAmax = np.exp(np.log(PGA * 100) + numstd * stdpga) / 100
if "PGV" in displmodel:
PGVmin = np.exp(np.log(PGV) - numstd * stdpgv)
PGVmax = np.exp(np.log(PGV) + numstd * stdpgv)
else:
PGVmin = None
PGVmax = None
# Ignore errors so still runs when Ac > PGA, just leaves nan instead
# of crashing.
np.seterr(invalid="ignore")
Dn, logDnstd, logtype = NMdisp(Ac, PGA, model=displmodel, M=M, PGV=PGV)
if uncertfile is not None:
Dnmin, logDnstdmin, logtype = NMdisp(
Ac, PGAmin, model=displmodel, M=M, PGV=PGVmin
)
Dnmax, logDnstdmax, logtype = NMdisp(
Ac, PGAmax, model=displmodel, M=M, PGV=PGVmax
)
PROB = Dn.copy()
PROB[PROB < dnthresh] = 0.0
PROB[PROB >= dnthresh] = 1.0
PROB = np.sum(PROB, axis=2)
if uncertfile is not None:
PROBmin = Dnmin.copy()
PROBmin[PROBmin <= dnthresh] = 0.0
PROBmin[PROBmin > dnthresh] = 1.0
PROBmin = np.sum(PROBmin, axis=2)
PROBmax = Dnmax.copy()
PROBmax[PROBmax <= dnthresh] = 0.0
PROBmax[PROBmax > dnthresh] = 1.0
PROBmax = np.sum(PROBmax, axis=2)
PROB[PROB == 1.0] = 0.01
PROB[PROB == 2.0] = 0.10
PROB[PROB == 3.0] = 0.30
PROB[PROB == 4.0] = 0.50
PROB[PROB == 5.0] = 0.70
PROB[PROB == 6.0] = 0.90
PROB[PROB == 7.0] = 0.99
if uncertfile is not None:
PROBmin[PROBmin == 1.0] = 0.01
PROBmin[PROBmin == 2.0] = 0.10
PROBmin[PROBmin == 3.0] = 0.30
PROBmin[PROBmin == 4.0] = 0.50
PROBmin[PROBmin == 5.0] = 0.70
PROBmin[PROBmin == 6.0] = 0.90
PROBmin[PROBmin == 7.0] = 0.99
PROBmax[PROBmax == 1.0] = 0.01
PROBmax[PROBmax == 2.0] = 0.10
PROBmax[PROBmax == 3.0] = 0.30
PROBmax[PROBmax == 4.0] = 0.50
PROBmax[PROBmax == 5.0] = 0.70
PROBmax[PROBmax == 6.0] = 0.90
PROBmax[PROBmax == 7.0] = 0.99
if slopemin is not None:
PROB[slopestack[:, :, 6] <= slopemin] = 0.0
# uncert too
if uncertfile is not None:
PROBmin[slopestack[:, :, 6] <= slopemin] = 0.0
PROBmax[slopestack[:, :, 6] <= slopemin] = 0.0
# Turn output and inputs into into grids and put in mapLayers dictionary
maplayers = collections.OrderedDict()
shakedetail = "%s_ver%s" % (shakedict["shakemap_id"], shakedict["shakemap_version"])
description = {
"name": modelsref,
"longref": modellref,
"units": "Proportion of Area Affected",
"shakemap": shakedetail,
"event_id": event_id,
"parameters": {
"displmodel": displmodel,
"thickness_m": thick,
"unitwt_kNm3": uwt,
"dnthresh_cm": dnthresh,
"acthresh_g": acthresh,
"fsthresh": fsthresh,
"modeltype": "Landslide",
},
}
PROBgrid = GDALGrid(PROB, sampledict)
if trimfile is not None:
PROBgrid = trim_ocean2(PROBgrid, trimfile)
maplayers["model"] = {
"grid": PROBgrid,
"label": "Landslide - Proportion of Area Affected",
"type": "output",
"description": description,
}
if uncertfile is not None:
PROBmingrid = GDALGrid(PROBmin, sampledict)
PROBmaxgrid = GDALGrid(PROBmax, sampledict)
if trimfile is not None:
PROBmingrid = trim_ocean2(PROBmingrid, trimfile)
PROBmaxgrid = trim_ocean2(PROBmaxgrid, trimfile)
maplayers["modelmin"] = {
"grid": PROBmingrid,
"label": "Landslide Probability-%1.2fstd" % numstd,
"type": "output",
"description": description,
}
maplayers["modelmax"] = {
"grid": PROBmaxgrid,
"label": "Landslide Probability+%1.2fstd" % numstd,
"type": "output",
"description": description,
}
if saveinputs is True:
maplayers["pga"] = {
"grid": GDALGrid(PGA[:, :, 0], sampledict),
"label": "PGA (g)",
"type": "input",
"description": {"units": "g", "shakemap": shakedetail},
}
if "PGV" in displmodel:
maplayers["pgv"] = {
"grid": GDALGrid(PGV[:, :, 0], sampledict),
"label": "PGV (cm/s)",
"type": "input",
"description": {"units": "cm/s", "shakemap": shakedetail},
}
maplayers["minFS"] = {
"grid": GDALGrid(np.min(FS, axis=2), sampledict),
"label": "Min Factor of Safety",
"type": "input",
"description": {"units": "unitless"},
}
maplayers["max slope"] = {
"grid": GDALGrid(slopestack[:, :, -1], sampledict),
"label": r"Maximum slope ($^\circ$)",
"type": "input",
"description": {
"units": "degrees",
"name": slopesref,
"longref": slopelref,
},
}
maplayers["cohesion"] = {
"grid": GDALGrid(cohesion[:, :, 0], sampledict),
"label": "Cohesion (kPa)",
"type": "input",
"description": {
"units": "kPa (adjusted)",
"name": cohesionsref,
"longref": cohesionlref,
},
}
maplayers["friction angle"] = {
"grid": GDALGrid(friction[:, :, 0], sampledict),
"label": r"Friction angle ($^\circ$)",
"type": "input",
"description": {
"units": "degrees",
"name": frictionsref,
"longref": frictionlref,
},
}
if uncertfile is not None:
maplayers["pgamin"] = {
"grid": GDALGrid(PGAmin[:, :, 0], sampledict),
"label": "PGA - %1.2fstd (g)" % numstd,
"type": "input",
"description": {"units": "g", "shakemap": shakedetail},
}
maplayers["pgamax"] = {
"grid": GDALGrid(PGAmax[:, :, 0], sampledict),
"label": "PGA + %1.2fstd (g)" % numstd,
"type": "input",
"description": {"units": "g", "shakemap": shakedetail},
}
if "PGV" in displmodel:
if uncertfile is not None:
maplayers["pgvmin"] = {
"grid": GDALGrid(PGVmin[:, :, 0], sampledict),
"label": "PGV - %1.2fstd (cm/s)" % numstd,
"type": "input",
"description": {"units": "cm/s", "shakemap": shakedetail},
}
maplayers["pgvmax"] = {
"grid": GDALGrid(PGVmax[:, :, 0], sampledict),
"label": "PGV + %1.2fstd (cm/s)" % numstd,
"type": "input",
"description": {"units": "cm/s", "shakemap": shakedetail},
}
shutil.rmtree(tmpdir)
return maplayers
[docs]def NMdisp(Ac, PGA, model="J_PGA", M=None, PGV=None):
"""
PGA-based Newmark Displacement model
Args:
Ac (array): NxM array of critical accelerations in units of g.
PGA (array): NxM Array of PGA values in units of g.
model (str):
* ``'J_PGA'`` -- PGA only model from Jibson (2007), equation 6.
Applicable for Magnitude range of dataset (5.3-7.6).
* ``'J_PGA_M'`` -- PGA-and M- based Newmark Displacement model
from Jibson(2007), equation 7. Applicable for Magnitude range
of dataset (5.3-7.6).
* ``'RS_PGA_M'`` -- PGA and M-based Newmark displacement model
from Rathje and Saygili (2009).
* ``'RS_PGA_PGV'`` -- PGA and PGV-based model from Saygili and
Rathje (2008) -- eq 6.
* ``'BT_PGA_M'`` -- PGA and M-based model from Bray and
Travasarou (2007) assuming natural fundamental period of
sliding mass Ts = 0 (eq 6).
M (float): Magnitude -- only needed for models with M in the name.
PGV (float): NxM Array of PGV values in units of cm/sec -- only needed
for models with PGV in the name.
Returns:
tuple: (Dn, logDnstd, logtype) where:
* Dn: Newmark displacement in cm
* logDnstd: Log of standard deviation of Dn
* logtype: Type of log used in logDnstd (log10 or ln)
"""
# Deal with non-array inputs
if isinstance(Ac, float) or isinstance(Ac, int):
flag = 1
else:
flag = 0
# Ignore errors so still runs when Ac > PGA, just leaves nan instead of
# crashing
np.seterr(invalid="ignore")
if model == "J_PGA":
C1 = 0.215 # additive constant in newmark displacement calculation
C2 = 2.341 # first exponential constant
C3 = -1.438 # second exponential constant
Dn = np.array(
10.0 ** (C1 + np.log10(((1 - Ac / PGA) ** C2) * (Ac / PGA) ** C3))
)
Dn[np.isnan(Dn)] = 0.0
logDnstd = np.ones(np.shape(Dn)) * 0.51
logtype = "log10"
elif model == "J_PGA_M":
if M is None:
raise Exception("M (magnitude) not found, cannot use RS_PGA_M model")
else:
C1 = -2.71 # additive constant in newmark displacement calculation
C2 = 2.335 # first exponential constant
C3 = -1.478 # second exponential constant
C4 = 0.424
Dn = np.array(
10.0
** (C1 + np.log10(((1 - Ac / PGA) ** C2) * (Ac / PGA) ** C3) + C4 * M)
)
Dn[np.isnan(Dn)] = 0.0
logDnstd = np.ones(np.shape(Dn)) * 0.454
logtype = "log10"
elif model == "RS_PGA_M":
if M is None:
raise Exception("You must enter a value for M to use the RS_PGA_M model")
C1 = 4.89
C2 = -4.85
C3 = -19.64
C4 = 42.49
C5 = -29.06
C6 = 0.72
C7 = 0.89
# Equation from Saygili and Rathje (2008)/Rathje and Saygili (2009)
Dn = np.array(
np.exp(
C1
+ C2 * (Ac / PGA)
+ C3 * (Ac / PGA) ** 2
+ C4 * (Ac / PGA) ** 3
+ C5 * (Ac / PGA) ** 4
+ C6 * np.log(PGA)
+ C7 * (M - 6)
)
)
Dn[np.isnan(Dn)] = 0.0
logDnstd = 0.732 + 0.789 * (Ac / PGA) - 0.539 * (Ac / PGA) ** 2
logtype = "ln"
elif model == "RS_PGA_PGV":
if PGV is None:
raise Exception("You must enter a value for M to use the RS_PGA_PGV model")
C1 = -1.56
C2 = -4.58
C3 = -20.84
C4 = 44.75
C5 = -30.50
C6 = -0.64
C7 = 1.55
# Equation from Saygili and Rathje (2008)/Rathje and Saygili (2009)
Dn = np.array(
np.exp(
C1
+ C2 * (Ac / PGA)
+ C3 * (Ac / PGA) ** 2
+ C4 * (Ac / PGA) ** 3
+ C5 * (Ac / PGA) ** 4
+ C6 * np.log(PGA)
+ C7 * np.log(PGV)
)
)
Dn[np.isnan(Dn)] = 0.0
logDnstd = 0.405 + 0.524 * (Ac / PGA)
logtype = "ln"
elif model == "BT_PGA_M":
if M is None:
raise Exception("You must enter a value for M to use the BT_PGA_M model")
Dn = np.array(
np.exp(
-0.22
- 2.83 * np.log(Ac)
- 0.333 * (np.log(Ac)) ** 2
+ 0.566 * np.log(Ac) * np.log(PGA)
+ 3.04 * np.log(PGA)
- 0.244 * (np.log(PGA)) ** 2
+ 0.278 * (M - 7.0)
)
)
Dn[np.isnan(Dn)] = 0.0
logDnstd = np.ones(np.shape(Dn)) * 0.66
logtype = "log10"
if flag == 1:
Dn = float(Dn)
logDnstd = float(logDnstd)
return Dn, logDnstd, logtype