"""
Process a ShakeMap, based on the configuration and data found in
shake_data.hdf, and produce output in shake_result.hdf.
"""
import os
import argparse
import inspect
import os.path
import time as time
import copy
from time import gmtime, strftime
import shutil
from datetime import date
import json
import numpy as np
import numpy.ma as ma
from openquake.hazardlib import imt
import openquake.hazardlib.const as oqconst
import fiona
import cartopy.io.shapereader as shpreader
from shapely.geometry import shape
import concurrent.futures as cf
from impactutils.rupture.point_rupture import PointRupture
from impactutils.rupture.distance import Distance, get_distance, get_distance_measures
from impactutils.io.smcontainers import ShakeMapOutputContainer
from impactutils.rupture import constants
# local imports
from mapio.geodict import GeoDict
from mapio.grid2d import Grid2D
from .base import CoreModule, Contents
from shakelib.sites import Sites
from shakelib.multigmpe import MultiGMPE
from shakelib.virtualipe import VirtualIPE
from shakelib.utils.utils import get_extent, thirty_sec_min, thirty_sec_max
from shakelib.utils.imt_string import oq_to_file
from shakelib.utils.containers import ShakeMapInputContainer
from shakemap.utils.config import get_config_paths
from shakemap.utils.utils import get_object_from_config
from shakemap._version import get_versions
from shakemap.utils.generic_amp import get_generic_amp_factors
from shakemap.c.clib import make_sigma_matrix, geodetic_distance_fast, make_sd_array
# from shakemap.utils.exception import TerminateShakeMap
from shakelib.directivity.rowshandel2013 import Rowshandel2013
#
# default_stddev_inter: This is a stand-in for tau when the gmpe set
# doesn't provide it. It is an educated guess based
# on the NGA-west, Akkar et al, and BC Hydro gmpes.
# It's not perfect, but probably isn't too far off.
# It is only used when the GMPE(s) don't provide a
# breakdown of the uncertainty terms. When used,
# this value is multiplied by the total standard
# deviation to get tau. The square of tau is then
# subtracted from the squared total stddev and the
# square root of the result is then used as the
# within-event stddev (phi).
#
SM_CONSTS = {"default_stddev_inter": 0.55, "default_stddev_inter_mmi": 0.55}
[docs]class DataFrame:
def __init__(self):
df = None # noqa
imts = None # noqa
sx = None # noqa
dx = None # noqa
[docs]class ModelModule(CoreModule):
"""
model -- Interpolate ground motions to a grid or list of locations.
"""
command_name = "model"
targets = [r"products/shake_result\.hdf"]
dependencies = [("shake_data.hdf", True)]
no_seismic = False
no_macroseismic = False
no_rupture = False
use_simulations = False
rock_vs30 = 760.0
soil_vs30 = 180.0
def __init__(self, eventid):
super(ModelModule, self).__init__(eventid)
self.contents = Contents(None, None, eventid)
#
# Set up a bunch of dictionaries that will be keyed to IMTs
#
self.nominal_bias = {} # holds an average bias for each IMT
self.psd_raw = {} # raw phi (intra-event stddev) of the output points
self.psd = {} # phi (intra-event stddev) of the output points
self.tsd = {} # tau (inter-event stddev) of the output points
#
# These are arrays (keyed by IMT) of the station data that will be
# used to compute the bias and do the interpolation, they are filled
# in the _fillDataArrays method
#
self.sta_per_ix = {}
self.sta_lons_rad = {}
self.sta_lats_rad = {}
self.sta_resids = {}
self.sta_phi = {}
self.sta_tau = {}
self.sta_sig_extra = {}
self.sta_rrups = {}
#
# These are useful matrices that we compute in the bias function
# that we can reuse in the MVN function
#
self.T_D = {}
self.cov_WD_WD_inv = {}
self.mu_H_yD = {}
self.cov_HH_yD = {}
#
# Some variables and arrays used in both the bias and MVN functions
#
self.no_native_flag = {}
self.imt_types = {}
self.len_types = {}
self.imt_Y_ind = {}
#
# These hold the main outputs of the MVN
#
self.outgrid = {} # Holds the interpolated output arrays keyed by IMT
self.outsd = {} # Holds the standard deviation arrays keyed by IMT
self.outphi = {} # Holds the intra-event standard deviation arrays
self.outtau = {} # Holds the inter-event standard deviation arrays
#
# Places to put the results for the attenuation plots
#
self.atten_rock_mean = {}
self.atten_soil_mean = {}
self.atten_rock_sd = {}
self.atten_soil_sd = {}
[docs] def parseArgs(self, arglist):
"""
Set up the object to accept the --no_seismic, --no_macroseismic,
and --no_rupture flags.
"""
parser = argparse.ArgumentParser(
prog=self.__class__.command_name, description=inspect.getdoc(self.__class__)
)
parser.add_argument(
"-s",
"--no_seismic",
action="store_true",
help="Exclude instrumental seismic data from "
"the processing, ignoring any that may exist in "
"the input directory.",
)
parser.add_argument(
"-m",
"--no_macroseismic",
action="store_true",
help="Exclude macroseismic data from the "
"processing, ignoring any that may exist in the "
"input directory.",
)
parser.add_argument(
"-r",
"--no_rupture",
action="store_true",
help="Exclude a rupture model from the "
"processing, ignoring any that may exist in the "
"input directory.",
)
#
# This line should be in any modules that overrides this
# one. It will collect up everything after the current
# modules options in args.rem, which should be returned
# by this function. Note: doing parser.parse_known_args()
# will not work as it will suck up any later modules'
# options that are the same as this one's.
#
parser.add_argument("rem", nargs=argparse.REMAINDER, help=argparse.SUPPRESS)
args = parser.parse_args(arglist)
if args.no_seismic:
self.no_seismic = True
if args.no_macroseismic:
self.no_macroseismic = True
if args.no_rupture:
self.no_rupture = True
return args.rem
[docs] def execute(self):
"""
Interpolate ground motions to a grid or list of locations.
Raises:
NotADirectoryError: When the event data directory does not exist.
FileNotFoundError: When the the shake_data HDF file does not exist.
"""
self.logger.debug("Starting model...")
# ---------------------------------------------------------------------
# Make the input container and extract the config
# ---------------------------------------------------------------------
self._setInputContainer()
self.config = self.ic.getConfig()
self.sim_imt_paths = [x for x in self.ic.getArrays() if "simulations" in x]
if len(self.sim_imt_paths):
self.use_simulations = True
self.config["use_simulations"] = self.use_simulations
# ---------------------------------------------------------------------
# Clear away results from previous runs
# ---------------------------------------------------------------------
self._clearProducts()
# ---------------------------------------------------------------------
# Retrieve a bunch of config options and set them as attributes
# ---------------------------------------------------------------------
self._setConfigOptions()
# ---------------------------------------------------------------------
# Instantiate the gmpe, gmice, and ipe
# Here we make a placeholder gmpe so that we can make the
# rupture and distance contexts; later we'll make the
# IMT-specific gmpes
# ---------------------------------------------------------------------
self.default_gmpe = MultiGMPE.__from_config__(self.config)
self.gmice = get_object_from_config("gmice", "modeling", self.config)
if (
self.config["ipe_modules"][self.config["modeling"]["ipe"]][0]
== "VirtualIPE"
):
pgv_imt = imt.from_string("PGV")
ipe_gmpe = MultiGMPE.__from_config__(self.config, filter_imt=pgv_imt)
self.ipe = VirtualIPE.__fromFuncs__(ipe_gmpe, self.gmice)
else:
ipe = get_object_from_config("ipe", "modeling", self.config)
if "vs30" not in ipe.REQUIRES_SITES_PARAMETERS:
# REQUIRES_SITES_PARAMETERS is now a frozen set so we have to
# work around it
tmpset = set(ipe.REQUIRES_SITES_PARAMETERS)
tmpset.add("vs30")
ipe.REQUIRES_SITES_PARAMETERS = frozenset(tmpset)
ipe.DEFINED_FOR_INTENSITY_MEASURE_COMPONENT = (
oqconst.IMC.GREATER_OF_TWO_HORIZONTAL
)
self.ipe = MultiGMPE.__from_list__([ipe], [1.0])
ipe_sd_types = self.ipe.DEFINED_FOR_STANDARD_DEVIATION_TYPES
if len(ipe_sd_types) == 1:
self.ipe_total_sd_only = True
self.ipe_stddev_types = [oqconst.StdDev.TOTAL]
else:
self.ipe_total_sd_only = False
self.ipe_stddev_types = [
oqconst.StdDev.TOTAL,
oqconst.StdDev.INTER_EVENT,
oqconst.StdDev.INTRA_EVENT,
]
# ---------------------------------------------------------------------
# Get the rupture object and rupture context
# ---------------------------------------------------------------------
self.rupture_obj = self.ic.getRuptureObject()
# If the --no_rupture flag is used, switch to a PointRupture
if self.no_rupture:
self.rupture_obj = PointRupture(self.rupture_obj._origin)
if self.config["modeling"]["mechanism"] is not None:
self.rupture_obj._origin.setMechanism(
mech=self.config["modeling"]["mechanism"]
)
self.rx = self.rupture_obj.getRuptureContext([self.default_gmpe])
# TODO: figure out how to not have to do this
if self.rx.rake is None:
self.rx.rake = 0
#
# Set up the coordinates for the attenuation curves
#
repi = np.logspace(-1, 3, 200)
pt = self.rupture_obj._origin.getHypo()
self.atten_coords = {
"lons": np.full_like(repi, pt.x),
"lats": np.array([pt.y + x / 111.0 for x in repi]),
}
self.point_source = PointRupture(self.rupture_obj._origin)
# ---------------------------------------------------------------------
# The output locations: either a grid or a list of points
# ---------------------------------------------------------------------
self.logger.debug("Setting output params...")
self._setOutputParams()
landmask = self._getLandMask()
# We used to do this, but we've decided not to. Leaving the code
# in place in case we change our minds.
# if landmask is not None and np.all(landmask):
# raise TerminateShakeMap("Mapped area is entirely water")
# ---------------------------------------------------------------------
# If the gmpe doesn't break down its stardard deviation into
# within- and between-event terms, we need to handle things
# somewhat differently.
# ---------------------------------------------------------------------
gmpe_sd_types = self.default_gmpe.DEFINED_FOR_STANDARD_DEVIATION_TYPES
if len(gmpe_sd_types) == 1:
self.gmpe_total_sd_only = True
self.gmpe_stddev_types = [oqconst.StdDev.TOTAL]
else:
self.gmpe_total_sd_only = False
self.gmpe_stddev_types = [
oqconst.StdDev.TOTAL,
oqconst.StdDev.INTER_EVENT,
oqconst.StdDev.INTRA_EVENT,
]
# ---------------------------------------------------------------------
# Are we going to include directivity?
# ---------------------------------------------------------------------
# Config option?
dir_conf = self.config["modeling"]["directivity"]
# Is the rupture not a point source?
rup_check = not isinstance(self.rupture_obj, PointRupture)
if dir_conf and rup_check:
self.do_directivity = True
# The following attribute will be used to store a list of tuples,
# where each tuple will contain the 1) result of the directivity
# model (for the periods defined by Rowshandel2013) and 2) the
# associated distance context. The distance context is needed
# within the _gmas function for figuring out which of the results
# should be used when combining it with the GMPE result. We store
# the pre-defined period first and interpolate later because there
# is some optimization to doing it this way (some of the
# calculation is period independent).
self.dir_results = []
# But if we want to save the results that were actually used for
# each IMT, so we use a dictionary. This uses keys that are
# the same as self.outgrid.
self.dir_output = {}
else:
self.do_directivity = False
# ---------------------------------------------------------------------
# Station data: Create DataFrame(s) with the input data:
# df1 for instrumented data
# df2 for non-instrumented data
# ---------------------------------------------------------------------
self.logger.debug("Setting data frames...")
self._setDataFrames()
# ---------------------------------------------------------------------
# Add the predictions, etc. to the data frames
# ---------------------------------------------------------------------
self.logger.debug("Populating data frames...")
self._populateDataFrames()
# ---------------------------------------------------------------------
# Try to make all the derived IMTs possible from MMI (if we have MMI)
# ---------------------------------------------------------------------
self._deriveIMTsFromMMI()
# ---------------------------------------------------------------------
# Now make MMI from the station data where possible
# ---------------------------------------------------------------------
self._deriveMMIFromIMTs()
self.logger.debug("Getting combined IMTs")
# ---------------------------------------------------------------------
# Get the combined set of input and output IMTs, their periods,
# and an index dictionary, then make the cross-correlation function
# ---------------------------------------------------------------------
if self.use_simulations:
#
# Ignore what is in the configuration and make maps only for the
# IMTs that are in the set of simulations (and MMI).
#
self.combined_imt_set = set([x.split("/")[-1] for x in self.sim_imt_paths])
self.sim_df = {}
for imtstr in self.combined_imt_set:
dset, _ = self.ic.getArray(["simulations"], imtstr)
self.sim_df[imtstr] = dset
self.combined_imt_set |= set(["MMI"])
else:
self.combined_imt_set = self.imt_out_set.copy()
for ndf in self.dataframes:
self.combined_imt_set |= getattr(self, ndf).imts
self.imt_per, self.imt_per_ix = _get_period_arrays(self.combined_imt_set)
self.ccf = get_object_from_config("ccf", "modeling", self.config, self.imt_per)
self.logger.debug("Doing bias")
# ---------------------------------------------------------------------
# Do the bias for all of the input and output IMTs. Hold on
# to some of the products that will be used for the interpolation.
# The "raw" values are the stddevs that have not been inflated by
# the additional sigma (if any) of the point-source to finite
# rupture approximation.
# ---------------------------------------------------------------------
# ---------------------------------------------------------------------
# Do some prep, the bias, and the directivity prep
# ---------------------------------------------------------------------
self._fillDataArrays()
self._computeBias()
self._computeDirectivityPredictionLocations()
# ---------------------------------------------------------------------
# Now do the MVN with the intra-event residuals
# ---------------------------------------------------------------------
self.logger.debug("Doing MVN...")
if self.max_workers > 0:
with cf.ThreadPoolExecutor(max_workers=self.max_workers) as ex:
results = ex.map(self._computeMVN, self.imt_out_set)
list(results) # Check threads for possible exceptions, etc.
else:
for imt_str in self.imt_out_set:
self._computeMVN(imt_str)
self._applyCustomMask()
# ---------------------------------------------------------------------
# Output the data and metadata
# ---------------------------------------------------------------------
product_path = os.path.join(self.datadir, "products")
if not os.path.isdir(product_path):
os.mkdir(product_path)
oc = ShakeMapOutputContainer.create(
os.path.join(product_path, "shake_result.hdf")
)
# ---------------------------------------------------------------------
# Might as well stick the whole config in the result
# ---------------------------------------------------------------------
oc.setConfig(self.config)
# ---------------------------------------------------------------------
# We're going to need masked arrays of the output grids later, so
# make them now.
# ---------------------------------------------------------------------
moutgrid = self._getMaskedGrids(landmask)
# ---------------------------------------------------------------------
# Get the info dictionary that will become info.json, and
# store it in the output container
# ---------------------------------------------------------------------
info = self._getInfo(moutgrid)
oc.setMetadata(info)
# ---------------------------------------------------------------------
# Add the rupture JSON as a text string
# ---------------------------------------------------------------------
oc.setRuptureDict(self.rupture_obj._geojson)
# ---------------------------------------------------------------------
# Fill the station dictionary for stationlist.json and add it to
# the output container
# ---------------------------------------------------------------------
sjdict = self._fillStationJSON()
oc.setStationDict(sjdict)
# ---------------------------------------------------------------------
# Add the output grids or points to the output; include some
# metadata.
# ---------------------------------------------------------------------
if self.do_grid:
self._storeGriddedData(oc)
else:
self._storePointData(oc)
self._storeAttenuationData(oc)
oc.close()
self.ic.close()
self.contents.addFile(
"shakemapHDF",
"Comprehensive ShakeMap HDF Data File",
"HDF file containing all ShakeMap results.",
"shake_result.hdf",
"application/x-bag",
)
# -------------------------------------------------------------------------
# End execute()
# -------------------------------------------------------------------------
def _setInputContainer(self):
"""
Open the input container and set
the event's current data directory.
Raises:
NotADirectoryError: When the event data directory does not exist.
FileNotFoundError: When the the shake_data HDF file does not exist.
"""
#
# Find the shake_data.hdf file
#
_, data_path = get_config_paths()
datadir = os.path.join(data_path, self._eventid, "current")
if not os.path.isdir(datadir):
raise NotADirectoryError(f"{datadir} is not a valid directory.")
datafile = os.path.join(datadir, "shake_data.hdf")
if not os.path.isfile(datafile):
raise FileNotFoundError(f"{datafile} does not exist.")
self.datadir = datadir
self.ic = ShakeMapInputContainer.load(datafile)
def _clearProducts(self):
"""
Function to delete an event's products directory if it exists.
Returns:
nothing
"""
products_path = os.path.join(self.datadir, "products")
if os.path.isdir(products_path):
shutil.rmtree(products_path, ignore_errors=True)
pdl_path = os.path.join(self.datadir, "pdl")
if os.path.isdir(pdl_path):
shutil.rmtree(pdl_path, ignore_errors=True)
def _setConfigOptions(self):
"""
Pull various useful configuration options out of the config
dictionary.
Returns:
nothing
"""
# ---------------------------------------------------------------------
# Processing parameters
# ---------------------------------------------------------------------
self.max_workers = self.config["system"]["max_workers"]
# ---------------------------------------------------------------------
# Do we apply the generic amplification factors?
# ---------------------------------------------------------------------
self.apply_gafs = self.config["modeling"]["apply_generic_amp_factors"]
# ---------------------------------------------------------------------
# Bias parameters
# ---------------------------------------------------------------------
self.do_bias = self.config["modeling"]["bias"]["do_bias"]
self.bias_max_range = self.config["modeling"]["bias"]["max_range"]
self.bias_max_mag = self.config["modeling"]["bias"]["max_mag"]
self.bias_max_dsigma = self.config["modeling"]["bias"]["max_delta_sigma"]
# ---------------------------------------------------------------------
# Outlier parameters
# ---------------------------------------------------------------------
self.do_outliers = self.config["data"]["outlier"]["do_outliers"]
self.outlier_deviation_level = self.config["data"]["outlier"]["max_deviation"]
self.outlier_max_mag = self.config["data"]["outlier"]["max_mag"]
self.outlier_valid_stations = self.config["data"]["outlier"]["valid_stations"]
# ---------------------------------------------------------------------
# These are the IMTs we want to make
# ---------------------------------------------------------------------
self.imt_out_set = set(self.config["interp"]["imt_list"])
# ---------------------------------------------------------------------
# The x and y resolution of the output grid
# ---------------------------------------------------------------------
self.smdx = self.config["interp"]["prediction_location"]["xres"]
self.smdy = self.config["interp"]["prediction_location"]["yres"]
self.nmax = self.config["interp"]["prediction_location"]["nmax"]
# ---------------------------------------------------------------------
# Get the Vs30 file name
# ---------------------------------------------------------------------
self.vs30default = self.config["data"]["vs30default"]
self.vs30_file = self.config["data"]["vs30file"]
if not self.vs30_file:
self.vs30_file = None
self.mask_file = self.config["data"]["maskfile"]
if not self.mask_file:
self.mask_file = None
def _setOutputParams(self):
"""
Set variables dealing with the output grid or points
Returns:
nothing
"""
if self.use_simulations:
self.do_grid = True
imt_grp = self.sim_imt_paths[0]
groups = imt_grp.split("/")
myimt = groups[-1]
del groups[-1]
data, geodict = self.ic.getArray(groups, myimt)
self.W = geodict["xmin"]
self.E = geodict["xmax"]
self.S = geodict["ymin"]
self.N = geodict["ymax"]
self.smdx = geodict["dx"]
self.smdy = geodict["dy"]
self.sites_obj_out = Sites.fromBounds(
self.W,
self.E,
self.S,
self.N,
self.smdx,
self.smdy,
defaultVs30=self.vs30default,
vs30File=self.vs30_file,
padding=True,
resample=True,
)
self.smnx, self.smny = self.sites_obj_out.getNxNy()
self.sx_out = self.sites_obj_out.getSitesContext()
lons, lats = np.meshgrid(self.sx_out.lons, self.sx_out.lats)
self.sx_out.lons = lons.copy()
self.sx_out.lats = lats.copy()
self.lons = lons.flatten()
self.lats = lats.flatten()
self.depths = np.zeros_like(lats)
dist_obj_out = Distance.fromSites(
self.default_gmpe, self.sites_obj_out, self.rupture_obj
)
elif (
self.config["interp"]["prediction_location"]["file"]
and self.config["interp"]["prediction_location"]["file"] != "None"
):
#
# FILE: Open the file and get the output points
#
self.do_grid = False
in_sites = np.genfromtxt(
self.config["interp"]["prediction_location"]["file"],
autostrip=True,
unpack=True,
dtype=[np.double, np.double, np.double, "<U80"],
)
if np.size(in_sites) == 0:
self.logger.info("Points file is empty; nothing to do")
return
elif np.size(in_sites) == 1:
lons, lats, vs30, idents = in_sites.item()
self.idents = [idents]
else:
try:
lons, lats, vs30, self.idents = zip(*in_sites)
except Exception:
lons, lats, vs30, self.idents = zip(in_sites)
self.lons = np.array(lons).reshape(1, -1)
self.lats = np.array(lats).reshape(1, -1)
self.vs30 = np.array(vs30).reshape(1, -1)
self.depths = np.zeros_like(self.lats)
self.W = thirty_sec_min(np.min(self.lons))
self.E = thirty_sec_max(np.max(self.lons))
self.S = thirty_sec_min(np.min(self.lats))
self.N = thirty_sec_max(np.max(self.lats))
self.smnx = np.size(self.lons)
self.smny = 1
dist_obj_out = Distance(
self.default_gmpe, self.lons, self.lats, self.depths, self.rupture_obj
)
self.sites_obj_out = Sites.fromBounds(
self.W,
self.E,
self.S,
self.N,
self.smdx,
self.smdy,
defaultVs30=self.vs30default,
vs30File=self.vs30_file,
padding=True,
resample=True,
)
self.sx_out = self.sites_obj_out.getSitesContext(
{"lats": self.lats, "lons": self.lons}
)
# Replace the Vs30 from the grid (or default) with the Vs30
# provided with the site list.
if np.any(self.vs30 > 0):
self.sx_out.vs30 = self.vs30
Sites._addDepthParameters(self.sx_out)
else:
#
# GRID: Figure out the grid parameters and get output points
#
self.do_grid = True
if self.config["interp"]["prediction_location"]["extent"]:
self.W, self.S, self.E, self.N = self.config["interp"][
"prediction_location"
]["extent"]
else:
self.W, self.E, self.S, self.N = get_extent(
config=self.config, ipe=self.ipe, rupture=self.rupture_obj
)
# Adjust resolution to be under nmax
self._adjustResolution()
self.sites_obj_out = Sites.fromBounds(
self.W,
self.E,
self.S,
self.N,
self.smdx,
self.smdy,
defaultVs30=self.vs30default,
vs30File=self.vs30_file,
padding=True,
resample=True,
)
self.smnx, self.smny = self.sites_obj_out.getNxNy()
self.sx_out = self.sites_obj_out.getSitesContext()
lons, lats = np.meshgrid(self.sx_out.lons, self.sx_out.lats)
self.sx_out.lons = lons.copy()
self.sx_out.lats = lats.copy()
self.lons = lons.flatten()
self.lats = lats.flatten()
self.depths = np.zeros_like(lats)
dist_obj_out = Distance.fromSites(
self.default_gmpe, self.sites_obj_out, self.rupture_obj
)
#
# TODO: This will break if the IPE needs distance measures
# that the GMPE doesn't; should make this a union of the
# requirements of both
#
self.dx_out = dist_obj_out.getDistanceContext()
#
# Set up the sites and distance contexts for the attenuation curves
#
self.atten_sx_rock = self.sites_obj_out.getSitesContext(
self.atten_coords, rock_vs30=self.rock_vs30
)
self.atten_sx_soil = self.sites_obj_out.getSitesContext(
self.atten_coords, rock_vs30=self.soil_vs30
)
self.atten_dx = Distance(
self.default_gmpe,
self.atten_coords["lons"],
self.atten_coords["lats"],
np.zeros_like(self.atten_coords["lons"]),
rupture=self.point_source,
).getDistanceContext()
self.lons_out_rad = np.radians(self.lons).flatten()
self.lats_out_rad = np.radians(self.lats).flatten()
self.flip_lons = False
if self.W > 0 and self.E < 0:
self.flip_lons = True
self.lons_out_rad[self.lons_out_rad < 0] += 2 * np.pi
def _setDataFrames(self):
"""
Extract the StationList object from the input container and
fill the DataFrame class and keep a list of dataframes.
- df1 holds the instrumented data (PGA, PGV, SA)
- df2 holds the non-instrumented data (MMI)
"""
self.dataframes = []
try:
self.stations = self.ic.getStationList()
except AttributeError:
return
if self.stations is None:
return
for dfid, val in (("df1", True), ("df2", False)):
if dfid == "df1" and self.no_seismic:
continue
if dfid == "df2" and self.no_macroseismic:
continue
sdf, imts = self.stations.getStationDictionary(
instrumented=val, min_nresp=self.config["data"]["min_nresp"]
)
if sdf is not None:
df = DataFrame()
df.df = sdf
df.imts = imts
setattr(self, dfid, df)
self.dataframes.append(dfid)
# Flag the stations in the bad stations list from the config
if not hasattr(self, "df1"):
return
evdt = date(
self.rupture_obj._origin.time.year,
self.rupture_obj._origin.time.month,
self.rupture_obj._origin.time.day,
)
nostart = date(1970, 1, 1)
self.df1.df["flagged"] = np.full_like(self.df1.df["lon"], 0, dtype=np.bool)
if "bad_stations" not in self.config["data"]:
return
for sid, dates in self.config["data"]["bad_stations"].items():
ondate, offdate = dates.split(":")
year, month, day = map(int, ondate.split("-"))
ondt = date(year, month, day)
if offdate:
year, month, day = map(int, offdate.split("-"))
offdt = date(year, month, day)
else:
offdt = None
bad = False
if (ondt == nostart or ondt <= evdt) and (offdt is None or offdt >= evdt):
bad = True
if bad:
self.df1.df["flagged"] |= self.df1.df["id"] == sid
def _populateDataFrames(self):
"""
Make the sites and distance contexts for each dataframe then
compute the predictions for the IMTs in that dataframe.
"""
for dfid in self.dataframes:
dfn = getattr(self, dfid)
df = dfn.df
# -----------------------------------------------------------------
# Get the sites and distance contexts
# -----------------------------------------------------------------
df["depth"] = np.zeros_like(df["lon"])
lldict = {"lons": df["lon"], "lats": df["lat"]}
dfn.sx = self.sites_obj_out.getSitesContext(lldict)
dfn.sx_rock = copy.deepcopy(dfn.sx)
dfn.sx_rock.vs30 = np.full_like(dfn.sx.vs30, self.rock_vs30)
dfn.sx_soil = copy.deepcopy(dfn.sx)
dfn.sx_soil.vs30 = np.full_like(dfn.sx.vs30, self.soil_vs30)
dist_obj = Distance(
self.default_gmpe, df["lon"], df["lat"], df["depth"], self.rupture_obj
)
dfn.dx = dist_obj.getDistanceContext()
# -----------------------------------------------------------------
# Are we doing directivity?
# -----------------------------------------------------------------
if self.do_directivity is True:
self.logger.info(f"Directivity for {dfid}...")
time1 = time.time()
dir_df = Rowshandel2013(
self.rupture_obj._origin,
self.rupture_obj,
df["lat"].reshape((1, -1)),
df["lon"].reshape((1, -1)),
df["depth"].reshape((1, -1)),
dx=1.0,
T=Rowshandel2013.getPeriods(),
a_weight=0.5,
mtype=1,
)
self.dir_results.append((dir_df, dfn.dx))
directivity_time = time.time() - time1
self.logger.debug(
f"Directivity {dfid} evaluation time: {directivity_time:f} sec"
)
# -----------------------------------------------------------------
# Do the predictions and other bookkeeping for each IMT
# -----------------------------------------------------------------
imt_set = self.imt_out_set | set(dfn.imts)
for imtstr in imt_set:
oqimt = imt.from_string(imtstr)
gmpe = None
not_supported = False
if imtstr != "MMI":
try:
gmpe = MultiGMPE.__from_config__(self.config, filter_imt=oqimt)
except KeyError:
self.logger.warn(
f"Input IMT {imtstr} not supported by GMPE: ignoring"
)
not_supported = True
if not_supported:
pmean = np.full_like(df[imtstr], np.nan)
pmean_rock = np.full_like(df[imtstr], np.nan)
pmean_soil = np.full_like(df[imtstr], np.nan)
pstddev = [None] * 3
pstddev[0] = np.full_like(df[imtstr], np.nan)
pstddev[1] = np.full_like(df[imtstr], np.nan)
pstddev[2] = np.full_like(df[imtstr], np.nan)
pstddev_rock = [None] * 1
pstddev_soil = [None] * 1
pstddev_rock[0] = np.full_like(df[imtstr], np.nan)
pstddev_soil[0] = np.full_like(df[imtstr], np.nan)
else:
pmean, pstddev = self._gmas(
gmpe, dfn.sx, dfn.dx, oqimt, self.apply_gafs
)
pmean_rock, pstddev_rock = self._gmas(
gmpe, dfn.sx_rock, dfn.dx, oqimt, self.apply_gafs
)
pmean_soil, pstddev_soil = self._gmas(
gmpe, dfn.sx_soil, dfn.dx, oqimt, self.apply_gafs
)
df[imtstr + "_pred"] = pmean
df[imtstr + "_pred_sigma"] = pstddev[0]
df[imtstr + "_pred_rock"] = pmean_rock
df[imtstr + "_pred_sigma_rock"] = pstddev_rock[0]
df[imtstr + "_pred_soil"] = pmean_soil
df[imtstr + "_pred_sigma_soil"] = pstddev_soil[0]
if imtstr != "MMI":
total_only = self.gmpe_total_sd_only
tau_guess = SM_CONSTS["default_stddev_inter"]
else:
total_only = self.ipe_total_sd_only
tau_guess = SM_CONSTS["default_stddev_inter_mmi"]
if total_only:
df[imtstr + "_pred_tau"] = tau_guess * pstddev[0]
df[imtstr + "_pred_phi"] = np.sqrt(
pstddev[0] ** 2 - df[imtstr + "_pred_tau"] ** 2
)
else:
df[imtstr + "_pred_tau"] = pstddev[1]
df[imtstr + "_pred_phi"] = pstddev[2]
#
# If we're just computing the predictions of an output
# IMT, then we can skip the residual and outlier stuff
#
if imtstr not in df:
continue
#
# Compute the total residual
#
df[imtstr + "_residual"] = df[imtstr] - df[imtstr + "_pred"]
# -------------------------------------------------------------
# Do the outlier flagging if we have a fault, or we don't
# have a fault but the event magnitude is under the limit
# -------------------------------------------------------------
if self.do_outliers and (
not isinstance(self.rupture_obj, PointRupture)
or self.rx.mag <= self.outlier_max_mag
):
#
# Make a boolean array of stations that have been
# manually rehabilitated by the operator
#
is_valid = np.full(np.shape(df["id"]), False, dtype=bool)
for valid in self.outlier_valid_stations:
is_valid |= valid == df["id"]
#
# turn off nan warnings for this statement
#
np.seterr(invalid="ignore")
flagged = (
np.abs(df[imtstr + "_residual"])
> self.outlier_deviation_level * df[imtstr + "_pred_sigma"]
) & (~is_valid)
np.seterr(invalid="warn")
#
# Add NaN values to the list of outliers
#
flagged |= np.isnan(df[imtstr + "_residual"])
self.logger.debug(
"IMT: %s, flagged: %d" % (imtstr, np.sum(flagged))
)
df[imtstr + "_outliers"] = flagged
else:
#
# Not doing outliers, but should still flag NaNs
#
flagged = np.isnan(df[imtstr + "_residual"])
df[imtstr + "_outliers"] = flagged
#
# If uncertainty hasn't been set for MMI, give it
# the default value
#
if imtstr == "MMI" and all(df["MMI_sd"] == 0):
df["MMI_sd"][:] = self.config["data"]["default_mmi_stddev"]
#
# Get the lons/lats in radians while we're at it
#
df["lon_rad"] = np.radians(df["lon"])
df["lat_rad"] = np.radians(df["lat"])
if self.flip_lons:
df["lon_rad"][df["lon_rad"] < 0] += 2 * np.pi
#
# It will be handy later on to have the rupture distance
# in the dataframes
#
dd = get_distance(
["rrup"], df["lat"], df["lon"], df["depth"], self.rupture_obj
)
df["rrup"] = dd["rrup"]
if dd["rrup_var"] is not None:
df["rrup_var"] = dd["rrup_var"]
else:
df["rrup_var"] = np.zeros_like(dd["rrup"])
def _deriveIMTsFromMMI(self):
"""
Compute all the IMTs possible from MMI
TODO: This logic needs to be revisited. We should probably make what
we have to to do the CMS to make the needed output IMTs, but
for now, we're just going to use what we have and the ccf.
"""
if "df2" not in self.dataframes:
return
df2 = self.df2.df
for gmice_imt in self.gmice.DEFINED_FOR_INTENSITY_MEASURE_TYPES:
if imt.SA == gmice_imt:
iterlist = self.gmice.DEFINED_FOR_SA_PERIODS
else:
iterlist = [None]
for period in iterlist:
if period:
oqimt = gmice_imt(period)
else:
oqimt = gmice_imt()
imtstr = str(oqimt)
np.seterr(invalid="ignore")
df2[imtstr], _ = self.gmice.getGMfromMI(
df2["MMI"], oqimt, dists=df2["rrup"], mag=self.rx.mag
)
df2[imtstr][
df2["MMI"] < self.config["data"]["min_mmi_convert"]
] = np.nan
np.seterr(invalid="warn")
df2[imtstr + "_sd"] = np.full_like(
df2["MMI"], self.gmice.getMI2GMsd()[oqimt]
)
self.df2.imts.add(imtstr)
#
# Get the predictions and stddevs
#
not_supported = False
try:
gmpe = MultiGMPE.__from_config__(self.config, filter_imt=oqimt)
except KeyError:
self.logger.warn(
f"Input IMT {imtstr} not supported by GMPE: ignoring"
)
not_supported = True
if not_supported:
pmean = np.full_like(df2["MMI"], np.nan)
pstddev = [None] * 3
pstddev[0] = np.full_like(df2["MMI"], np.nan)
pstddev[1] = np.full_like(df2["MMI"], np.nan)
pstddev[2] = np.full_like(df2["MMI"], np.nan)
else:
pmean, pstddev = self._gmas(
gmpe, self.df2.sx, self.df2.dx, oqimt, self.apply_gafs
)
pmean_rock, pstddev_rock = self._gmas(
gmpe, self.df2.sx_rock, self.df2.dx, oqimt, self.apply_gafs
)
pmean_soil, pstddev_soil = self._gmas(
gmpe, self.df2.sx_soil, self.df2.dx, oqimt, self.apply_gafs
)
df2[imtstr + "_pred"] = pmean
df2[imtstr + "_pred_sigma"] = pstddev[0]
df2[imtstr + "_pred_rock"] = pmean_rock
df2[imtstr + "_pred_sigma_rock"] = pstddev_rock[0]
df2[imtstr + "_pred_soil"] = pmean_soil
df2[imtstr + "_pred_sigma_soil"] = pstddev_soil[0]
if imtstr != "MMI":
total_only = self.gmpe_total_sd_only
tau_guess = SM_CONSTS["default_stddev_inter"]
else:
total_only = self.ipe_total_sd_only
tau_guess = SM_CONSTS["default_stddev_inter_mmi"]
if total_only:
df2[imtstr + "_pred_tau"] = tau_guess * pstddev[0]
df2[imtstr + "_pred_phi"] = np.sqrt(
pstddev[0] ** 2 - df2[imtstr + "_pred_tau"] ** 2
)
else:
df2[imtstr + "_pred_tau"] = pstddev[1]
df2[imtstr + "_pred_phi"] = pstddev[2]
df2[imtstr + "_residual"] = df2[imtstr] - pmean
df2[imtstr + "_outliers"] = np.isnan(df2[imtstr + "_residual"])
df2[imtstr + "_outliers"] |= df2["MMI_outliers"]
def _deriveMMIFromIMTs(self):
"""
Make derived MMI from each of the IMTs in the input (for
which the GMICE is defined; then select the best MMI for
each station based on a list of "preferred" IMTs; also
calculate the predicted MMI and the residual.
"""
if "df1" not in self.dataframes:
return
df1 = self.df1.df
gmice_imts = [
imt.__name__ for imt in self.gmice.DEFINED_FOR_INTENSITY_MEASURE_TYPES
]
gmice_pers = self.gmice.DEFINED_FOR_SA_PERIODS
np.seterr(invalid="ignore")
df1["MMI"] = self.gmice.getPreferredMI(df1, dists=df1["rrup"], mag=self.rx.mag)
np.seterr(invalid="warn")
df1["MMI_sd"] = self.gmice.getPreferredSD()
if df1["MMI_sd"] is not None:
df1["MMI_sd"] = np.full_like(df1["lon"], df1["MMI_sd"])
for imtstr in self.df1.imts:
oqimt = imt.from_string(imtstr)
if not (
oqimt.string in gmice_imts
or (oqimt.string.startswith("SA") and "SA" in gmice_imts)
):
continue
if "SA" in oqimt.string and oqimt.period not in gmice_pers:
continue
np.seterr(invalid="ignore")
df1["derived_MMI_from_" + imtstr], _ = self.gmice.getMIfromGM(
df1[imtstr], oqimt, dists=df1["rrup"], mag=self.rx.mag
)
np.seterr(invalid="warn")
df1["derived_MMI_from_" + imtstr + "_sd"] = np.full_like(
df1[imtstr], self.gmice.getGM2MIsd()[oqimt]
)
preferred_imts = ["PGV", "PGA", "SA(1.0)", "SA(0.3)", "SA(3.0)"]
if df1["MMI"] is None:
df1["MMI"] = np.full_like(df1["lon"], np.nan)
df1["MMI_sd"] = np.full_like(df1["lon"], np.nan)
df1["MMI_outliers"] = np.full_like(df1["lon"], True, dtype=np.bool)
for imtstr in preferred_imts:
if "derived_MMI_from_" + imtstr in df1:
ixx = (np.isnan(df1["MMI"]) | df1["MMI_outliers"]) & ~(
np.isnan(df1["derived_MMI_from_" + imtstr])
| df1[imtstr + "_outliers"]
)
df1["MMI"][ixx] = df1["derived_MMI_from_" + imtstr][ixx]
df1["MMI_sd"][ixx] = df1["derived_MMI_from_" + imtstr + "_sd"][ixx]
df1["MMI_outliers"][ixx] = False
self.df1.imts.add("MMI")
#
# Get the prediction and stddevs
#
gmpe = None
pmean, pstddev = self._gmas(
gmpe, self.df1.sx, self.df1.dx, imt.from_string("MMI"), self.apply_gafs
)
pmean_rock, pstddev_rock = self._gmas(
gmpe, self.df1.sx_rock, self.df1.dx, imt.from_string("MMI"), self.apply_gafs
)
pmean_soil, pstddev_soil = self._gmas(
gmpe, self.df1.sx_soil, self.df1.dx, imt.from_string("MMI"), self.apply_gafs
)
df1["MMI" + "_pred"] = pmean
df1["MMI" + "_pred_sigma"] = pstddev[0]
df1["MMI" + "_pred_rock"] = pmean_rock
df1["MMI" + "_pred_sigma_rock"] = pstddev_rock[0]
df1["MMI" + "_pred_soil"] = pmean_soil
df1["MMI" + "_pred_sigma_soil"] = pstddev_soil[0]
if self.ipe_total_sd_only:
tau_guess = SM_CONSTS["default_stddev_inter_mmi"]
df1["MMI" + "_pred_tau"] = tau_guess * pstddev[0]
df1["MMI" + "_pred_phi"] = np.sqrt(
pstddev[0] ** 2 - df1["MMI" + "_pred_tau"] ** 2
)
else:
df1["MMI" + "_pred_tau"] = pstddev[1]
df1["MMI" + "_pred_phi"] = pstddev[2]
df1["MMI" + "_residual"] = df1["MMI"] - pmean
df1["MMI" + "_outliers"] |= np.isnan(df1["MMI" + "_residual"])
def _fillDataArrays(self):
"""
For each IMT get lists of the amplitudes that can contribute
to the bias and the interpolation. Keep lists of IMT, period
index, lons, lats, residuals, tau, phi, additional uncertainty,
and rupture distance.
"""
imtsets = {}
sasets = {}
for ndf in ("df1", "df2"):
df = getattr(self, ndf, None)
if df is None:
continue
imtsets[ndf], sasets[ndf] = _get_imt_lists(df)
for imtstr in self.imt_out_set:
#
# Fill the station arrays; here we use lists and append to
# them because it is much faster than appending to a numpy
# array; after the loop, the lists are converted to numpy
# arrays:
#
lons_rad = [] # longitude (in radians) of the input station
lats_rad = [] # latitude (in radians) of the input station
resids = [] # The residual of the input IMT
tau = [] # The between-event stddev of the input IMT
phi = [] # The within-event stddev of the input IMT
sig_extra = [] # Additional stddev of the input IMT
rrups = [] # The rupture distance of the input station
per_ix = []
for ndf in ("df1", "df2"):
tdf = getattr(self, ndf, None)
if tdf is None:
continue
sdf = tdf.df
for i in range(np.size(sdf["lon"])):
#
# Each station can provide 0, 1, or 2 IMTs:
#
for imtin in _get_nearest_imts(
imtstr, imtsets[ndf][i], sasets[ndf][i]
):
per_ix.append(self.imt_per_ix[imtin])
lons_rad.append(sdf["lon_rad"][i])
lats_rad.append(sdf["lat_rad"][i])
resids.append(sdf[imtin + "_residual"][i])
tau.append(sdf[imtin + "_pred_tau"][i])
phi.append(sdf[imtin + "_pred_phi"][i])
sig_extra.append(sdf[imtin + "_sd"][i])
rrups.append(sdf["rrup"][i])
self.sta_per_ix[imtstr] = np.array(per_ix)
self.sta_lons_rad[imtstr] = np.array(lons_rad)
self.sta_lats_rad[imtstr] = np.array(lats_rad)
if self.flip_lons:
self.sta_lons_rad[imtstr][self.sta_lons_rad[imtstr] < 0] += 2 * np.pi
self.sta_resids[imtstr] = np.array(resids).reshape((-1, 1))
self.sta_tau[imtstr] = np.array(tau).reshape((-1, 1))
self.sta_phi[imtstr] = np.array(phi).reshape((-1, 1))
self.sta_sig_extra[imtstr] = np.array(sig_extra)
self.sta_rrups[imtstr] = np.array(rrups)
def _computeBias(self):
"""
Compute a bias for all of the IMTs in the outputs
"""
for imtstr in self.imt_out_set:
time1 = time.time()
#
# Get the index of the (pseudo-) period of the output IMT
#
outperiod_ix = self.imt_per_ix[imtstr]
#
# Get the data and distance-limited residuals for computing
# the bias
#
sta_per_ix = self.sta_per_ix[imtstr]
sta_lons_rad = self.sta_lons_rad[imtstr]
sta_lats_rad = self.sta_lats_rad[imtstr]
sta_tau = self.sta_tau[imtstr]
sta_phi = self.sta_phi[imtstr]
sta_sig_extra = self.sta_sig_extra[imtstr]
dix = self.sta_rrups[imtstr] > self.bias_max_range
sta_resids_dl = self.sta_resids[imtstr].copy()
if len(dix) > 0:
sta_resids_dl[dix] = 0.0
#
# If there are no stations, bail out
#
nsta = np.size(sta_lons_rad)
if nsta == 0:
self.mu_H_yD[imtstr] = 0.0
self.cov_HH_yD[imtstr] = 0.0
self.nominal_bias[imtstr] = 0.0
nom_variance = 0.0
bias_time = time.time() - time1
#
# Write the nominal values of the bias and its stddev to log
#
self.logger.debug(
"%s: nom bias %f nom stddev %f; %d stations (time=%f sec)"
% (
imtstr,
self.nominal_bias[imtstr],
np.sqrt(nom_variance),
nsta,
bias_time,
)
)
continue
#
# Set up the IMT indices
#
imt_types = np.sort(np.unique(sta_per_ix))
len_types = len(imt_types)
self.imt_types[imtstr] = imt_types
self.len_types[imtstr] = len_types
sa_inds = {}
for i in range(len_types):
sa_inds[imt_types[i]] = np.where(sta_per_ix == imt_types[i])[0]
if outperiod_ix not in imt_types:
self.no_native_flag[imtstr] = True
imt_types_alt = np.sort(
np.concatenate([imt_types, np.array([outperiod_ix])])
)
self.imt_Y_ind[imtstr] = np.where(outperiod_ix == imt_types_alt)[0][0]
else:
self.no_native_flag[imtstr] = False
#
# Build T_D and corr_HH_D
#
if self.no_native_flag[imtstr] is False:
T_D = np.zeros((nsta, len_types))
for i in range(len_types):
T_D[sa_inds[imt_types[i]], i] = sta_tau[sa_inds[imt_types[i]], 0]
corr_HH_D = np.zeros((len_types, len_types))
ones = np.ones(len_types, dtype=np.long).reshape((-1, 1))
t1 = imt_types.reshape((1, -1)) * ones
t2 = imt_types.reshape((-1, 1)) * ones.T
self.ccf.getCorrelation(t1, t2, corr_HH_D)
else:
T_D = np.zeros((nsta, len_types + 1))
for i in range(len_types + 1):
if i == self.imt_Y_ind[imtstr]:
continue
if i < self.imt_Y_ind[imtstr]:
T_D[sa_inds[imt_types[i]], i] = sta_tau[
sa_inds[imt_types[i]], 0
]
else:
T_D[sa_inds[imt_types[i - 1]], i] = sta_tau[
sa_inds[imt_types[i - 1]], 0
]
corr_HH_D = np.zeros((len_types + 1, len_types + 1))
ones = np.ones(len_types + 1, dtype=np.long).reshape((-1, 1))
t1 = imt_types_alt.reshape((1, -1)) * ones
t2 = imt_types_alt.reshape((-1, 1)) * ones.T
self.ccf.getCorrelation(t1, t2, corr_HH_D)
#
# Make cov_WD_WD_inv (Sigma_22_inv)
#
matrix22 = np.empty((nsta, nsta), dtype=np.double)
geodetic_distance_fast(
sta_lons_rad, sta_lats_rad, sta_lons_rad, sta_lats_rad, matrix22
)
ones = np.ones(nsta, dtype=np.long).reshape((-1, 1))
t1_22 = sta_per_ix.reshape((1, -1)) * ones
t2_22 = sta_per_ix.reshape((-1, 1)) * ones.T
self.ccf.getCorrelation(t1_22, t2_22, matrix22)
sta_phi_flat = sta_phi.flatten()
make_sigma_matrix(matrix22, sta_phi_flat, sta_phi_flat)
np.fill_diagonal(matrix22, np.diag(matrix22) + sta_sig_extra ** 2)
cov_WD_WD_inv = np.linalg.pinv(matrix22)
#
# Hold on to some things we'll need later
#
self.T_D[imtstr] = T_D
self.cov_WD_WD_inv[imtstr] = cov_WD_WD_inv
#
# Compute the bias mu_H_yD and cov_HH_yD pieces
#
cov_HH_yD = np.linalg.pinv(
np.linalg.multi_dot([T_D.T, cov_WD_WD_inv, T_D])
+ np.linalg.pinv(corr_HH_D)
)
mu_H_yD = np.linalg.multi_dot(
[cov_HH_yD, T_D.T, cov_WD_WD_inv, sta_resids_dl]
)
if self.do_bias and (
not isinstance(self.rupture_obj, PointRupture)
or self.rx.mag <= self.bias_max_mag
):
self.mu_H_yD[imtstr] = mu_H_yD
else:
self.mu_H_yD[imtstr] = np.zeros_like(mu_H_yD)
self.cov_HH_yD[imtstr] = cov_HH_yD
#
# Get the nominal bias and variance
#
delta_B_yD = T_D.dot(mu_H_yD)
self.nominal_bias[imtstr] = np.mean(delta_B_yD)
sig_B_yD = np.sqrt(np.diag(np.linalg.multi_dot([T_D, cov_HH_yD, T_D.T])))
nom_variance = np.mean(sig_B_yD)
bias_time = time.time() - time1
#
# Write the nominal values of the bias and its stddev to log
#
self.logger.debug(
"%s: nom bias %f nom stddev %f; %d stations (time=%f sec)"
% (
imtstr,
self.nominal_bias[imtstr],
np.sqrt(nom_variance),
nsta,
bias_time,
)
)
def _computeDirectivityPredictionLocations(self):
"""
Figure out if we need the directivity factors, and if so, pre-calculate
them. These will be used later in _computeMVN.
"""
if self.do_directivity is True:
self.logger.info("Directivity for prediction locations...")
time1 = time.time()
# Precompute directivity at all periods
dir_out = Rowshandel2013(
self.rupture_obj._origin,
self.rupture_obj,
self.lats.reshape((1, -1)),
self.lons.reshape((1, -1)),
np.zeros_like((len(self.lats), 1)),
dx=1.0,
T=Rowshandel2013.getPeriods(),
a_weight=0.5,
mtype=1,
)
self.dir_results.append((dir_out, self.dx_out))
# Precompute directivity for the attenuation curves
dir_out = Rowshandel2013(
self.rupture_obj._origin,
self.rupture_obj,
self.atten_coords["lats"].reshape((1, -1)),
self.atten_coords["lons"].reshape((1, -1)),
np.zeros_like((len(self.atten_coords["lats"]), 1)),
dx=1.0,
T=Rowshandel2013.getPeriods(),
a_weight=0.5,
mtype=1,
)
self.dir_results.append((dir_out, self.atten_dx))
directivity_time = time.time() - time1
self.logger.debug(
f"Directivity prediction evaluation time: {directivity_time:f} sec"
)
else:
self.directivity = None
def _computeMVN(self, imtstr):
"""
Do the MVN computations
"""
self.logger.debug(f"computeMVN: doing IMT {imtstr}")
time1 = time.time()
#
# Get the index of the (pesudo-) period of the output IMT
#
outperiod_ix = self.imt_per_ix[imtstr]
#
# Get the predictions at the output points
#
oqimt = imt.from_string(imtstr)
if imtstr != "MMI":
gmpe = MultiGMPE.__from_config__(self.config, filter_imt=oqimt)
else:
gmpe = self.ipe
pout_mean, pout_sd = self._gmas(
gmpe, self.sx_out, self.dx_out, oqimt, self.apply_gafs
)
if self.use_simulations:
if imtstr == "MMI":
pout_mean = self.gmice.getPreferredMI(
self.sim_df, dists=self.dx_out.rrup, mag=self.rx.mag
)
else:
pout_mean = self.sim_df[imtstr]
#
# While we have the gmpe for this IMT, we should make
# the attenuation curves
#
x_mean, x_sd = self._gmas(
gmpe, self.atten_sx_rock, self.atten_dx, oqimt, self.apply_gafs
)
self.atten_rock_mean[imtstr] = x_mean
self.atten_rock_sd[imtstr] = x_sd[0]
x_mean, x_sd = self._gmas(
gmpe, self.atten_sx_soil, self.atten_dx, oqimt, self.apply_gafs
)
self.atten_soil_mean[imtstr] = x_mean
self.atten_soil_sd[imtstr] = x_sd[0]
#
# Get an array of the within-event standard deviations for the
# output IMT at the output points
#
if imtstr != "MMI":
total_only = self.gmpe_total_sd_only
tau_guess = SM_CONSTS["default_stddev_inter"]
else:
total_only = self.ipe_total_sd_only
tau_guess = SM_CONSTS["default_stddev_inter_mmi"]
if total_only:
self.tsd[imtstr] = tau_guess * pout_sd[0]
self.psd[imtstr] = np.sqrt(pout_sd[0] ** 2 - self.tsd[imtstr] ** 2)
self.psd_raw[imtstr] = np.sqrt(pout_sd[1] ** 2 - self.tsd[imtstr] ** 2)
else:
self.tsd[imtstr] = pout_sd[1]
self.psd[imtstr] = pout_sd[2]
self.psd_raw[imtstr] = pout_sd[5]
#
# If there are no data, just use the unbiased prediction
# and the stddev
#
nsta = np.size(self.sta_lons_rad[imtstr])
if nsta == 0:
self.outgrid[imtstr] = pout_mean
self.outsd[imtstr] = pout_sd[0]
self.outphi[imtstr] = self.psd[imtstr]
self.outtau[imtstr] = self.tsd[imtstr]
# Special stuff for the MMI priors. When we make this apply to other
# IMTs we'll need to mess around with this block
if imtstr == "MMI":
self.MMI_add_uncertainty = np.array([])
self.MMI_Sigma_HH_YD = np.array([])
self.MMI_C = np.array([])
self.MMI_sta_per_ix = np.array([])
return
pout_sd2_phi = np.power(self.psd[imtstr], 2.0)
sta_per_ix = self.sta_per_ix[imtstr]
sta_phi = self.sta_phi[imtstr]
sta_lons_rad = self.sta_lons_rad[imtstr]
sta_lats_rad = self.sta_lats_rad[imtstr]
len_output = np.size(self.tsd[imtstr])
if self.no_native_flag[imtstr] is False:
T_Y0 = np.zeros((len_output, self.len_types[imtstr]))
T_Y0[:, np.where(outperiod_ix == self.imt_types[imtstr])[0][0]] = self.tsd[
imtstr
].ravel()
else:
T_Y0 = np.zeros((len_output, self.len_types[imtstr] + 1))
T_Y0[:, self.imt_Y_ind[imtstr]] = self.tsd[imtstr].ravel()
T_D = self.T_D[imtstr]
cov_WD_WD_inv = self.cov_WD_WD_inv[imtstr]
#
# Now do the MVN itself...
#
dtime = mtime = ddtime = ctime = stime = atime = 0
C = np.empty_like(T_Y0[0 : self.smnx, :])
C_tmp1 = np.empty_like(C)
C_tmp2 = np.empty_like(C)
s_tmp1 = np.empty((self.smnx), dtype=np.float64).reshape((-1, 1))
s_tmp2 = np.empty((self.smnx), dtype=np.float64).reshape((-1, 1))
s_tmp3 = np.empty((self.smnx), dtype=np.float64)
ampgrid = np.zeros_like(pout_mean)
cov_WY_WY_WD = np.zeros_like(pout_mean)
sdgrid_tau = np.zeros_like(pout_mean)
# Stuff that doesn't change within the loop:
lons_out_rad = self.lons_out_rad
lats_out_rad = self.lats_out_rad
d12_cols = self.smnx
ones = np.ones(d12_cols, dtype=np.long).reshape((-1, 1))
t1_12 = sta_per_ix.reshape((1, -1)) * ones
t2_12 = np.full((d12_cols, nsta), outperiod_ix, dtype=np.long)
# sdsta is the standard deviation of the stations
sdsta_phi = sta_phi.flatten()
matrix12_phi = np.empty(t2_12.shape, dtype=np.double)
rcmatrix_phi = np.empty(t2_12.shape, dtype=np.double)
# Allocate the full C matrix only for the desired IMT (MMI). This
# is for generating realizations. Someday we will want to make this
# apply to other IMTs and become an optional flag for this module
# where we save the priors optionally
# Note: We would have to set up C_complete = {} at the top of the
# module, and then here, it would be C_complete[imtstr] = ...
if imtstr == "MMI":
C_complete = np.empty_like(T_Y0)
for iy in range(self.smny):
ss = iy * self.smnx
se = (iy + 1) * self.smnx
time4 = time.time()
geodetic_distance_fast(
sta_lons_rad,
sta_lats_rad,
lons_out_rad[ss:se],
lats_out_rad[ss:se],
matrix12_phi,
)
ddtime += time.time() - time4
time4 = time.time()
self.ccf.getCorrelation(t1_12, t2_12, matrix12_phi)
ctime += time.time() - time4
time4 = time.time()
# sdarr_phi is the standard deviation of the within-event
# residuals at the output sites
sdarr_phi = self.psd[imtstr][iy, :]
make_sigma_matrix(matrix12_phi, sdsta_phi, sdarr_phi)
stime += time.time() - time4
time4 = time.time()
#
# Sigma12 * Sigma22^-1 is known as the 'regression
# coefficient' matrix (rcmatrix)
#
np.dot(matrix12_phi, cov_WD_WD_inv, out=rcmatrix_phi)
dtime += time.time() - time4
time4 = time.time()
#
# We only want the diagonal elements of the conditional
# covariance matrix, so there is no point in doing the
# full solution with the dot product, e.g.:
# sdgrid[ss:se] = pout_sd2[ss:se] -
# np.diag(rcmatrix.dot(sigma21))
#
# make_sd_array is a Cython function that is optimized to find
# the diagonal of the covariance matrix.
#
make_sd_array(cov_WY_WY_WD, pout_sd2_phi, iy, rcmatrix_phi, matrix12_phi)
#
# Equation B32 of Engler et al. (2021)
#
np.subtract(T_Y0[ss:se, :], np.dot(rcmatrix_phi, T_D, out=C_tmp1), out=C)
#
# This is the MVN solution for the conditional mean
# It is an implementation of the equation just below
# equation B25 in Engler et al. (2021):
#
# mu_Y_yD = mu_Y + C mu_H_yD + cov_WY_WD cov_WD_WD^-1 zeta
#
# but we break it up for efficiency.
#
s_tmp1r = np.dot(C, self.mu_H_yD[imtstr], out=s_tmp1).reshape((-1,))
s_tmp2r = np.dot(rcmatrix_phi, self.sta_resids[imtstr], out=s_tmp2).reshape(
(-1,)
)
ampgrid[iy, :] = np.add(
np.add(pout_mean[iy, :], s_tmp1r, out=s_tmp1r), s_tmp2r, out=s_tmp2r
)
atime += time.time() - time4
time4 = time.time()
#
# We're doing this:
#
# sdgrid_tau[iy, :] = np.diag(
# C.dot(self.cov_HH_yD[imtstr].dot(C.T)))
#
# to find the between-event part of the diagonal of the conditional
# covariance. This is the second term of equation B27 of Engler
# et al. (2021). The code below is faster and uses less memory than
# actually implementing the above equation.
#
np.dot(C, self.cov_HH_yD[imtstr], C_tmp1)
sdgrid_tau[iy, :] = np.sum(
np.multiply(C_tmp1, C, out=C_tmp2), out=s_tmp3, axis=1
)
# This is where we would have a flag if we were saving the priors
# as an option
if imtstr == "MMI":
# Save C to complete full size C
C_complete[ss:se, :] = C
mtime += time.time() - time4
#
# This processing can result in MMI values that go beyond
# the 1 to 10 bounds of MMI, so we apply that constraint again
# here
#
if imtstr == "MMI":
ampgrid = np.clip(ampgrid, 1.0, 10.0)
#
# The conditional mean
#
self.outgrid[imtstr] = ampgrid
#
# The outputs are the conditional total stddev, the conditional
# between-event stddev (tau), and the prior within-event stddev (phi)
#
self.outsd[imtstr] = np.sqrt(
np.add(cov_WY_WY_WD, sdgrid_tau, out=cov_WY_WY_WD), out=cov_WY_WY_WD
)
# self.outphi[imtstr] = np.sqrt(cov_WY_WY_WD)
self.outphi[imtstr] = self.psd[imtstr]
self.outtau[imtstr] = np.sqrt(sdgrid_tau, out=sdgrid_tau)
# Special stuff for the MMI priors. When we make this apply to other
# IMTs we'll need to mess around with this block
if imtstr == "MMI":
self.MMI_add_uncertainty = self.sta_sig_extra[imtstr]
self.MMI_Sigma_HH_YD = self.cov_HH_yD[imtstr]
self.MMI_C = C_complete
self.MMI_sta_per_ix = sta_per_ix
self.logger.debug(f"\ttime for {imtstr} distance={ddtime:f}")
self.logger.debug(f"\ttime for {imtstr} correlation={ctime:f}")
self.logger.debug(f"\ttime for {imtstr} sigma={stime:f}")
self.logger.debug(f"\ttime for {imtstr} rcmatrix={dtime:f}")
self.logger.debug(f"\ttime for {imtstr} amp calc={atime:f}")
self.logger.debug(f"\ttime for {imtstr} sd calc={mtime:f}")
self.logger.debug(f"total time for {imtstr}={time.time() - time1:f}")
def _applyCustomMask(self):
"""Apply custom masks to IMT grid outputs."""
if self.mask_file:
mask = self._getMask(self.mask_file)
for grid in self.outgrid.values():
grid[~mask] = np.nan
def _getLandMask(self):
"""
Get the landmask for this map. Land will be False, water will
be True (because of the way masked arrays work).
"""
if "CALLED_FROM_PYTEST" in os.environ:
oceans = None
else:
oceans = shpreader.natural_earth(
category="physical", name="ocean", resolution="10m"
)
return self._getMask(oceans)
def _getMask(self, vector=None):
"""
Get a masked array for this map corresponding to the given vector
feature.
"""
if not self.do_grid:
return None
gd = GeoDict.createDictFromBox(
self.W, self.E, self.S, self.N, self.smdx, self.smdy
)
bbox = (gd.xmin, gd.ymin, gd.xmax, gd.ymax)
if vector is None:
return np.zeros((gd.ny, gd.nx), dtype=np.bool)
with fiona.open(vector) as c:
tshapes = list(c.items(bbox=bbox))
shapes = []
for tshp in tshapes:
shapes.append(shape(tshp[1]["geometry"]))
if len(shapes):
grid = Grid2D.rasterizeFromGeometry(shapes, gd, fillValue=0.0)
return grid.getData().astype(np.bool)
else:
return np.zeros((gd.ny, gd.nx), dtype=np.bool)
def _getMaskedGrids(self, bmask):
"""
For each grid in the output, generate a grid with the water areas
masked out.
"""
moutgrid = {}
if not self.do_grid:
for imtout in self.imt_out_set:
moutgrid[imtout] = self.outgrid[imtout]
return moutgrid
for imtout in self.imt_out_set:
moutgrid[imtout] = ma.masked_array(self.outgrid[imtout], mask=bmask)
return moutgrid
def _getInfo(self, moutgrid):
"""
Create an info dictionary that can be made into the info.json file.
"""
#
# Get the map grade
#
mean_rat, mygrade = _get_map_grade(
self.do_grid, self.outsd, self.psd_raw, moutgrid
)
# ---------------------------------------------------------------------
# This is the metadata for creating info.json
# ---------------------------------------------------------------------
st = "strec"
ip = "input"
ei = "event_information"
op = "output"
gm = "ground_motions"
mi = "map_information"
un = "uncertainty"
pp = "processing"
gmm = "ground_motion_modules"
ms = "miscellaneous"
sv = "shakemap_versions"
sr = "site_response"
info = self._info
info[ip] = {}
info[ip][ei] = {}
info[ip][ei]["depth"] = str(self.rx.hypo_depth)
info[ip][ei]["event_id"] = self._eventid
# look for the presence of a strec_results file and read it in
_, data_path = get_config_paths()
datadir = os.path.join(data_path, self._eventid, "current")
strecfile = os.path.join(datadir, "strec_results.json")
if os.path.isfile(strecfile):
strec_results = json.load(open(strecfile, "rt"))
info[st] = strec_results
# the following items are primarily useful for PDL
origin = self.rupture_obj._origin
info[ip][ei]["eventsource"] = origin.netid
info[ip][ei]["netid"] = origin.netid
# The netid could be a valid part of the eventsourcecode, so we have
# to check here if it ***starts with*** the netid
if origin.id.startswith(origin.netid):
eventsourcecode = origin.id.replace(origin.netid, "", 1)
else:
eventsourcecode = origin.id
info[ip][ei]["eventsourcecode"] = eventsourcecode
info[ip][ei]["id"] = origin.id
info[ip][ei]["productcode"] = origin.productcode
info[ip][ei]["productsource"] = self.config["system"]["source_network"]
info[ip][ei]["producttype"] = self.config["system"]["product_type"]
info[ip][ei]["event_ref"] = getattr(origin, "reference", None)
info[ip][ei]["fault_ref"] = self.rupture_obj.getReference()
if "df2" in self.dataframes:
info[ip][ei]["intensity_observations"] = str(np.size(self.df2.df["lon"]))
else:
info[ip][ei]["intensity_observations"] = "0"
info[ip][ei]["latitude"] = str(self.rx.hypo_lat)
info[ip][ei]["longitude"] = str(self.rx.hypo_lon)
info[ip][ei]["location"] = origin.locstring
info[ip][ei]["magnitude"] = str(self.rx.mag)
info[ip][ei]["origin_time"] = origin.time.strftime(constants.TIMEFMT)
if "df1" in self.dataframes:
info[ip][ei]["seismic_stations"] = str(np.size(self.df1.df["lon"]))
else:
info[ip][ei]["seismic_stations"] = "0"
info[ip][ei]["src_mech"] = origin.mech
if self.config["system"]["source_description"] != "":
info[ip][ei]["event_description"] = self.config["system"][
"source_description"
]
else:
info[ip][ei]["event_description"] = origin.locstring
# This AND src_mech?
# look at the origin information for indications that this
# event is a scenario
condition1 = (
hasattr(origin, "event_type") and origin.event_type.lower() == "scenario"
)
condition2 = origin.id.endswith("_se")
if condition1 or condition2:
info[ip][ei]["event_type"] = "SCENARIO"
else:
info[ip][ei]["event_type"] = "ACTUAL"
info[op] = {}
info[op][gm] = {}
for myimt in self.imt_out_set:
info[op][gm][myimt] = {}
if myimt == "MMI":
units = "intensity"
elif myimt == "PGV":
units = "cms"
else:
units = "g"
info[op][gm][myimt]["units"] = units
if myimt in self.nominal_bias:
info[op][gm][myimt]["bias"] = _string_round(self.nominal_bias[myimt], 3)
else:
info[op][gm][myimt]["bias"] = None
if myimt == "MMI":
info[op][gm][myimt]["max_grid"] = _string_round(
np.max(self.outgrid[myimt]), 3
)
info[op][gm][myimt]["max"] = _string_round(np.max(moutgrid[myimt]), 3)
else:
info[op][gm][myimt]["max_grid"] = _string_round(
np.exp(np.max(self.outgrid[myimt])), 3
)
info[op][gm][myimt]["max"] = _string_round(
np.exp(np.max(moutgrid[myimt])), 3
)
info[op][mi] = {}
info[op][mi]["grid_points"] = {}
info[op][mi]["grid_points"]["longitude"] = str(self.smnx)
info[op][mi]["grid_points"]["latitude"] = str(self.smny)
info[op][mi]["grid_points"]["units"] = ""
info[op][mi]["grid_spacing"] = {}
info[op][mi]["grid_spacing"]["longitude"] = _string_round(self.smdx, 7)
info[op][mi]["grid_spacing"]["latitude"] = _string_round(self.smdy, 7)
info[op][mi]["grid_spacing"]["units"] = "degrees"
info[op][mi]["grid_span"] = {}
if self.E <= 0 and self.W >= 0:
info[op][mi]["grid_span"]["longitude"] = _string_round(
self.E + 360.0 - self.W, 3
)
else:
info[op][mi]["grid_span"]["longitude"] = _string_round(self.E - self.W, 3)
info[op][mi]["grid_span"]["latitude"] = _string_round(self.N - self.S, 3)
info[op][mi]["grid_span"]["units"] = "degrees"
info[op][mi]["min"] = {}
info[op][mi]["max"] = {}
min_long = self.W
max_long = self.E
if self.rx.hypo_lon < 0:
if min_long > 0: # Crossing the 180 from the negative side
min_long = min_long - 360
else:
if max_long < 0: # Crossing the 180 from the positive side
max_long = max_long + 360
info[op][mi]["min"]["longitude"] = _string_round(min_long, 3)
info[op][mi]["max"]["longitude"] = _string_round(max_long, 3)
info[op][mi]["min"]["latitude"] = _string_round(self.S, 3)
info[op][mi]["max"]["latitude"] = _string_round(self.N, 3)
info[op][mi]["min"]["units"] = "degrees"
info[op][mi]["max"]["units"] = "degrees"
info[op][un] = {}
info[op][un]["grade"] = mygrade
info[op][un]["mean_uncertainty_ratio"] = _string_round(mean_rat, 3)
if "df2" in self.dataframes:
info[op][un]["total_flagged_mi"] = str(
np.sum(self.df2.df["MMI_outliers"] | np.isnan(self.df2.df["MMI"]))
)
else:
info[op][un]["total_flagged_mi"] = "0"
if "df1" in self.dataframes:
all_flagged = np.full(self.df1.df["lon"].shape, False, dtype=np.bool)
for imtstr in self.df1.imts:
if "MMI" in imtstr:
continue
all_flagged |= self.df1.df[imtstr + "_outliers"] | np.isnan(
self.df1.df[imtstr]
)
all_flagged |= self.df1.df["flagged"]
info[op][un]["total_flagged_pgm"] = str(np.sum(all_flagged))
else:
info[op][un]["total_flagged_pgm"] = "0"
info[pp] = {}
info[pp][gmm] = {}
info[pp][gmm]["gmpe"] = {}
info[pp][gmm]["gmpe"]["module"] = str(self.config["modeling"]["gmpe"])
info[pp][gmm]["gmpe"]["reference"] = ""
info[pp][gmm]["ipe"] = {}
info[pp][gmm]["ipe"]["module"] = str(
self.config["ipe_modules"][self.config["modeling"]["ipe"]][0]
)
info[pp][gmm]["ipe"]["reference"] = ""
info[pp][gmm]["gmice"] = {}
info[pp][gmm]["gmice"]["module"] = str(
self.config["gmice_modules"][self.config["modeling"]["gmice"]][0]
)
info[pp][gmm]["gmice"]["reference"] = ""
info[pp][gmm]["ccf"] = {}
info[pp][gmm]["ccf"]["module"] = str(
self.config["ccf_modules"][self.config["modeling"]["ccf"]][0]
)
info[pp][gmm]["ccf"]["reference"] = ""
info[pp][gmm]["basin_correction"] = {}
info[pp][gmm]["basin_correction"]["module"] = "None"
info[pp][gmm]["basin_correction"]["reference"] = ""
info[pp][gmm]["directivity"] = {}
info[pp][gmm]["directivity"]["module"] = "None"
info[pp][gmm]["directivity"]["reference"] = ""
info[pp][ms] = {}
info[pp][ms]["bias_max_dsigma"] = str(self.bias_max_dsigma)
info[pp][ms]["bias_max_mag"] = str(self.bias_max_mag)
info[pp][ms]["bias_max_range"] = str(self.bias_max_range)
info[pp][ms]["median_dist"] = "True"
info[pp][ms]["do_outliers"] = self.do_outliers
info[pp][ms]["outlier_deviation_level"] = str(self.outlier_deviation_level)
info[pp][ms]["outlier_max_mag"] = str(self.outlier_max_mag)
info[pp][sv] = {}
info[pp][sv]["shakemap_revision"] = get_versions()["version"]
info[pp][sv]["shakemap_revision_id"] = get_versions()["full-revisionid"]
info[pp][sv]["process_time"] = strftime(constants.ALT_TIMEFMT, gmtime())
info[pp][sv]["map_version"] = self.ic.getVersionHistory()["history"][-1][2]
info[pp][sv]["map_comment"] = self.ic.getVersionHistory()["history"][-1][3]
info[pp][sv]["map_data_history"] = self.ic.getVersionHistory()["history"]
info[pp][sv]["map_status"] = self.config["system"]["map_status"]
info[pp][sr] = {}
info[pp][sr]["vs30default"] = str(self.vs30default)
info[pp][sr]["site_correction"] = "GMPE native"
return info
def _fillStationJSON(self):
"""
Get the station JSON dictionary and then add a bunch of stuff to it.
"""
if not hasattr(self, "stations") or self.stations is None:
return {"eventid": self._eventid, "features": []}
sjdict = {}
# ---------------------------------------------------------------------
# Compute a bias for all the output IMTs in the data frames
# ---------------------------------------------------------------------
for ndf in self.dataframes:
sdf = getattr(self, ndf).df
for myimt in self.imt_out_set:
if type(self.mu_H_yD[myimt]) is float:
mybias = sdf[myimt + "_pred_tau"] * self.mu_H_yD[myimt]
mybias_sig = np.sqrt(
sdf[myimt + "_pred_tau"] ** 2 * self.cov_HH_yD[myimt]
)
else:
mybias = sdf[myimt + "_pred_tau"] * self.mu_H_yD[myimt][0]
mybias_sig = np.sqrt(
sdf[myimt + "_pred_tau"] ** 2 * self.cov_HH_yD[myimt][0, 0]
)
sdf[myimt + "_bias"] = mybias.flatten()
sdf[myimt + "_bias_sigma"] = mybias_sig.flatten()
# ---------------------------------------------------------------------
# Add the station data. The stationlist object has the original
# data and produces a GeoJSON object (a dictionary, really), but
# we need to add peak values and flagging that has been done here.
# ---------------------------------------------------------------------
#
# First make a dictionary of distances
#
dist_dict = {"df1": {}, "df2": {}}
for ndf in self.dataframes:
dx = getattr(self, ndf).dx
for dm in get_distance_measures():
dm_arr = getattr(dx, dm, None)
if dm_arr is not None:
dist_dict[ndf][dm] = dm_arr
else:
continue
if dm in ("rrup", "rjb"):
dm_var = getattr(dx, dm + "_var", None)
if dm_var is not None:
dist_dict[ndf][dm + "_var"] = dm_var
else:
dist_dict[ndf][dm + "_var"] = np.zeros_like(dm_arr)
#
# Get the index for each station ID
#
sjdict = self.stations.getGeoJson()
sta_ix = {"df1": {}, "df2": {}}
for ndf in self.dataframes:
sdf = getattr(self, ndf).df
sta_ix[ndf] = dict(zip(sdf["id"], range(len(sdf["id"]))))
#
# Now go through the GeoJSON and add various properties and
# amps from the df_dict dictionaries
#
sjdict_copy = copy.copy(sjdict["features"])
for station in sjdict_copy:
if station["id"] in sta_ix["df1"]:
ndf = "df1"
station["properties"]["station_type"] = "seismic"
elif station["id"] in sta_ix["df2"]:
ndf = "df2"
station["properties"]["station_type"] = "macroseismic"
else:
# We're probably using --no_seismic or --no_macroseismic
if self.no_seismic or self.no_macroseismic:
sjdict["features"].remove(station)
continue
else:
raise ValueError(f"Unknown station {station['id']} in stationlist")
dfx = getattr(self, ndf)
sdf = dfx.df
six = sta_ix[ndf][station["id"]]
#
# Set the 'intensity', 'pga', and 'pga' peak properties
#
if (
"MMI" in sdf
and not sdf["MMI_outliers"][six]
and not np.isnan(sdf["MMI"][six])
):
station["properties"]["intensity"] = float(f"{sdf['MMI'][six]:.1f}")
station["properties"]["intensity_stddev"] = sdf["MMI_sd"][six]
if "MMI_nresp" in sdf:
station["properties"]["nresp"] = int(sdf["MMI_nresp"][six])
else:
station["properties"]["nresp"] = "null"
else:
station["properties"]["intensity"] = "null"
station["properties"]["intensity_stddev"] = "null"
station["properties"]["nresp"] = "null"
if (
"PGA" in sdf
and not sdf["PGA_outliers"][six]
and not np.isnan(sdf["PGA"][six])
and (ndf != "df1" or not sdf["flagged"][six])
):
station["properties"]["pga"] = _round_float(
np.exp(sdf["PGA"][six]) * 100, 4
)
else:
station["properties"]["pga"] = "null"
if (
"PGV" in sdf
and not sdf["PGV_outliers"][six]
and not np.isnan(sdf["PGV"][six])
and (ndf != "df1" or not sdf["flagged"][six])
):
station["properties"]["pgv"] = _round_float(np.exp(sdf["PGV"][six]), 4)
else:
station["properties"]["pgv"] = "null"
#
# Add vs30
#
station["properties"]["vs30"] = _round_float(dfx.sx.vs30[six], 2)
#
# Add the predictions so we can plot residuals
#
station["properties"]["predictions"] = []
for key in sdf.keys():
if not key.endswith("_pred"):
continue
myamp = sdf[key][six]
myamp_rock = sdf[key + "_rock"][six]
myamp_soil = sdf[key + "_soil"][six]
tau_str = "ln_tau"
phi_str = "ln_phi"
sigma_str = "ln_sigma"
sigma_str_rock = "ln_sigma_rock"
sigma_str_soil = "ln_sigma_soil"
bias_str = "ln_bias"
bias_sigma_str = "ln_bias_sigma"
if key.startswith("PGV"):
value = np.exp(myamp)
value_rock = np.exp(myamp_rock)
value_soil = np.exp(myamp_soil)
units = "cm/s"
elif key.startswith("MMI"):
value = myamp
value_rock = myamp_rock
value_soil = myamp_soil
units = "intensity"
tau_str = "tau"
phi_str = "phi"
sigma_str = "sigma"
sigma_str_rock = "sigma_rock"
sigma_str_soil = "sigma_soil"
bias_str = "bias"
bias_sigma_str = "bias_sigma"
else:
value = np.exp(myamp) * 100
value_rock = np.exp(myamp_rock) * 100
value_soil = np.exp(myamp_soil) * 100
units = "%g"
if self.gmpe_total_sd_only:
mytau = 0
else:
mytau = sdf[key + "_tau"][six]
myphi = sdf[key + "_phi"][six]
mysigma = np.sqrt(mytau ** 2 + myphi ** 2)
mysigma_rock = sdf[key + "_sigma_rock"][six]
mysigma_soil = sdf[key + "_sigma_soil"][six]
imt_name = key.lower().replace("_pred", "")
if imt_name.upper() in self.imt_out_set:
mybias = sdf[imt_name.upper() + "_bias"][six]
mybias_sigma = sdf[imt_name.upper() + "_bias_sigma"][six]
else:
mybias = "null"
mybias_sigma = "null"
station["properties"]["predictions"].append(
{
"name": imt_name,
"value": _round_float(value, 4),
"value_rock": _round_float(value_rock, 4),
"value_soil": _round_float(value_soil, 4),
"units": units,
tau_str: _round_float(mytau, 4),
phi_str: _round_float(myphi, 4),
sigma_str: _round_float(mysigma, 4),
sigma_str_rock: _round_float(mysigma_rock, 4),
sigma_str_soil: _round_float(mysigma_soil, 4),
bias_str: _round_float(mybias, 4),
bias_sigma_str: _round_float(mybias_sigma, 4),
}
)
#
# For df1 stations, add the MMIs comverted from PGM
#
if ndf == "df1":
station["properties"]["mmi_from_pgm"] = []
for myimt in getattr(self, ndf).imts:
if myimt == "MMI":
continue
dimtstr = "derived_MMI_from_" + myimt
if dimtstr not in sdf:
continue
imt_name = myimt.lower()
myamp = sdf[dimtstr][six]
mysd = sdf[dimtstr + "_sd"][six]
if np.isnan(myamp):
myamp = "null"
mysd = "null"
flag = "0"
else:
if sdf[myimt + "_outliers"][six] == 1:
flag = "Outlier"
else:
flag = "0"
station["properties"]["mmi_from_pgm"].append(
{
"name": imt_name,
"value": _round_float(myamp, 2),
"sigma": _round_float(mysd, 2),
"flag": flag,
}
)
#
# For df2 stations, add the PGMs converted from MMI
#
if ndf == "df2":
station["properties"]["pgm_from_mmi"] = []
for myimt in getattr(self, ndf).imts:
if myimt == "MMI":
continue
imt_name = myimt.lower()
myamp = sdf[myimt][six]
mysd = sdf[myimt + "_sd"][six]
if myimt == "PGV":
value = np.exp(myamp)
units = "cm/s"
else:
value = np.exp(myamp) * 100
units = "%g"
if np.isnan(value):
value = "null"
mysd = "null"
flag = "0"
else:
if sdf[myimt + "_outliers"][six] == 1:
flag = "Outlier"
else:
flag = "0"
station["properties"]["pgm_from_mmi"].append(
{
"name": imt_name,
"value": _round_float(value, 4),
"units": units,
"ln_sigma": _round_float(mysd, 4),
"flag": flag,
}
)
#
# Set the generic distance property (this is rrup)
#
station["properties"]["distance"] = _round_float(sdf["rrup"][six], 3)
station["properties"]["distance_stddev"] = _round_float(
np.sqrt(sdf["rrup_var"][six]), 3
)
#
# Set the specific distances properties
#
station["properties"]["distances"] = {}
for dm, dm_arr in dist_dict[ndf].items():
station["properties"]["distances"][dm] = _round_float(dm_arr[six], 3)
#
# Set the outlier flags
#
mflag = "0"
if ndf == "df1" and sdf["flagged"][six]:
mflag = "ManuallyFlagged"
for channel in station["properties"]["channels"]:
for amp in channel["amplitudes"]:
if amp["flag"] != "0":
amp["flag"] += "," + mflag
else:
amp["flag"] = mflag
Name = amp["name"].upper()
if sdf[Name + "_outliers"][six]:
if amp["flag"] == "0":
amp["flag"] = "Outlier"
else:
amp["flag"] += ",Outlier"
sjdict["metadata"] = {"eventid": self._eventid}
return sjdict
def _storeGriddedData(self, oc):
"""
Store gridded data in the output container.
"""
metadata = {}
min_long = self.W
max_long = self.E
if self.rx.hypo_lon < 0:
if min_long > 0: # Crossing the 180 from the negative side
min_long = min_long - 360
else:
if max_long < 0: # Crossing the 180 from the positive side
max_long = max_long + 360
metadata["xmin"] = min_long
metadata["xmax"] = max_long
metadata["ymin"] = self.S
metadata["ymax"] = self.N
metadata["nx"] = self.smnx
metadata["ny"] = self.smny
metadata["dx"] = self.smdx
metadata["dy"] = self.smdy
#
# Put the Vs30 grid in the output container
#
_, units, digits = _get_layer_info("vs30")
metadata["units"] = units
metadata["digits"] = digits
oc.setArray([], "vs30", self.sx_out.vs30, metadata=metadata)
#
# Now do the distance grids
#
metadata["units"] = "km"
metadata["digits"] = 4
for dm in get_distance_measures():
dm_arr = getattr(self.dx_out, dm, None)
if dm_arr is None:
continue
oc.setArray(["distances"], dm, dm_arr, metadata=metadata)
if dm in ("rrup", "rjb"):
dm_var = getattr(self.dx_out, dm + "_var", None)
if dm_var is None:
dm_var = np.zeros_like(dm_arr)
dm_std = np.sqrt(dm_var)
oc.setArray(["distances"], dm + "_std", dm_std, metadata=metadata)
#
# Output the data and uncertainty grids
#
component = self.config["interp"]["component"]
std_metadata = copy.copy(metadata)
for key, value in self.outgrid.items():
# set the data grid
_, units, digits = _get_layer_info(key)
metadata["units"] = units
metadata["digits"] = digits
# set the mean and uncertainty grids
std_layername, units, digits = _get_layer_info(key + "_sd")
std_metadata["units"] = units
std_metadata["digits"] = digits
oc.setIMTGrids(
key,
component,
value,
metadata,
self.outsd[key],
std_metadata,
self.outphi[key],
self.outtau[key],
)
if key == "MMI":
sub_groups = ["imts", component, key]
oc.setArray(sub_groups, "add_uncertainty", self.MMI_add_uncertainty)
oc.setArray(sub_groups, "Sigma_HH_YD", self.MMI_Sigma_HH_YD)
oc.setArray(sub_groups, "C", self.MMI_C)
oc.setArray(sub_groups, "sta_per_ix", self.MMI_sta_per_ix)
oc.setArray(sub_groups, "sta_phi", self.sta_phi["MMI"])
oc.setArray(sub_groups, "sta_lons_rad", self.sta_lons_rad["MMI"])
oc.setArray(sub_groups, "sta_lats_rad", self.sta_lats_rad["MMI"])
#
# Directivity
#
del metadata["units"]
del metadata["digits"]
if self.do_directivity is True:
for k, v in self.dir_output.items():
imtstr, _, _ = _get_layer_info(k)
oc.setArray(["directivity"], imtstr, v, metadata=metadata)
def _storePointData(self, oc):
"""
Store point data in the output container.
"""
#
# Store the Vs30
#
vs30_metadata = {"units": "m/s", "digits": 4}
oc.setArray([], "vs30", self.sx_out.vs30.flatten(), metadata=vs30_metadata)
#
# Store the distances
#
distance_metadata = {"units": "km", "digits": 4}
for dm in get_distance_measures():
dm_arr = getattr(self.dx_out, dm, None)
if dm_arr is None:
continue
oc.setArray(["distances"], dm, dm_arr.flatten(), metadata=distance_metadata)
if dm in ("rrup", "rjb"):
dm_var = getattr(self.dx_out, dm + "_var", None)
if dm_var is None:
dm_var = np.zeros_like(dm_arr)
oc.setArray(
["distances"],
dm + "_std",
np.sqrt(dm_var).flatten(),
metadata=distance_metadata,
)
#
# Store the IMTs
#
ascii_ids = np.array(
[np.char.encode(x, encoding="ascii") for x in self.idents]
).flatten()
component = self.config["interp"]["component"]
for key, value in self.outgrid.items():
# set the data grid
_, units, digits = _get_layer_info(key)
mean_metadata = {"units": units, "digits": digits}
# set the uncertainty grid
std_layername, units, digits = _get_layer_info(key + "_sd")
std_metadata = {"units": units, "digits": digits}
oc.setIMTArrays(
key,
component,
self.sx_out.lons.flatten(),
self.sx_out.lats.flatten(),
ascii_ids,
value.flatten(),
mean_metadata,
self.outsd[key].flatten(),
std_metadata,
self.outphi[key].flatten(),
self.outtau[key].flatten(),
)
def _storeAttenuationData(self, oc):
"""
Output arrays of rock and soil attenuation curves
"""
for dist_type in ["repi", "rhypo", "rrup", "rjb"]:
oc.setArray(
["attenuation", "distances"],
dist_type,
getattr(self.atten_dx, dist_type, None),
)
imtstrs = self.atten_rock_mean.keys()
for imtstr in imtstrs:
oc.setArray(
["attenuation", "rock", imtstr], "mean", self.atten_rock_mean[imtstr]
)
oc.setArray(
["attenuation", "soil", imtstr], "mean", self.atten_soil_mean[imtstr]
)
oc.setArray(
["attenuation", "rock", imtstr], "std", self.atten_rock_sd[imtstr]
)
oc.setArray(
["attenuation", "soil", imtstr], "std", self.atten_soil_sd[imtstr]
)
return
#
# Helper function to call get_mean_and_stddevs for the
# appropriate object given the IMT and describe the
# MultiGMPE structure.
#
def _gmas(self, gmpe, sx, dx, oqimt, apply_gafs):
"""
This is a helper function to call get_mean_and_stddevs for the
appropriate object given the IMT.
Args:
gmpe:
A GMPE instance.
sx:
Sites context.
dx:
Distance context.
oqimt:
List of OpenQuake IMTs.
apply_gafs (boolean):
Whether or not to apply the generic
amplification factors to the GMPE output.
Returns:
tuple: Tuple of two items:
- Numpy array of means,
- List of numpy array of standard deviations corresponding to
therequested stddev_types.
"""
if "MMI" in oqimt:
pe = self.ipe
sd_types = self.ipe_stddev_types
if not self.use_simulations:
if not hasattr(self, "_info"):
self._info = {"multigmpe": {}}
else:
self._info = {}
else:
pe = gmpe
sd_types = self.gmpe_stddev_types
if not self.use_simulations:
# --------------------------------------------------------------------
# Describe the MultiGMPE
# --------------------------------------------------------------------
if not hasattr(self, "_info"):
self._info = {"multigmpe": {}}
self._info["multigmpe"][str(oqimt)] = gmpe.__describe__()
else:
self._info = {}
mean, stddevs = pe.get_mean_and_stddevs(
copy.deepcopy(sx), self.rx, copy.deepcopy(dx), oqimt, sd_types
)
# Include generic amp factors?
if apply_gafs:
gafs = get_generic_amp_factors(sx, str(oqimt))
if gafs is not None:
mean += gafs
# Does directivity apply to this imt?
row_pers = Rowshandel2013.getPeriods()
if oqimt.string == "PGA":
imt_ok = False
elif oqimt.string == "PGV" or oqimt.string == "MMI":
tper = 1.0
imt_ok = True
elif "SA" in oqimt.string:
tper = oqimt.period
min_per = np.min(row_pers)
max_per = np.max(row_pers)
if (tper >= min_per) and (tper <= max_per):
imt_ok = True
else:
imt_ok = False
else:
imt_ok = False
# Did we calculate directivity?
calc_dir = self.do_directivity
if calc_dir and imt_ok:
# Use distance context to figure out which directivity result
# we need to use.
all_fd = None
for dirdf, tmpdx in self.dir_results:
if dx == tmpdx:
all_fd = dirdf.getFd()
break
if all_fd is None:
raise RuntimeError(
"Failed to detect dataframe for directivity calculation."
)
# Does oqimt match any of those periods?
if tper in row_pers:
fd = all_fd[row_pers.index(tper)]
else:
# Log(period) interpolation.
apers = np.array(row_pers)
per_below = np.max(apers[apers < tper])
per_above = np.min(apers[apers > tper])
fd_below = all_fd[row_pers.index(per_below)]
fd_above = all_fd[row_pers.index(per_above)]
x1 = np.log(per_below)
x2 = np.log(per_above)
fd = fd_below + (np.log(tper) - x1) * (fd_above - fd_below) / (x2 - x1)
# Reshape to match the mean
fd = fd.reshape(mean.shape)
# Store the interpolated grid
imtstr = str(oqimt)
self.dir_output[imtstr] = fd
if oqimt.string == "MMI":
mean *= np.exp(fd)
else:
mean += fd
return mean, stddevs
def _adjustResolution(self):
"""
This is a helper function to adjust the resolution to be under
the maximum value specified in the config.
"""
# We want to only use resolutions that are multiples of 1 minute or
# an integer division of 1 minute.
one_minute = 1 / 60
multiples = np.arange(1, 11)
divisions = 1 / multiples
factors = np.sort(np.unique(np.concatenate((divisions, multiples))))
ok_res = one_minute * factors
latspan = self.N - self.S
# Deal with possible 180 longitude disontinuity
if self.E > self.W:
lonspan = self.E - self.W
else:
xmax = self.E + 360
lonspan = xmax - self.W
nx = np.floor(lonspan / self.smdx) + 1
ny = np.floor(latspan / self.smdy) + 1
ngrid = nx * ny
nmax = self.nmax
if ngrid > nmax:
self.logger.info(
"Extent and resolution of shakemap results in "
"too many grid points. Adjusting resolution..."
)
self.logger.info(f"Longitude span: {lonspan:f}")
self.logger.info(f"Latitude span: {latspan:f}")
self.logger.info(f"Current dx: {self.smdx:f}")
self.logger.info(f"Current dy: {self.smdy:f}")
self.logger.info("Current number of grid points: %i" % ngrid)
self.logger.info("Max grid points allowed: %i" % nmax)
target_res = (
-(latspan + lonspan)
- np.sqrt(
latspan ** 2 + lonspan ** 2 + 2 * latspan * lonspan * (2 * nmax - 1)
)
) / (2 * (1 - nmax))
if np.any(ok_res > target_res):
sel_res = np.min(ok_res[ok_res > target_res])
else:
sel_res = np.max(ok_res)
self.smdx = sel_res
self.smdy = sel_res
self.logger.info(f"Updated dx: {self.smdx:f}")
self.logger.info(f"Updatd dy: {self.smdy:f}")
nx = np.floor(lonspan / self.smdx) + 1
ny = np.floor(latspan / self.smdy) + 1
self.logger.info("Updated number of grid points: %i" % (nx * ny))
def _round_float(val, digits):
if ma.is_masked(val) or val == "--" or val == "null" or np.isnan(val):
return None
return float(("%." + str(digits) + "f") % val)
def _string_round(val, digits):
if ma.is_masked(val) or val == "--" or val == "null" or np.isnan(val):
return None
return str(_round_float(val, digits))
def _get_period_arrays(*args):
"""
Return 1) a sorted array of the periods represented by the IMT list(s)
in the input, and 2) a dictionary of the IMTs and their indices.
Args:
*args (list): One or more lists of IMTs.
Returns:
array, dict: Numpy array of the (sorted) periods represented by the
IMTs in the input list(s), and a dictionary of the IMTs and their
indices into the period array.
"""
imt_per = set()
imt_per_ix = {}
for imt_list in args:
if imt_list is None:
continue
for imtstr in imt_list:
if imtstr == "PGA":
period = 0.01
elif imtstr in ("PGV", "MMI"):
period = 1.0
else:
period = _get_period_from_imt(imtstr)
imt_per.add(period)
imt_per_ix[imtstr] = period
imt_per = sorted(imt_per)
for imtstr, period in imt_per_ix.items():
imt_per_ix[imtstr] = imt_per.index(period)
return np.array(imt_per), imt_per_ix
def _get_period_from_imt(imtstr):
"""
Return a float representing the period of the SA IMT in the input.
Args:
imtstr (str): A string representing an SA IMT.
Returns:
float: The period of the SA IMT as a float.
"""
return float(imtstr.replace("SA(", "").replace(")", ""))
def _get_nearest_imts(imtstr, imtset, saset):
"""
Return the input IMT, or it's closest surrogarte (or bracket) found
in imtset.
Args:
imtstr (str): An (OQ-style) IMT string.
imtset (list): A list of IMTs to search for imtstr or its closest
surrogate (or bracket).
saset (list): The SA IMTs found in imtset.
Returns:
tuple: The IMT, it's closest surrogate, or a bracket of SAs with
periods on either side of the IMT's period, from the IMTs in intset.
"""
if imtstr in imtset:
return (imtstr,)
#
# If we're here, then we know that IMT isn't in the inputs. Try
# some alternatives.
#
if imtstr == "PGA":
#
# Use the highest frequency in the inputs
#
if len(saset):
return (sorted(saset, key=_get_period_from_imt)[0],)
else:
return ()
elif imtstr == "PGV":
#
# PGV has no surrogate
#
return ()
elif imtstr == "MMI":
#
# MMI has no surrogate
#
return ()
elif imtstr.startswith("SA("):
#
# We know the actual IMT isn't here, so get the bracket
#
return _get_sa_bracket(imtstr, saset)
else:
raise ValueError(f"Unknown IMT {imtstr} in get_imt_bracket")
def _get_sa_bracket(myimt, saset):
"""
For a given SA IMT, look through the input SAs and return a tuple of
a) a pair of IMT strings representing the periods bracketing the given
period; or b) the single IMT representing the first or last period in
the input list if the given period is off the end of the list.
Args:
myper (float): The period to search for in the input lists.
saset (list): A list of SA IMTs.
Returns:
tuple: One or two strings representing the IMTs closest to or
bracketing the input IMT.
"""
if not len(saset):
return ()
#
# Stick the target IMT into a copy of the list of SAs, then sort
# the list by period.
#
ss = saset.copy()
ss.append(myimt)
tmplist = sorted(ss, key=_get_period_from_imt)
nimt = len(tmplist)
#
# Get the index of the target IMT in the sorted list
#
myix = tmplist.index(myimt)
#
# If the target IMT is off the end of the list, return the
# appropriate endpoint; else return the pair of IMTs that
# bracket the target.
#
if myix == 0:
return (tmplist[1],)
elif myix == nimt - 1:
return (tmplist[-2],)
else:
return (tmplist[myix - 1], tmplist[myix + 1])
def _get_imt_lists(df):
"""
Given a data frame, return a list of lists of valid IMTS for
each station in the dataframe; also return a list of the valid
SA IMTs for each station.
Args:
df (DataFrame): A DataFrame.
Returns:
list, list: Two lists of lists: each list contains lists
corresponding to the stations in the data frame: the first
list contains all of the valid IMTs for that station, the
second list contains just the valid SA IMTs for the station.
"""
imtlist = []
salist = []
nlist = np.size(df.df["lon"])
for ix in range(nlist):
valid_imts = []
sa_imts = []
if "flagged" not in df.df or not df.df["flagged"][ix]:
for this_imt in df.imts:
if (
not np.isnan(df.df[this_imt + "_residual"][ix])
and not df.df[this_imt + "_outliers"][ix]
):
valid_imts.append(this_imt)
if this_imt.startswith("SA("):
sa_imts.append(this_imt)
imtlist.append(valid_imts)
salist.append(sa_imts)
return imtlist, salist
def _get_map_grade(do_grid, outsd, psd, moutgrid):
"""
Computes a 'grade' for the map. Essentially looks at the ratio of
the computed PGA uncertainty to the predicted PGA uncertainty for
the area inside the MMI 6 contour. If the maximum MMI is less than
6, or the map is not a grid, the grade and mean ratio are set to '--'.
Args:
do_grid (bool): Is the map a grid (True) or a list of points
(False)?
outsd (dict): A dictionary of computed uncertainty arrays.
psd (dict): A dictionary of predicted uncertainty arrays.
moutgrid (dict): A dictionary of landmasked output ground
motion arrays.
Returns:
tuple: The mean uncertainty ratio and the letter grade.
"""
mean_rat = "--"
mygrade = "--"
if not do_grid or "PGA" not in outsd or "PGA" not in psd or "MMI" not in moutgrid:
return mean_rat, mygrade
sd_rat = outsd["PGA"] / psd["PGA"]
mmimasked = ma.masked_less(moutgrid["MMI"], 6.0)
mpgasd_rat = ma.masked_array(sd_rat, mask=mmimasked.mask)
if not np.all(mpgasd_rat.mask):
gvals = [0.96, 0.98, 1.05, 1.25]
grades = ["A", "B", "C", "D", "F"]
mean_rat = mpgasd_rat.mean()
for ix, val in enumerate(gvals):
if mean_rat <= val:
mygrade = grades[ix]
break
if mygrade == "--":
mygrade = "F"
return mean_rat, mygrade
def _get_layer_info(layer):
"""
We need a way to get units information about intensity measure types
and translate between OpenQuake naming convention and ShakeMap grid naming
convention.
Args:
layer (str): ShakeMap grid name.
Returns:
tuple: Tuple including:
- OpenQuake naming convention,
- units,
- significant digits.
"""
layer_out = layer
layer_units = ""
layer_digits = 4 # number of significant digits
if layer.endswith("_sd"):
layer_out = oq_to_file(layer.replace("_sd", ""))
layer_out = layer_out + "_sd"
else:
layer_out = oq_to_file(layer)
if layer.startswith("SA"):
layer_units = "ln(g)"
elif layer.startswith("PGA"):
layer_units = "ln(g)"
elif layer.startswith("PGV"):
layer_units = "ln(cm/s)"
elif layer.startswith("MMI"):
layer_units = "intensity"
layer_digits = 2
elif layer.startswith("vs30"):
layer_units = "m/s"
else:
raise ValueError(f"Unknown layer type: {layer}")
return (layer_out, layer_units, layer_digits)