大数据培训课程之实时计算网站PV案例

发布时间:2020年05月13日作者:atguigu浏览次数:1,357

实时计算网站PV案例

0)基础知识准备

1)需求

       统计网站pv。

2)需求分析

方案一:

定义static long pv,Synchronized 控制累计操作。(不可行)

原因:Synchronized 和 Lock在单JVM下有效,但在多JVM下无效

方案二:

shuffleGrouping下,pv * Executer并发数

驱动函数中配置如下: builder.setSpout(“PVSpout”, new PVSpout(), 1); builder.setBolt(“PVBolt1”, new PVBolt1(), 4).shuffleGrouping(“PVSpout”); 在PVBolt1中输出时 System.err.println(“threadid:” + Thread.currentThread().getId() + ”  pv:” + pv*4); 因为shuffleGrouping轮询分配

优点:简单、计算量小

缺点:稍有误差,但绝大多数场景能接受

方案三:

PVBolt1进行多并发局部汇总,PVSumbolt单线程进行全局汇总

线程安全:多线程处理的结果和单线程一致

优点:绝对准确;如果用filedGrouping可以得到中间值,如单个user的访问PV(访问深度等)

缺点:计算量稍大,且多一个Bolt

大数据培训课程

3)案例实操

(1)创建数据输入源PVSpout

package com.atguigu.storm.pv; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Map;   import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;   public class PVSpout implements IRichSpout{          private static final long serialVersionUID = 1L;        private SpoutOutputCollector collector ;        private BufferedReader reader;               @SuppressWarnings(“rawtypes”)        @Override        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {               this.collector = collector;                             try {                      reader = new BufferedReader(new InputStreamReader(new FileInputStream(“e:/website.log”),”UTF-8″));                                    } catch (Exception e) {                      e.printStackTrace();               }        }          @Override        public void close() {                             try {                      if (reader != null) {                             reader.close();                      }               } catch (IOException e) {                      e.printStackTrace();               }        }          @Override        public void activate() {                      }          @Override        public void deactivate() {                      }          private String str;               @Override        public void nextTuple() {                             try {                      while((str = reader.readLine()) != null){                                                         collector.emit(new Values(str));                                                         Thread.sleep(500);                      }               } catch (Exception e) {                                    }        }          @Override        public void ack(Object msgId) {        }                 @Override        public void fail(Object msgId) {                      }          @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {               declarer.declare(new Fields(“log”));        }          @Override        public Map<String, Object> getComponentConfiguration() {               return null;        } }

(2)创建数据处理pvbolt1

package com.atguigu.storm.pv; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;   public class PVBolt1 implements IRichBolt {               private static final long serialVersionUID = 1L;        private OutputCollector collector;        private long pv = 0;          @SuppressWarnings(“rawtypes”)        @Override        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {               this.collector = collector;        }          @Override        public void execute(Tuple input) {               // 获取传递过来的数据               String logline = input.getString(0);                 // 截取出sessionid               String session_id = logline.split(“\t”)[1];                 // 根据会话id不同统计pv次数               if (session_id != null) {                      pv++;               }                 // 提交               collector.emit(new Values(Thread.currentThread().getId(), pv));                 System.err.println(“threadid:” + Thread.currentThread().getId() + ”  pv:” + pv);        }          @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {               declarer.declare(new Fields(“thireadID”, “pv”));        }          @Override        public void cleanup() {          }          @Override        public Map<String, Object> getComponentConfiguration() {               return null;        } }

(3)创建PVSumBolt

package com.atguigu.storm.pv; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple;   public class PVSumBolt implements IRichBolt {          private static final long serialVersionUID = 1L;        private Map<Long, Long> counts = new HashMap<>();          @SuppressWarnings(“rawtypes”)        @Override        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {          }          @Override        public void execute(Tuple input) {               Long threadID = input.getLong(0);               Long pv = input.getLong(1);                 counts.put(threadID, pv);                 long word_sum = 0;                 Iterator<Long> iterator = counts.values().iterator();                 while (iterator.hasNext()) {                      word_sum += iterator.next();               }                 System.err.println(“pv_all:” + word_sum);        }          @Override        public void cleanup() {        }          @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {          }          @Override        public Map<String, Object> getComponentConfiguration() {               return null;        } }

(4)驱动

package com.atguigu.storm.pv; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder;   public class PVMain {          public static void main(String[] args) {               TopologyBuilder builder = new TopologyBuilder();                             builder.setSpout(“PVSpout”, new PVSpout(), 1);               builder.setBolt(“PVBolt1”, new PVBolt1(), 4).shuffleGrouping(“PVSpout”);               builder.setBolt(“PVSumBolt”, new PVSumBolt(), 1).shuffleGrouping(“PVBolt1”);                             Config conf = new Config();                             conf.setNumWorkers(2);                             if (args.length > 0) {                      try {                             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());                      } catch (Exception e) {                             e.printStackTrace();                      }               }else {                      LocalCluster cluster = new LocalCluster();                      cluster.submitTopology(“pvtopology”, conf, builder.createTopology());               }        } }

(5)测试,执行程序输出如下结果

threadid:161  pv:1 pv_all:1 threadid:164  pv:1 pv_all:2 threadid:161  pv:2 pv_all:3 threadid:172  pv:1 pv_all:4 threadid:164  pv:2 pv_all:5 threadid:164  pv:3 pv_all:6 threadid:162  pv:1 pv_all:7 threadid:161  pv:3 pv_all:8 threadid:172  pv:2 pv_all:9 threadid:164  pv:4 pv_all:10 threadid:162  pv:2 pv_all:11 threadid:172  pv:3 pv_all:12 threadid:164  pv:5 pv_all:13 threadid:164  pv:6 pv_all:14 threadid:161  pv:4 pv_all:15 threadid:161  pv:5 pv_all:16 threadid:164  pv:7 pv_all:17 threadid:172  pv:4 pv_all:18 threadid:172  pv:5 pv_all:19 threadid:161  pv:6 pv_all:20 threadid:162  pv:3 pv_all:21 threadid:164  pv:8 pv_all:22 threadid:172  pv:6 pv_all:23 threadid:164  pv:9 pv_all:24 threadid:161  pv:7 pv_all:25 threadid:162  pv:4 pv_all:26 threadid:162  pv:5 pv_all:27 threadid:162  pv:6 pv_all:28 threadid:164  pv:10 pv_all:29 threadid:161  pv:8 pv_all:30

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)