# stdlib imports
import os.path
import zipfile
import tempfile
from shutil import copyfile
import concurrent.futures as cf
from collections import OrderedDict
from functools import partial
# third party imports
import fiona
from impactutils.io.smcontainers import ShakeMapOutputContainer
import numpy as np
from configobj import ConfigObj
from openquake.hazardlib import imt as OQIMT
# local imports
from .base import CoreModule, Contents
from shakemap.utils.config import (
get_data_path,
get_config_paths,
get_configspec,
get_custom_validator,
config_error,
)
from shakemap.utils.utils import get_object_from_config
from shakelib.plotting.contour import contour
from shakemap.c.pcontour import pcontour
from shakelib.utils.imt_string import oq_to_file
[docs]class ShapeModule(CoreModule):
"""
shape -- Generate shape files of the ground motion parameters
"""
command_name = "shape"
targets = [r"products/shape\.zip"]
dependencies = [("products/shake_result.hdf", True)]
def __init__(self, eventid):
"""
Instantiate a ShapeModule class with an event ID.
"""
super(ShapeModule, self).__init__(eventid)
self.contents = Contents("GIS Shape Files", "shape files", eventid)
[docs] def execute(self):
"""
Create shape files.
Raises:
NotADirectoryError: When the event data directory does not exist.
FileNotFoundError: When the the shake_result HDF file does not
exist.
"""
install_path, data_path = get_config_paths()
datadir = os.path.join(data_path, self._eventid, "current", "products")
if not os.path.isdir(datadir):
raise NotADirectoryError(f"{datadir} is not a valid directory.")
datafile = os.path.join(datadir, "shake_result.hdf")
if not os.path.isfile(datafile):
raise FileNotFoundError(f"{datafile} does not exist.")
# Open the ShakeMapOutputContainer and extract the data
container = ShakeMapOutputContainer.load(datafile)
if container.getDataType() != "grid":
raise NotImplementedError(
"shape module can only contour " "gridded data, not sets of points"
)
config_file = os.path.join(install_path, "config", "products.conf")
spec_file = get_configspec("products")
validator = get_custom_validator()
config = ConfigObj(config_file, configspec=spec_file)
results = config.validate(validator)
if not isinstance(results, bool) or not results:
config_error(config, results)
max_workers = config["products"]["mapping"]["max_workers"]
method = config["products"]["shape"]["method"]
create_polygons(container, datadir, self.logger, max_workers, method=method)
container.close()
self.contents.addFile(
"shakemap_shapefiles",
"ShakeMap Shape Files",
"Shape Files.",
"shape.zip",
"application/zip",
)
[docs]def create_polygons(container, datadir, logger, max_workers, method="pcontour"):
"""Generates a set of closed polygons (with or without holes) using the
specified method (either pcontour or skimage), and uses fiona to convert
the resulting GeoJSON objects into ESRI-style shape files which are then
zipped into an archive along with .prj, .lyr, and metadata .xml files. A
warning will be emitted if .lyr, or .xml files cannot be found for the
ground motion parameter in question.
Args:
container (ShakeMapOutputContainer): An open ShakeMap output
container object.
datadir (str): The products directory for the event in question.
logger (logger): This module's logger object.
method (str): Contouring implementation to use (either 'pcontour' or
'skimage')
Returns:
(nothing): Nothing.
"""
# gmice info for shakelib.plotting.contour
config = container.getConfig()
gmice = get_object_from_config("gmice", "modeling", config)
gmice_imts = gmice.DEFINED_FOR_INTENSITY_MEASURE_TYPES
gmice_pers = gmice.DEFINED_FOR_SA_PERIODS
component = list(container.getComponents())[0]
imts = container.getIMTs(component)
if method == "pcontour":
schema = {
"properties": OrderedDict(
[
("AREA", "float:13.3"),
("PERIMETER", "float:14.3"),
("PGAPOL_", "int:12"),
("PGAPOL_ID", "int:12"),
("GRID_CODE", "int:12"),
("PARAMVALUE", "float:14.4"),
]
),
"geometry": "Polygon",
}
elif method == "skimage":
schema = {
"properties": OrderedDict(
[
("value", "float:14.4"),
("units", "str"),
("color", "str"),
("weight", "float:13.3"),
]
),
"geometry": "MultiLineString",
}
else:
raise ValueError(f"Unknown contouring method {method}")
smdata = os.path.join(get_data_path(), "gis")
# Make a directory for the files to live in prior to being zipped
alist = []
with tempfile.TemporaryDirectory(dir=datadir) as tdir:
for imt in imts:
gdict = container.getIMTGrids(imt, component)
fgrid = gdict["mean"]
if imt == "MMI":
fname = "mi"
elif imt == "PGV":
fname = "pgv"
else:
fname = oq_to_file(imt)
if method == "pcontour":
my_gmice = None
if imt == "MMI":
contour_levels = np.arange(0.1, 10.2, 0.2)
elif imt == "PGV":
fgrid = np.exp(fgrid)
cont_max = np.ceil(np.max(fgrid)) + 2.0
contour_levels = np.arange(1.0, cont_max, 2.0)
if contour_levels.size == 0:
contour_levels = np.array([1.0])
else:
fgrid = np.exp(fgrid)
cont_max = (np.ceil(100 * np.max(fgrid)) + 2.0) / 100.0
contour_levels = np.arange(0.01, cont_max, 0.02)
if contour_levels.size == 0:
contour_levels = np.array([0.01])
else:
# skimage method chooses its own levels
contour_levels = None
# but wants gmice info
oqimt = OQIMT.from_string(imt)
if (
imt == "MMI"
or not isinstance(oqimt, tuple(gmice_imts))
or (isinstance(oqimt, OQIMT.SA) and oqimt.period not in gmice_pers)
):
my_gmice = None
else:
my_gmice = gmice
a = {
"fgrid": fgrid,
"dx": gdict["mean_metadata"]["dx"],
"dy": gdict["mean_metadata"]["dy"],
"xmin": gdict["mean_metadata"]["xmin"],
"ymax": gdict["mean_metadata"]["ymax"],
"contour_levels": contour_levels,
"tdir": tdir,
"fname": fname,
"schema": schema,
"imt": imt,
"gmice": my_gmice,
"gdict": gdict,
}
alist.append(a)
copyfile(
os.path.join(smdata, "WGS1984.prj"), os.path.join(tdir, fname + ".prj")
)
lyrfile = os.path.join(smdata, fname + ".lyr")
if not os.path.isfile(lyrfile):
logger.warning("No " + fname + ".lyr file in " + smdata)
else:
copyfile(lyrfile, os.path.join(tdir, fname + ".lyr"))
xmlfile = os.path.join(smdata, fname + ".shp.xml")
if not os.path.isfile(xmlfile):
logger.warning("No " + fname + ".shp.xml file in " + smdata)
else:
copyfile(xmlfile, os.path.join(tdir, fname + ".shp.xml"))
worker = partial(make_shape_files, method=method)
if max_workers > 0:
with cf.ProcessPoolExecutor(max_workers=max_workers) as ex:
results = ex.map(worker, alist)
list(results)
else:
for adict in alist:
worker(adict)
zfilename = os.path.join(datadir, "shape.zip")
zfile = zipfile.ZipFile(zfilename, mode="w", compression=zipfile.ZIP_DEFLATED)
filelist = []
for (dirpath, dirnames, filenames) in os.walk(tdir):
filelist.extend(filenames)
break
for sfile in filelist:
zfile.write(os.path.join(tdir, sfile), sfile)
zfile.close()
[docs]def make_shape_files(adict, method="pcontour"):
fgrid = adict["fgrid"]
dx = adict["dx"]
dy = adict["dy"]
xmin = adict["xmin"]
ymax = adict["ymax"]
contour_levels = adict["contour_levels"]
tdir = adict["tdir"]
fname = adict["fname"]
schema = adict["schema"]
gdict = adict["gdict"]
imt = adict["imt"]
gmice = adict["gmice"]
if method == "pcontour":
gjson = pcontour(fgrid, dx, dy, xmin, ymax, contour_levels, 4, 0, fmt=1)
features = gjson["features"]
elif method == "skimage":
features = contour(gdict, imt, 10, gmice)
else:
raise ValueError("Unknown contour method.")
with fiona.open(
os.path.join(tdir, fname + ".shp"), "w", "ESRI Shapefile", schema
) as c:
for jobj in features:
c.write(jobj)