实时单词统计案例
1)需求
实时统计发射到Storm框架中单词的总数。
2)分析
设计一个topology,来实现对文档里面的单词出现的频率进行统计。
整个topology分为三个部分:
(1)WordCountSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
(2)WordCountSplitBolt:负责将单行文本记录(句子)切分成单词
(3)WordCountBolt:负责对单词的频率进行累加
3)实操
(1)创建spout
package com.atguigu.storm.wordcount; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class WordCountSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; @Override public void nextTuple() { // 1 发射模拟数据 collector.emit(new Values(“i am ximen love jinlian”)); // 2 睡眠2秒 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } @SuppressWarnings(“rawtypes”) @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“love”)); } } |
(2)创建切割单词的bolt
package com.atguigu.storm.wordcount; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class WordCountSplitBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; @Override public void execute(Tuple input) { // 1 获取传递过来的一行数据 String line = input.getString(0); // 2 截取 String[] arrWords = line.split(” “); // 3 发射 for (String word : arrWords) { collector.emit(new Values(word, 1)); } } @SuppressWarnings(“rawtypes”) @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“word”, “num”)); } } |
(3)创建汇总单词个数的bolt
package com.atguigu.storm.wordcount; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class WordCountBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private Map<String, Integer> map = new HashMap<String, Integer>(); @Override public void execute(Tuple input) { // 1 获取传递过来的数据 String word = input.getString(0); Integer num = input.getInteger(1); // 2 累加单词 if (map.containsKey(word)) { Integer count = map.get(word); map.put(word, count + num); } else { map.put(word, num); } System.err.println(Thread.currentThread().getId() + ” word:” + word + ” num:” + map.get(word)); } @SuppressWarnings(“rawtypes”) @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // 不输出 } } |
(4)创建程序的拓扑main
package com.atguigu.storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountMain { public static void main(String[] args) { // 1、准备一个TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“WordCountSpout”, new WordCountSpout(), 1); builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 2).shuffleGrouping(“WordCountSpout”); builder.setBolt(“WordCountBolt”, new WordCountBolt(), 4).fieldsGrouping(“WordCountSplitBolt”, new Fields(“word”)); // 2、创建一个configuration,用来指定当前topology 需要的worker的数量 Config conf = new Config(); conf.setNumWorkers(2); // 3、提交任务 —–两种模式 本地模式和集群模式 if (args.length > 0) { try { // 4 分布式提交 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } } else { // 5 本地模式提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(“wordcounttopology”, conf, builder.createTopology()); } } } |
(5)测试
发现132线程只处理单词am和单词love;169进程处理单词i、ximen、jianlian。这就是分组的好处。
132 word:am num:1 132 word:love num:1 169 word:i num:1 169 word:ximen num:1 169 word:jinlian num:1 |
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习
上一篇: 大数据培训课程并发度
下一篇: Java培训课程Redis之AOF常用属性