#!/usr/bin/env python
# stdlib imports
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.colors as colors
import json
import numpy as np
import os
from configobj import ConfigObj
import tempfile
from datetime import datetime, timedelta
# third party imports
import simplekml
from folium.utilities import mercator_transform
from gfail.utilities import parseConfigLayers, get_alert
from gfail.stats import computeStats
from mapio.shake import ShakeGrid
from impactutils.textformat.text import set_num_precision
# from impactutils.time.ancient_time import HistoricTime as ShakeDateTime
# import pytz
from gfail.utilities import loadlayers
# from gfail.utilities import is_grid_point_source
# temporary until mapio is updated
import warnings
warnings.filterwarnings("ignore")
# plt.switch_backend('agg')
DFCOLORS = [
[0.94, 0.94, 0.70, 0.7],
[0.90, 0.78, 0.18, 0.7],
[0.92, 0.45, 0.03, 0.7],
[0.75, 0.22, 0.36, 0.7],
[0.36, 0.16, 0.70, 0.7],
[0.12, 0.12, 0.39, 0.7],
]
DFBINS = [0.002, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5]
[docs]def hazdev(
maplayerlist,
configs,
shakemap,
outfolder=None,
alpha=0.7,
shakethreshtype="pga",
shakethresh=10.0,
prefLS="Nowicki Jessee and others (2018)",
prefLQ="Zhu and others (2017)",
pop_file=None,
defaultcolors=True,
point=True,
pager_alert="",
eventsource="",
eventsourcecode="",
createpngs=True,
gf_version=1,
pdlcall=False,
):
"""Create all files needed for product page creation
Assumes gfail has been run already with -w flag
Args:
maplayerlist (list): List of model outputs from gfail.
configs (list): List of dictionaries of config files corresponding to
each model in maplayerlist and in the same order.
shakemap (str): path to shakemap .xml file.
outfolder (str): Location in which to save outputs. If None, will use
current directory.
alpha (float): Transparency to use for overlay pngs, value from 0 to 1.
shakethreshtype (str): Type of ground motion to use for shakethresh,
'pga', 'pgv', or 'mmi'.
shakethresh: Float or list of shaking thresholds in %g for pga, cm/s
for pgv, float for mmi. Used for Hagg and Exposure computation.
prefLS (str): shortref of "preferred" landslide model.
prefLQ (str): shortref of "preferred" liquefaction model.
pop_file (str): file path to population file used to compute
population-based alert levels.
defaultcolors (bool): If True, will use DFCOLORS for all layers instead
of determining new ones. This will crash if any of the layers have
a different number of bins than the number of DFCOLORS
point (bool): if True, event is a point source and warning should be
displayed
pager_alert (str): PAGER alert level, e.g., 'green'. 'pending', ...
eventsource (str): net id (e.g., 'us')
eventsourcecode (str): event code (e.g. '123456pq')
createpngs (bool): if True, create pngs for web map
gf_version (int): ground failure version
pdlcall (bool): True if callgf was called by pdl automatically,
false if called manually (or otherwise).
Returns:
Files that need to be sent to comcat for hazdev to create the product
webpage including:
- info.json
- transparent png overlays of all models
"""
event_id = maplayerlist[0]["model"]["description"]["event_id"]
if pop_file is None:
# Read in default paths to get location of the population grid
default_file = os.path.join(os.path.expanduser("~"), ".gfail_defaults")
defaults = ConfigObj(default_file)
pop_file = defaults["popfile"]
if outfolder is None:
outfolder = os.path.join(os.getcwd(), event_id)
filenames = []
# Separate the LS and LQ models
concLS = []
concLQ = []
lsmodels = []
lqmodels = []
logLS = []
limLS = []
colLS = []
logLQ = []
limLQ = []
colLQ = []
for conf, maplayer in zip(configs, maplayerlist):
mdict = maplayer["model"]["description"]
# config = ConfigObj(conf)
if "landslide" in mdict["parameters"]["modeltype"].lower():
title = maplayer["model"]["description"]["name"]
plotorder, logscale, lims, colormaps, maskthreshes = parseConfigLayers(
maplayer, conf, keys=["model"]
)
logLS.append(logscale[0])
limLS.append(lims[0])
colLS.append(colormaps[0])
concLS.append(title)
if "godt" in maplayer["model"]["description"]["name"].lower():
probthresh = None
id1 = "godt_2008"
maxP = 1.0
else:
# Since logistic models can't equal one, need to eliminate
# placeholder zeros before computing stats
if "jessee" in maplayer["model"]["description"]["name"].lower():
id1 = "jessee_2018"
probthresh = 0.002
maxP = 0.26
else:
id1 = "nowicki_2014_global"
probthresh = 0.0
maxP = 1.0
if "std" in list(maplayer.keys()):
stdgrid2D = maplayer["std"]["grid"]
else:
stdgrid2D = None
stats = computeStats(
maplayer["model"]["grid"],
stdgrid2D=stdgrid2D,
shakefile=shakemap,
shakethresh=shakethresh,
probthresh=probthresh,
pop_file=pop_file,
shakethreshtype=shakethreshtype,
maxP=maxP,
)
metadata = maplayer["model"]["description"]
if len(maplayer) > 1:
inputs = {}
inkeys = list(maplayer.keys())
for key in inkeys:
if key != "model":
newkey = maplayer[key]["label"]
inputs[newkey] = maplayer[key]["description"]
metadata["inputs"] = inputs
if title == prefLS:
on = True
ls_haz_alert, ls_pop_alert, _, _, ls_alert, _ = get_alert(
stats["hagg_0.10g"], 0.0, stats["exp_pop_0.10g"], 0.0
)
lsext = get_zoomextent(maplayer["model"]["grid"])
ls_haz_value = set_num_precision(stats["hagg_0.10g"], 2, "float")
ls_pop_value = set_num_precision(stats["exp_pop_0.10g"], 2, "int")
else:
on = False
ls_haz_alert = None
ls_pop_alert = None
ls_haz_value = None
ls_pop_value = None
ls_alert = None
lsext = None
ls_haz_std = None
ls_pop_std = None
ls_hlim = None
ls_elim = None
ls_hp = None
ls_hq = None
ls_ep = None
ls_eq = None
ls_haz_1std_range = None
ls_haz_2std_range = None
ls_pop_1std_range = None
ls_pop_2std_range = None
if stdgrid2D is not None and title == prefLS:
ph = stats["p_hagg_0.10g"]
qh = stats["q_hagg_0.10g"]
pe = stats["p_exp_0.10g"]
qe = stats["q_exp_0.10g"]
hmax = stats["hlim_0.10g"]
emax = stats["elim_0.10g"]
ls_haz_std = float("%.4f" % stats["hagg_std_0.10g"])
ls_pop_std = float("%.4f" % stats["exp_std_0.10g"])
ls_hlim = float("%.4f" % hmax)
ls_elim = float("%.4f" % emax)
ls_hp = float("%.4f" % ph)
ls_hq = float("%.4f" % qh)
ls_ep = float("%.4f" % pe)
ls_eq = float("%.4f" % qe)
ls_haz_1std_range = stats["hagg_1std_range_0.10g"]
ls_haz_2std_range = stats["hagg_2std_range_0.10g"]
ls_pop_1std_range = stats["pop_1std_range_0.10g"]
ls_pop_2std_range = stats["pop_2std_range_0.10g"]
edict = {
"id": id1,
"title": metadata["name"],
"overlay": "%s.png" % id1,
"extent": "%s_extent.json" % id1,
"units": metadata["units"],
"preferred": on,
"alert": ls_alert,
"hazard_alert": {
"color": ls_haz_alert,
"value": ls_haz_value,
"std": ls_haz_std,
"parameter": "Aggregate Hazard",
"units": "km^2",
},
"population_alert": {
"color": ls_pop_alert,
"value": ls_pop_value,
"std": ls_pop_std,
"parameter": "Population exposure",
"units": "people",
},
"bin_edges": list(lims[0]),
"probability": {
"max": float("%.2f" % stats["Max"]),
"std": float("%.2f" % stats["Std"]),
"hagg0.1g": float("%.2f" % stats["hagg_0.10g"]),
"popexp0.1g": float(
"%.2f" % stats["exp_pop_0.10g"],
),
"hagg0.1g_std": ls_haz_std,
"popexp0.1g_std": ls_pop_std,
"hlim0.1g": ls_hlim,
"elim0.1g": ls_elim,
"p_hagg": ls_hp,
"q_hagg": ls_hq,
"p_exp": ls_ep,
"q_exp": ls_eq,
"hagg_1std": ls_haz_1std_range,
"hagg_2std": ls_haz_2std_range,
"pop_1std": ls_pop_1std_range,
"pop_2std": ls_pop_2std_range,
},
"longref": metadata["longref"],
"parameters": metadata["parameters"],
"zoomext": lsext,
}
# Replace any Nans with Nones so info.json is valid
for key in edict["probability"]:
if np.isscalar(edict["probability"][key]):
if np.isnan(edict["probability"][key]):
edict["probability"][key] = None
lsmodels.append(edict)
elif "liquefaction" in mdict["parameters"]["modeltype"].lower():
title = maplayer["model"]["description"]["name"]
plotorder, logscale, lims, colormaps, maskthreshes = parseConfigLayers(
maplayer, conf, keys=["model"]
)
logLQ.append(logscale[0])
limLQ.append(lims[0])
colLQ.append(colormaps[0])
concLQ.append(title)
if "2015" in maplayer["model"]["description"]["name"].lower():
id1 = "zhu_2015"
probthresh = 0.0
maxP = 1.0
elif "2017" in maplayer["model"]["description"]["name"].lower():
id1 = "zhu_2017_general"
probthresh = 0.005
maxP = 0.487
else:
probthresh = None
maxP = 1.0
id1 = None
if "std" in list(maplayer.keys()):
stdgrid2D = maplayer["std"]["grid"]
else:
stdgrid2D = None
stats = computeStats(
maplayer["model"]["grid"],
stdgrid2D=stdgrid2D,
shakefile=shakemap,
shakethresh=shakethresh,
pop_file=pop_file,
shakethreshtype=shakethreshtype,
probthresh=probthresh,
maxP=maxP,
)
metadata = maplayer["model"]["description"]
if len(maplayer) > 1:
inputs = {}
inkeys = list(maplayer.keys())
for key in inkeys:
if key != "model":
newkey = maplayer[key]["label"]
inputs[newkey] = maplayer[key]["description"]
metadata["inputs"] = inputs
if title == prefLQ:
on = True
_, _, lq_haz_alert, lq_pop_alert, _, lq_alert = get_alert(
0.0, stats["hagg_0.10g"], 0.0, stats["exp_pop_0.10g"]
)
lqext = get_zoomextent(maplayer["model"]["grid"])
lq_haz_value = set_num_precision(stats["hagg_0.10g"], 2, "float")
lq_pop_value = set_num_precision(stats["exp_pop_0.10g"], 2, "int")
else:
on = False
lq_haz_alert = None
lq_pop_alert = None
lq_haz_value = None
lq_pop_value = None
lq_alert = None
lqext = None
lq_haz_std = None
lq_pop_std = None
lq_hlim = None
lq_elim = None
lq_hp = None
lq_hq = None
lq_ep = None
lq_eq = None
lq_haz_1std_range = None
lq_haz_2std_range = None
lq_pop_1std_range = None
lq_pop_2std_range = None
if stdgrid2D is not None and title == prefLQ:
ph = stats["p_hagg_0.10g"]
qh = stats["q_hagg_0.10g"]
pe = stats["p_exp_0.10g"]
qe = stats["q_exp_0.10g"]
hmax = stats["hlim_0.10g"]
emax = stats["elim_0.10g"]
lq_haz_std = float("%.2f" % stats["hagg_std_0.10g"])
lq_pop_std = float("%.2f" % stats["exp_std_0.10g"])
lq_hlim = float("%.4f" % hmax)
lq_elim = float("%.4f" % emax)
lq_hp = float("%.4f" % ph)
lq_hq = float("%.4f" % qh)
lq_ep = float("%.4f" % pe)
lq_eq = float("%.4f" % qe)
lq_haz_1std_range = stats["hagg_1std_range_0.10g"]
lq_haz_2std_range = stats["hagg_2std_range_0.10g"]
lq_pop_1std_range = stats["pop_1std_range_0.10g"]
lq_pop_2std_range = stats["pop_2std_range_0.10g"]
edict = {
"id": id1,
"title": metadata["name"],
"overlay": "%s.png" % id1,
"extent": "%s_extent.json" % id1,
"units": metadata["units"],
"preferred": on,
"alert": lq_alert,
"hazard_alert": {
"color": lq_haz_alert,
"value": lq_haz_value,
"std": lq_haz_std,
"parameter": "Aggregate Hazard",
"units": "km^2",
},
"population_alert": {
"color": lq_pop_alert,
"value": lq_pop_value,
"std": lq_pop_std,
"parameter": "Population exposure",
"units": "people",
},
"bin_edges": list(lims[0]),
"probability": {
"max": float("%.2f" % stats["Max"]),
"std": float("%.2f" % stats["Std"]),
"hagg0.1g": float("%.2f" % stats["hagg_0.10g"]),
"popexp0.1g": float("%.2f" % stats["exp_pop_0.10g"]),
"hagg0.1g_std": lq_haz_std,
"popexp0.1g_std": lq_pop_std,
"hlim0.1g": lq_hlim,
"elim0.1g": lq_elim,
"p_hagg": lq_hp,
"q_hagg": lq_hq,
"p_exp": lq_ep,
"q_exp": lq_eq,
"hagg_1std": lq_haz_1std_range,
"hagg_2std": lq_haz_2std_range,
"pop_1std": lq_pop_1std_range,
"pop_2std": lq_pop_2std_range,
},
"longref": metadata["longref"],
"parameters": metadata["parameters"],
"zoomext": lqext,
}
# Replace any Nans with Nones so info.json is valid
for key in edict["probability"]:
if np.isscalar(edict["probability"][key]):
if np.isnan(edict["probability"][key]):
edict["probability"][key] = None
lqmodels.append(edict)
else:
raise Exception(
"model type is undefined, check "
"maplayer['model']['parameters']"
"['modeltype'] to ensure it is defined"
)
if defaultcolors:
for ls in lsmodels:
ls["bin_colors"] = DFCOLORS
for lq in lqmodels:
lq["bin_colors"] = DFCOLORS
else:
defaultcolormap = cm.CMRmap_r
# Get colors and stuff into dictionaries
sync, colorlistLS, reflims = setupcolors(
prefLS, concLS, limLS, colLS, defaultcolormap, logscale=logLS, alpha=alpha
)
if reflims is None:
raise Exception(
"Check input config files, they must all have the "
"same number of bin edges"
)
else:
# Stuff colors into dictionary
for ls in lsmodels:
ls["bin_colors"] = list(colorlistLS)
sync, colorlistLQ, reflims = setupcolors(
prefLQ, concLQ, limLQ, colLQ, defaultcolormap, logscale=logLQ, alpha=alpha
)
if reflims is None:
raise Exception(
"Check input config files, they must all have the "
"same number of bin edges"
)
else:
# Stuff colors into dictionary
for lq in lqmodels:
lq["bin_colors"] = list(colorlistLQ)
# Create pngs
if createpngs:
pngfiles = create_png(outfolder, lsmodels, lqmodels)
filenames.append(pngfiles)
# If PAGER alert is pending, overwrite our alerts
if pager_alert == "pending":
for ls in lsmodels:
ls["alert"] = "pending"
ls["hazard_alert"]["color"] = "pending"
ls["population_alert"]["color"] = "pending"
for lq in lqmodels:
lq["alert"] = "pending"
lq["hazard_alert"]["color"] = "pending"
lq["population_alert"]["color"] = "pending"
# Create info.json
infojson = create_info(
outfolder,
lsmodels,
lqmodels,
gf_version,
eventsource,
eventsourcecode,
point,
pdlcall,
)
filenames.append(infojson)
return filenames
[docs]def create_png(
event_dir,
lsmodels=None,
lqmodels=None,
mercator=True,
lsmask=0.002,
lqmask=0.005,
legends=False,
):
"""
Creates transparent PNG file for website.
Args:
event_dir (srt): Directory containing ground failure results.
lsmodels (list): List of dictionaries of model summary info compiled
by the hazdev function. If not specified, code will search for
the hdf5 files for the preferred model and will create this
dictionary and will apply default colorbars and bins.
lqmodels (list): Same as above for liquefaction.
mercator (bool): Project raster to web mercator
lsmask (float): Mask all landslide cells with probabilities below this
threshold
lqmask (float): Mask all liquefaction cells with probabilities below
this threshold
legends (bool): if True, will produce png files of legends for each
preferred model
Returns:
.png map overlays and .json files specifying their mapped extents
"""
filenames = []
files = os.listdir(event_dir)
if lsmodels is None:
# Read in the "preferred" model for landslides
ls_mod_file = [f for f in files if "jessee_2018.hdf5" in f]
if len(ls_mod_file) == 1:
ls_file = os.path.join(event_dir, ls_mod_file[0])
ls_mod = loadlayers(ls_file)
filesnippet = "jessee_2018"
out = make_rgba(
ls_mod["model"]["grid"],
mask=lsmask,
mercator=mercator,
levels=DFBINS,
colorlist=DFCOLORS,
)
rgba_img, ls_extent, lmin, lmax, cmap = out
filen = os.path.join(event_dir, "%s_extent.json" % filesnippet)
filenames.append(filen)
with open(filen, "w") as f:
json.dump(ls_extent, f)
filen = os.path.join(event_dir, "%s.png" % filesnippet)
plt.imsave(filen, rgba_img, vmin=lmin, vmax=lmax, cmap=cmap)
else:
raise OSError(
"Preferred landslide model result (%s) not found." % ls_mod_file
)
else:
for lsm in lsmodels:
# if lsm['preferred']:
filesnippet = lsm["id"]
fsh = "%s.hdf5" % filesnippet
ls_mod_file = [f for f in files if fsh in f]
if len(ls_mod_file) == 1:
ls_file = os.path.join(event_dir, ls_mod_file[0])
ls_mod = loadlayers(ls_file)
else:
raise OSError("Specified landslide model result (%s) not found." % fsh)
out = make_rgba(
ls_mod["model"]["grid"],
mask=lsmask,
mercator=mercator,
levels=DFBINS,
colorlist=DFCOLORS,
)
rgba_img, ls_extent, lmin, lmax, cmap = out
filen = os.path.join(event_dir, "%s_extent.json" % filesnippet)
filenames.append(filen)
with open(filen, "w") as f:
json.dump(ls_extent, f)
filen = os.path.join(event_dir, "%s.png" % filesnippet)
plt.imsave(filen, rgba_img, vmin=lmin, vmax=lmax, cmap=cmap)
if lqmodels is None:
# read in preferred model for liquefaction if none specified
lq_mod_file = [f2 for f2 in files if "zhu_2017_general.hdf5" in f2]
if len(lq_mod_file) == 1:
lq_file = os.path.join(event_dir, lq_mod_file[0])
lq_mod = loadlayers(lq_file)
filesnippet = "zhu_2017_general"
tempbins = DFBINS
tempbins[0] = 0.005
out = make_rgba(
lq_mod["model"]["grid"],
mask=lqmask,
mercator=mercator,
levels=tempbins,
colorlist=DFCOLORS,
)
rgba_img, lq_extent, lmin, lmax, cmap = out
filen = os.path.join(event_dir, "%s_extent.json" % filesnippet)
filenames.append(filen)
with open(filen, "w") as f:
json.dump(lq_extent, f)
filen = os.path.join(event_dir, "%s.png" % filesnippet)
plt.imsave(filen, rgba_img, vmin=lmin, vmax=lmax, cmap=cmap)
filenames.append(filen)
else:
raise OSError(
"Preferred liquefaction model result (%s) not found." % lq_mod_file
)
else:
for lqm in lqmodels:
# if lqm['preferred']:
filesnippet = lqm["id"]
tempbins = DFBINS
tempbins[0] = 0.005
fsh = "%s.hdf5" % filesnippet
lq_mod_file = [f2 for f2 in files if fsh in f2]
if len(lq_mod_file) == 1:
lq_file = os.path.join(event_dir, lq_mod_file[0])
lq_mod = loadlayers(lq_file)
else:
raise OSError(
"Specified liquefaction model result (%s) not found." % fsh
)
out = make_rgba(
lq_mod["model"]["grid"],
mask=lqmask,
mercator=mercator,
levels=tempbins,
colorlist=DFCOLORS,
)
rgba_img, lq_extent, lmin, lmax, cmap = out
filen = os.path.join(event_dir, "%s_extent.json" % filesnippet)
filenames.append(filen)
with open(filen, "w") as f:
json.dump(lq_extent, f)
filen = os.path.join(event_dir, "%s.png" % filesnippet)
plt.imsave(filen, rgba_img, vmin=lmin, vmax=lmax, cmap=cmap)
filenames.append(filen)
if legends:
lsname, lqname = make_legends(lqmin=lqmask, lsmin=lsmask, outfolder=event_dir)
filenames.append(lsname)
filenames.append(lqname)
return filenames
[docs]def create_info(
event_dir,
lsmodels,
lqmodels,
gf_version=1,
eventsource="",
eventsourcecode="",
point=True,
pdlcall=False,
):
"""Create info.json for ground failure product.
Args:
event_dir (srt): Directory containing ground failure results.
lsmodels (list): List of dictionaries of model summary info compiled
by the hazdev function. If not specified, code will search for
the hdf5 files for the preferred model and will create this
dictionary and will apply default colorbars and bins.
lqmodels (list): Same as above for liquefaction.
gf_version (int): ground failure version
eventsource (str): net id (e.g., 'us')
eventsourcecode (str): event code (e.g. '123456pq')
point (bool): if True, event is a point source and warning should be
displayed
pdlcall (bool): True if callgf was called by pdl automatically
Returns:
creates info.json for this event
"""
filenames = []
# Find the shakemap grid.xml file
with open(os.path.join(event_dir, "shakefile.txt"), "r") as f:
shakefile = f.read()
files = os.listdir(event_dir)
# Get all info from dictionaries of preferred events, add in extent
# and filename
ls_alert = None
lq_alert = None
lsext = None
lqext = None
for lsm in lsmodels:
# Add extent and filename for preferred model
if lsm["preferred"]:
filesnippet = lsm["id"]
# Read in extents
flnm = "%s_extent.json" % filesnippet
ls_extent_file = [f for f in files if flnm in f]
if len(ls_extent_file) == 1:
ls_file = os.path.join(event_dir, ls_extent_file[0])
with open(ls_file) as f:
ls_extent = json.load(f)
else:
raise OSError("Landslide extent not found.")
lsm["extent"] = ls_extent
# lsm['filename'] = flnm
lsext = lsm["zoomext"] # Get zoom extent
ls_alert = lsm["alert"]
rmkeys = ["bin_edges", "bin_colors", "zoomext"]
else:
# Remove any alert keys
rmkeys = [
"bin_edges",
"bin_colors",
"zoomext",
"population_alert",
"alert",
"hazard_alert",
]
# Deal with extent
try:
with open(os.path.join(event_dir, lsm["extent"])) as ff:
extent1 = json.load(ff)
except BaseException:
extent1 = None
lsm["extent"] = extent1
for key in rmkeys:
if key in lsm:
lsm.pop(key)
for lqm in lqmodels:
if lqm["preferred"]:
filesnippet = lqm["id"]
# Read in extents
flnm = "%s_extent.json" % filesnippet
lq_extent_file = [f2 for f2 in files if flnm in f2]
if len(lq_extent_file) == 1:
lq_file = os.path.join(event_dir, lq_extent_file[0])
with open(lq_file) as f:
lq_extent = json.load(f)
else:
raise OSError("Liquefaction extent not found.")
lqm["extent"] = lq_extent
# lqm['filename'] = flnm
lqext = lqm["zoomext"] # Get zoom extent
lq_alert = lqm["alert"]
rmkeys = ["bin_edges", "bin_colors", "zoomext"]
else:
# Remove any alert keys
rmkeys = [
"bin_edges",
"bin_colors",
"zoomext",
"population_alert",
"alert",
"hazard_alert",
]
# Deal with extent
try:
with open(os.path.join(event_dir, lqm["extent"])) as ff:
extent1 = json.load(ff)
except BaseException:
extent1 = None
lqm["extent"] = extent1
for key in rmkeys:
if key in lqm:
lqm.pop(key)
# Try to get event info
shake_grid = ShakeGrid.load(shakefile, adjust="res")
event_dict = shake_grid.getEventDict()
sm_dict = shake_grid.getShakeDict()
base_url = "https://earthquake.usgs.gov/earthquakes/eventpage/"
# Is this a point source?
# point = is_grid_point_source(shake_grid)
net = eventsource
code = eventsourcecode
time = event_dict["event_timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ")
event_url = "%s%s%s#executive" % (base_url, net, code)
# Get extents that work for both unless one is green and the other isn't
if lq_alert == "green" and ls_alert != "green" and ls_alert is not None:
xmin = lsext["xmin"]
xmax = lsext["xmax"]
ymin = lsext["ymin"]
ymax = lsext["ymax"]
elif lq_alert != "green" and ls_alert == "green" and lq_alert is not None:
xmin = lqext["xmin"]
xmax = lqext["xmax"]
ymin = lqext["ymin"]
ymax = lqext["ymax"]
else:
xmin = np.min((lqext["xmin"], lsext["xmin"]))
xmax = np.max((lqext["xmax"], lsext["xmax"]))
ymin = np.min((lqext["ymin"], lsext["ymin"]))
ymax = np.max((lqext["ymax"], lsext["ymax"]))
# Should we display the warning about point source?
rupture_warning = False
if point and event_dict["magnitude"] > 6.5:
rupture_warning = True
# Figure out if realtime (pdl run and same week as event)
if pdlcall and datetime.utcnow() - event_dict["event_timestamp"] < timedelta(
days=7
):
realtime = True
else:
realtime = False
# Create info.json for website rendering and metadata purposes
info_dict = {
"Summary": {
"code": code,
"net": net,
"magnitude": event_dict["magnitude"],
"depth": event_dict["depth"],
"time": time,
"lat": event_dict["lat"],
"lon": event_dict["lon"],
"location": event_dict["event_description"],
"event_url": event_url,
"gf_version": gf_version,
"gf_time": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"shakemap_version": sm_dict["shakemap_version"],
"shakemap_source": sm_dict["shakemap_originator"],
"shakemap_time": sm_dict["process_timestamp"].strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
"rupture_warning": rupture_warning,
"point_source": point,
"zoom_extent": [xmin, xmax, ymin, ymax],
"realtime": realtime,
},
"Landslides": lsmodels,
"Liquefaction": lqmodels,
}
info_file = os.path.join(event_dir, "info.json")
with open(info_file, "w") as f:
json.dump(info_dict, f)
fixfile(info_file)
filenames.append(info_file)
return filenames
[docs]def fixfile(filename):
"""Replace any Nan or Infinity values with null in info.json
Args:
filename (str): full path to info.json file
Returns: same file with invalid entries replaced with valid
"""
if os.path.exists(filename):
with open(filename, "r") as file:
filedata = file.read()
if "NaN" in filedata:
filedata = filedata.replace("NaN", "null")
if "Infinity" in filedata:
filedata = filedata.replace("-Infinity", "null")
filedata = filedata.replace("Infinity", "null")
with open(filename, "w") as file:
file.write(filedata)
[docs]def make_legends(
lqmin=0.005, lsmin=0.002, outfolder=None, orientation="horizontal", transparent=True
):
"""Make png file of legends to go with pngs using DFCOLORS and DFBINS
Args:
lqmin (float): minimum visible value of liquefaction probability
lsmin (float): same as above for landslides
outfolder (str): folder to place pngs of legends
orientation (str): orientation of colorbar, 'horizontal' or 'vertical'
transparent (bool): if True, background will be transparent
Returns:
tuple: (lsfilename, lqfilename), locations of files that were created.
"""
# Make liquefaction colorbar
binedges = DFBINS
binedges[0] = lqmin
if outfolder is None:
lqfilename = "legend_liquefaction.png"
else:
lqfilename = os.path.join(outfolder, "legend_liquefaction.png")
make_legend(
binedges,
DFCOLORS,
filename=lqfilename,
orientation=orientation,
title="Liquefaction probability",
transparent=transparent,
)
# --------------------------
# Make landslide legend
binedges2 = DFBINS
binedges2[0] = lsmin
if outfolder is None:
lsfilename = "legend_landslide.png"
else:
lsfilename = os.path.join(outfolder, "legend_landslide.png")
make_legend(
DFBINS,
DFCOLORS,
filename=lqfilename,
orientation=orientation,
title="Landslide probability",
transparent=transparent,
)
return lsfilename, lqfilename
[docs]def create_kmz(maplayer, outfile, mask=None, levels=None, colorlist=None, qdict=None):
"""
Create kmz files of models
Args:
maplayer (dict):
Dictionary of one model result formatted like:
.. code-block:: python
{
'grid': mapio grid2D object,
'label': 'label for colorbar and top line of subtitle',
'type': 'output or input to model',
'description': 'description for subtitle'
}
outfile (str):
File extension
mask (float):
make all cells below this value transparent
levels (array):
list of bin edges for each color, must be same length
colorlist (array):
list of colors for each bin, should be length one less than levels
qdict (dict):
dictionary of quantile grids
Returns:
kmz file
"""
# Figure out lims
if levels is None:
levels = DFBINS
if colorlist is None:
colorlist = DFCOLORS
if len(levels) - 1 != len(colorlist):
raise Exception("len(levels) must be one longer than len(colorlist)")
# Make place to put temporary files
temploc = tempfile.TemporaryDirectory()
# Figure out file names
name, ext = os.path.splitext(outfile)
basename = os.path.basename(name)
if ext != ".kmz":
ext = ".kmz"
filename = "%s%s" % (name, ext)
mapfile = os.path.join(temploc.name, "%s.tiff" % basename)
legshort = "%s_legend.png" % basename
legfile = os.path.join(temploc.name, legshort)
# Make colored geotiff
out = make_rgba(maplayer["grid"], mask=mask, levels=levels, colorlist=colorlist)
rgba_img, extent, lmin, lmax, cmap = out
# Save as a tiff
plt.imsave(mapfile, rgba_img, vmin=lmin, vmax=lmax, cmap=cmap)
if qdict is not None:
for quant, quantdict in qdict.items():
out = make_rgba(
quantdict["grid"], mask=mask, levels=levels, colorlist=colorlist
)
rgba_img, extent, lmin, lmax, cmap = out
qfile = os.path.join(temploc.name, "%s_%s.tiff" % (basename, quant))
plt.imsave(qfile, rgba_img, vmin=lmin, vmax=lmax, cmap=cmap)
# Start creating kmz
L = simplekml.Kml()
# Set zoom window
doc = L.document # have to put lookat in root document directory
doc.altitudemode = simplekml.AltitudeMode.relativetoground
boundaries1 = get_zoomextent(maplayer["grid"])
doc.lookat.latitude = np.mean([boundaries1["ymin"], boundaries1["ymax"]])
doc.lookat.longitude = np.mean([boundaries1["xmax"], boundaries1["xmin"]])
doc.lookat.altitude = 0.0
# dist in m from point
doc.lookat.range = (boundaries1["ymax"] - boundaries1["ymin"]) * 111.0 * 1000.0
doc.description = (
"USGS near-real-time earthquake-triggered %s model for event id %s"
% (
maplayer["description"]["parameters"]["modeltype"],
maplayer["description"]["event_id"],
)
)
prob = L.newgroundoverlay(name=maplayer["label"])
prob.icon.href = "files/%s.tiff" % basename
prob.latlonbox.north = extent[3]
prob.latlonbox.south = extent[2]
prob.latlonbox.east = extent[1]
prob.latlonbox.west = extent[0]
L.addfile(mapfile)
if qdict is not None:
for quant, quantdict in qdict.items():
qfile = os.path.join(temploc.name, "%s_%s.tiff" % (basename, quant))
qlayer = L.newgroundoverlay(name=quantdict["label"], visibility=0)
qlayer.icon.href = "files/%s_%s.tiff" % (basename, quant)
qlayer.latlonbox.north = extent[3]
qlayer.latlonbox.south = extent[2]
qlayer.latlonbox.east = extent[1]
qlayer.latlonbox.west = extent[0]
L.addfile(qfile)
# Add legend and USGS icon as screen overlays
# Make legend
make_legend(levels, colorlist, filename=legfile, title=maplayer["label"])
size1 = simplekml.Size(x=0.3, xunits=simplekml.Units.fraction)
leg = L.newscreenoverlay(name="Legend", size=size1)
leg.icon.href = "files/%s" % legshort
leg.screenxy = simplekml.ScreenXY(
x=0.2, y=0.05, xunits=simplekml.Units.fraction, yunits=simplekml.Units.fraction
)
L.addfile(legfile)
size2 = simplekml.Size(x=0.15, xunits=simplekml.Units.fraction)
icon = L.newscreenoverlay(name="USGS", size=size2)
icon.icon.href = "files/USGS_ID_white.png"
icon.screenxy = simplekml.ScreenXY(
x=0.8, y=0.95, xunits=simplekml.Units.fraction, yunits=simplekml.Units.fraction
)
L.addfile(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
os.pardir,
"content",
"USGS_ID_white.png",
)
)
L.savekmz(filename)
return filename
# noinspection PyArgumentList
[docs]def get_zoomextent(grid, propofmax=0.3):
"""
Get the extent that contains all values with probabilities exceeding
a threshold in order to determine ideal zoom level for interactive map
If nothing is above the threshold, uses the full extent
Args:
grid: grid2d of model output
propofmax (float): Proportion of maximum that should be fully included
within the bounds.
Returns:
* boundaries: a dictionary with keys 'xmin', 'xmax', 'ymin', and
'ymax' that defines the zoomed boundaries in geographic coordinates.
"""
maximum = np.nanmax(grid.getData())
xmin, xmax, ymin, ymax = grid.getBounds()
if np.isnan(maximum):
# If no finite values, use entire extent for zoom
return dict(xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax)
lons = np.linspace(xmin, xmax, grid.getGeoDict().nx)
lats = np.linspace(ymax, ymin, grid.getGeoDict().ny)
if maximum <= 0.0:
boundaries1 = dict(xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax)
# If nothing is above the threshold, use full extent
return boundaries1
threshold = propofmax * maximum
row, col = np.where(grid.getData() > float(threshold))
lonmin = lons[col].min()
lonmax = lons[col].max()
latmin = lats[row].min()
latmax = lats[row].max()
# need to handle 180 crossing
if xmin > xmax:
xmin += 360.0
if lonmin > lonmax:
lonmin += 360.0
boundaries1 = {}
if xmin < lonmin:
boundaries1["xmin"] = lonmin
else:
boundaries1["xmin"] = xmin
if xmax > lonmax:
boundaries1["xmax"] = lonmax
else:
boundaries1["xmax"] = xmax
if ymin < latmin:
boundaries1["ymin"] = latmin
else:
boundaries1["ymin"] = ymin
if ymax > latmax:
boundaries1["ymax"] = latmax
else:
boundaries1["ymax"] = ymax
return boundaries1
[docs]def make_rgba(grid2D, levels, colorlist, mask=None, mercator=False):
"""
Make an rgba (red, green, blue, alpha) grid out of raw data values and
provide extent and limits needed to save as an image file
Args:
grid2D: Mapio Grid2D object of result to mape
levels (list): list of bin edges for each color, must be same length
colorlist (list): list of colors for each bin, should be length one
less than levels
mask (float): mask all values below this value
mercator (bool): project to web mercator (needed for leaflet, not
for kmz)
Returns:
tuple: (rgba_img, extent, lmin, lmax, cmap), where:
* rgba_img: rgba (red green blue alpha) image
* extent: list of outside corners of image,
[minlat, maxlat, minlon, maxlon]
* lmin: lowest bin edge
* lmax: highest bin edge
* cmap: colormap corresponding to image
"""
data1 = grid2D.getData().copy() # Copy because values are edited in place
if mask is not None:
data1[data1 < mask] = float("nan")
geodict = grid2D.getGeoDict()
extent = [
geodict.xmin - 0.5 * geodict.dx,
geodict.xmax + 0.5 * geodict.dx,
geodict.ymin - 0.5 * geodict.dy,
geodict.ymax + 0.5 * geodict.dy,
]
lmin = levels[0]
lmax = levels[-1]
data2 = np.clip(data1, lmin, lmax)
cmap = mpl.colors.ListedColormap(colorlist)
norm = mpl.colors.BoundaryNorm(levels, cmap.N)
data2 = np.ma.array(data2, mask=np.isnan(data1))
rgba_img = cmap(norm(data2))
if mercator:
rgba_img = mercator_transform(rgba_img, (extent[2], extent[3]), origin="upper")
return rgba_img, extent, lmin, lmax, cmap
[docs]def make_legend(
levels,
colorlist,
filename=None,
orientation="horizontal",
title=None,
transparent=False,
):
"""Make legend file
Args:
levels (list): list of bin edges for each color, must be same length
colorlist (list): list of colors for each bin, should be length one
less than levels
filename (str): File extension of legend file
orientation (str): orientation of colorbar, 'horizontal' or 'vertical'
title (str): title of legend (usually units)
transparent (bool): if True, background will be transparent
Returns:
figure of legend
"""
fontsize = 16
labels = ["< %1.1f%%" % (levels[0] * 100.0,)]
for db in levels:
if db < 0.01:
labels.append("%1.1f" % (db * 100,))
else:
labels.append("%1.0f" % (db * 100.0,))
if orientation == "vertical":
# Flip order to darker on top
labels = labels[::-1]
colors1 = colorlist[::-1]
fig, axes = plt.subplots(len(colors1) + 1, 1, figsize=(3.0, len(colors1) - 1.7))
clearind = len(axes) - 1
maxind = 0
else:
colors1 = colorlist
fig, axes = plt.subplots(
1, len(colorlist) + 1, figsize=(len(colorlist) + 1.7, 0.8)
)
# DPI = fig.get_dpi()
# fig.set_size_inches(440/DPI, 83/DPI)
clearind = 0
maxind = len(axes) - 1
for i, ax in enumerate(axes):
ax.set_ylim((0.0, 1.0))
ax.set_xlim((0.0, 1.0))
# draw square
if i == clearind:
color1 = colors1[0]
color1[-1] = 0.0 # make completely transparent
if orientation == "vertical":
label = labels[i + 1]
else:
label = labels[0]
else:
if orientation == "vertical":
label = "%s-%s%%" % (labels[i + 1], labels[i])
color1 = colors1[i]
else:
label = "%s-%s%%" % (labels[i], labels[i + 1])
color1 = colors1[i - 1]
color1[-1] = 0.8 # make less transparent
if i == maxind:
label = "> %1.0f%%" % (levels[-2] * 100.0)
ax.set_facecolor(color1)
if orientation == "vertical":
ax.text(
1.1, 0.5, label, fontsize=fontsize, rotation="horizontal", va="center"
)
else:
ax.set_xlabel(label, fontsize=fontsize, rotation="horizontal")
ax.set_yticks([])
ax.set_xticks([])
plt.setp(ax.get_yticklabels(), visible=False)
plt.setp(ax.get_xticklabels(), visible=False)
if orientation == "vertical":
fig.suptitle(title.title(), weight="bold", fontsize=fontsize + 2)
plt.subplots_adjust(hspace=0.01, right=0.4, top=0.82)
else:
fig.suptitle(title.title(), weight="bold", fontsize=fontsize + 2)
# , left=0.01, right=0.99, top=0.99, bottom=0.01)
plt.subplots_adjust(wspace=0.1, top=0.6)
# plt.tight_layout()
if filename is not None:
fig.savefig(filename, bbox_inches="tight", transparent=transparent)
else:
plt.show()
[docs]def setupcolors(
sync,
plotorder,
lims,
colormaps,
defaultcolormap=cm.CMRmap_r,
logscale=None,
alpha=None,
):
"""Get colors that will be used for all colorbars from reference grid
Args:
sync(str): If False, will exit program, else corresponds to the
shortref of the model which should serve as the template for
the colorbars used by all other models. All other models must
have the exact same number of bins
plotorder (list): List of keys of shortrefs of the grids that will be
plotted.
lims (*): Nx1 list of tuples or numpy arrays corresponding to
plotorder defining the bin edges to use for each model.
Example:
.. code-block:: python
[(0., 0.1, 0.2, 0.3), np.linspace(0., 1.5, 15)]
colormaps (list): List of strings of matplotlib colormaps (e.g.
cm.autumn_r) corresponding to plotorder
defaultcolormap (matplotlib colormap): Colormap to use if
colormaps is not defined. default cm.CMRmap_r
logscale (*): If not None, then a list of booleans corresponding to
plotorder stating whether to use log scaling in determining colors
alpha (*): list of transparencies
Returns:
tuple: (sync, colorlist, lim1) where:
* sync (bool): whether or not colorbars are/can be synced
* colorlist (list): list of rgba colors that will be applied to all
models regardless of bin edge values
* lim1 (array): bin edges of model to which others are synced
"""
if not sync:
return False, None, None
elif sync in plotorder:
k = [indx for indx, key in enumerate(plotorder) if key in sync][0]
# Make sure lims exist and all have the same number of bins'
if logscale is not None:
logs = logscale[k]
else:
logs = False
lim1 = np.array(lims[k])
sum1 = 0
for lim in lims:
if lim is None:
sum1 += 1
continue
if len(lim) != len(lim1):
sum1 += 1
continue
if sum1 > 0:
print(
"Cannot sync colorbars, different number of bins or lims "
"not specified"
)
sync = False
return sync, None, None
if colormaps[k] is not None:
palette1 = colormaps[k]
else:
palette1 = defaultcolormap
# palette1.set_bad(clear_color, alpha=0.0)
if logs:
cNorm = colors.LogNorm(vmin=lim1[0], vmax=lim1[-1])
# geometric mean for midpoints
midpts = np.sqrt(lim1[1:] * lim1[:-1])
else:
cNorm = colors.Normalize(vmin=lim1[0], vmax=lim1[-1])
midpts = (lim1[1:] - lim1[:-1]) / 2 + lim1[:-1]
scalarMap = cm.ScalarMappable(norm=cNorm, cmap=palette1)
colorlist = []
for value in midpts:
colorlist.append(scalarMap.to_rgba(value, alpha=alpha))
sync = True
else:
print("Cannot sync colorbars, different number of bins or lims not specified")
sync = False
colorlist = None
lim1 = None
return sync, colorlist, lim1
if __name__ == "__main__":
pass