Source code for shakemap.utils.amps

# import sys
import sqlite3
import os.path
import re
from xml.dom import minidom
from datetime import datetime, timezone, timedelta
from collections import OrderedDict
import time
import defusedxml.cElementTree as dET
import xml.etree.cElementTree as ET
import json
from itertools import zip_longest

# third party libraries
import numpy as np
from openquake.hazardlib.geo.geodetic import geodetic_distance
from impactutils.rupture import constants

# local libraries

# define all of the tables as dictionaries
EVENT = OrderedDict(
    [
        ("id", "INTEGER PRIMARY KEY"),
        ("eventid", "TEXT UNIQUE"),
        ("netid", "TEXT"),
        ("network", "TEXT"),
        ("time", "INTEGER"),
        ("lat", "REAL"),
        ("lon", "REAL"),
        ("depth", "REAL"),
        ("magnitude", "REAL"),
        ("locstring", "TEXT"),
        ("repeats", "TEXT"),
        ("lastrun", "INTEGER"),
    ]
)

STATION = OrderedDict(
    [
        ("id", "INTEGER PRIMARY KEY"),
        ("timestamp", "INTEGER"),
        ("lat", "REAL"),
        ("lon", "REAL"),
        ("network", "TEXT"),
        ("name", "TEXT"),
        ("code", "TEXT"),
    ]
)

CHANNEL = OrderedDict(
    [
        ("id", "INTEGER PRIMARY KEY"),
        ("station_id", "INTEGER REFERENCES station(id) ON DELETE CASCADE"),
        ("channel", "TEXT"),
        ("loc", "TEXT"),
    ]
)

PGM = OrderedDict(
    [
        ("id", "INTEGER PRIMARY KEY"),
        ("channel_id", "INTEGER REFERENCES channel(id) ON DELETE CASCADE"),
        ("imt", "TEXT"),
        ("value", "REAL"),
    ]
)

TABLES = {"event": EVENT, "station": STATION, "channel": CHANNEL, "pgm": PGM}

# database file name
DBFILE = "amps.db"

IMTS = ["acc", "vel", "sa", "pga", "pgv"]
# sometimes (sigh) pga/pgv labeled as acc/vel
IMTDICT = {"acc": "pga", "vel": "pgv"}

# association algorithm - any peak with:
# time > origin - TMIN and time < origin + TMAX
# AND
# distance < DISTANCE
TMIN = 60
TMAX = 180
DISTANCE = 500

# SQLite has a limit (999) on the number of variables in
# a query; we set our threshold somewhat lower than that for
# safety.
MAX_VARS = 200


[docs]class AmplitudeHandler(object): """Store and associate strong motion peak amplitudes with earthquake events. """ def __init__(self, install_path, data_path): """Instantiate amplitude handler with ShakeMap profile paths.""" self._data_path = data_path self._dbfile = os.path.join(install_path, "data", DBFILE) db_exists = os.path.isfile(self._dbfile) self._connect() if not db_exists: for table, tdict in TABLES.items(): createcmd = f"CREATE TABLE {table} (" nuggets = [] for column, ctype in tdict.items(): nuggets.append(f"{column} {ctype}") createcmd += ",".join(nuggets) + ")" self._cursor.execute(createcmd) self._cursor.execute("CREATE INDEX station_index ON " "channel(station_id)") self._cursor.execute("CREATE INDEX channel_index ON " "pgm(channel_id)") self._cursor.execute("CREATE INDEX eventid_index ON " "event(eventid)") self._cursor.execute("CREATE INDEX stacode_index ON " "station(code)") self._cursor.execute("CREATE INDEX stanet_index ON " "station(network)") self._cursor.execute("PRAGMA journal_mode = WAL") def _connect(self): self._connection = sqlite3.connect(self._dbfile, timeout=15) if self._connection is None: raise RuntimeError(f"Could not connect to {self._dbfile}") self._connection.isolation_level = "EXCLUSIVE" self._cursor = self._connection.cursor() self._cursor.execute("PRAGMA foreign_keys = ON") self._cursor.execute("PRAGMA journal_mode = WAL") def _disconnect(self): self.commit() self._cursor.close() self._connection.close() self._connection = None self._cursor = None
[docs] def commit(self): """Commit any operations to the database.""" self._connection.commit()
[docs] def insertEvent(self, event, update=False): """Insert an event into the database. A directory with name of event['id'] should exist in data_path. Args: event (dict): Dictionary containing fields: - id: Event ID (i.e., us2008abcd). - netid: Network code (i.e., us). - network: Network name (i.e., "USGS Network"). - time: Origin time in UTC (datetime). - lat: Origin latitude (dd). - lon: Origin longitude (dd). - depth: Origin depth (km). - mag: Earthquake magnitude. - locstring: Location string (i.e. '2 mi SE of Reno') - repeats: A list of repeat times (optional) - lastrun: Timestamp of the last run of the event. (optional) update (bool): Update an existing event with new info (True) or insert a new event (False) Returns: nothing: Nothing. """ cols = [x for x in EVENT.keys() if x != "id"] if update: # This makes a string like 'eventid = ?, netid = ?, ...' einsert = ( "UPDATE event SET " + ", ".join( [" = ".join(x) for x in zip_longest(cols, [], fillvalue="?")] ) + ' WHERE eventid = "' + str(event["id"]) + '"' ) else: einsert = ( "INSERT INTO event (" + ", ".join(cols) + ") VALUES (" + ", ".join("?" * len(cols)) + ")" ) if "network" in event: network = event["network"] else: network = "" if "repeats" in event and event["repeats"] and len(event["repeats"]) > 0: repeats = json.dumps(event["repeats"]) else: repeats = None if "lastrun" in event: lastrun = event["lastrun"] else: lastrun = int(time.time()) self._cursor.execute( einsert, ( event["id"], event["netid"], network, timestr_to_timestamp(event["time"]), event["lat"], event["lon"], event["depth"], event["mag"], event["locstring"], repeats, lastrun, ), ) self.commit() return
[docs] def getEvent(self, eventid): """Return the event parameters for the specified event. Args: eventid (str): The id of the event to query Returns: dictionary: A dictionary of the columns of the table and their values for the the event; a None is returned if the event is not in the database. """ query = "SELECT * FROM event WHERE eventid = ?" self._cursor.execute(query, (eventid,)) row = self._cursor.fetchone() if row is None: return None cols = [col[0] for col in self._cursor.description] event = dict(zip(cols, row)) # # Deal with differences between the database column names # and the event keys # event["id"] = event["eventid"] del event["eventid"] event["mag"] = event["magnitude"] del event["magnitude"] event["time"] = datetime.fromtimestamp(event["time"], timezone.utc).strftime( constants.TIMEFMT ) if event["repeats"]: event["repeats"] = json.loads(event["repeats"]) return event
[docs] def deleteEvent(self, eventid): """Delete the event from the database. Args: eventid (str): The id of the event to delete Returns: nothing: Nothing. """ query = "DELETE FROM event WHERE eventid = ?" self._cursor.execute(query, (eventid,)) self.commit() return
[docs] def getRepeats(self): """Return all the rows from the event table where the 'repeats' column is not NULL. Args: none Returns: (list): List of tuples of (eventid, origin_time, [repeats]). """ query = "SELECT eventid, time, repeats FROM event WHERE " "repeats IS NOT NULL" self._cursor.execute(query) repeats = self._cursor.fetchall() replist = [] for repeat in repeats: rep = list(repeat) rep[2] = json.loads(rep[2]) replist.append(rep) return replist
[docs] def associateAll(self, pretty_print=False): """Associate peak ground motions with appropriate events, write station XML to file system. Ground motion records associated with events will be deleted from the database. Args: pretty_print (bool): Writes more human-readable XML, but is slower and writes larger files. False by default. Returns: list: The event IDs of the events for which associated data were found. """ equery = "SELECT eventid, time, lat, lon FROM event" self._cursor.execute(equery) events = self._cursor.fetchall() associated = [] for event in events: eventid = event[0] eqtime = event[1] lat = event[2] lon = event[3] data_list = self.associate(eqtime, lat, lon) if len(data_list) == 0: continue self.writeXML(data_list, eventid, pretty_print) associated.append(eventid) return associated
[docs] def associateOne(self, eventid, pretty_print=False): """Associate peak ground motions with the specified event, write station XML to file system. Ground motion records associated with events will be deleted from the database. Args: eventid (str): The event ID of the event to associate pretty_print (bool): Writes more human-readable XML, but is slower and writes larger files. False by default. Returns: int: The number of amps associated with the specified event. -1 is returned if the event is not found in the database. """ equery = "SELECT time, lat, lon FROM event where eventid = ?" self._cursor.execute(equery, (eventid,)) event = self._cursor.fetchone() if event is None: return -1 data_list = self.associate(event[0], event[1], event[2]) namps = len(data_list) if namps == 0: return 0 self.writeXML(data_list, eventid, pretty_print) return namps
[docs] def associate(self, eqtime, eqlat, eqlon): """Find peak ground motion records associated with input event info. Ground motion records associated with input event are deleted from the database. Note that in the case of duplicate stations, the amps from only one will be used, any others will be deleted from the database. Args: eqtime (int): Unix timestamp of earthquake origin. eqlat (float): Latitude of earthquake origin. eqlon (float): Longitude of earthquake origin. Returns: list: A list of amps associated with the event. Each row in the list has the following columns: - code: Station code - channel: Channel (HHE, HHN, etc.) - imt: Intensity measure type (pga, pgv, etc.) - value: IMT value. - lat: Station latitude. - lon: Station longitude. - netid: Station contributing network. - name: String describing station name. - distance: Distance (km) from station to origin. - flag: Value will be 0. - loccode: The location code of the instrument. """ self._cursor.execute("BEGIN EXCLUSIVE") time_query = ( "SELECT id, timestamp, lat, lon, code, network " "FROM station WHERE timestamp > ? AND timestamp < ? " ) self._cursor.execute(time_query, ((eqtime - TMIN), (eqtime + TMAX))) # numpy array of id, timestamp, lat, lon eqdata = np.array(self._cursor.fetchall()) if not len(eqdata): self.commit() return [] dist = geodetic_distance( eqlon, eqlat, eqdata[:, 3].astype(float), eqdata[:, 2].astype(float) ) inear = np.where(dist < DISTANCE)[0] eqdata = eqdata[inear] dist = dist[inear] stadict = {} junk_sids = [] for idx, row in enumerate(eqdata): sid, timestamp, code, network = [row[x] for x in (0, 1, 4, 5)] timestamp = int(timestamp) if network not in stadict: stadict[network] = { code: {"sid": sid, "timestamp": timestamp, "distance": dist[idx]} } continue elif code not in stadict[network]: stadict[network][code] = { "sid": sid, "timestamp": timestamp, "distance": dist[idx], } continue traveltime = dist[idx] / 4.2 new_dt = abs(abs(eqtime - timestamp) - traveltime) old_dt = abs(abs(eqtime - stadict[network][code]["timestamp"]) - traveltime) if old_dt < new_dt: junk_sids.append(sid) continue junk_sids.append(stadict[network][code]["sid"]) stadict[network][code] = { "sid": sid, "timestamp": timestamp, "distance": dist[idx], } sta_sids = [] for netd in stadict.values(): for coded in netd.values(): sta_sids.append(coded["sid"]) if not len(sta_sids): self.commit() return [] amp_query = ( "SELECT s.network, s.name, s.code, s.lat, s.lon, " "c.channel, c.loc, p.imt, p.value FROM station s, " "channel c, pgm p WHERE s.id IN %s AND " "c.station_id = s.id AND p.channel_id = c.id " "ORDER BY s.network, s.code, c.channel, p.imt" ) delete_query = "DELETE FROM station where id in %s" # data_list will hold the rows of the dataframe nstas = len(sta_sids) data_list = [] start = 0 while start < nstas: end = start + MAX_VARS if end > nstas: end = nstas varstr = f"({', '.join('?' for _ in sta_sids[start:end])})" query = amp_query % varstr self._cursor.execute(query, sta_sids[start:end]) amprows = self._cursor.fetchall() for row in amprows: # data_row = (code, channel_name, imt, value, lat, lon, # network, name, distance, flag, loccode) data_row = ( row[2], row[5], row[7], row[8], row[3], row[4], row[0], row[1], stadict[row[0]][row[2]]["distance"], 0, row[6], ) data_list.append(data_row) # Delete the stations now, since we have them queued up self._cursor.execute(delete_query % varstr, sta_sids[start:end]) start = end # clean up rows that have been associated but didn't make the cut start = 0 njunk = len(junk_sids) while start < njunk: end = start + MAX_VARS if end > njunk: end = njunk varstr = f"({', '.join('?' for _ in junk_sids[start:end])})" self._cursor.execute(delete_query % varstr, junk_sids[start:end]) start = end self.commit() return data_list
[docs] def writeXML(self, data_list, eventid, pretty_print=False): """Write the list of tuples as an XML file in the event's current directory. Args: data_list (list): A list of tuples with the following elements: - station code - channel - imt - imt value - station latitude - station longitude - station's network id - station's name string - distance from station to origin - imt flag - channel's location code eventid (str): The event ID of the event associated with the data. pretty_print (bool): Whether or not to write the XML in a more human-readable form. If True, the file will be somewhat larger and writing will be somewhat slower. Returns: nothing: Nothing. """ root = ET.Element("shakemap-data", code_version="4.0") create_time = int(time.time()) stationlist = ET.SubElement(root, "stationlist", created="%i" % create_time) oldnet = None oldcode = None oldchan = None oldloc = None for row in data_list: code, chan, imt, value, lat, lon, net, name, dist, flag, loc = row if net != oldnet or code != oldcode: if not code.startswith(net + "."): stacode = net + "." + code else: stacode = code station = ET.SubElement( stationlist, "station", code=stacode, name=name, insttype="", lat=f"{lat:.4f}", lon=f"{lon:.4f}", dist=f"{dist:.4f}", netid=net, commtype="DIG", loc="", ) oldnet = net oldcode = code oldchan = None oldloc = None if chan != oldchan or loc != oldloc: if not chan.startswith(loc + "."): comp = loc + "." + chan else: comp = chan component = ET.SubElement(station, "comp", name=comp) oldchan = chan oldloc = loc ET.SubElement(component, imt, value=f"{value:.6f}", flag=f"{str(flag)}") data_folder = os.path.join(self._data_path, eventid, "current") if not os.path.isdir(data_folder): os.makedirs(data_folder) amptime = datetime.utcnow().strftime("%Y%m%d%H%M%S") xmlfile = os.path.join(data_folder, f"unassoc_{amptime}_dat.xml") if pretty_print: pstring = prettify(root) with open(xmlfile, "w") as fd: fd.write(pstring) else: tree = ET.ElementTree(root) tree.write(xmlfile, encoding="utf-8", xml_declaration=True) return
def __del__(self): """Destructor.""" if hasattr(self, "_connection") and self._connection is not None: self._disconnect()
[docs] def insertAmps(self, xmlfile): """Insert data from amps file into database. Args: xmlfile (str): XML file containing peak ground motion data. """ _, fname = os.path.split(xmlfile) try: xmlstr = open(xmlfile, "r").read() # sometimes these records have non-ascii bytes in them newxmlstr = re.sub(r"[^\x00-\x7F]+", " ", xmlstr) # newxmlstr = _invalid_xml_remove(xmlstr) newxmlstr = newxmlstr.encode("utf-8", errors="xmlcharrefreplace") amps = dET.fromstring(newxmlstr) except Exception as e: raise Exception(f'Could not parse {xmlfile}, due to error "{str(e)}"') if amps.tag != "amplitudes": raise Exception(f"{xmlfile} does not appear to be an amplitude XML file.") agency = amps.get("agency") record = amps.find("record") timing = record.find("timing") reference = timing.find("reference") has_pgm = False time_dict = {} for child in reference.iter(): node_name = child.tag if node_name == "PGMTime": has_pgm = True elif node_name == "year": time_dict["year"] = int(child.get("value")) elif node_name == "month": time_dict["month"] = int(child.get("value")) elif node_name == "day": time_dict["day"] = int(child.get("value")) elif node_name == "hour": time_dict["hour"] = int(child.get("value")) elif node_name == "minute": time_dict["minute"] = int(child.get("value")) elif node_name == "second": time_dict["second"] = int(child.get("value")) elif node_name == "msec": time_dict["msec"] = int(child.get("value")) if has_pgm: pgmtime_str = reference.find("PGMTime").text try: tfmt = constants.TIMEFMT.replace("Z", "") pgmdate = datetime.strptime(pgmtime_str[0:19], tfmt).replace( tzinfo=timezone.utc ) except ValueError: tfmt = constants.ALT_TIMEFMT.replace("Z", "") pgmdate = datetime.strptime(pgmtime_str[0:19], tfmt).replace( tzinfo=timezone.utc ) pgmtime = int(dt_to_timestamp(pgmdate)) else: if not len(time_dict): print(f"No time data for file {fname}") return pgmdate = datetime( time_dict["year"], time_dict["month"], time_dict["day"], time_dict["hour"], time_dict["minute"], time_dict["second"], ) pgmtime = dt_to_timestamp(pgmdate) # there are often multiple stations per file, but they're # all duplicates of each other, so just grab the information # from the first one station = record.find("station") attrib = dict(station.items()) lat = float(attrib["lat"]) lon = float(attrib["lon"]) code = attrib["code"] name = attrib["name"] if "net" in attrib: network = attrib["net"] elif "netid" in attrib: network = attrib["netid"] else: network = agency # # The station (at this pgmtime +/- 10 seconds) might already exist # in the DB; if it does, use it # self._cursor.execute("BEGIN EXCLUSIVE") query = ( "SELECT id, timestamp FROM station where network = ? and " "code = ? and timestamp > ? and timestamp < ?" ) self._cursor.execute(query, (network, code, pgmtime - 10, pgmtime + 10)) # # It's possible that the query returned more than one station; pick # the one closest to the new station's pgmtime # rows = self._cursor.fetchall() best_sid = None best_time = None for row in rows: dtime = abs(row[1] - pgmtime) if best_time is None or dtime < best_time: best_time = dtime best_sid = row[0] inserted_station = False if best_sid is None: fmt = ( "INSERT INTO station " "(timestamp, lat, lon, name, code, network) " "VALUES (?, ?, ?, ?, ?, ?)" ) self._cursor.execute(fmt, (pgmtime, lat, lon, name, code, network)) best_sid = self._cursor.lastrowid inserted_station = True # # If the station is already there, it has at least one channel, too # existing_channels = {} if inserted_station is False: chan_query = "SELECT channel, id FROM channel where station_id = ?" self._cursor.execute(chan_query, (best_sid,)) rows = self._cursor.fetchall() existing_channels = dict(rows) # might need these insert_channel = ( "INSERT INTO channel " "(station_id, channel, loc)" "VALUES (?, ?, ?)" ) insert_pgm = "INSERT INTO pgm " "(channel_id, imt, value)" "VALUES (?, ?, ?)" # loop over components channels_inserted = 0 for channel in record.iter("component"): # We don't want channels with qual > 4 (assuming qual is Cosmos # table 6 value) qual = channel.get("qual") if qual: try: iqual = int(qual) except ValueError: # qual is something we don't understand iqual = 0 else: iqual = 0 if iqual > 4: continue loc = channel.get("loc") if not loc: loc = "--" cname = channel.get("name") if cname in existing_channels: best_cid = existing_channels[cname] inserted_channel = False else: self._cursor.execute(insert_channel, (best_sid, cname, loc)) best_cid = self._cursor.lastrowid inserted_channel = True channels_inserted += 1 # # Similarly, if the channel is already there, we don't want to # insert repeated IMTs (and updating them doesn't make a lot of # sense) # existing_pgms = {} if inserted_channel is False: pgm_query = "SELECT imt, id FROM pgm where channel_id = ?" self._cursor.execute(pgm_query, (best_cid,)) rows = self._cursor.fetchall() existing_pgms = dict(rows) # loop over imts in channel pgm_list = [] for pgm in list(channel): imt = pgm.tag if imt not in IMTS: continue try: value = float(pgm.get("value")) except ValueError: # # Couldn't interpret the value for some reason # continue if imt == "sa": imt = "p" + imt + pgm.get("period").replace(".", "") value = value / 9.81 if imt in IMTDICT: imt = IMTDICT[imt] if imt == "pga": value = value / 9.81 if imt in existing_pgms: continue pgm_list.append((best_cid, imt, value)) if len(pgm_list) > 0: # # Insert the new amps # self._cursor.executemany(insert_pgm, pgm_list) elif inserted_channel: # # If we didn't insert any amps, but we inserted the channel, # delete the channel # channel_delete = "DELETE FROM channel WHERE id = ?" self._cursor.execute(channel_delete, (best_cid,)) channels_inserted -= 1 # End of pgm loop # End of channel loop # # If we inserted the station but no channels, delete the station # if channels_inserted == 0 and inserted_station: station_delete = "DELETE FROM station WHERE id = ?" self._cursor.execute(station_delete, (best_sid,)) self.commit() return
[docs] def cleanAmps(self, threshold=30): """Clean out amplitude data that is older than the threshold number of days. Args: threshold (int): Maximum age in days of amplitude data in the database. Returns: int: Number of stations deleted. """ thresh_date = dt_to_timestamp(datetime.utcnow() - timedelta(days=threshold)) squery = "DELETE FROM station WHERE timestamp < ?" self._cursor.execute(squery, [thresh_date]) nrows = self._cursor.rowcount self.commit() return nrows
[docs] def cleanEvents(self, threshold=365): """Clean out event data that is older than the threshold number of days. Args: threshold (int): Maximum age in days of events in the database. Returns: int: Number of events deleted. """ thresh_date = dt_to_timestamp(datetime.utcnow() - timedelta(days=threshold)) equery = "DELETE FROM event WHERE time < %i" % thresh_date self._cursor.execute(equery) nevents = self._cursor.rowcount self.commit() return nevents
[docs] def getStats(self): """Get summary statistics about the database. Returns: dict: Fields: - events Number of events in database. - stations Number of stations in database. - channels Number of unique channels in database. - pgms Number of unique pgms in database. - event_min: Datetime of earliest event in database. - event_max: Datetime of most recent event in database. - station_min: Datetime of earliest amplitude data in database. - station_max: Datetime of most recent amplitude data in database. """ results = {} # event stuff equery = "SELECT count(*), min(time), max(time) FROM event" self._cursor.execute(equery) row = self._cursor.fetchone() results["events"] = row[0] if row[0] == 0: results["event_min"] = None results["event_max"] = None else: results["event_min"] = datetime.fromtimestamp(row[1], timezone.utc) results["event_max"] = datetime.fromtimestamp(row[2], timezone.utc) # station stuff squery = "SELECT count(*), min(timestamp), max(timestamp) FROM station" self._cursor.execute(squery) row = self._cursor.fetchone() results["stations"] = row[0] if row[0] == 0: results["station_min"] = None results["station_max"] = None else: results["station_min"] = datetime.fromtimestamp(row[1], timezone.utc) results["station_max"] = datetime.fromtimestamp(row[2], timezone.utc) # channels cquery = "SELECT count(*) FROM channel" self._cursor.execute(cquery) row = self._cursor.fetchone() results["channels"] = row[0] # pgms pquery = "SELECT count(*) FROM pgm" self._cursor.execute(pquery) row = self._cursor.fetchone() results["pgms"] = row[0] return results
[docs]def dt_to_timestamp(dt): timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp()) return timestamp
[docs]def timestr_to_timestamp(timestr): try: timestamp = int( datetime.strptime(timestr, constants.TIMEFMT) .replace(tzinfo=timezone.utc) .timestamp() ) except ValueError: timestamp = int( datetime.strptime(timestr, constants.ALT_TIMEFMT) .replace(tzinfo=timezone.utc) .timestamp() ) return timestamp
# def _invalid_xml_remove(c): # http://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python # noqa # illegal_unichrs = [(0x00, 0x08), (0x0B, 0x1F), (0x7F, 0x84), (0x86, 0x9F), # (0xD800, 0xDFFF), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF), # (0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), # (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), # (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), # (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), # (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), # (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), # (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), # (0xFFFFE, 0xFFFFF), # (0x10FFFE, 0x10FFFF)] # # illegal_ranges = ["%s-%s" % (chr(low), chr(high)) # for (low, high) in illegal_unichrs # if low < sys.maxunicode] # # illegal_xml_re = re.compile(u'[%s]' % u''.join(illegal_ranges)) # if illegal_xml_re.search(c) is not None: # # Replace with space # return ' ' # else: # return c
[docs]def prettify(elem): """Return a pretty-printed XML string.""" rough_string = ET.tostring(elem, "utf-8") reparsed = minidom.parseString(rough_string) return reparsed.toprettyxml(indent=" ")