# System imports
import os
import os.path
import socket
import json
from collections import OrderedDict
import sys
from datetime import datetime, timezone
import time as time
import psutil
import copy
import shutil
import logging
from logging.handlers import TimedRotatingFileHandler
import subprocess
import shlex
import sqlite3
# Third-party imports
import daemon
import lockfile
from shapely.geometry import Point
from configobj import ConfigObj
from validate import Validator
from shapely.geometry import Polygon
from impactutils.rupture.origin import write_event_file
from impactutils.rupture import constants
# Local imports
from shakemap.utils.config import get_config_paths, get_configspec, config_error
from shakemap.utils.amps import AmplitudeHandler
MAX_SIZE = 4096
[docs]def send_queue(command, data, port=9755):
"""
Send a command and its data to the queue process.
Args:
command (str): A valid command for the queue (e.g., 'origin').
data (JSON serializable): The data associated with the command.
Could be int, float, string, dictionary, list, etc. Must be
JSON serializable. Must be less than MAX_SIZE bytes when
serialized.
Returns:
nothing: Nothing
Raises:
RuntimeError: If the serialized data is larger than MAX_SIZE.
OSError: If there is a problem with the socket or connection.
TypeError: If the supplied data is not JSON serializable.
"""
qdata = {"type": command, "data": data}
qstr = json.dumps(qdata).encode("utf8")
if len(qstr) > MAX_SIZE:
raise RuntimeError("Data portion too large")
csocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
csocket.connect(("127.0.0.1", port))
csocket.send(qstr)
csocket.close()
return
[docs]def str_to_seconds(tstring):
"""Convert time strings to seconds. Strings can be of the
form:
<int> (ninutes)
<int>m (minutes)
<int>h (hours)
<int>d (days)
<int>y (years)
Args:
tstring (str): An integer followed by an (optional)
'm', 'h', 'd', 'y'.
Returns
int: The number of seconds represented by the input string. If
the string is unadorned and represents a negative number, -1
is returned.
Raises:
ValueError: If the value cannot be converted to seconds.
"""
if tstring.endswith("m"):
secs = 60 * int(tstring.replace("m", ""))
elif tstring.endswith("h"):
secs = 60 * 60 * int(tstring.replace("h", ""))
elif tstring.endswith("d"):
secs = 24 * 60 * 60 * int(tstring.replace("d", ""))
elif tstring.endswith("y"):
secs = 365 * 24 * 60 * 60 * int(tstring.replace("y", ""))
else:
secs = 60 * int(tstring)
if secs < 0:
secs = -1
return secs
[docs]def parse_config(config):
"""Parse the config object to get usable data.
Args:
config (ConfigObj object): The result of parsing the file's
configuration file.
Returns:
dict: A cleaned up version of the input.
"""
if "" in config["servers"]:
config["servers"].remove("")
if "localhost" not in config["servers"]:
config["servers"].append("localhost")
config["old_event_age"] = str_to_seconds(config["old_event_age"])
config["future_event_age"] = str_to_seconds(config["future_event_age"])
config["associate_interval"] = str_to_seconds(config["associate_interval"])
config["max_trigger_wait"] = str_to_seconds(config["max_trigger_wait"])
boxes = OrderedDict()
for key, value in config["boxes"].items():
coords = [float(x) for x in value.split(",")]
mag = coords.pop(0)
# The config is in lat, lon order, but wee want lon, lat
# so we do the odd indices first
coords2 = list(zip(coords[1::2], coords[0::2]))
coords2.append(coords2[0])
boxes[key] = {"mag": mag, "poly": Polygon(coords2)}
config["boxes"] = boxes
repeats = {}
for key, value in config["repeats"].items():
tlist = [str_to_seconds(x) for x in value]
repeats[float(key)] = tlist
config["repeats"] = repeats
network_delays = {}
for key, value in config["network_delays"].items():
network_delays[key] = str_to_seconds(value)
config["network_delays"] = network_delays
return config
[docs]def get_config(install_path):
"""Read the config and get it into a usable state.
Args:
install_path (str): The install path of the current profile.
Returns:
dict: A dictionary of configuration data.
"""
config_file = os.path.join(install_path, "config", "queue.conf")
configspec = get_configspec("queue")
config = ConfigObj(config_file, configspec=configspec)
results = config.validate(Validator())
if not isinstance(results, bool) or not results:
config_error(config, results)
config = parse_config(config.dict())
return config
[docs]class Queue(object):
def __init__(self, pargs):
current_time = int(time.time())
self.MEMORY_UPDATE_TIME = current_time
self.ASSOCIATE_UPDATE_TIME = current_time
self.DB_MAINTENANCE_TIME = current_time
self.children = {}
self.attached = pargs.attached
self.install_path, self.data_path = get_config_paths()
self.config = get_config(self.install_path)
#
# Get shake.conf for the autorun modules
#
config_file = os.path.join(self.install_path, "config", "shake.conf")
spec_file = get_configspec("shake")
shake_config = ConfigObj(config_file, configspec=spec_file)
results = shake_config.validate(Validator())
if not isinstance(results, bool) or not results:
config_error(shake_config, results)
self.shake_cmds = shlex.split(shake_config["autorun_modules"])
#
# Turn this process into a daemon
#
self.logpath = os.path.join(self.install_path, "logs")
if not os.path.isdir(self.logpath):
os.makedirs(self.logpath)
pidfile = os.path.join(self.logpath, "queue.pid")
self.filelock = lockfile.FileLock(pidfile)
if self.filelock.is_locked():
if pargs.break_lock:
self.filelock.break_lock()
else:
logger = self.getLogger()
logger.error(
"pid lock file '%s' exists, can't start "
"sm_queue; exiting..." % (pidfile)
)
sys.exit(-1)
[docs] def queueMainLoop(self):
context = daemon.DaemonContext(
working_directory=self.data_path, pidfile=self.filelock
)
with self.getContext(context):
self.logger = self.getLogger()
#
# Create the database for running and queued events.
#
self.eventQueue = EventQueue(self.install_path)
#
# Create the socket
#
qsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
qsocket.bind(("", self.config["port"]))
# Set a timeout so that we can occasionally look for other
# things to do
qsocket.settimeout(30)
qsocket.listen(5)
#
# Get a connection to the event database
#
self.ampHandler = AmplitudeHandler(self.install_path, self.data_path)
self.logger.info("sm_queue initiated")
#
# At startup we want to see what was running when this process
# shut down and try to restart them.
#
running = self.eventQueue.getRunningEvents()
for eventid, command in running:
self.logger.info(f"Startup: Running event {eventid}")
event = self.ampHandler.getEvent(eventid)
# Update the XML because the DB may have newer information
self.writeEventXml(event)
p = subprocess.Popen(command)
self.children[eventid] = {"popen": p, "start_time": time.time()}
while True:
#
# Do routine stuff
#
self.doPeriodicTasks()
#
# Now wait for a connection
#
try:
(clientsocket, address) = qsocket.accept()
except socket.timeout:
#
# Normal timeout; do routine tasks and then go
# back to waiting for a connection
#
continue
#
# Got a connection
#
hostname, _, _ = socket.gethostbyaddr(address[0])
# hostname = socket.getfqdn(hostname)
self.logger.info(f"Got connection from {hostname} at port {address[1]}")
if hostname not in self.config["servers"]:
self.logger.warning(
f"Connection from {hostname} refused: not in valid servers list"
)
clientsocket.close()
continue
#
# The accept() should guarantee that there's something
# to read, but something could go wrong...
#
try:
clientsocket.settimeout(5)
data = clientsocket.recv(MAX_SIZE)
except socket.timeout:
self.logger.warning(
"Did not get data from connection, " "continuing"
)
clientsocket.close()
continue
else:
clientsocket.close()
#
# Decode the data and do something
#
try:
cmd = json.loads(data.decode("utf8"))
except json.decoder.JSONDecodeError:
self.logger.warning(
f"Couldn't decode data from {hostname}: ignoring"
)
continue
if (
not isinstance(cmd, dict)
or "type" not in cmd
or "data" not in cmd
or "id" not in cmd["data"]
):
self.logger.warning(f"Bad data from {hostname}: ignoring")
continue
if cmd["type"] == "origin":
self.logger.info(
f"Received \"origin\" for event {cmd['data']['id']}"
)
if "action" in cmd["data"]:
action = cmd["data"]["action"]
else:
action = "Origin received"
self.processOrigin(cmd["data"], action)
elif cmd["type"] == "cancel":
self.logger.info(
f"Received \"cancel\" for event {cmd['data']['id']}"
)
self.processCancel(cmd["data"])
else:
self.logger.info(
'Received "%s" for event %s' % cmd["type"], cmd["data"]["id"]
)
self.processOther(cmd["data"], cmd["type"])
[docs] def doPeriodicTasks(self):
"""Check for finished children and start any needed timed repeats.
Returns:
nothing: Nothing.
"""
#
# Do routine stuff:
# first check for repeats to queue
#
current_time = int(time.time())
repeats = self.ampHandler.getRepeats()
for eventid, otime, rep_list in repeats:
while rep_list is not None and rep_list[0] < current_time:
event = self.ampHandler.getEvent(eventid)
if eventid in self.children:
#
# Event is already running; pop this repeat and move on
#
rep_list.pop(0)
if len(rep_list) == 0:
rep_list = None
event["repeats"] = rep_list
self.ampHandler.insertEvent(event, update=True)
continue
rep_list.pop(0)
if len(rep_list) == 0:
rep_list = None
event["repeats"] = rep_list
self.ampHandler.insertEvent(event, update=True)
if event["lastrun"] == 0:
# This is a delayed first run or possibly just a very
# old event being updated
event_dir = os.path.join(self.data_path, eventid, "current")
if not os.path.isdir(event_dir):
self.logger.info(
f"Queueing event {eventid} after network delay"
)
self.dispatchEvent(event, "Event added")
else:
self.logger.info(f"Queueing old event {eventid} for update")
self.dispatchEvent(event, "Event updated")
else:
self.logger.info(f"Queueing repeat of event {eventid}")
self.dispatchEvent(event, "Scheduled repeat")
break
#
# Run the associator and dispatch events with new data
#
if (
self.config["associate_interval"] >= 0
and self.ASSOCIATE_UPDATE_TIME + self.config["associate_interval"]
< current_time
):
self.ASSOCIATE_UPDATE_TIME = current_time
self.associateAll()
#
# Reap any dead children, and then try to run queued events
#
_ = self.reapChildren()
self.runQueuedEvents()
#
# Print memory usage once per hour to see how much we're leaking...
#
if self.MEMORY_UPDATE_TIME + 3600 < current_time:
self.MEMORY_UPDATE_TIME = current_time
process = psutil.Process(os.getpid())
mem = getattr(process.memory_full_info(), "uss", 0) / 1048576.0
self.logger.info(f"Currently using {mem:.1f} MB")
#
# Do the occasional DB cleanup once per day; keep amps for 30
# days and events for 1 year
#
if self.DB_MAINTENANCE_TIME + 86400 < current_time:
self.DB_MAINTENANCE_TIME = current_time
#
# First do the assocication to make sure we don't drop any
# amps that might associate
#
if self.config["associate_interval"] >= 0:
self.ASSOCIATE_UPDATE_TIME = current_time
self.associateAll()
#
# Now clean out the amps and events
#
self.ampHandler.cleanAmps(threshold=30)
self.ampHandler.cleanEvents(threshold=365)
return
[docs] def getLogger(self):
"""Set up a logger for this process.
Returns:
logging.logger: An instance of a logger.
"""
if not os.path.isdir(self.logpath):
os.makedirs(self.logpath)
logger = logging.getLogger("queue_logger")
logger.setLevel(logging.INFO)
if self.attached:
handler = logging.StreamHandler()
else:
logfile = os.path.join(self.logpath, "queue.log")
handler = TimedRotatingFileHandler(logfile, when="midnight", backupCount=30)
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
return logger
[docs] def associateAll(self):
"""Do the associateAll method of the the AmplitudeHandler and
then process all of the events with updated data.
Returns:
nothing: Nothing.
"""
event_list = self.ampHandler.associateAll(pretty_print=True)
for eventid in event_list:
event = self.ampHandler.getEvent(eventid)
self.processOrigin(event, "Data association")
return
[docs] def writeEventXml(self, event):
"""Create the event directory if it doesn't exist and write/re-write
the event.xml file
Args:
event (dict): The event data structure.
Returns:
nothing: Nothing.
"""
ttemp = event["time"]
try:
dt = datetime.strptime(ttemp, constants.TIMEFMT)
except ValueError:
try:
dt = datetime.strptime(ttemp, constants.ALT_TIMEFMT)
except ValueError:
self.logger.error(f"Can't parse input time {ttemp}")
return
event["time"] = dt
event_dir = os.path.join(self.data_path, event["id"], "current")
if not os.path.isdir(event_dir):
os.makedirs(event_dir)
event_xml = os.path.join(event_dir, "event.xml")
self.logger.info(f"Writing event {event['id']} to event.xml")
val = write_event_file(event, event_xml)
if val:
self.logger.error(f"Error writing event.xml: {val}")
event["time"] = ttemp
return
[docs] def moveEventDirectory(self, oldid, newid):
"""Change the name of an existing event directory to a new ID."""
try:
shutil.move(
os.path.join(self.data_path, oldid), os.path.join(self.data_path, newid)
)
except shutil.Error as e:
self.logger(
f"Error trying to move data directory {oldid} to {newid}: {str(e)}"
)
return
[docs] def processOrigin(self, event, action):
"""Determine if an event should be processed (or reprocessed) and
dispatch it for processing.
Args:
event (dict): The event data structure.
action (str): The "type" of the trigger that caused this function
to be called.
Returns:
nothing: Nothing.
"""
current_time = int(time.time())
force_run = False
dispatch = True
#
# See if we already have this event, make a decision
#
existing = self.ampHandler.getEvent(event["id"])
if existing is None and "alt_eventids" in event:
#
# We haven't processed this ID, but the ID may have changed
#
for eid in event["alt_eventids"].split(","):
if eid == event["id"]:
continue
alt_exists = self.ampHandler.getEvent(eid)
if alt_exists is None:
continue
#
# We processed this event under a different ID
# If the event is currently running with the old ID, kill it
#
if eid in self.children:
self.children[eid]["popen"].kill()
self.children[eid]["popen"].wait()
del self.children[eid]
self.eventQueue.deleteRunningEvent(eid)
# Delete the old event from the database
self.ampHandler.deleteEvent(eid)
# Move the old event directory to the new ID
self.moveEventDirectory(eid, event["id"])
# Now treat the the new event ID as a new event.
existing = None
# But force it to run (because we want to update the event ID),
# bypassing date and magnitude checks that come later
force_run = True
break
if existing is None:
#
# This is a new event (or an event with a changed ID)
#
update = False
# Do we want to run this event?
if not force_run and self.magnitudeTooSmall(event):
self.logger.info(
f"Event {event['id']} (mag={event['mag']:f}) too small, skipping"
)
return
if not force_run and self.eventTooOldOrInFuture(event):
self.logger.info(
f"Event {event['id']} too old or too far in the future, skipping"
)
return
#
# Looks like we'll be running this event, get the repeats
# (if any) and toss the ones that have already passed
#
replist = None
try:
dt = datetime.strptime(event["time"], constants.TIMEFMT)
except ValueError:
try:
dt = datetime.strptime(event["time"], constants.ALT_TIMEFMT)
except ValueError:
self.logger.error(f"Can't parse input time {event['time']}")
return
event_timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp())
for mag in sorted(self.config["repeats"].keys(), reverse=True):
if event["mag"] > mag:
replist = [
x + event_timestamp
for x in self.config["repeats"][mag]
if event_timestamp + x > current_time
]
break
#
# The first time we run an event, we need to check its
# network ID against those in the delay list. If present,
# we add the required delay as the first repeat, but
# don't dispatch the event. If the delay time has already
# passed, just treat this as a normal event
#
if event["netid"] in self.config["network_delays"]:
delay = self.config["network_delays"][event["netid"]]
if event_timestamp + delay > current_time:
self.logger.info(
"Delaying processing event %s due to "
"network delay configuration." % (event["id"])
)
replist.insert(0, event_timestamp + delay)
dispatch = False
event["repeats"] = replist if len(replist) > 0 else None
event["lastrun"] = 0
else:
#
# We've run this event before
#
update = True
#
# We want to update the event info in the database but
# save the lastrun and repeats settings
#
event["lastrun"] = existing["lastrun"]
event["repeats"] = copy.copy(existing["repeats"])
#
# Insert or update the event info in the database, then
# possibly queue the event to be run.
#
self.ampHandler.insertEvent(event, update=update)
if dispatch is True:
self.dispatchEvent(event, action)
return
[docs] def processOther(self, data, action):
"""A trigger has been issued for an event. Treat this as
an origin update. If the event in question is not in our
database, ignore the message.
Args:
data (dict): The event information dictionary.
action (str): The "type" of the trigger that caused this function
to be called.
Returns:
nothing: Nothing.
"""
eventid = data["id"]
existing = self.ampHandler.getEvent(eventid)
if existing:
self.processOrigin(existing, action)
else:
if "alt_eventids" in data:
for eid in data["alt_eventids"].split(","):
if eid == eventid:
continue
existing = self.ampHandler.getEvent(eid)
if existing:
self.processOrigin(existing, action)
return
self.logger.info(
'Trigger of action "%s" is for unprocessed '
"event %s: ignoring" % (action, data["id"])
)
return
[docs] def processCancel(self, data):
"""We've received a cancellation of an event: run 'shake cancel'.
Args:
data (dict): The dictionary must have an event ID
under the 'id' key.
Returns:
nothing: Nothing.
"""
eventid = data["id"]
existing = self.ampHandler.getEvent(eventid)
if existing:
self.dispatchEvent(data, "cancel")
existing["repeats"] = None
self.ampHandler.insertEvent(existing, update=True)
return
if "alt_eventids" in data:
for eid in data["alt_eventids"].split(","):
if eid == eventid:
continue
existing = self.ampHandler.getEvent(eid)
if existing:
self.dispatchEvent(existing, "cancel")
existing["repeats"] = None
self.ampHandler.insertEvent(existing, update=True)
return
self.logger.info(f"cancel is for unprocessed event {eventid}: ignoring")
return
[docs] def magnitudeTooSmall(self, event):
"""Return False if the magnitude is greater than the threshold
magnitude of the first metro box within which it falls, or
the global minmag if it does not fall within a box; return
true otherwise.
Args:
event (dict): The event dictionary; must contain at least "mag",
"lon", and "lat" keys.
Returns:
bool: True if the event is too small to process; False otherwise.
"""
mag = event["mag"]
lon = event["lon"]
lat = event["lat"]
pt = Point((lon, lat))
for boxname in sorted(self.config["boxes"]):
boxdict = self.config["boxes"][boxname]
if pt.within(boxdict["poly"]):
if mag >= boxdict["mag"]:
return False
else:
return True
#
# Not in any boxes
#
if mag >= self.config["minmag"]:
return False
return True
[docs] def eventTooOldOrInFuture(self, event):
"""Return True if the event is too old or too far in the future to
process; return False otherwise.
Args:
event (dict): The event data structure.
Returns:
bool: True if the event is older than old_event_age or is
more than future_event_age in the future; returns False otherwise.
"""
current_time = time.time()
try:
event_time = (
datetime.strptime(event["time"], constants.TIMEFMT)
.replace(tzinfo=timezone.utc)
.timestamp()
)
except ValueError:
event_time = (
datetime.strptime(event["time"], constants.ALT_TIMEFMT)
.replace(tzinfo=timezone.utc)
.timestamp()
)
if (
self.config["old_event_age"] >= 0
and event_time + self.config["old_event_age"] < current_time
):
return True
if (
self.config["future_event_age"] >= 0
and event_time - self.config["future_event_age"] > current_time
):
return True
return False
[docs] def dispatchEvent(self, event, action):
"""Queue a run for the specified event.
Args:
event (dict): The data structure of the event to process.
action (str): 'cancel', 'test', or some other string. 'cancel'
starts the cancel process, 'test' queues the process
'echo eventid'. Any other string queues the
shake process to be run at the next opportunity.
See the configuration file 'queue.conf' for the
exact commands that will be run.
Returns:
nothing: Nothing.
"""
eventid = event["id"]
if action == "cancel":
#
# Cancellations aren't queued, they're run immediately
#
self.logger.info(f"Canceling event {eventid}")
if eventid in self.children:
self.logger.info(f"Event {eventid} is running; killing...")
self.children[eventid]["popen"].kill()
self.children[eventid]["popen"].wait()
del self.children[eventid]
self.eventQueue.deleteRunningEvent(eventid)
cmd = self.config["cancel_command"].replace(
"shake", self.config["shake_path"]
)
cmd = cmd.replace("<EVID>", eventid)
cmd = cmd.split()
p = subprocess.Popen(cmd)
self.children[eventid] = {"popen": p, "start_time": time.time()}
self.eventQueue.insertRunningEvent(eventid, cmd)
return
self.logger.info(f'Queueing event {eventid} due to action "{action}"')
#
# Add the action as the assemble/augment comment, or replace the
# comment if it is already there.
#
for ix, shcmd in enumerate(self.shake_cmds):
if shcmd not in ["assemble", "augment"]:
continue
if len(self.shake_cmds) == ix + 1: # This shouldn't happen
self.shake_cmds.append("-c")
self.shake_cmds.append(f'"{action}"')
elif self.shake_cmds[ix + 1] == "-c":
self.shake_cmds[ix + 2] = f'"{action}"'
else:
self.shake_cmds.insert(ix + 1, "-c")
self.shake_cmds.insert(ix + 2, f'"{action}"')
break
if action == "test":
cmd = self.config["shake_command"].replace("shake", "echo")
else:
cmd = self.config["shake_command"].replace(
"shake", self.config["shake_path"]
)
cmd = cmd.replace("<EVID>", eventid)
cmd = cmd.split() + self.shake_cmds
self.eventQueue.queueEvent(eventid, cmd, event["mag"])
return
[docs] def runQueuedEvents(self):
"""If there is space, run events from the queue"""
if len(self.children) >= self.config["max_subprocesses"]:
self.logger.info("Processing queue is full; waiting for open " "slots.")
return
current_time = int(time.time())
mtw = self.config["max_trigger_wait"]
queued = self.eventQueue.getQueuedEvents()
for eventid, command in queued:
event = self.ampHandler.getEvent(eventid)
if eventid in self.children:
#
# Event is currently running, don't run it but make sure
# there's a repeat pretty soon
#
if event["repeats"]:
if event["repeats"][0] > current_time + mtw:
event["repeats"].insert(0, current_time + mtw)
else:
event["repeats"] = [current_time + mtw]
self.ampHandler.insertEvent(event, update=True)
self.logger.info(
f"Event {event['id']} is currently running, shelving this update"
)
self.eventQueue.dequeueEvent(eventid)
continue
if event["repeats"]:
delta_t = current_time - event["repeats"][0]
if delta_t > -mtw:
# We're due for a rerun anyway, so just leave the
# event queued
self.logger.info(
f"Event {eventid} will repeat soon, shelving this update"
)
self.eventQueue.dequeueEvent(eventid)
continue
if current_time - event["lastrun"] < mtw:
#
# We ran this event very recently, but don't have a repeat
# scheduled in the near future, so let's skip this one
# but make sure something happens relatively soon
#
if event["repeats"]:
event["repeats"].insert(0, current_time + mtw)
else:
event["repeats"] = [current_time + mtw]
self.ampHandler.insertEvent(event, update=True)
self.logger.info(
f"Event {event['id']} ran recently, shelving this update"
)
self.eventQueue.dequeueEvent(eventid)
continue
self.logger.info(f"Running event {eventid}")
# Update the XML because the DB may have newer information
self.writeEventXml(event)
p = subprocess.Popen(command)
self.children[eventid] = {"popen": p, "start_time": time.time()}
self.eventQueue.dequeueEvent(eventid)
self.eventQueue.insertRunningEvent(eventid, command)
if len(self.children) >= self.config["max_subprocesses"]:
self.logger.info("Processing queue is full; waiting for open " "slots.")
break
return
[docs] def reapChildren(self):
"""
Look through the list of child processes, reap the ones that have
finished, and kill any that are taking too long.
Returns:
nothing: Nothing. Completed or killed child processes are removed
from the list of children.
"""
to_delete = []
current_time = time.time()
for eventid, info in self.children.items():
returncode = info["popen"].poll()
if returncode is not None:
self.logger.info(
"Reaped child for event %s (return code %d)" % (eventid, returncode)
)
event = self.ampHandler.getEvent(eventid)
if event:
event["lastrun"] = current_time
self.ampHandler.insertEvent(event, update=True)
to_delete.append(eventid)
self.eventQueue.deleteRunningEvent(eventid)
continue
#
# Kill children who take too long
#
if info["start_time"] + self.config["max_process_time"] < current_time:
self.logger.warning(f"Event {eventid} taking too long, killing")
info["popen"].kill()
info["popen"].wait()
self.logger.warning(f"Reaped child for killed event {eventid}")
to_delete.append(eventid)
self.eventQueue.deleteRunningEvent(eventid)
for eventid in to_delete:
del self.children[eventid]
return to_delete
[docs] def getContext(self, context):
"""Returns a context based on the value of the 'attached' argument.
If attached is True, then the function returns an instance of the
Dummycontext; if it is False the function returns the 'context'
argument.
Args:
context (Context manager): A valid context manager.
Returns:
Context manager: If attached is True, the function returns an
instance of the Dummycontext; if False, returns
the 'context' argument.
"""
if self.attached:
return Dummycontext()
else:
return context
[docs]class Dummycontext(object):
"""This is a dummy context that can be used as a context manager
with 'with'. It doesn't do anything.
"""
def __enter__(self):
return self
def __exit__(*x):
pass
[docs]class EventQueue(object):
"""Class to maintain some persistence in the event of a program
crash or shutdown. The db file can be removed if the operator wants
a fresh start.
"""
def __init__(self, ipath):
queued_events = OrderedDict(
[
("id", "INTEGER_PRIMARY_KEY"),
("eventid", "TEXT UNIQUE"),
("command", "TEXT"),
("mag", "REAL"),
]
)
running_events = OrderedDict(
[
("id", "INTEGER_PRIMARY_KEY"),
("eventid", "TEXT UNIQUE"),
("command", "TEXT"),
]
)
tables = {"queued": queued_events, "running": running_events}
self.db_file = os.path.join(ipath, "data", "event_queue.db")
db_exists = os.path.isfile(self.db_file)
self._connection = sqlite3.connect(self.db_file, timeout=15)
if self._connection is None:
raise RuntimeError(f"Could not connect to {self.db_file}")
self._connection.isolation_level = "EXCLUSIVE"
self._cursor = self._connection.cursor()
self._cursor.execute("PRAGMA foreign_keys = ON")
self._cursor.execute("PRAGMA journal_mode = WAL")
if not db_exists:
for table, tdict in tables.items():
createcmd = f"CREATE TABLE {table} ("
nuggets = []
for column, ctype in tdict.items():
nuggets.append(f"{column} {ctype}")
createcmd += ",".join(nuggets) + ")"
self._cursor.execute(createcmd)
self._cursor.execute("CREATE INDEX queue_index ON " "queued(eventid)")
self._cursor.execute("CREATE INDEX mag_index ON " "queued(mag)")
self._cursor.execute("CREATE INDEX event_index ON " "running(eventid)")
self._cursor.execute("PRAGMA journal_mode = WAL")
def __del__(self):
"""Destructor."""
if hasattr(self, "_connection") and self._connection is not None:
self._disconnect()
def _disconnect(self):
self.commit()
self._cursor.close()
self._connection.close()
self._connection = None
self._cursor = None
[docs] def commit(self):
"""Commit any operations to the database."""
self._connection.commit()
[docs] def getQueuedEvents(self):
query = "SELECT eventid, command, mag FROM queued ORDER BY mag DESC"
self._cursor.execute(query)
erows = self._cursor.fetchall()
return [(x[0], json.loads(x[1])) for x in erows]
[docs] def queueEvent(self, eventid, command, mag):
query = "REPLACE INTO queued (eventid, command, mag) VALUES (?, ?, ?)"
self._cursor.execute(query, (eventid, json.dumps(command), mag))
self.commit()
[docs] def dequeueEvent(self, eventid):
query = "DELETE FROM queued WHERE eventid = ?"
self._cursor.execute(query, (eventid,))
self.commit()
[docs] def getRunningEvents(self):
query = "SELECT eventid, command FROM running"
self._cursor.execute(query)
erows = self._cursor.fetchall()
return [(x[0], json.loads(x[1])) for x in erows]
[docs] def insertRunningEvent(self, eventid, command):
query = "INSERT INTO running (eventid, command) VALUES (?, ?)"
self._cursor.execute(query, (eventid, json.dumps(command)))
self.commit()
[docs] def deleteRunningEvent(self, eventid):
query = "DELETE FROM running WHERE eventid = ?"
self._cursor.execute(query, (eventid,))
self.commit()