Source code for shakelib.station

# stdlib imports
import sqlite3
import xml.etree.ElementTree as ET
import copy
from collections import OrderedDict
import re

# third party imports
import numpy as np

# local imports


TABLES = OrderedDict((
    ('station',
     OrderedDict((
         ('id', 'str primary key'),  # id is net.sta
         ('network', 'str'),
         ('code', 'str'),
         ('name', 'str'),
         ('lat', 'float'),
         ('lon', 'float'),
         ('elev', 'float'),
         ('vs30', 'float'),
         ('instrumented', 'int')
     ))
     ),
    ('imt',
     OrderedDict((
         ('id', 'integer primary key'),
         ('imt_type', 'str')
     ))
     ),
    ('amp',
     OrderedDict((
         ('id', 'integer primary key'),
         ('station_id', 'str'),
         ('imt_id', 'int'),
         ('original_channel', 'str'),
         ('orientation', 'str'),
         ('amp', 'float'),
         ('uncertainty', 'float'),
         ('flag', 'str')
     ))
     )
))

#
# These are the netid's that indicate MMI data
#
CIIM_TUPLE = ('dyfi', 'mmi', 'intensity', 'ciim')


[docs]class StationList(object): """ A class to facilitate reading ShakeMap formatted XML fies of peak amplitudes and MMI, and produce tables of station data. Seismic stations are considered to be 'instrumented'; MMI data is not instrumented and is indicated in the ShakeMap XML with a ``netid`` attribute of "DYFI," "MMI," "INTENSITY," or "CIIM." .. note:: Typically the user will call the class method :meth:`fromXML` to create a :class:`StationList` object the first time a set of station files are processed. (Or, as an alternative, the user can call :meth:`loadFromXML` and :meth:`fillTables` sequentially.) This will create a database at the location specified by the ``dbfile`` parameter to :meth:`fromXML`. Subsequent programs can use the default constructor to simply load ``dbfile``. """ def __init__(self, db): """ The default constructor reads a pre-built SQLite database of station data. Args: dbfile (str): A SQLite database file containing pre-processed station data. Returns: A :class:`StationList` object. """ self.db = db self.cursor = self.db.cursor() def __del__(self): """ Closes out the database when the object is destroyed. """ self.db.commit() self.cursor.close() self.db.close() # def _load_features(self, xmlfile): # each station is a feature in a geojson file # tree = ET.parse(xmlfile) # root = tree.getroot() # imt_translate = {} # for sl in root.iter('stationlist'): # for station in sl: # feature = {'type':'Feature'} # netid = station.attrib['netid'] # code = station.attrib['code'] # feature['id'] = '%s.%s' % (netid,code) # lat = code = float(station.attrib['lat']) # lon = code = float(station.attrib['lon']) # feature['geometry'] = \ # {'type':'Point','coordinates':[lon,lat]} # properties = {} # properties['name'] = station.attrib['name'] # properties['intensity_flag'] = station.attrib['']#?? # properties['distance'] = \ # float(station.attrib['distance']) # properties['location'] = station.attrib['loc'] # properties['code'] = station.attrib['code'] # properties['commType'] = station.attrib['commtype'] # properties['source'] = station.attrib['source'] # properties['network'] = station.attrib['netid'] # properties['instrumentType'] = \ # station.attrib['insttype'] # properties['intensity'] = \ # float(station.attrib['intensity']) # channels = [] # for comp in station: # channel = {'name':comp.attrib['name']} # for component in comp: # pgmdict, imtset = self._getGroundMotions( # component, imt_translate) # pass
[docs] @classmethod def loadFromSQL(cls, sql, dbfile=':memory:'): """ Create a new object from saved SQL code (see :meth:`dumpToSQL`). Args: sql (str): SQL code to create and populate the database dbfile (str): The path to a file in which the database will reside. The default is ':memory:' for an in-memory database. Returns: :class:`Stationlist` object. """ db = sqlite3.connect(dbfile) self = cls(db) self.cursor.executescript(sql) return self
[docs] def dumpToSQL(self): """ Dump the database as a string of SQL code (see :meth:`loadFromSQL`). Args: None Returns: A string of SQL sufficient to restore and repopulate the database. """ return "\n".join(list(self.db.iterdump()))
[docs] @classmethod def loadFromXML(cls, xmlfiles, dbfile=':memory:'): """ Create a StationList object by reading one or more ShakeMap XML input files. Args: xmlfiles (sequence of str): Sequence of ShakeMap XML input files to read. dbfile (str): Path to a file into which to write the SQLite database. The default is ':memory:' for an in-memory database. Returns: :class:`StationList` object """ # Create the database and tables db = sqlite3.connect(dbfile) self = cls(db) self._createTables() return self.addData(xmlfiles)
[docs] def getGeoJson(self): jdict = {'type': 'FeatureCollection', 'features': []} self.cursor.execute( 'SELECT id, network, code, name, lat, lon, elev, vs30, ' 'instrumented from station' ) sta_rows = self.cursor.fetchall() for sta in sta_rows: # print('doing station %s' % (str(sta[2]))) if str(sta[2]).startswith(sta[1] + '.'): myid = str(sta[2]) else: myid = sta[1] + '.' + str(sta[2]) feature = { 'type': 'Feature', 'id': myid, 'properties': { 'code': str(sta[2]), 'name': sta[3], 'instrumentType': 'UNK' if sta[8] is True else 'OBSERVED', 'source': sta[1], 'network': sta[1], 'commType': 'UNK', 'location': '', 'intensity': None, 'intensity_flag': '', 'pga': None, 'pgv': None, 'distance': None, 'channels': [] }, 'geometry': { 'type': 'Point', 'coordinates': [sta[5], sta[4]] } } self.cursor.execute( 'SELECT a.amp, i.imt_type, a.original_channel, a.flag ' 'FROM amp a, imt i ' 'WHERE a.station_id = "%s" ' 'AND a.imt_id = i.id' % (str(sta[0])) ) amp_rows = self.cursor.fetchall() channels = {} for amp in amp_rows: # print('doing channel %s imt %s' % (amp[2], amp[1])) if amp[2] not in channels: channels[amp[2]] = {'name': amp[2], 'amplitudes': []} if amp[0] == 'NULL': myamp = np.nan else: myamp = amp[0] if amp[1] == 'PGV': value = np.exp(myamp) units = 'cm/s' elif amp[1] == 'MMI': value = myamp units = 'intensity' else: value = np.exp(myamp) * 100 units = '%g' this_amp = {'name': amp[1].lower(), 'value': '%.4f' % value, 'units': units, 'flag': str(amp[3]) } channels[amp[2]]['amplitudes'].append(this_amp) for channel in channels.values(): feature['properties']['channels'].append(channel) jdict['features'].append(feature) return jdict
[docs] def addData(self, xmlfiles): """ Create a StationList object by reading one or more ShakeMap XML input files. Args: xmlfiles (sequence of str): Sequence of ShakeMap XML input files to read. Returns: :class:`StationList` object """ # Parse the xml into a dictionary stationdict = {} imtset = set() for xmlfile in xmlfiles: stationdict, ims = self._filter_station(xmlfile, stationdict) imtset |= ims # fill the database and create the object from it self._loadFromDict(stationdict, imtset) return self
def _loadFromDict(self, stationdict, imtset): """ Internal method to turn the station dictionary created from the ShakeMap XML input files into a SQLite database. Args: stationdictlist (list of stationdicts): A list of station dictionaries returned by _filter_station(). dbfile (string): The path to which the SQLite database will be written. Returns: :class:`StationList` object """ # # Get the current IMTs and their IDs and add any new ones # imts_in_db = self.getIMTtypes() new_imts = imtset - imts_in_db if any(new_imts): self.cursor.executemany('INSERT INTO imt (imt_type) VALUES (?)', zip(new_imts)) self.db.commit() # Now get the updated list of IMTs and their IDs query = 'SELECT imt_type, id FROM imt' self.cursor.execute(query) imt_hash = dict(self.cursor.fetchall()) # # Get the station codes for all the stations in the db # query = 'SELECT id FROM station' self.cursor.execute(query) sta_set = set([z[0] for z in self.cursor.fetchall()]) # # Insert any new stations into the station table # station_rows = [] for sta_id, station_tpl in stationdict.items(): if sta_id in sta_set: continue else: sta_set.add(sta_id) station_attributes, comp_dict = station_tpl lat = station_attributes['lat'] lon = station_attributes['lon'] network = station_attributes['netid'] code = station_attributes['code'] if 'name' in station_attributes: name = station_attributes['name'] else: name = None if 'elev' in station_attributes: elev = station_attributes['elev'] else: elev = None if 'vs30' in station_attributes: vs30 = station_attributes['vs30'] else: vs30 = None instrumented = int(network.lower() not in CIIM_TUPLE) station_rows.append((sta_id, network, code, name, lat, lon, elev, vs30, instrumented)) self.cursor.executemany( 'INSERT INTO station (id, network, code, name, lat, lon, ' 'elev, vs30, instrumented) VALUES ' '(?, ?, ?, ?, ?, ?, ?, ?, ?)', station_rows ) self.db.commit() # # Now add the amps, first get the current set so we don't add # any duplicates; a unique amp will be (station_id, imt_id, # original_channel) # query = 'SELECT station_id, imt_id, original_channel FROM amp' self.cursor.execute(query) amp_rows = self.cursor.fetchall() # Create a unique identifier for each amp so we don't repeat any amp_set = set([str(v[0]) + '.' + str(v[1]) + '.' + str(v[2]) for v in amp_rows]) # # Insert the amps for each component # amp_rows = [] for sta_id, station_tpl in stationdict.items(): station_attributes, comp_dict = station_tpl instrumented = int(station_attributes['netid'].lower() not in CIIM_TUPLE) for original_channel, pgm_dict in comp_dict.items(): orientation = self._getOrientation(original_channel) for imt_type, imt_dict in pgm_dict.items(): if (instrumented == 0) and (imt_type != 'MMI'): continue imtid = imt_hash[imt_type] amp_id = str(sta_id) + '.' + str(imtid) + '.' + \ str(original_channel) if amp_id in amp_set: continue else: amp_set.add(amp_id) amp = imt_dict['value'] flag = imt_dict['flag'] if np.isnan(amp) or (amp <= 0): amp = 'NULL' flag = 'G' elif imt_type == 'MMI': pass elif imt_type == 'PGV': amp = np.log(amp) else: amp = np.log(amp / 100.0) amp_rows.append((sta_id, imtid, original_channel, orientation, amp, flag)) self.cursor.executemany( 'INSERT INTO amp (station_id, imt_id, original_channel, ' 'orientation, amp, flag) VALUES (?, ?, ?, ?, ?, ?)', amp_rows ) self.db.commit() return
[docs] def getIMTtypes(self): """ Return a set of IMT types found in the database Args: None Returns: A set of IMT types """ self.cursor.execute('SELECT imt_type FROM imt') return set([z[0] for z in self.cursor.fetchall()])
[docs] def getStationDictionary(self, instrumented=True): """ Return a dictionary of the instrumented or non-instrumented stations. The keys describe the parameter, the values are Numpy arrays of the parameter in question. For the standard set of ShakeMap IMTs (mmi, pga, pgv, psa03, psa10, psa30), the keys in the dictionary would be: 'id', 'network', 'code', 'name', 'lat', 'lon', 'elev', 'vs30', 'instrumented', 'PGA', 'PGV', 'SA(0.3)', 'SA(1.0)', 'SA(3.0)' For the non-instrumented dictionary, the keys would be: 'id', 'network', 'code', 'name', 'lat', 'lon', 'elev', 'vs30', 'instrumented', 'MMI' The **id** column is **network** and **code** concatenated with a period (".") between them. All ground motion units are natural log units. Distances are in km. Args: instrumented (bool): Set to True if the dictionary is to contain the instrumented stations, or to False if the dictionary is to contain the non-instrumented (MMI) stations. Returns: dict: A dictionary of Numpy arrays. """ columns = list(TABLES['station'].keys()) dstr = ', '.join(columns) self.cursor.execute( 'SELECT %s FROM station where instrumented = %d' % (dstr, instrumented) ) station_rows = self.cursor.fetchall() nstation_rows = len(station_rows) if not nstation_rows: return None station_columns = list(zip(*station_rows)) df = OrderedDict() for ic, col in enumerate(columns): df[col] = np.array(station_columns[ic]) for imt in self.getIMTtypes(): if (instrumented and 'MMI' in imt) or \ (not instrumented and 'MMI' not in imt): continue df[imt] = np.full(nstation_rows, np.nan) id_dict = dict(zip(df['id'], range(nstation_rows))) # # Get all of the unflagged amps with the proper orientation # self.cursor.execute( 'SELECT a.amp, i.imt_type, a.station_id FROM ' 'amp a, station s, imt i WHERE a.flag = "0" ' 'AND s.id = a.station_id ' 'AND a.imt_id = i.id ' 'AND s.instrumented = %d AND a.orientation NOT IN ("Z", "U") ' 'AND a.amp IS NOT NULL' % (instrumented) ) amp_rows = self.cursor.fetchall() # # Go through all the amps and put them into the data frame # for this_row in amp_rows: # # Set the cell to the peak amp # rowidx = id_dict[this_row[2]] cval = df[this_row[1]][rowidx] amp = this_row[0] if np.isnan(cval) or (cval < amp): df[this_row[1]][rowidx] = amp return df
@staticmethod def _getOrientation(orig_channel): """ Return a character representing the orientation of a channel. Args: orig_channel (string): String representing the seed channel (e.g. 'HNZ'). The final character is assumed to be the (uppercase) orientation. Returns: Character representing the channel orientation. One of 'N', 'E', 'Z', 'H' (for horizontal), or 'U' (for unknown). """ if orig_channel == 'mmi': orientation = 'H' # mmi is arbitrarily horizontal elif orig_channel[-1] in ('N', 'E', 'Z'): orientation = orig_channel[-1] elif orig_channel[-1] == "K": # Channel is "UNK"; assume horizontal orientation = 'H' else: orientation = 'U' # this is unknown return orientation @staticmethod def _getGroundMotions(comp, imt_translate): """ Get a dictionary of peak ground motions (values and flags). Output keys are one of: [pga,pgv,psa03,psa10,psa30] Even if flags are not specified in the input, they will be guaranteed to at least have a flag of '0'. """ pgmdict = {} imtset = set() for pgm in comp: if pgm.tag == '#text': continue key = pgm.tag if key not in imt_translate: if key in ('acc', 'pga'): new_key = 'PGA' elif key in ('vel', 'pgv'): new_key = 'PGV' elif 'mmi' in key: new_key = 'MMI' elif 'psa' in key: pp = get_imt_period(key) new_key = 'SA(' + str(pp) + ')' else: raise ValueError('Unknown amp type in input: %s' % key) imt_translate[key] = new_key else: new_key = imt_translate[key] key = new_key if 'value' in pgm.attrib: try: value = float(pgm.attrib['value']) except ValueError: print('Unknown value in XML: ', pgm.attrib['value'], ' for amp ', pgm.tag) continue else: print('No value for amp ', pgm.tag) continue if 'flag' in pgm.attrib and pgm.attrib['flag'] != '': flag = pgm.attrib['flag'] else: flag = '0' pgmdict[key] = {'value': value, 'flag': flag} imtset.add(key) return pgmdict, imtset def _filter_station(self, xmlfile, stationdict): """ Filter individual xmlfile into a stationdict data structure. Args: xmlfile (string): Path to ShakeMap XML input file (or file-like object) containing station data. Returns: stationdict data structure """ imt_translate = {} imtset = set() tree = ET.parse(xmlfile) root = tree.getroot() for sl in root.iter('stationlist'): for station in sl: if station.tag != 'station': continue # look at the station attributes to figure out if this is a # DYFI-type station or a station with instruments measuring # PGA, PGV, etc. attributes = station.attrib.copy() netid = attributes['netid'] instrumented = int(netid.lower() not in CIIM_TUPLE) code = attributes['code'] if code.startswith(netid + '.'): sta_id = code code = code.replace(netid + '.', '') else: sta_id = netid + '.' + code if sta_id in stationdict: compdict = stationdict[sta_id][1] else: compdict = {} for comp in station: compname = comp.attrib['name'] if 'Intensity Questionnaire' in str(compname): compdict['mmi'] = {} continue tpgmdict, ims = self._getGroundMotions(comp, imt_translate) if compname in compdict: pgmdict = compdict[compname] else: pgmdict = {} pgmdict.update(tpgmdict) # copy the VALUES, not REFERENCES, of the component list # into our growing dictionary compdict[compname] = copy.copy(pgmdict) imtset |= ims if ('intensity' in attributes) and (instrumented == 0): if 'mmi' not in compdict: compdict['mmi'] = {} compdict['mmi']['MMI'] = \ {'value': float(attributes['intensity']), 'flag': '0'} imtset.add('MMI') stationdict[sta_id] = (attributes, copy.copy(compdict)) return stationdict, imtset def _createTables(self): """ Build the database tables. """ for table in TABLES.keys(): sql = 'CREATE TABLE %s (' % table nuggets = [] for column, ctype in TABLES[table].items(): nuggets.append('%s %s' % (column, ctype)) sql += ','.join(nuggets) + ')' self.cursor.execute(sql) self.db.commit() return
[docs]def get_imt_period(imt): p = re.search('(?<=psa)\d+', imt) return float(p.group(0)[:-1] + '.' + p.group(0)[-1])