package gov.usgs.earthquake.distribution.roundrobinnotifier;

import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.ListenerNotifier;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationEvent;
import gov.usgs.earthquake.distribution.NotificationIndex;
import gov.usgs.earthquake.distribution.NotificationListener;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.Ini;
import io.nats.client.SubscribeOptions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:gov/usgs/earthquake/distribution/roundrobinnotifier/RoundRobinListenerNotifier.class */
public class RoundRobinListenerNotifier extends DefaultConfigurable implements ListenerNotifier, Runnable {
    private static final Logger LOGGER = Logger.getLogger(RoundRobinListenerNotifier.class.getName());
    private static final ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<>();
    private final DefaultNotificationReceiver receiver;
    private long statusInterval = SubscribeOptions.DEFAULT_ORDERED_HEARTBEAT;
    private final HashMap<NotificationListener, ListenerNotifierThread> listeners = new HashMap<>();
    private Thread thread = null;

    public RoundRobinListenerNotifier(DefaultNotificationReceiver defaultNotificationReceiver) {
        this.receiver = defaultNotificationReceiver;
    }

    @Override // gov.usgs.util.DefaultConfigurable, gov.usgs.util.Configurable
    public void startup() throws Exception {
        if (this.thread == null) {
            this.thread = new Thread(this);
            this.thread.start();
            requeue();
        }
    }

    @Override // gov.usgs.util.DefaultConfigurable, gov.usgs.util.Configurable
    public void shutdown() {
        if (this.thread != null) {
            this.thread.interrupt();
            this.thread = null;
        }
    }

    @Override // gov.usgs.earthquake.distribution.ListenerNotifier
    public void addNotificationListener(NotificationListener notificationListener) throws Exception {
        if (this.listeners.containsKey(notificationListener)) {
            return;
        }
        ListenerNotifierThread listenerNotifierThread = new ListenerNotifierThread(notificationListener);
        this.listeners.put(notificationListener, listenerNotifierThread);
        listenerNotifierThread.start();
    }

    @Override // gov.usgs.earthquake.distribution.ListenerNotifier
    public void removeNotificationListener(NotificationListener notificationListener) throws Exception {
        if (this.listeners.containsKey(notificationListener)) {
            this.listeners.remove(notificationListener).stop();
        }
    }

    @Override // gov.usgs.earthquake.distribution.ListenerNotifier
    public void notifyListeners(NotificationEvent notificationEvent) throws Exception {
        notifyListeners(notificationEvent, this.listeners.values());
    }

    protected void notifyListeners(NotificationEvent notificationEvent, Collection<ListenerNotifierThread> collection) throws Exception {
        Iterator<ListenerNotifierThread> it = collection.iterator();
        while (it.hasNext()) {
            it.next().notify(notificationEvent);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Thread.sleep(this.statusInterval);
                for (ListenerNotifierThread listenerNotifierThread : this.listeners.values()) {
                    listenerNotifierThread.requeueErrors();
                    LOGGER.fine(Ini.SECTION_START + this.receiver.getName() + "-notifier] listener " + listenerNotifierThread.getListener().getName() + " " + listenerNotifierThread.getQueue().size() + " queued, " + listenerNotifierThread.getErrorQueue().size() + " to retry");
                }
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                LOGGER.log(Level.WARNING, "exception running notifier status", (Throwable) e2);
            }
        }
    }

    protected void requeue() throws Exception {
        NotificationIndex notificationIndex = this.receiver.getNotificationIndex();
        ArrayList arrayList = new ArrayList();
        for (ListenerNotifierThread listenerNotifierThread : this.listeners.values()) {
            NotificationListener listener = listenerNotifierThread.getListener();
            if ((listener instanceof DefaultNotificationListener) && ((DefaultNotificationListener) listener).getNotificationIndex() != null) {
                if (!AUTOLOADED_INDEXES.contains(notificationIndex.getName() + "|" + listener.getName())) {
                    arrayList.add(listenerNotifierThread);
                }
            }
        }
        if (arrayList.size() == 0) {
            return;
        }
        LOGGER.fine(Ini.SECTION_START + this.receiver.getName() + "-notifier] requeuing notifications");
        Iterator<Notification> it = notificationIndex.findNotifications((List<String>) null, (List<String>) null, (List<String>) null).iterator();
        while (it.hasNext()) {
            notifyListeners(new NotificationEvent(this.receiver, it.next()), arrayList);
        }
        LOGGER.fine(Ini.SECTION_START + this.receiver.getName() + "-notifier] done requeuing notifications");
    }
}
