StormWindowDemo 发表于 2017-06-20 | 分类于 大数据 | 阅读次数 关于Storm Window原理以及用法的Demo123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.topology.base.BaseWindowedBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import org.apache.storm.windowing.TupleWindow;import java.util.ArrayList;import java.util.List;import java.util.Map;import static org.apache.storm.topology.base.BaseWindowedBolt.Count;public class BaseWindowTopology { private static long startTime; static String relativeTime() { return (System.currentTimeMillis() - startTime) / 1000.0 + "s "; } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("integer", new RegularIntegerSpout(), 1); builder.setBolt("peek", new PeekBolt(), 1) .shuffleGrouping("integer"); builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1) .shuffleGrouping("peek"); builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1) .shuffleGrouping("slidingsum"); builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg"); Config conf = new Config(); // conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); // Utils.sleep(100000); //cluster.killTopology("test"); // cluster.shutdown(); //觉得打印结果能够说明问题时,自己手动停止程序 } } private static class RegularIntegerSpout extends BaseRichSpout { private SpoutOutputCollector collector; private int id = 0; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("value")); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(1000); collector.emit(new Values(++id));//id作为value发送 } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } } private static class PrinterBolt extends BaseBasicBolt {//打印最后的结果 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println("fields: " + tuple.getFields().toList() + " values: " + tuple.getValues()); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } } private static class PeekBolt extends BaseBasicBolt {//查看从Spout中发出的tuple static boolean begin = false; @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println("Spout: fields: " + tuple.getFields().toList() + " values: " + tuple.getValues()); collector.emit(tuple.getValues()); if (!begin) { startTime = System.currentTimeMillis(); begin = true; } } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { ofd.declare(new Fields("value")); } } private static class SlidingWindowSumBolt extends BaseWindowedBolt { private int sum = 0; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { /* * The inputWindow gives a view of * (a) all the events in the window * (b) events that expired since last activation of the window * (c) events that newly arrived since last activation of the window */ List<Tuple> tuplesInWindow = inputWindow.get(); List<Tuple> newTuples = inputWindow.getNew(); List<Tuple> expiredTuples = inputWindow.getExpired(); /* * Instead of iterating over all the tuples in the window to compute * the sum, the values for the new events are added and old events are * subtracted. Similar optimizations might be possible in other * windowing computations. */ System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++"); System.out.println("Time: " + relativeTime() + "in SlidingWindowSumBolt"); System.out.println("tuplesInWindow: " + tupleToString(tuplesInWindow)); System.out.println("newTuples: " + tupleToString(newTuples)); System.out.println("expiredTuples: " + tupleToString(expiredTuples)); System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++"); for (Tuple tuple : newTuples) { sum += (int) tuple.getValue(0); } for (Tuple tuple : expiredTuples) { sum -= (int) tuple.getValue(0); } collector.emit(new Values(sum)); } static List<String> tupleToString(List<Tuple> tuples) { List<String> ret = new ArrayList<>(); for (Tuple t : tuples) { ret.add(t.getValues().toString()); } return ret; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sum")); } }/* *Computes tumbling window average */ private static class TumblingWindowAvgBolt extends BaseWindowedBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { int sum = 0; List<Tuple> tuplesInWindow = inputWindow.get(); if (tuplesInWindow.size() > 0) { /* * Since this is a tumbling window calculation, * we use all the tuples in the window to compute the avg. */ System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++"); System.out.println("Time: " + relativeTime() + "in TumblingWindowAvgBolt"); for (Tuple tuple : tuplesInWindow) { System.out.println(" values: " + tuple.getValues()); sum += (int) tuple.getValue(0); } System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++"); collector.emit(new Values(sum / tuplesInWindow.size())); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("avg")); } }} 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.topology.base.BaseWindowedBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import org.apache.storm.windowing.TupleWindow;import java.text.SimpleDateFormat;import java.util.*;import java.util.concurrent.TimeUnit;public class TimestampWindowTopology { private static long startTime; private static SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("timestamp", new TimestampSpout(), 1); builder.setBolt("peek", new PeekBolt(), 1) .shuffleGrouping("timestamp"); //WindowBolt参数:Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s BaseWindowedBolt windowBolt = new WatchSlidingWindowBolt() .withWindow(new BaseWindowedBolt.Duration(20, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) .withTimestampField("timestamp") .withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS)) .withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)); builder.setBolt("slidingsum", windowBolt, 1) .shuffleGrouping("peek"); Config conf = new Config(); if (args != null && args.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); // Utils.sleep(100000); //cluster.killTopology("test"); // cluster.shutdown(); // 觉得打印结果能够说明问题时,自己手动停止程序 } } private static String relativeTime() { return (System.currentTimeMillis() - startTime) / 1000.0 + "s "; } private static class TimestampSpout extends BaseRichSpout { private SpoutOutputCollector collector; private int id = 0; private long timestamp; private int hour=0; private boolean needToChange=false;//标记是否该修改当前小时 //自定义时间戳的秒 private int[] seconds = new int[]{4, 5, 7, 18, 26, 34, 35, 47, 48, 56 }; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("timestamp", "id", "value")); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(100);//一秒钟10个tuple int loopIndex=id % seconds.length; int sec = seconds[loopIndex];//秒循环用自定义数组的值 if(loopIndex==0){ hour=(hour+1)%24;//每当秒用完一次循环,小时加一 } if(loopIndex==seconds.length/2&&id>seconds.length){ //每一秒发射的中间一个tuple故意设置为“迟到的”,但是第一次产生水印时还没有旧的水印,故加上id>seconds.length排除第一次的“迟到”tuple needToChange=true; hour--;//将时间戳提前1小时,来产生“迟到的”tuple } timestamp = getTimestamp(hour, sec); Date date = new Date(timestamp); String value = df.format(date); collector.emit(new Values(timestamp, id++, value)); if(needToChange){//发射完之后将小时改回来 needToChange=false; hour++; } } private long getTimestamp(int hour, int second) { Calendar now = Calendar.getInstance(); now.set(Calendar.HOUR_OF_DAY, hour); now.set(Calendar.MINUTE, 0); now.set(Calendar.SECOND, second); return now.getTimeInMillis(); } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } } private static class PeekBolt extends BaseBasicBolt { static boolean begin = false; @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println("Spout: fields: " + tuple.getFields().toList() + " values: " + tupleToString(tuple)); collector.emit(tuple.getValues()); if (!begin) { startTime = System.currentTimeMillis(); begin = true; } } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { ofd.declare(new Fields("timestamp", "id", "value")); } } private static class WatchSlidingWindowBolt extends BaseWindowedBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(TupleWindow inputWindow) { List<Tuple> tuplesInWindow = inputWindow.get(); List<Tuple> newTuples = inputWindow.getNew(); List<Tuple> expiredTuples = inputWindow.getExpired(); System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++"); System.out.println("Time: " + relativeTime() + "in WatchSlidingWindowBolt"); System.out.println("tuplesInWindow: " + tuplesToString(tuplesInWindow)); System.out.println("newTuples: " + tuplesToString(newTuples)); System.out.println("expiredTuples: " + tuplesToString(expiredTuples)); System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } private static String tuplesToString(List<Tuple> tuples) { List<String> ret = new ArrayList<>(); for (Tuple t : tuples) { ret.add(tupleToString(t)); } return ret.toString(); } private static String tupleToString(Tuple t) { Date date = new Date(t.getLong(0)); String timestamp = df.format(date); int id = t.getInteger(1); String ret = "[timestamp: " + timestamp + ", id: " + id + ",value: " + t.getString(2) + "]"; return ret; }}