# stdlib imports
import os.path
from io import StringIO
import json
# third party imports
from libcomcat.search import get_event_by_id
from libcomcat.classes import DetailEvent
import pandas as pd
import numpy as np
# local imports
from .base import CoreModule
from shakemap.utils.config import get_config_paths
from shakemap.utils.dataframe import dataframe_to_xml
# Get rid of stupid pandas warning
pd.options.mode.chained_assignment = None
# number of seconds to search for event matching origin time
TIMEWINDOW = 60
# distance in decimal degrees to search for event matching coordinates
DEGWINDOW = 0.1
# +/- magnitude threshold to search for matching events
MAGWINDOW = 0.2
required_columns = ["station", "lat", "lon", "network"]
channel_groups = [["[a-z]{2}e", "[a-z]{2}n", "[a-z]{2}z"], ["h1", "h2", "z"], ["unk"]]
pgm_cols = ["pga", "pgv", "psa03", "psa10", "psa30"]
optional = ["location", "distance", "reference", "intensity", "source"]
# what are the DYFI columns and what do we rename them to?
DYFI_COLUMNS_REPLACE = {
"Geocoded box": "station",
"CDI": "intensity",
"Latitude": "lat",
"Longitude": "lon",
"No. of responses": "nresp",
"Hypocentral distance": "distance",
}
OLD_DYFI_COLUMNS_REPLACE = {
"ZIP/Location": "station",
"CDI": "intensity",
"Latitude": "lat",
"Longitude": "lon",
"No. of responses": "nresp",
"Epicentral distance": "distance",
}
MIN_RESPONSES = 1 # minimum number of DYFI responses per grid
[docs]class DYFIModule(CoreModule):
"""
dyfi -- Search ComCat for DYFI data and turn it into a ShakeMap data file.
"""
command_name = "dyfi"
[docs] def execute(self):
"""
Write info.json metadata file.
Raises:
NotADirectoryError: When the event data directory does not exist.
FileNotFoundError: When the the shake_result HDF file does not
exist.
"""
_, data_path = get_config_paths()
datadir = os.path.join(data_path, self._eventid, "current")
if not os.path.isdir(datadir):
os.makedirs(datadir)
# try to find the event by our event id
try:
detail = get_event_by_id(self._eventid)
dataframe, msg = _get_dyfi_dataframe(detail)
except Exception as e:
fmt = 'Could not retrieve DYFI data for %s - error "%s"'
self.logger.warning(fmt % (self._eventid, str(e)))
return
if dataframe is None:
self.logger.info(msg)
return
reference = "USGS Did You Feel It? System"
xmlfile = os.path.join(datadir, "dyfi_dat.xml")
dataframe_to_xml(dataframe, xmlfile, reference)
self.logger.info("Wrote %i DYFI records to %s" % (len(dataframe), xmlfile))
def _get_dyfi_dataframe(detail_or_url, inputfile=None, min_nresp=MIN_RESPONSES):
if inputfile:
with open(inputfile, "rb") as f:
rawdata = f.read()
if "json" in inputfile:
df = _parse_geocoded_json(rawdata, min_nresp)
else:
df = _parse_geocoded_csv(rawdata, min_nresp)
if df is None:
msg = f"Could not read file {inputfile}"
else:
if isinstance(detail_or_url, str):
detail = DetailEvent(detail_or_url)
else:
detail = detail_or_url
df, msg = _parse_dyfi_detail(detail, min_nresp)
if df is None:
return None, msg
df["netid"] = "DYFI"
df["source"] = "USGS (Did You Feel It?)"
df.columns = df.columns.str.upper()
return (df, "")
def _parse_dyfi_detail(detail, min_nresp):
if not detail.hasProduct("dyfi"):
msg = f"{detail.url} has no DYFI product at this time."
dataframe = None
return (dataframe, msg)
dyfi = detail.getProducts("dyfi")[0]
# search the dyfi product, see which of the geocoded
# files (1km or 10km) it has. We're going to select the data from
# whichever of the two has more entries with >= 3 responses,
# preferring 1km if there is a tie.
df_10k = pd.DataFrame({"a": []})
df_1k = pd.DataFrame({"a": []})
# get 1km data set, if exists
if len(dyfi.getContentsMatching("dyfi_geo_1km.geojson")):
bytes_1k, _ = dyfi.getContentBytes("dyfi_geo_1km.geojson")
df_1k = _parse_geocoded_json(bytes_1k, min_nresp)
return df_1k, ""
# get 10km data set, if exists
if len(dyfi.getContentsMatching("dyfi_geo_10km.geojson")):
bytes_10k, _ = dyfi.getContentBytes("dyfi_geo_10km.geojson")
df_10k = _parse_geocoded_json(bytes_10k, min_nresp)
return None, "Only 10km dataset found, ignoring."
if not len(df):
# try to get a text file data set
if not len(dyfi.getContentsMatching("cdi_geo.txt")):
return (None, "No geocoded datasets are available for this event.")
bytes_geo, _ = dyfi.getContentBytes("cdi_geo.txt")
df = _parse_geocoded_csv(bytes_geo, min_nresp)
return None, "Only cdi_geo.txt found, ignoring."
return df, ""
def _parse_geocoded_csv(bytes_data, min_nresp):
# the dataframe we want has columns:
# 'intensity', 'distance', 'lat', 'lon', 'station', 'nresp'
# the cdi geo file has:
# Geocoded box, CDI, No. of responses, Hypocentral distance,
# Latitude, Longitude, Suspect?, City, State
# download the text file, turn it into a dataframe
text_geo = bytes_data.decode("utf-8")
if text_geo.find("502 Proxy Error"):
return pd.DataFrame([])
lines = text_geo.split("\n")
if not len(lines):
return pd.DataFrame([])
columns = lines[0].split(":")[1].split(",")
columns = [col.strip() for col in columns]
fileio = StringIO(text_geo)
df = pd.read_csv(fileio, skiprows=1, names=columns)
if "ZIP/Location" in columns:
df = df.rename(index=str, columns=OLD_DYFI_COLUMNS_REPLACE)
else:
df = df.rename(index=str, columns=DYFI_COLUMNS_REPLACE)
df = df.drop(["Suspect?", "City", "State"], axis=1)
df = df[df["nresp"] >= min_nresp]
return df
def _parse_geocoded_json(bytes_data, min_nresp):
text_data = bytes_data.decode("utf-8")
try:
jdict = json.loads(text_data)
except Exception:
return pd.DataFrame([])
if len(jdict["features"]) == 0:
return pd.DataFrame(data={})
prop_columns = list(jdict["features"][0]["properties"].keys())
columns = ["lat", "lon"] + prop_columns
arrays = [[] for col in columns]
df_dict = dict(zip(columns, arrays))
for feature in jdict["features"]:
for column in prop_columns:
if column == "name":
prop = feature["properties"][column]
prop = prop[0 : prop.find("<br>")]
else:
prop = feature["properties"][column]
df_dict[column].append(prop)
# the geojson defines a box, so let's grab the center point
lons = [c[0] for c in feature["geometry"]["coordinates"][0]]
lats = [c[1] for c in feature["geometry"]["coordinates"][0]]
clon = np.mean(lons)
clat = np.mean(lats)
df_dict["lat"].append(clat)
df_dict["lon"].append(clon)
df = pd.DataFrame(df_dict)
df = df.rename(
index=str, columns={"cdi": "intensity", "dist": "distance", "name": "station"}
)
if df is not None:
df = df[df["nresp"] >= min_nresp]
return df