实时计算网站PV案例
0)基础知识准备
1)需求
统计网站pv。
2)需求分析
方案一:
定义static long pv,Synchronized 控制累计操作。(不可行)
原因:Synchronized 和 Lock在单JVM下有效,但在多JVM下无效
方案二:
shuffleGrouping下,pv * Executer并发数
驱动函数中配置如下: builder.setSpout(“PVSpout”, new PVSpout(), 1); builder.setBolt(“PVBolt1”, new PVBolt1(), 4).shuffleGrouping(“PVSpout”); 在PVBolt1中输出时 System.err.println(“threadid:” + Thread.currentThread().getId() + ” pv:” + pv*4); 因为shuffleGrouping轮询分配 |
优点:简单、计算量小
缺点:稍有误差,但绝大多数场景能接受
方案三:
PVBolt1进行多并发局部汇总,PVSumbolt单线程进行全局汇总
线程安全:多线程处理的结果和单线程一致
优点:绝对准确;如果用filedGrouping可以得到中间值,如单个user的访问PV(访问深度等)
缺点:计算量稍大,且多一个Bolt

3)案例实操
(1)创建数据输入源PVSpout
package com.atguigu.storm.pv; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class PVSpout implements IRichSpout{ private static final long serialVersionUID = 1L; private SpoutOutputCollector collector ; private BufferedReader reader; @SuppressWarnings(“rawtypes”) @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(“e:/website.log”),”UTF-8″)); } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { try { if (reader != null) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } @Override public void activate() { } @Override public void deactivate() { } private String str; @Override public void nextTuple() { try { while((str = reader.readLine()) != null){ collector.emit(new Values(str)); Thread.sleep(500); } } catch (Exception e) { } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“log”)); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } |
(2)创建数据处理pvbolt1
package com.atguigu.storm.pv; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class PVBolt1 implements IRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private long pv = 0; @SuppressWarnings(“rawtypes”) @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { // 获取传递过来的数据 String logline = input.getString(0); // 截取出sessionid String session_id = logline.split(“\t”)[1]; // 根据会话id不同统计pv次数 if (session_id != null) { pv++; } // 提交 collector.emit(new Values(Thread.currentThread().getId(), pv)); System.err.println(“threadid:” + Thread.currentThread().getId() + ” pv:” + pv); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“thireadID”, “pv”)); } @Override public void cleanup() { } @Override public Map<String, Object> getComponentConfiguration() { return null; } } |
(3)创建PVSumBolt
package com.atguigu.storm.pv; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; public class PVSumBolt implements IRichBolt { private static final long serialVersionUID = 1L; private Map<Long, Long> counts = new HashMap<>(); @SuppressWarnings(“rawtypes”) @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { Long threadID = input.getLong(0); Long pv = input.getLong(1); counts.put(threadID, pv); long word_sum = 0; Iterator<Long> iterator = counts.values().iterator(); while (iterator.hasNext()) { word_sum += iterator.next(); } System.err.println(“pv_all:” + word_sum); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } } |
(4)驱动
package com.atguigu.storm.pv; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; public class PVMain { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“PVSpout”, new PVSpout(), 1); builder.setBolt(“PVBolt1”, new PVBolt1(), 4).shuffleGrouping(“PVSpout”); builder.setBolt(“PVSumBolt”, new PVSumBolt(), 1).shuffleGrouping(“PVBolt1”); Config conf = new Config(); conf.setNumWorkers(2); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“pvtopology”, conf, builder.createTopology()); } } } |
(5)测试,执行程序输出如下结果
threadid:161 pv:1 pv_all:1 threadid:164 pv:1 pv_all:2 threadid:161 pv:2 pv_all:3 threadid:172 pv:1 pv_all:4 threadid:164 pv:2 pv_all:5 threadid:164 pv:3 pv_all:6 threadid:162 pv:1 pv_all:7 threadid:161 pv:3 pv_all:8 threadid:172 pv:2 pv_all:9 threadid:164 pv:4 pv_all:10 threadid:162 pv:2 pv_all:11 threadid:172 pv:3 pv_all:12 threadid:164 pv:5 pv_all:13 threadid:164 pv:6 pv_all:14 threadid:161 pv:4 pv_all:15 threadid:161 pv:5 pv_all:16 threadid:164 pv:7 pv_all:17 threadid:172 pv:4 pv_all:18 threadid:172 pv:5 pv_all:19 threadid:161 pv:6 pv_all:20 threadid:162 pv:3 pv_all:21 threadid:164 pv:8 pv_all:22 threadid:172 pv:6 pv_all:23 threadid:164 pv:9 pv_all:24 threadid:161 pv:7 pv_all:25 threadid:162 pv:4 pv_all:26 threadid:162 pv:5 pv_all:27 threadid:162 pv:6 pv_all:28 threadid:164 pv:10 pv_all:29 threadid:161 pv:8 pv_all:30 |
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习
上一篇: Java培训课程Redis之AOF的优缺点
下一篇: 前端培训之React/Vue 项目时为什么要在组件中写 key