尚硅谷大数据技术之电信客服

3.2.5 数据消费方案优化

现在我们要使用

使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二。

思路:

  1. a) 编写协处理器类,用于协助处理HBase的相关操作(增删改查)
  2. b) 在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。
  3. c) 重新创建hbase表,并设置为该表设置协处理器。
  4. d) 编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包
  5. e) 修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件

编码:

1) 新建协处理器类:CalleeWriteObserver,并覆写postPut方法,该方法会在数据成功插入之后被回调。

package com.atguigu.coprocessor;

import com.atguigu.utils.HBaseUtil;

import com.atguigu.utils.PropertiesUtil;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;

import org.apache.hadoop.hbase.coprocessor.ObserverContext;

import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

/**

 * 用于实现主叫日志插入成功之后,同时插入一条被叫日志

 */

public class CalleeWriteObserver extends BaseRegionObserver{

    @Override

    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {

        super.postPut(e, put, edit, durability);

        //1、获取需要操作的表

        String targetTableName = PropertiesUtil.getProperty("hbase.table.name");

        //2、获取当前操作的表

        String currentTableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();

        //3、判断需要操作的表是否就是当前表,如果不是,则return

        if (!StringUtils.equals(targetTableName, currentTableName)) return;

        //4、得到当前插入数据的值并封装新的数据,oriRowkey举例:01_15369468720_20170727081033_13720860202_1_0180

        String oriRowKey = Bytes.toString(put.getRow());

        String[] splits = oriRowKey.split("_");

        String flag = splits[4];

        //如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据)

        if(StringUtils.equals(flag, "0")) return;

        //当前插入的数据描述

        String caller = splits[1];

        String callee = splits[3];

        String dateTime = splits[2];

        String duration = splits[5];

        String timestamp = null;

        try {

            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

            timestamp = String.valueOf(sdf.parse(dateTime).getTime());

        } catch (ParseException e1) {

            e1.printStackTrace();

        }

        //组装新的数据所在分区号

        int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count"));

        String regionHash = HBaseUtil.genPartitionCode(callee, dateTime, regions);

        String newFlag = "0";

        String rowKey = HBaseUtil.genRowKey(regionHash, callee, dateTime, caller, newFlag, duration);

        //开始存放被叫数据

        Put newPut = new Put(Bytes.toBytes(rowKey));

        newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call1"), Bytes.toBytes(callee));

        newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call2"), Bytes.toBytes(caller));

        newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time"), Bytes.toBytes(dateTime));

        newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp));

        newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

        newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(newFlag));

        HTableInterface hTable = e.getEnvironment().getTable(TableName.valueOf(targetTableName));

        hTable.put(newPut);

        hTable.close();

    }

}

2) 重新创建hbase表,并设置为该表设置协处理器。在“表描述器”中调用addCoprocessor方法进行协处理器的设置,大概是这样的:(你需要找到你的建表的那部分代码,添加如下逻辑)

tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver");

3.2.6 协处理器

重新编译项目,发布jar包到hbase的lib目录下(注意需群发):

$ scp lib/ct_consumer-1.0-SNAPSHOT.jar hadoop102:/opt/module/hbase/lib/

$ scp lib/ct_consumer-1.0-SNAPSHOT.jar hadoop103:/opt/module/hbase/lib/

重新修改hbase-site.xml

<property>

    <name>hbase.coprocessor.region.classes</name>

    <value>com.atguigu.coprocessor.CalleeWriteObserver</value>

</property>

修改后群发:

$ xsync hbase/

完成以上步骤后,重新消费数据进行测试。