StormWindowDemo

关于Storm Window原理以及用法的Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
public class BaseWindowTopology {
private static long startTime;
static String relativeTime() {
return (System.currentTimeMillis() - startTime) / 1000.0 + "s ";
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("integer", new RegularIntegerSpout(), 1);
builder.setBolt("peek", new PeekBolt(), 1)
.shuffleGrouping("integer");
builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
.shuffleGrouping("peek");
builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
.shuffleGrouping("slidingsum");
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
Config conf = new Config();
// conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
// Utils.sleep(100000);
//cluster.killTopology("test");
// cluster.shutdown();
//觉得打印结果能够说明问题时,自己手动停止程序
}
}
private static class RegularIntegerSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int id = 0;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("value"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
collector.emit(new Values(++id));//id作为value发送
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
}
private static class PrinterBolt extends BaseBasicBolt {//打印最后的结果
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("fields: " + tuple.getFields().toList() + " values: " + tuple.getValues());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
private static class PeekBolt extends BaseBasicBolt {//查看从Spout中发出的tuple
static boolean begin = false;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("Spout: fields: " + tuple.getFields().toList() + " values: " + tuple.getValues());
collector.emit(tuple.getValues());
if (!begin) {
startTime = System.currentTimeMillis();
begin = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields("value"));
}
}
private static class SlidingWindowSumBolt extends BaseWindowedBolt {
private int sum = 0;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
/*
* The inputWindow gives a view of
* (a) all the events in the window
* (b) events that expired since last activation of the window
* (c) events that newly arrived since last activation of the window
*/
List<Tuple> tuplesInWindow = inputWindow.get();
List<Tuple> newTuples = inputWindow.getNew();
List<Tuple> expiredTuples = inputWindow.getExpired();
/*
* Instead of iterating over all the tuples in the window to compute
* the sum, the values for the new events are added and old events are
* subtracted. Similar optimizations might be possible in other
* windowing computations.
*/
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");
System.out.println("Time: " + relativeTime() + "in SlidingWindowSumBolt");
System.out.println("tuplesInWindow: " + tupleToString(tuplesInWindow));
System.out.println("newTuples: " + tupleToString(newTuples));
System.out.println("expiredTuples: " + tupleToString(expiredTuples));
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");
for (Tuple tuple : newTuples) {
sum += (int) tuple.getValue(0);
}
for (Tuple tuple : expiredTuples) {
sum -= (int) tuple.getValue(0);
}
collector.emit(new Values(sum));
}
static List<String> tupleToString(List<Tuple> tuples) {
List<String> ret = new ArrayList<>();
for (Tuple t : tuples) {
ret.add(t.getValues().toString());
}
return ret;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sum"));
}
}
/*
*Computes tumbling window average
*/
private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
int sum = 0;
List<Tuple> tuplesInWindow = inputWindow.get();
if (tuplesInWindow.size() > 0) {
/*
* Since this is a tumbling window calculation,
* we use all the tuples in the window to compute the avg.
*/
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");
System.out.println("Time: " + relativeTime() + "in TumblingWindowAvgBolt");
for (Tuple tuple : tuplesInWindow) {
System.out.println(" values: " + tuple.getValues());
sum += (int) tuple.getValue(0);
}
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");
collector.emit(new Values(sum / tuplesInWindow.size()));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("avg"));
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class TimestampWindowTopology {
private static long startTime;
private static SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("timestamp", new TimestampSpout(), 1);
builder.setBolt("peek", new PeekBolt(), 1)
.shuffleGrouping("timestamp");
//WindowBolt参数:Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
BaseWindowedBolt windowBolt = new WatchSlidingWindowBolt()
.withWindow(new BaseWindowedBolt.Duration(20, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withTimestampField("timestamp")
.withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS))
.withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS));
builder.setBolt("slidingsum", windowBolt, 1)
.shuffleGrouping("peek");
Config conf = new Config();
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
// Utils.sleep(100000);
//cluster.killTopology("test");
// cluster.shutdown();
// 觉得打印结果能够说明问题时,自己手动停止程序
}
}
private static String relativeTime() {
return (System.currentTimeMillis() - startTime) / 1000.0 + "s ";
}
private static class TimestampSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int id = 0;
private long timestamp;
private int hour=0;
private boolean needToChange=false;//标记是否该修改当前小时
//自定义时间戳的秒
private int[] seconds = new int[]{4, 5, 7, 18, 26, 34, 35, 47, 48, 56
};
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("timestamp", "id", "value"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Utils.sleep(100);//一秒钟10个tuple
int loopIndex=id % seconds.length;
int sec = seconds[loopIndex];//秒循环用自定义数组的值
if(loopIndex==0){
hour=(hour+1)%24;//每当秒用完一次循环,小时加一
}
if(loopIndex==seconds.length/2&&id>seconds.length){
//每一秒发射的中间一个tuple故意设置为“迟到的”,但是第一次产生水印时还没有旧的水印,故加上id>seconds.length排除第一次的“迟到”tuple
needToChange=true;
hour--;//将时间戳提前1小时,来产生“迟到的”tuple
}
timestamp = getTimestamp(hour, sec);
Date date = new Date(timestamp);
String value = df.format(date);
collector.emit(new Values(timestamp, id++, value));
if(needToChange){//发射完之后将小时改回来
needToChange=false;
hour++;
}
}
private long getTimestamp(int hour, int second) {
Calendar now = Calendar.getInstance();
now.set(Calendar.HOUR_OF_DAY, hour);
now.set(Calendar.MINUTE, 0);
now.set(Calendar.SECOND, second);
return now.getTimeInMillis();
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
}
private static class PeekBolt extends BaseBasicBolt {
static boolean begin = false;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("Spout: fields: " + tuple.getFields().toList() + " values: " + tupleToString(tuple));
collector.emit(tuple.getValues());
if (!begin) {
startTime = System.currentTimeMillis();
begin = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields("timestamp", "id", "value"));
}
}
private static class WatchSlidingWindowBolt extends BaseWindowedBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(TupleWindow inputWindow) {
List<Tuple> tuplesInWindow = inputWindow.get();
List<Tuple> newTuples = inputWindow.getNew();
List<Tuple> expiredTuples = inputWindow.getExpired();
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");
System.out.println("Time: " + relativeTime() + "in WatchSlidingWindowBolt");
System.out.println("tuplesInWindow: " + tuplesToString(tuplesInWindow));
System.out.println("newTuples: " + tuplesToString(newTuples));
System.out.println("expiredTuples: " + tuplesToString(expiredTuples));
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
private static String tuplesToString(List<Tuple> tuples) {
List<String> ret = new ArrayList<>();
for (Tuple t : tuples) {
ret.add(tupleToString(t));
}
return ret.toString();
}
private static String tupleToString(Tuple t) {
Date date = new Date(t.getLong(0));
String timestamp = df.format(date);
int id = t.getInteger(1);
String ret = "[timestamp: " + timestamp + ", id: " + id + ",value: " + t.getString(2) + "]";
return ret;
}
}