尚硅谷大数据技术之电信客服
3.2.5 数据消费方案优化
现在我们要使用
使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二。
思路:
- a) 编写协处理器类,用于协助处理HBase的相关操作(增删改查)
- b) 在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。
- c) 重新创建hbase表,并设置为该表设置协处理器。
- d) 编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包
- e) 修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件
编码:
1) 新建协处理器类:CalleeWriteObserver,并覆写postPut方法,该方法会在数据成功插入之后被回调。
package com.atguigu.coprocessor; import com.atguigu.utils.HBaseUtil; import com.atguigu.utils.PropertiesUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; /** * 用于实现主叫日志插入成功之后,同时插入一条被叫日志 */ public class CalleeWriteObserver extends BaseRegionObserver{ @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { super.postPut(e, put, edit, durability); //1、获取需要操作的表 String targetTableName = PropertiesUtil.getProperty("hbase.table.name"); //2、获取当前操作的表 String currentTableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); //3、判断需要操作的表是否就是当前表,如果不是,则return if (!StringUtils.equals(targetTableName, currentTableName)) return; //4、得到当前插入数据的值并封装新的数据,oriRowkey举例:01_15369468720_20170727081033_13720860202_1_0180 String oriRowKey = Bytes.toString(put.getRow()); String[] splits = oriRowKey.split("_"); String flag = splits[4]; //如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据) if(StringUtils.equals(flag, "0")) return; //当前插入的数据描述 String caller = splits[1]; String callee = splits[3]; String dateTime = splits[2]; String duration = splits[5]; String timestamp = null; try { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); timestamp = String.valueOf(sdf.parse(dateTime).getTime()); } catch (ParseException e1) { e1.printStackTrace(); } //组装新的数据所在分区号 int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count")); String regionHash = HBaseUtil.genPartitionCode(callee, dateTime, regions); String newFlag = "0"; String rowKey = HBaseUtil.genRowKey(regionHash, callee, dateTime, caller, newFlag, duration); //开始存放被叫数据 Put newPut = new Put(Bytes.toBytes(rowKey)); newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call1"), Bytes.toBytes(callee)); newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call2"), Bytes.toBytes(caller)); newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time"), Bytes.toBytes(dateTime)); newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp)); newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration)); newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(newFlag)); HTableInterface hTable = e.getEnvironment().getTable(TableName.valueOf(targetTableName)); hTable.put(newPut); hTable.close(); } } |
2) 重新创建hbase表,并设置为该表设置协处理器。在“表描述器”中调用addCoprocessor方法进行协处理器的设置,大概是这样的:(你需要找到你的建表的那部分代码,添加如下逻辑)
tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver"); |
3.2.6 协处理器
重新编译项目,发布jar包到hbase的lib目录下(注意需群发):
$ scp lib/ct_consumer-1.0-SNAPSHOT.jar hadoop102:/opt/module/hbase/lib/ $ scp lib/ct_consumer-1.0-SNAPSHOT.jar hadoop103:/opt/module/hbase/lib/ |
重新修改hbase-site.xml:
<property> <name>hbase.coprocessor.region.classes</name> <value>com.atguigu.coprocessor.CalleeWriteObserver</value> </property> |
修改后群发:
$ xsync hbase/ |
完成以上步骤后,重新消费数据进行测试。