Source code for shakemap.utils.probs

# stdlib imports
import re

# third party imports
import numpy as np
import logging
from strec.subtype import SubductionSelector


[docs]def get_weights(origin, config): """Get list of GMPEs and their weights for a given earthquake. Args: origin (Origin object): ShakeMap Origin object, containing earthquake info. config (dict-like): Configuration information regarding earthquake type. Returns: tuple: Tuple with elements that are: - list of strings indicating the GMPEs selected for this earthquake. - ndarray (float) of GMPE weights. - Pandas series containing STREC output. """ tprobs, strec_results = get_probs(origin, config) gmpelist = [] weightlist = [] # remove all probabilities that are == 0 probs = {} for key, value in tprobs.items(): if value > 0.0: probs[key] = value all_keylist = list(probs.keys()) # let's have the code default to use the slab data if config["tectonic_regions"]["subduction"]: use_slab = config["tectonic_regions"]["subduction"]["use_slab"] else: use_slab = True for region, rdict in config["tectonic_regions"].items(): if (region == "subduction") and use_slab: if "crustal" in probs or "subduction_0" in probs: if "crustal" in probs: topkey = "crustal" else: topkey = "subduction_0" gmpelist += rdict["crustal"]["gmpe"] weightlist.append(probs[topkey]) if "interface" in probs or "subduction_1" in probs: if "interface" in probs: midkey = "interface" else: midkey = "subduction_1" gmpelist += rdict["interface"]["gmpe"] weightlist.append(probs[midkey]) if "intraslab" in probs or "subduction_2" in probs: if "intraslab" in probs: botkey = "intraslab" else: botkey = "subduction_2" gmpelist += rdict["intraslab"]["gmpe"] weightlist.append(probs[botkey]) else: pat = re.compile(region + "_") keylist = sorted(list(filter(pat.search, all_keylist))) if len(keylist): for key in keylist: weightlist.append(probs[key]) idx = int(key.split("_")[1]) gmpelist.append(rdict["gmpe"][idx]) weightlist = np.array(weightlist) logging.debug(f"gmpelist: {gmpelist}") logging.debug(f"weightlist: {weightlist}") gmmdict = {"gmpelist": gmpelist, "weightlist": weightlist} # # Here we get the region-specific ipe, gmice, and ccf. If they are # not specified in the config, we use None, and let the value # fall back to whatever is specified in the system config. # if strec_results["TectonicRegion"] == "Active": gmmdict["ipe"] = config["tectonic_regions"]["acr"].get("ipe", None) gmmdict["gmice"] = config["tectonic_regions"]["acr"].get("gmice", None) gmmdict["ccf"] = config["tectonic_regions"]["acr"].get("ccf", None) elif strec_results["TectonicRegion"] == "Stable": gmmdict["ipe"] = config["tectonic_regions"]["scr"].get("ipe", None) gmmdict["gmice"] = config["tectonic_regions"]["scr"].get("gmice", None) gmmdict["ccf"] = config["tectonic_regions"]["scr"].get("ccf", None) elif strec_results["TectonicRegion"] == "Subduction": gmmdict["ipe"] = config["tectonic_regions"]["subduction"].get("ipe", None) gmmdict["gmice"] = config["tectonic_regions"]["subduction"].get("gmice", None) gmmdict["ccf"] = config["tectonic_regions"]["subduction"].get("ccf", None) elif strec_results["TectonicRegion"] == "Volcanic": gmmdict["ipe"] = config["tectonic_regions"]["volcanic"].get("ipe", None) gmmdict["gmice"] = config["tectonic_regions"]["volcanic"].get("gmice", None) gmmdict["ccf"] = config["tectonic_regions"]["volcanic"].get("ccf", None) return gmmdict, strec_results
[docs]def get_probs(origin, config): """Calculate probabilities for each earthquake type. The results here contain probabilities that can be rolled up in many ways: - The probabilities of acr, scr, volcanic, and subduction should sum to one. - The probabilities of acr_X,scr_X,volcanic_X, crustal, interface and intraslab should sum to 1. - The probabilities of acr_X should sum to acr, and so on. Args: origin (Origin object): ShakeMap Origin object, containing earthquake info. config (dict-like): Configuration information regarding earthquake type. Returns: (dict, dict): Probabilities for each earthquake type, with fields: - acr Probability that the earthquake is in an active region. - acr_X Probability that the earthquake is in a depth layer of ACR, starting from the top. - scr Probability that the earthquake is in a stable region. - scr_X Probability that the earthquake is in a depth layer of SCR, starting from the top. - volcanic Probability that the earthquake is in a volcanic region. - volcanic_X Probability that the earthquake is in a depth layer of Volcanic, starting from the top. - subduction Probability that the earthquake is in a subduction zone. - crustal Probability that the earthquake is in the crust above an interface. - interface Probability that the earthquake is on the interface. - intraslab Probability that the earthquake is in the slab below interface. STREC results """ selector = SubductionSelector() lat, lon, depth, mag = origin.lat, origin.lon, origin.depth, origin.mag if origin.id is not None and not origin.id.startswith(origin.netid): eid = origin.netid + origin.id else: eid = origin.id tensor_params = None if hasattr(origin, "moment"): tensor_params = origin.moment strec_results = selector.getSubductionType( lat, lon, depth, eid, tensor_params=tensor_params ) region_probs = get_region_probs(eid, depth, strec_results, config) in_subduction = strec_results["TectonicRegion"] == "Subduction" above_slab = not np.isnan(strec_results["SlabModelDepth"]) use_slab = config["tectonic_regions"]["subduction"]["use_slab"] if use_slab: if in_subduction: subduction_probs = get_subduction_probs( strec_results, depth, mag, config, above_slab ) for key, value in subduction_probs.items(): subduction_probs[key] = value * region_probs["subduction"] # If we are in a subduction zone then we don't want the # keys for subduction_0, 1, 2 (which are the generic vertical # subduction subtypes that are not informed by the slab model because # it isn't available) if "subduction_0" in region_probs: del region_probs["subduction_0"] if "subduction_1" in region_probs: del region_probs["subduction_1"] if "subduction_2" in region_probs: del region_probs["subduction_2"] else: # If we are NOT in a subduction zone we may or may not need subduction # probabilities (depending on distance and the configured taper). But # either way, we will not have access to the slab model and so we have # to use the generic vertical subtypes subduction_probs = { "crustal": region_probs["subduction_0"], "interface": region_probs["subduction_1"], "intraslab": region_probs["subduction_2"], } region_probs.update(subduction_probs) else: logging.info('"use_slab" is False so no slab used in finding GMPE ' "weights.") return (region_probs, strec_results)
[docs]def get_region_probs(eid, depth, strec_results, config): """ Calculate the regional probabilities (not including subduction interface etc). Args: eid (str): Earthquake ID (i.e., us1000cdn0) depth (float): Depth of earthquake. strec_results (Series): Pandas series containing STREC output. config (dict-like): Configuration information regarding earthquake type. Returns: dict: Probabilities for each earthquake type, with fields: - acr Probability that the earthquake is in an active region. - acr_X Probability that the earthquake is in a depth layer of ACR, starting from the top. - scr Probability that the earthquake is in a stable region. - scr_X Probability that the earthquake is in a depth layer of SCR, starting from the top. - volcanic Probability that the earthquake is in a volcanic region. - volcanic_X Probability that the earthquake is in a depth layer of Volcanic, starting from the top. - subduction Probability that the earthquake is in a subduction zone. """ region_probs = {} region_mapping = { "scr": "DistanceToStable", "acr": "DistanceToActive", "volcanic": "DistanceToVolcanic", "subduction": "DistanceToSubduction", } layer_probs = {} for region, rdict in config["tectonic_regions"].items(): distance = strec_results[region_mapping[region]] # If we're considering subduction zones but not IN a subduction zone x1 = 0.0 p2 = 0.0 p1 = 1.0 x2 = rdict["horizontal_buffer"] region_prob = get_probability(distance, x1, p1, x2, p2) region_probs[region] = region_prob region_layer_probs = {} # now do the weights for each depth zone for i in range(0, len(rdict["min_depth"])): # First, taper from -1 to 0 for the lower end x1 = rdict["min_depth"][i] - rdict["vertical_buffer"] / 2 p1 = -1.0 x2 = rdict["min_depth"][i] p2 = 0.0 p_layer1 = get_probability(depth, x1, p1, x2, p2) # Then, taper from 0 to -1 for the higher end x1 = rdict["max_depth"][i] p1 = 0.0 x2 = rdict["max_depth"][i] + rdict["vertical_buffer"] / 2 p2 = -1.0 p_layer2 = get_probability(depth, x1, p1, x2, p2) # Lastly, combine to get probability curve for layer i region_layer_probs["%s_%i" % (region, i)] = 1 + p_layer1 + p_layer2 probsum = sum([lp for lp in list(region_layer_probs.values())]) if probsum > 0: for key, value in region_layer_probs.items(): region_layer_probs[key] = value / probsum # running list of all region layer probabilities layer_probs.update(region_layer_probs) # divide the weights by the total weight probsum = sum(list(region_probs.values())) for region, prob in region_probs.items(): region_probs[region] = prob / probsum pat = re.compile(region) layerkeys = list(layer_probs.keys()) reg_layers = list(filter(pat.search, layerkeys)) for reg_layer in reg_layers: layer_probs[reg_layer] = layer_probs[reg_layer] * region_probs[region] region_probs.update(layer_probs) return region_probs
[docs]def get_subduction_probs(strec_results, depth, mag, config, above_slab): """Get probabilities of earthquake being crustal, interface or intraslab. Args: strec_results (Series): Pandas series containing STREC output. depth (float): Depth of earthquake. mag (float): Earthquake magnitude. config (dict-like): Configuration information regarding earthquake type. above_slab (bool): Is earthquake above a defined slab? Returns: dict: Probabilities for each earthquake type, with fields: - crustal Probability that the earthquake is in the crust above an interface. - interface Probability that the earthquake is on the interface. - intraslab Probability that the earthquake is in the slab below interface. """ subcfg = config["subduction"] if above_slab: # the angle between moment tensor and slab kagan = strec_results["KaganAngle"] # can be nan # Depth to slab slab_depth = strec_results["SlabModelDepth"] # Error in depth to slab slab_depth_error = strec_results["SlabModelDepthUncertainty"] # what is the effective bottom of the interface zone? max_interface_depth = strec_results["SlabModelMaximumDepth"] # Calculate the probability of interface given the # (absolute value of) difference between hypocenter and depth to slab. dz = np.abs(depth - slab_depth) x1 = subcfg["p_int_hypo"]["x1"] + slab_depth_error x2 = subcfg["p_int_hypo"]["x2"] + slab_depth_error p1 = subcfg["p_int_hypo"]["p1"] p2 = subcfg["p_int_hypo"]["p2"] p_int_hypo = get_probability(dz, x1, p1, x2, p2) # Calculate probability of interface given Kagan's angle if np.isfinite(kagan): x1 = subcfg["p_int_kagan"]["x1"] x2 = subcfg["p_int_kagan"]["x2"] p1 = subcfg["p_int_kagan"]["p1"] p2 = subcfg["p_int_kagan"]["p2"] p_int_kagan = get_probability(kagan, x1, p1, x2, p2) else: p_int_kagan = subcfg["p_kagan_default"] # Calculate probability that event occurred above bottom of seismogenic # zone, given to us by the Slab model. x1 = max_interface_depth + subcfg["p_int_sz"]["x1"] x2 = max_interface_depth + subcfg["p_int_sz"]["x2"] p1 = subcfg["p_int_sz"]["p1"] p2 = subcfg["p_int_sz"]["p2"] p_int_sz = get_probability(depth, x1, p1, x2, p2) # Calculate combined probability of interface p_int = p_int_hypo * p_int_kagan * p_int_sz # Calculate probability that the earthquake lies above the slab # and is thus crustal. x1 = subcfg["p_crust_slab"]["x1"] x2 = subcfg["p_crust_slab"]["x2"] p1 = subcfg["p_crust_slab"]["p1"] p2 = subcfg["p_crust_slab"]["p2"] p_crust_slab = get_probability((depth - slab_depth), x1, p1, x2, p2) # Calculate probability that the earthquake lies within the crust x1 = subcfg["p_crust_hypo"]["x1"] x2 = subcfg["p_crust_hypo"]["x2"] p1 = subcfg["p_crust_hypo"]["p1"] p2 = subcfg["p_crust_hypo"]["p2"] p_crust_hypo = get_probability(depth, x1, p1, x2, p2) # Calculate probability of crustal p_crustal = (1 - p_int) * p_crust_slab * p_crust_hypo # Calculate probability of intraslab p_slab = 1 - (p_int + p_crustal) else: slab_depth = subcfg["default_slab_depth"] # Calculate the probability that an earthquake is interface # given magnitude x1 = subcfg["p_int_mag"]["x1"] p1 = subcfg["p_int_mag"]["p1"] x2 = subcfg["p_int_mag"]["x2"] p2 = subcfg["p_int_mag"]["p2"] p_int_mag = get_probability(mag, x1, p1, x2, p2) # Calculate the probability that the earthquake is # interface given depth (two part function). # upper portion of function x1 = subcfg["p_int_dep_no_slab_upper"]["x1"] p1 = subcfg["p_int_dep_no_slab_upper"]["p1"] x2 = subcfg["p_int_dep_no_slab_upper"]["x2"] p2 = subcfg["p_int_dep_no_slab_upper"]["p2"] p_int_depth_upper = get_probability(depth, x1, p1, x2, p2) # lower portion of function x1 = subcfg["p_int_dep_no_slab_lower"]["x1"] p1 = subcfg["p_int_dep_no_slab_lower"]["p1"] x2 = subcfg["p_int_dep_no_slab_lower"]["x2"] p2 = subcfg["p_int_dep_no_slab_lower"]["p2"] p_int_depth_lower = get_probability(depth, x1, p1, x2, p2) p_int_depth = p_int_depth_upper + p_int_depth_lower # This functional form is used so that the probability of interface # inflates and appraoches 1 as magnitude gets large, assuming # that the ramp function for p_int_mag is zero at small magnitudes # and 1 at large magnitudes. p_int = p_int_depth + (1 - p_int_depth) * p_int_mag if depth > slab_depth: p_crustal = 0.0 p_slab = 1 - p_int else: p_crustal = 1 - p_int p_slab = 0.0 probs = {"crustal": p_crustal, "interface": p_int, "intraslab": p_slab} return probs
[docs]def get_probability(x, x1, p1, x2, p2): """Calculate probability using a ramped function. The subsections and parameters below reflect a series of ramp functions we use to calculate various probabilities. :: p1 |----+ | \ | \ | \ p2 | +------- | +----------------- x1 x2 Args: x (float): Quantity for which we want corresponding probability. x1 (float): Minimum X value. p1 (float): Probability at or below minimum X value. x2 (float): Maximum X value. p2 (float): Probability at or below maximum X value. Returns: float: Probability at input x value. """ if x <= x1: prob = p1 elif x >= x2: prob = p2 else: slope = (p1 - p2) / (x1 - x2) intercept = p1 - slope * x1 prob = x * slope + intercept return prob