View Javadoc
1   package gov.usgs.volcanoes.winston.in;
2   
3   import java.util.ArrayList;
4   import java.util.Iterator;
5   import java.util.List;
6   
7   import gov.usgs.earthworm.Menu;
8   import gov.usgs.earthworm.MenuItem;
9   import gov.usgs.earthworm.SCN;
10  import gov.usgs.earthworm.WaveServer;
11  import gov.usgs.earthworm.message.TraceBuf;
12  import gov.usgs.volcanoes.core.time.Ew;
13  import gov.usgs.volcanoes.core.time.Time;
14  import gov.usgs.volcanoes.winston.db.Channels;
15  import gov.usgs.volcanoes.winston.db.Data;
16  import gov.usgs.volcanoes.winston.db.Input;
17  import gov.usgs.volcanoes.winston.db.WinstonDatabase;
18  
19  /**
20   *
21   * @author Dan Cervelli
22   */
23  public class WaveServerCollector extends Thread {
24    public static final int COLLECT = 1;
25    public static final int FILL_GAPS = 2;
26    private final int mode;
27    private final WinstonDatabase winston;
28    private final Channels winstonStations;
29    private final Input input;
30    private final Data data;
31    private final int interval; // seconds
32    private final int maxSize; // seconds
33    private final int delay;
34  
35    private final List<SCN> channels;
36    private final String name;
37    private final WaveServer waveServer;
38  
39    public WaveServerCollector(final String n, final WinstonDatabase w, final WaveServer ws,
40        final int i, final int m, final int d, final int md) {
41      name = n;
42      winston = w;
43      input = new Input(winston);
44      winstonStations = new Channels(winston);
45      data = new Data(winston);
46      interval = i;
47      maxSize = m;
48      delay = d;
49      mode = md;
50      waveServer = ws;
51      channels = new ArrayList<SCN>();
52    }
53  
54    public void startCollecting() {
55      start();
56    }
57  
58    public void stopCollecting() {
59  
60    }
61  
62    public void addStation(final SCN ci) {
63      channels.add(ci);
64    }
65  
66    public void fillGap(final SCN scn, final double t1, final double t2) {
67      double ct = t1;
68      waveServer.connect();
69      while (ct < t2) {
70        final long ts = System.currentTimeMillis();
71        List<TraceBuf> tbs = null;
72        if (t2 - ct > maxSize) {
73          tbs = waveServer.getTraceBufs(scn.station, scn.channel, scn.network, Time.j2kToEw(ct - 5),
74              Time.j2kToEw(ct + maxSize + 5));
75          ct += maxSize;
76        } else {
77          tbs = waveServer.getTraceBufs(scn.station, scn.channel, scn.network, Time.j2kToEw(ct - 5),
78              Time.j2kToEw(t2 + 5));
79          ct = t2;
80        }
81        if (tbs != null && tbs.size() > 0) {
82          for (final Object o : tbs) {
83            final TraceBuf tb = (TraceBuf) o;
84            tb.createBytes();
85            input.inputTraceBuf(tb, false);
86          }
87        }
88        final long te = System.currentTimeMillis();
89        System.out.println("Chunk: " + ((double) (te - ts) / 1000) + "s");
90      }
91      waveServer.close();
92    }
93  
94    public void fillGaps() {
95      final Menu menu = waveServer.getMenu();
96      final Iterator<SCN> it = channels.iterator();
97      while (it.hasNext()) {
98        final SCN scn = it.next();
99        final String code = scn.toString().replace('_', '$');
100       System.out.println("[" + name + "/" + code + "]: ");
101 
102       final double now = Ew.now();
103 
104       final MenuItem mi = menu.getItem(scn);
105       if (mi == null)
106         continue;
107 
108       final List<double[]> gaps = data.findGaps(code, Time.ewToj2k(mi.startTime), now);
109       final double[] span = data.getTimeSpan(code);
110       final double fdt = span[0];
111       if (fdt > Time.ewToj2k(mi.startTime)) {
112         gaps.add(new double[] {Time.ewToj2k(mi.startTime), fdt});
113       }
114       if (gaps != null) {
115         final Iterator<double[]> it2 = gaps.iterator();
116         while (it2.hasNext()) {
117           final double[] gap = it2.next();
118           System.out.println((gap[1] - gap[0]) + "s, " + gap[0] + " -> " + gap[1]);
119           fillGap(scn, gap[0] - 5, gap[1] + 5);
120         }
121       }
122     }
123   }
124 
125   public void collect() {
126     final Iterator<SCN> it = channels.iterator();
127     while (it.hasNext()) {
128       final long ts = System.currentTimeMillis();
129       final SCN scn = it.next();
130       final String code = scn.toString().replace('_', '$');
131       System.out.print("[" + name + "/" + code + "]: ");
132 
133       boolean stationOk = true;
134       if (!winstonStations.channelExists(code)) {
135         System.out.print("creating new station in Winston; ");
136         stationOk = winstonStations.createChannel(code);
137       }
138       if (!stationOk)
139         continue;
140 
141       final double[] span = data.getTimeSpan(code);
142       final double now = Ew.now();
143       double last = Time.j2kToEw(span[1]);
144       if (last == -1E300)
145         last = now - 10 * 60;
146 
147       final List<TraceBuf> tbs =
148           waveServer.getTraceBufs(scn.station, scn.channel, scn.network, last, now);
149       if (tbs == null || tbs.size() == 0) {
150         System.out.print("wave server returned no data; ");
151       } else {
152         for (final Object o : tbs) {
153           final TraceBuf tb = (TraceBuf) o;
154           tb.createBytes();
155           input.inputTraceBuf(tb, false);
156         }
157       }
158 
159       final long te = System.currentTimeMillis();
160 
161       System.out.println("done. " + ((double) (te - ts) / 1000) + "s");
162     }
163   }
164 
165   @Override
166   public void run() {
167     try {
168       Thread.sleep(delay * 1000);
169     } catch (final Exception e) {
170     }
171     if (mode == COLLECT) {
172       while (true) {
173         try {
174           collect();
175           Thread.sleep(interval * 1000);
176         } catch (final Exception e) {
177           e.printStackTrace();
178         }
179       }
180     } else {
181       fillGaps();
182     }
183   }
184 }