1 读取文件案例思考
1)spout数据源:数据库、文件、MQ(比如:Kafka)
2)数据源是数据库:只适合读取数据库的配置文件
3)数据源是文件:只适合测试、讲课用(因为集群是分布式集群)
4)企业产生的log文件处理步骤:
(1)读出内容写入MQ
(2)Storm再处理
2 分组策略(Stream Grouping)
stream grouping用来定义一个stream应该如何分配给Bolts上面的多个Executors(多线程、多并发)。
Storm里面有7种类型的stream grouping
1)Shuffle Grouping: 随机分组,轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
2)Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
3)All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
4)Global Grouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5)Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果。在多线程情况下不平均分配。
6)Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
7)Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。
8)测试
(1)spout并发度修改为2,bolt并发度修改为1,shuffleGrouping模式
builder.setSpout(“WebLogSpout”, new WebLogSpout(),2); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 1).shuffleGrouping(“WebLogSpout”); Spout开两个线程会对数据读取两份,打印出来就是2份。如果数据源是消息队列,就不会出来读取两份的数据(统一消费者组,只能有一个消费者) Thread-33-WebLogBolt-executor[1 1]lines:60 session_id:CYYH6Y2345GHI899OFG4V9U567 |
(2)spout并发度修改为1,bolt并发度修改为2,noneGrouping模式
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).noneGrouping(“WebLogSpout”); 每个bolt接收到的单词不同。 Thread-33-WebLogBolt-executor[1 1]lines:14 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-34-WebLogBolt-executor[2 2]lines:16 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 |
(3)spout并发度修改为1,bolt并发度修改为2,fieldsGrouping
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).fieldsGrouping(“WebLogSpout”, new Fields(“log”)); 基于web案例不明显,后续案例比较明显 |
(4)spout并发度修改为1,bolt并发度修改为2,allGrouping(“spout”);
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).allGrouping(“WebLogSpout”); 每一个bolt获取到的数据都是一样的。 Thread-43-WebLogBolt-executor[1 1]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-23-WebLogBolt-executor[2 2]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 |
(5)spout并发度修改为1,bolt并发度修改为2,globalGrouping(“spout”);
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).globalGrouping(“WebLogSpout”); Task的id最低的bolt获取到了所有数据。 Thread-28-WebLogBolt-executor[1 1]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 |
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习
上一篇: 大数据培训课程之网站日志处理案例
下一篇: Java培训课程之zset操作