Source code for shakemap.coremods.base

# stdlib imports
import argparse
import inspect
import logging
from abc import ABC, abstractmethod
import os.path
from collections import OrderedDict

# third party imports
from lxml import etree

# local imports
from shakemap.utils.config import get_config_paths
from shakemap.utils.logging import get_logging_config


[docs]class CoreModule(ABC): """ Base class for any module in coremods which gets called by the shake program. """ command_name = "" # # targets and dependencies are assumed to live in the event's "current" # directory (and must therefore be prefixed with 'products/' if they # are found in the products directory); configs are assumed to be # found in the profile's config directory (and, thus, event-specific # configs like model_select.conf should be listed in dependencies, not # configs. # # targets should be regexp strings, e.g.: # r'this\.txt' # r'.*\.json' # r'cont_.*\.json' # dependencies and configs should be tuples of globbable strings and # a value of True if the dependency is a requirement, and False if # it is optional, e.g.: # ('this.txt', True) # ('*.json', False) # targets = None dependencies = None configs = None def __init__(self, eventid): """ Instantiate a CoreModule class with an event ID. """ self._eventid = eventid log_config = get_logging_config() log_name = log_config["loggers"].keys()[0] self.logger = logging.getLogger(log_name)
[docs] @abstractmethod def execute(self): pass
[docs] def parseArgs(self, arglist): """ This is the default parseArgs which is sufficient for most modules. It will respond to '-h' or '--help' but nothing else. If a module needs to accept command line arguments, it will need to override this module. """ parser = argparse.ArgumentParser( prog=self.__class__.command_name, description=inspect.getdoc(self.__class__) ) # # This line should be in any modules that overrides this # one. It will collect up everything after the current # modules options in args.rem, which should be returned # by this function. Note: doing parser.parse_known_args() # will not work as it will suck up any later modules' # options that are the same as this one's. # parser.add_argument("rem", nargs=argparse.REMAINDER, help=argparse.SUPPRESS) args = parser.parse_args(arglist) return args.rem
[docs] def writeContents(self): # if the module has not defined a contents dictionary # or contents dictionary is empty, just return try: contents = self.contents except AttributeError: return contents.writeContents()
[docs]class Contents(object): """Helper class for creating and updating the contents.xml file.""" def __init__(self, page_title, page_slug, eventid): self.contents = {} self.page = {"title": page_title, "slug": page_slug} self._eventid = eventid
[docs] def addFile(self, key, title, caption, filename, mime_type): filename = os.path.join("download", filename) if key in self.contents: self.contents[key]["formats"].append( {"filename": filename, "type": mime_type} ) return if self.page["title"] is None and self.page["slug"] is None: self.contents[key] = { "title": title, "caption": caption, "formats": [{"filename": filename, "type": mime_type}], } else: self.contents[key] = { "title": title, "caption": caption, "page": self.page, "formats": [{"filename": filename, "type": mime_type}], } return
[docs] def readContents(self, contents_file): tree = etree.parse(contents_file) root = tree.getroot() contents = OrderedDict() pages = {} for child in root: if child.tag == "file": formats = [] key = child.attrib["id"] title = child.attrib["title"] for fchild in child: if fchild.tag == "caption": caption = fchild.text elif fchild.tag == "format": filename = fchild.attrib["href"] mimetype = fchild.attrib["type"] formats.append({"filename": filename, "type": mimetype}) else: pass contents[key] = {"title": title, "caption": caption, "formats": formats} else: # page slug = child.attrib["slug"] title = child.attrib["title"] files = [] for fchild in child: files.append(fchild.attrib["refid"]) page = {"title": title, "slug": slug, "files": files} pages[slug] = page # assign the pages information into the relevant content dictionary for slug, page in pages.items(): file_ids = page["files"] title = page["title"] for file_id in file_ids: if file_id in contents: contents[file_id]["page"] = {"title": title, "slug": slug} return contents
[docs] def writeContents(self): if not len(self.contents): return # create or update the contents.xml file _, data_path = get_config_paths() pdldir = os.path.join(data_path, self._eventid, "current", "pdl") if not os.path.isdir(pdldir): os.makedirs(pdldir) contents_file = os.path.join(pdldir, "contents.xml") if os.path.isfile(contents_file): old_contents = self.readContents(contents_file) # TODO: should we ensure that keys are globally unique? old_contents.update(self.contents) contents = old_contents else: contents = self.contents root = etree.Element("contents") pages = {} # dictionary with slugs as keys for key, cdict in contents.items(): file_el = etree.SubElement(root, "file") file_el.set("title", cdict["title"]) file_el.set("id", key) caption = etree.SubElement(file_el, "caption") caption.text = etree.CDATA(cdict["caption"]) for format in cdict["formats"]: format_el = etree.SubElement(file_el, "format") format_el.set("href", format["filename"]) format_el.set("type", format["type"]) if "page" in cdict: slug = cdict["page"]["slug"] page_title = cdict["page"]["title"] if slug in pages: pages[slug]["files"].append(key) else: page = {"title": page_title, "files": [key]} pages[slug] = page for slug, page_dict in pages.items(): page_el = etree.SubElement(root, "page") page_el.set("title", page_dict["title"]) page_el.set("slug", slug) for filekey in page_dict["files"]: file_el = etree.SubElement(page_el, "file") file_el.set("refid", filekey) xmlstr = etree.tostring(root, xml_declaration=True) f = open(contents_file, "wt") f.write(xmlstr.decode("utf-8")) f.close()