大数据培训课程Storm之实时单词统计案例

发布时间:2020年05月12日作者:atguigu浏览次数:588

实时单词统计案例

1)需求

       实时统计发射到Storm框架中单词的总数。

2)分析

设计一个topology,来实现对文档里面的单词出现的频率进行统计。

整个topology分为三个部分:

(1)WordCountSpout:数据源,在已知的英文句子中,随机发送一条句子出去。

(2)WordCountSplitBolt:负责将单行文本记录(句子)切分成单词

(3)WordCountBolt:负责对单词的频率进行累加

3)实操

       (1)创建spout

package com.atguigu.storm.wordcount; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;   public class WordCountSpout extends BaseRichSpout {               private static final long serialVersionUID = 1L;        private SpoutOutputCollector collector;          @Override        public void nextTuple() {               // 1 发射模拟数据               collector.emit(new Values(“i am ximen love jinlian”));                             // 2 睡眠2秒               try {                      Thread.sleep(2000);               } catch (InterruptedException e) {                      e.printStackTrace();               }        }          @SuppressWarnings(“rawtypes”)        @Override        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {               this.collector = collector;        }          @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {               declarer.declare(new Fields(“love”));        } }

       (2)创建切割单词的bolt

package com.atguigu.storm.wordcount; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;   public class WordCountSplitBolt extends BaseRichBolt {          private static final long serialVersionUID = 1L;        private OutputCollector collector;          @Override        public void execute(Tuple input) {               // 1 获取传递过来的一行数据               String line = input.getString(0);               // 2 截取               String[] arrWords = line.split(” “);                             // 3 发射               for (String word : arrWords) {                      collector.emit(new Values(word, 1));               }        }          @SuppressWarnings(“rawtypes”)        @Override        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {               this.collector = collector;        }          @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {               declarer.declare(new Fields(“word”, “num”));        } }

       (3)创建汇总单词个数的bolt

package com.atguigu.storm.wordcount; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;   public class WordCountBolt extends BaseRichBolt {          private static final long serialVersionUID = 1L;        private Map<String, Integer> map = new HashMap<String, Integer>();          @Override        public void execute(Tuple input) {                 // 1 获取传递过来的数据               String word = input.getString(0);               Integer num = input.getInteger(1);                 // 2 累加单词               if (map.containsKey(word)) {                      Integer count = map.get(word);                      map.put(word, count + num);               } else {                      map.put(word, num);               }   System.err.println(Thread.currentThread().getId() + ”  word:” + word + ”  num:” + map.get(word));        }          @SuppressWarnings(“rawtypes”)        @Override        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {          }          @Override        public void declareOutputFields(OutputFieldsDeclarer arg0) {               // 不输出        } }

       (4)创建程序的拓扑main

package com.atguigu.storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;   public class WordCountMain {          public static void main(String[] args) {               // 1、准备一个TopologyBuilder               TopologyBuilder builder = new TopologyBuilder();                 builder.setSpout(“WordCountSpout”, new WordCountSpout(), 1);               builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 2).shuffleGrouping(“WordCountSpout”);               builder.setBolt(“WordCountBolt”, new WordCountBolt(), 4).fieldsGrouping(“WordCountSplitBolt”, new Fields(“word”));                 // 2、创建一个configuration,用来指定当前topology 需要的worker的数量               Config conf = new Config();               conf.setNumWorkers(2);                 // 3、提交任务 —–两种模式 本地模式和集群模式               if (args.length > 0) {                      try {                             // 4 分布式提交                             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());                      } catch (Exception e) {                             e.printStackTrace();                      }               } else {                      // 5 本地模式提交                      LocalCluster localCluster = new LocalCluster();                      localCluster.submitTopology(“wordcounttopology”, conf, builder.createTopology());               }        } }

       (5)测试

发现132线程只处理单词am和单词love;169进程处理单词i、ximen、jianlian。这就是分组的好处。

132  word:am  num:1 132  word:love  num:1 169  word:i  num:1 169  word:ximen  num:1 169  word:jinlian  num:1

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)