Source code for shakemap.coremods.plotregr

# stdlib
import os.path
import concurrent.futures as cf
import json

# third party
from configobj import ConfigObj
import matplotlib.pyplot as plt
import numpy as np

# neic imports
from import ShakeMapOutputContainer
from openquake.hazardlib import imt

# local imports
from shakemap.utils.config import (
from .base import CoreModule, Contents
from shakelib.utils.imt_string import oq_to_file

[docs]class PlotRegr(CoreModule): """ plotregr -- Plot the regression curves from a data file """ command_name = "plotregr" targets = [r"products/.*_regr\.png"] dependencies = [("products/shake_result.hdf", True)] def __init__(self, eventid): super(PlotRegr, self).__init__(eventid) self.contents = Contents("Regression Plots", "regression", eventid)
[docs] def execute(self): """ Raises: NotADirectoryError: When the event data directory does not exist. FileNotFoundError: When the the shake_result HDF file does not exist. """ install_path, data_path = get_config_paths() datadir = os.path.join(data_path, self._eventid, "current", "products") if not os.path.isdir(datadir): raise NotADirectoryError(f"{datadir} is not a valid directory.") datafile = os.path.join(datadir, "shake_result.hdf") if not os.path.isfile(datafile): raise FileNotFoundError(f"{datafile} does not exist.") # Open the ShakeMapOutputContainer and extract the data oc = ShakeMapOutputContainer.load(datafile) if oc.getDataType() != "grid": raise NotImplementedError( "plotregr module can only operate on " "gridded data not sets of points" ) # get the path to the products.conf file, load the config config_file = os.path.join(install_path, "config", "products.conf") spec_file = get_configspec("products") validator = get_custom_validator() config = ConfigObj(config_file, configspec=spec_file) results = config.validate(validator) if not isinstance(results, bool) or not results: config_error(config, results) # If mapping runs in parallel, then we want this module too, as well. # Otherwise we get weird errors from matplotlib max_workers = config["products"]["mapping"]["max_workers"] components = oc.getComponents() for component in components: # # Cheating here a bit by assuming that the IMTs are the same # as the regression IMTs # rockgrid = {} soilgrid = {} rocksd = {} soilsd = {} imtlist = oc.getIMTs(component) for myimt in imtlist: rockgrid[myimt], _ = oc.getArray(["attenuation", "rock", myimt], "mean") soilgrid[myimt], _ = oc.getArray(["attenuation", "soil", myimt], "mean") rocksd[myimt], _ = oc.getArray(["attenuation", "rock", myimt], "std") soilsd[myimt], _ = oc.getArray(["attenuation", "soil", myimt], "std") distances, _ = oc.getArray(["attenuation", "distances"], "rrup") stations = oc.getStationDict() if component == "GREATER_OF_TWO_HORIZONTAL": ctype = "" cfile = "" else: ctype = f" {component}" cfile = f"_{component}" # # Make plots # alist = [] for myimt in imtlist: a = { "myimt": myimt, "rockgrid": rockgrid, "soilgrid": soilgrid, "rocksd": rocksd, "soilsd": soilsd, "stations": stations, "distances": distances, "eventid": self._eventid, "datadir": datadir, "component": cfile, } alist.append(a) if myimt == "MMI": self.contents.addFile( "miRegr", "Intensity Regression", f"Regression plot of macroseismic intensity{ctype}.", f"mmi_regr{cfile}.png", "image/png", ) elif myimt == "PGA": self.contents.addFile( "pgaRegr", "PGA Regression", "Regression plot of peak " "ground acceleration (%%g)%s." % ctype, f"pga_regr{cfile}.png", "image/png", ) elif myimt == "PGV": self.contents.addFile( "pgvRegr", "PGV Regression", f"Regression plot of peak ground velocity (cm/s){ctype}.", f"pgv_regr{cfile}.png", "image/png", ) else: oqimt = imt.from_string(myimt) period = str(oqimt.period) filebase = oq_to_file(myimt) psacap = ( "Regression plot of " + period + " sec 5%% damped pseudo-spectral " "acceleration(%%g)%s." % ctype ) self.contents.addFile( filebase + "Regr", "PSA " + period + " sec Regression", psacap, filebase + f"_regr{cfile}.png", "image/png", ) if max_workers > 0: with cf.ProcessPoolExecutor(max_workers=max_workers) as ex: results =, alist) list(results) else: for adict in alist: make_plots(adict) # # Make attenuation_curves.json # jdict = {"eventid": self._eventid} jdict["gmpe"] = {} for site in ["soil", "rock"]: jdict["gmpe"][site] = {} for myimt in imtlist: jdict["gmpe"][site][myimt] = {} jdict["gmpe"][site][myimt]["mean"] = ( oc.getArray(["attenuation", site, myimt], "mean")[0] .round(decimals=5) .tolist() ) jdict["gmpe"][site][myimt]["stddev"] = ( oc.getArray(["attenuation", site, myimt], "std")[0] .round(decimals=5) .tolist() ) jdict["distances"] = {} for dtype in ["repi", "rhypo", "rjb", "rrup"]: jdict["distances"][dtype] = ( oc.getArray(["attenuation", "distances"], dtype)[0] .round(decimals=5) .tolist() ) jdict["mean_bias"] = {} info = oc.getMetadata() for myimt in imtlist: jdict["mean_bias"][myimt] = info["output"]["ground_motions"][myimt][ "bias" ] jstring = json.dumps(jdict, allow_nan=False) jfile = os.path.join(datadir, f"attenuation_curves{cfile}.json") f = open(jfile, "wt") f.write(jstring) f.close() oc.close() cap = "Nominal attenuation curves" self.contents.addFile( "attenuationCurves", "Attenuation Curves", cap, jfile, "application/json", )
[docs]def make_plots(adict): myimt = adict["myimt"] eventid = adict["eventid"] datadir = adict["datadir"] rockgrid = adict["rockgrid"] soilgrid = adict["soilgrid"] rocksd = adict["rocksd"] soilsd = adict["soilsd"] stations = adict["stations"] distances = adict["distances"] cfile = adict["component"] plt.figure(figsize=(10, 10)) plt.semilogx(distances, rockgrid[myimt], "r", label="rock") plt.semilogx(distances, soilgrid[myimt], "g", label="soil") plt.semilogx( distances, rockgrid[myimt] + rocksd[myimt], "r--", label="rock +/- stddev" ) plt.semilogx(distances, rockgrid[myimt] - rocksd[myimt], "r--") plt.semilogx( distances, soilgrid[myimt] + soilsd[myimt], "g--", label="soil +/- stddev" ) plt.semilogx(distances, soilgrid[myimt] - soilsd[myimt], "g--") for station in stations["features"]: dist = station["properties"]["distances"]["rrup"] if dist > distances[-1]: continue if station["properties"]["station_type"] == "seismic": symbol = "^" if myimt == "MMI": value = station["properties"]["intensity"] if value != "null": plt.semilogx(dist, value, symbol + "k", mfc="none") else: imtstr = myimt.lower() value = np.nan for chan in station["properties"]["channels"]: if chan["name"].endswith("Z") or chan["name"].endswith("U"): continue for amp in chan["amplitudes"]: if amp["name"] != imtstr: continue if amp["flag"] != "" and amp["flag"] != "0": break if amp["value"] is None or amp["value"] == "null": break if isinstance(amp["value"], str): thisamp = float(amp["value"]) else: thisamp = amp["value"] if thisamp <= 0: break if myimt == "PGV": tmpval = np.log(thisamp) else: tmpval = np.log(thisamp / 100.0) if np.isnan(value) or tmpval > value: value = tmpval break if not np.isnan(value): plt.semilogx(dist, value, symbol + "k", mfc="none") else: symbol = "o" if myimt == "MMI": amp = station["properties"]["intensity"] flag = station["properties"]["intensity_flag"] if flag == "" or flag == "0": if amp is not None and amp != "null": if isinstance(amp, str): value = float(amp) else: value = amp plt.semilogx(dist, value, symbol + "k", mfc="none") else: imtstr = myimt.lower() for thing in station["properties"]["pgm_from_mmi"]: if thing["name"] != imtstr: continue amp = thing["value"] if amp is not None and amp != "null" and amp != 0: if myimt == "PGV": amp = np.log(amp) else: amp = np.log(amp / 100.0) plt.semilogx(dist, amp, symbol + "k", mfc="none") break plt.title(eventid + ": " + myimt + " mean") plt.xlabel("Rrup (km)") if myimt == "MMI": plt.ylabel("MMI") elif myimt == "PGV": plt.ylabel("PGV ln(cm/s)") else: plt.ylabel(myimt + " ln(g)") plt.legend() fileimt = oq_to_file(myimt) pfile = os.path.join(datadir, fileimt + f"_regr{cfile}.png") plt.savefig(pfile) plt.close()