package gov.usgs.earthquake.nats;

import gov.usgs.earthquake.aws.FileTrackingListener;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.URLNotificationJSONConverter;
import gov.usgs.earthquake.distribution.WebSocketNotificationReceiver;
import gov.usgs.util.Config;
import gov.usgs.util.FileUtils;
import gov.usgs.util.Ini;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;

/* loaded from: input_file:gov/usgs/earthquake/nats/NATSStreamingNotificationReceiver.class */
public class NATSStreamingNotificationReceiver extends DefaultNotificationReceiver implements MessageHandler {
    private static final Logger LOGGER = Logger.getLogger(DefaultNotificationReceiver.class.getName());
    public static String TRACKING_FILE_NAME_PROPERTY = FileTrackingListener.TRACKING_FILE_PROEPRTY;
    public static String UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "updateSequenceAfterException";
    public static String SEQUENCE_PROPERTY = WebSocketNotificationReceiver.SEQUENCE_PROPERTY;
    public static String DEFAULT_TRACKING_FILE_NAME_PROPERTY = "data/STANReceiverInfo.json";
    public static String DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "true";
    private Subscription subscription;
    private String subject;
    private String trackingFileName;
    private boolean updateSequenceAfterException;
    private NATSClient client = new NATSClient();
    private long sequence = 0;
    private boolean exceptionThrown = false;

    @Override // gov.usgs.earthquake.distribution.DefaultNotificationReceiver, gov.usgs.util.DefaultConfigurable, gov.usgs.util.Configurable
    public void configure(Config config) throws Exception {
        super.configure(config);
        this.client.configure(config);
        this.subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
        if (this.subject == null) {
            throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
        }
        this.trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME_PROPERTY);
        this.updateSequenceAfterException = Boolean.parseBoolean(config.getProperty(UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY, DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY));
    }

    @Override // gov.usgs.earthquake.distribution.DefaultNotificationReceiver, gov.usgs.util.DefaultConfigurable, gov.usgs.util.Configurable
    public void startup() throws Exception {
        super.startup();
        this.client.startup();
        JsonObject readTrackingFile = readTrackingFile();
        if (readTrackingFile != null && readTrackingFile.getString(NATSClient.SERVER_HOST_PROPERTY).equals(this.client.getServerHost()) && readTrackingFile.getString(NATSClient.SERVER_PORT_PROPERTY).equals(this.client.getServerPort()) && readTrackingFile.getString(NATSClient.CLUSTER_ID_PROPERTY).equals(this.client.getClusterId()) && readTrackingFile.getString(NATSClient.CLIENT_ID_PROPERTY).equals(this.client.getClientId()) && readTrackingFile.getString(NATSClient.SUBJECT_PROPERTY).equals(this.subject)) {
            this.sequence = Long.parseLong(readTrackingFile.get(SEQUENCE_PROPERTY).toString());
        }
        this.subscription = this.client.getConnection().subscribe(this.subject, this, new SubscriptionOptions.Builder().startAtSequence(this.sequence).build());
    }

    @Override // gov.usgs.earthquake.distribution.DefaultNotificationReceiver, gov.usgs.util.DefaultConfigurable, gov.usgs.util.Configurable
    public void shutdown() throws Exception {
        try {
            writeTrackingFile();
        } catch (Exception e) {
            LOGGER.log(Level.WARNING, Ini.SECTION_START + getName() + "] failed to write to tracking file");
        }
        try {
            this.subscription.unsubscribe();
        } catch (Exception e2) {
            LOGGER.log(Level.WARNING, Ini.SECTION_START + getName() + "] failed to unsubscribe from NATS channel");
        }
        this.subscription = null;
        this.client.shutdown();
        super.shutdown();
    }

    public void writeTrackingFile() throws Exception {
        FileUtils.writeFileThenMove(new File(this.trackingFileName + "_tmp"), new File(this.trackingFileName), Json.createObjectBuilder().add(NATSClient.SERVER_HOST_PROPERTY, this.client.getServerHost()).add(NATSClient.SERVER_PORT_PROPERTY, this.client.getServerPort()).add(NATSClient.CLUSTER_ID_PROPERTY, this.client.getClusterId()).add(NATSClient.CLIENT_ID_PROPERTY, this.client.getClientId()).add(NATSClient.SUBJECT_PROPERTY, this.subject).add(SEQUENCE_PROPERTY, this.sequence).build().toString().getBytes());
    }

    public JsonObject readTrackingFile() throws Exception {
        JsonObject jsonObject = null;
        File file2 = new File(this.trackingFileName);
        if (file2.exists()) {
            JsonReader createReader = Json.createReader(new ByteArrayInputStream(FileUtils.readFile(file2)));
            jsonObject = createReader.readObject();
            createReader.close();
        }
        return jsonObject;
    }

    @Override // io.nats.streaming.MessageHandler
    public void onMessage(Message message) {
        try {
            receiveNotification(URLNotificationJSONConverter.parseJSON(new ByteArrayInputStream(message.getData())));
            if (!this.exceptionThrown || this.updateSequenceAfterException) {
                this.sequence = message.getSequence();
                writeTrackingFile();
            }
        } catch (Exception e) {
            this.exceptionThrown = true;
            LOGGER.log(Level.WARNING, Ini.SECTION_START + getName() + "] exception handling NATSStreaming message." + (!this.updateSequenceAfterException ? " Will no longer update sequence; restart PDL to reprocess." : "") + " Stack Trace: " + e);
            LOGGER.log(Level.FINE, Ini.SECTION_START + getName() + "] Message: " + message.getData());
        }
    }

    public String getTrackingFileName() {
        return this.trackingFileName;
    }

    public void setTrackingFileName(String str) {
        this.trackingFileName = str;
    }

    public NATSClient getClient() {
        return this.client;
    }

    public void setClient(NATSClient nATSClient) {
        this.client = nATSClient;
    }

    public String getSubject() {
        return this.subject;
    }

    public void setSubject(String str) {
        this.subject = str;
    }
}
