Source code for shakemap.coremods.transfer_base

# stdlib imports
import argparse
import inspect
import os.path
from datetime import datetime
import sys
import logging
import glob
import shutil
import re

# third party imports
from configobj import ConfigObj
from validate import Validator
from impactutils.io.smcontainers import ShakeMapOutputContainer
from impactutils.rupture import constants

# local imports
from .base import CoreModule
from shakemap.utils.config import get_config_paths, get_data_path, config_error

NO_TRANSFER = "NO_TRANSFER"
SAVE_FILE = ".saved"


[docs]class TransferBaseModule(CoreModule): """ Base class for transfer modules. """ def __init__(self, eventid, cancel=False): """ Instantiate a CoreModule class with an event ID. """ self._eventid = eventid
[docs] def execute(self): install_path, data_path = get_config_paths() self.datadir = os.path.join(data_path, self._eventid, "current") if not os.path.isdir(self.datadir): raise NotADirectoryError(f"{self.datadir} is not a valid directory.") # look for the presence of a NO_TRANSFER file in the datadir. notransfer = os.path.join(self.datadir, NO_TRANSFER) if os.path.isfile(notransfer): self.logger.info(f"Event has a {NO_TRANSFER} file blocking transfer.") return # get the path to the transfer.conf spec file configspec = os.path.join(get_data_path(), "transferspec.conf") # look for an event specific transfer.conf file transfer_conf = os.path.join(self.datadir, "transfer.conf") if not os.path.isfile(transfer_conf): # if not there, use the system one transfer_conf = os.path.join(install_path, "config", "transfer.conf") if not os.path.isfile(transfer_conf): raise FileNotFoundError(f"{transfer_conf} does not exist.") # get the config information for transfer self.config = ConfigObj(transfer_conf, configspec=configspec) results = self.config.validate(Validator()) if not isinstance(results, bool) or not results: config_error(self.config, results) # get the output container with all the things in it products_dir = os.path.join(self.datadir, "products") datafile = os.path.join(products_dir, "shake_result.hdf") if not os.path.isfile(datafile): raise FileNotFoundError(f"{datafile} does not exist.") # Open the ShakeMapOutputContainer and extract the data container = ShakeMapOutputContainer.load(datafile) # extract the info.json object from the container self.info = container.getMetadata() container.close() # check for the presence of a .saved file. If found, do nothing. # Otherwise, create the backup directory. save_file = os.path.join(self.datadir, SAVE_FILE) if not os.path.isfile(save_file): logging.info("Making backup directory...") self._make_backup(data_path) with open(save_file, "wt") as f: tnow = datetime.utcnow().strftime(constants.TIMEFMT) f.write(f"Saved {tnow} by {self.command_name}\n")
[docs] def getProperties(self, info, props=None): properties = {} product_properties = {} # origin info origin = info["input"]["event_information"] properties["eventsource"] = origin["netid"] # The netid could be a valid part of the eventsourcecode, so we have to # check here if it ***starts with*** the netid # This fix should already be done by the time we get here, but this # is just an insurance check if origin["eventsourcecode"].startswith(origin["netid"]): eventsourcecode = origin["eventsourcecode"].replace(origin["netid"], "", 1) else: eventsourcecode = origin["eventsourcecode"] properties["eventsourcecode"] = eventsourcecode properties["code"] = origin["productcode"] properties["source"] = origin["productsource"] properties["type"] = origin["producttype"] properties["magnitude"] = float(origin["magnitude"]) properties["latitude"] = float(origin["latitude"]) properties["longitude"] = float(origin["longitude"]) properties["depth"] = float(origin["depth"]) try: properties["eventtime"] = datetime.strptime( origin["origin_time"], constants.TIMEFMT ) except ValueError: properties["eventtime"] = datetime.strptime( origin["origin_time"], constants.ALT_TIMEFMT ) product_properties["event-type"] = origin["event_type"] product_properties["event-description"] = origin["event_description"] # other metadata if "MMI" in info["output"]["ground_motions"]: mmi_info = info["output"]["ground_motions"]["MMI"] product_properties["maxmmi"] = mmi_info["max"] product_properties["maxmmi-grid"] = mmi_info["max_grid"] if "PGV" in info["output"]["ground_motions"]: pgv_info = info["output"]["ground_motions"]["PGV"] product_properties["maxpgv"] = pgv_info["max"] product_properties["maxpgv-grid"] = pgv_info["max_grid"] if "PGA" in info["output"]["ground_motions"]: pga_info = info["output"]["ground_motions"]["PGA"] product_properties["maxpga"] = pga_info["max"] product_properties["maxpga-grid"] = pga_info["max_grid"] if "SA(0.3)" in info["output"]["ground_motions"]: psa03_info = info["output"]["ground_motions"]["SA(0.3)"] product_properties["maxpsa03"] = psa03_info["max"] product_properties["maxpsa03-grid"] = psa03_info["max_grid"] if "SA(1.0)" in info["output"]["ground_motions"]: psa10_info = info["output"]["ground_motions"]["SA(1.0)"] product_properties["maxpsa10"] = psa10_info["max"] product_properties["maxpsa10-grid"] = psa10_info["max_grid"] if "SA(3.0)" in info["output"]["ground_motions"]: psa30_info = info["output"]["ground_motions"]["SA(3.0)"] product_properties["maxpsa30"] = psa30_info["max"] product_properties["maxpsa30-grid"] = psa30_info["max_grid"] mapinfo = info["output"]["map_information"] product_properties["minimum-longitude"] = mapinfo["min"]["longitude"] product_properties["minimum-latitude"] = mapinfo["min"]["latitude"] product_properties["maximum-longitude"] = mapinfo["max"]["longitude"] product_properties["maximum-latitude"] = mapinfo["max"]["latitude"] vinfo = info["processing"]["shakemap_versions"] product_properties["process-timestamp"] = vinfo["process_time"] product_properties["version"] = vinfo["map_version"] product_properties["map-status"] = vinfo["map_status"] product_properties["shakemap-code-version"] = vinfo["shakemap_revision"] # if this process is being run manually, set the review-status property # to "reviewed". If automatic, then set to "automatic". if props is None or "review-status" not in props: product_properties["review-status"] = "automatic" if sys.stdout is not None and sys.stdout.isatty(): product_properties["review-status"] = "reviewed" # what gmice was used for the model calculations gmice = info["processing"]["ground_motion_modules"]["gmice"]["module"] product_properties["gmice"] = gmice if props: for this_prop, value in props.items(): product_properties[this_prop] = value return (properties, product_properties)
def _make_backup(self, data_path): data_dir = os.path.join(data_path, self._eventid) current_dir = os.path.join(data_dir, "current") backup_dirs = glob.glob(os.path.join(data_dir, "backup*")) latest_version = 0 # and get the most recent version number for backup_dir in backup_dirs: if not os.path.isdir(backup_dir): continue match = re.search("[0-9]*$", backup_dir) if match is not None: version = int(match.group()) if version > latest_version: latest_version = version new_version = latest_version + 1 backup = os.path.join(data_dir, "backup%04i" % new_version) shutil.copytree(current_dir, backup) logging.debug(f"Created backup directory {backup}")
[docs] def parseArgs(self, arglist): """ Set up the object to accept the --cancel flag. """ parser = argparse.ArgumentParser( prog=self.__class__.command_name, description=inspect.getdoc(self.__class__) ) helpstr = "Cancel this event." parser.add_argument( "-c", "--cancel", help=helpstr, action="store_true", default=False ) helpstr = ( 'Send products to the PDL server configured in "devconfig" ' "in the transfer.conf configuration file rather than the " 'default "configfile".' ) parser.add_argument( "-d", "--dev", help=helpstr, action="store_true", default=False ) helpstr = ( "Print the PDL command that would be executed, and then " "quit without doing anything. WARNING: do not use this " "option, it is currently not enabled." ) parser.add_argument( "-r", "--dryrun", help=helpstr, action="store_true", default=False ) # # This line should be in any modules that overrides this # one. It will collect up everything after the current # modules options in args.rem, which should be returned # by this function. Note: doing parser.parse_known_args() # will not work as it will suck up any later modules' # options that are the same as this one's. # parser.add_argument("rem", nargs=argparse.REMAINDER, help=argparse.SUPPRESS) args = parser.parse_args(arglist) self.cancel = args.cancel self.usedevconfig = args.dev self.dryrun = args.dryrun return args.rem