1 package gov.usgs.volcanoes.winston.in;
2
3 import java.util.ArrayList;
4 import java.util.Iterator;
5 import java.util.List;
6
7 import gov.usgs.earthworm.Menu;
8 import gov.usgs.earthworm.MenuItem;
9 import gov.usgs.earthworm.SCN;
10 import gov.usgs.earthworm.WaveServer;
11 import gov.usgs.earthworm.message.TraceBuf;
12 import gov.usgs.volcanoes.core.time.Ew;
13 import gov.usgs.volcanoes.core.time.Time;
14 import gov.usgs.volcanoes.winston.db.Channels;
15 import gov.usgs.volcanoes.winston.db.Data;
16 import gov.usgs.volcanoes.winston.db.Input;
17 import gov.usgs.volcanoes.winston.db.WinstonDatabase;
18
19
20
21
22
23 public class WaveServerCollector extends Thread {
24 public static final int COLLECT = 1;
25 public static final int FILL_GAPS = 2;
26 private final int mode;
27 private final WinstonDatabase winston;
28 private final Channels winstonStations;
29 private final Input input;
30 private final Data data;
31 private final int interval;
32 private final int maxSize;
33 private final int delay;
34
35 private final List<SCN> channels;
36 private final String name;
37 private final WaveServer waveServer;
38
39 public WaveServerCollector(final String n, final WinstonDatabase w, final WaveServer ws,
40 final int i, final int m, final int d, final int md) {
41 name = n;
42 winston = w;
43 input = new Input(winston);
44 winstonStations = new Channels(winston);
45 data = new Data(winston);
46 interval = i;
47 maxSize = m;
48 delay = d;
49 mode = md;
50 waveServer = ws;
51 channels = new ArrayList<SCN>();
52 }
53
54 public void startCollecting() {
55 start();
56 }
57
58 public void stopCollecting() {
59
60 }
61
62 public void addStation(final SCN ci) {
63 channels.add(ci);
64 }
65
66 public void fillGap(final SCN scn, final double t1, final double t2) {
67 double ct = t1;
68 waveServer.connect();
69 while (ct < t2) {
70 final long ts = System.currentTimeMillis();
71 List<TraceBuf> tbs = null;
72 if (t2 - ct > maxSize) {
73 tbs = waveServer.getTraceBufs(scn.station, scn.channel, scn.network, Time.j2kToEw(ct - 5),
74 Time.j2kToEw(ct + maxSize + 5));
75 ct += maxSize;
76 } else {
77 tbs = waveServer.getTraceBufs(scn.station, scn.channel, scn.network, Time.j2kToEw(ct - 5),
78 Time.j2kToEw(t2 + 5));
79 ct = t2;
80 }
81 if (tbs != null && tbs.size() > 0) {
82 for (final Object o : tbs) {
83 final TraceBuf tb = (TraceBuf) o;
84 tb.createBytes();
85 input.inputTraceBuf(tb, false);
86 }
87 }
88 final long te = System.currentTimeMillis();
89 System.out.println("Chunk: " + ((double) (te - ts) / 1000) + "s");
90 }
91 waveServer.close();
92 }
93
94 public void fillGaps() {
95 final Menu menu = waveServer.getMenu();
96 final Iterator<SCN> it = channels.iterator();
97 while (it.hasNext()) {
98 final SCN scn = it.next();
99 final String code = scn.toString().replace('_', '$');
100 System.out.println("[" + name + "/" + code + "]: ");
101
102 final double now = Ew.now();
103
104 final MenuItem mi = menu.getItem(scn);
105 if (mi == null)
106 continue;
107
108 final List<double[]> gaps = data.findGaps(code, Time.ewToj2k(mi.startTime), now);
109 final double[] span = data.getTimeSpan(code);
110 final double fdt = span[0];
111 if (fdt > Time.ewToj2k(mi.startTime)) {
112 gaps.add(new double[] {Time.ewToj2k(mi.startTime), fdt});
113 }
114 if (gaps != null) {
115 final Iterator<double[]> it2 = gaps.iterator();
116 while (it2.hasNext()) {
117 final double[] gap = it2.next();
118 System.out.println((gap[1] - gap[0]) + "s, " + gap[0] + " -> " + gap[1]);
119 fillGap(scn, gap[0] - 5, gap[1] + 5);
120 }
121 }
122 }
123 }
124
125 public void collect() {
126 final Iterator<SCN> it = channels.iterator();
127 while (it.hasNext()) {
128 final long ts = System.currentTimeMillis();
129 final SCN scn = it.next();
130 final String code = scn.toString().replace('_', '$');
131 System.out.print("[" + name + "/" + code + "]: ");
132
133 boolean stationOk = true;
134 if (!winstonStations.channelExists(code)) {
135 System.out.print("creating new station in Winston; ");
136 stationOk = winstonStations.createChannel(code);
137 }
138 if (!stationOk)
139 continue;
140
141 final double[] span = data.getTimeSpan(code);
142 final double now = Ew.now();
143 double last = Time.j2kToEw(span[1]);
144 if (last == -1E300)
145 last = now - 10 * 60;
146
147 final List<TraceBuf> tbs =
148 waveServer.getTraceBufs(scn.station, scn.channel, scn.network, last, now);
149 if (tbs == null || tbs.size() == 0) {
150 System.out.print("wave server returned no data; ");
151 } else {
152 for (final Object o : tbs) {
153 final TraceBuf tb = (TraceBuf) o;
154 tb.createBytes();
155 input.inputTraceBuf(tb, false);
156 }
157 }
158
159 final long te = System.currentTimeMillis();
160
161 System.out.println("done. " + ((double) (te - ts) / 1000) + "s");
162 }
163 }
164
165 @Override
166 public void run() {
167 try {
168 Thread.sleep(delay * 1000);
169 } catch (final Exception e) {
170 }
171 if (mode == COLLECT) {
172 while (true) {
173 try {
174 collect();
175 Thread.sleep(interval * 1000);
176 } catch (final Exception e) {
177 e.printStackTrace();
178 }
179 }
180 } else {
181 fillGaps();
182 }
183 }
184 }