尚硅谷大数据技术之电信客服

3.2.3 数据查询方式

使用scan查看HBase中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时间点的数据。进行该单元测试前,需要先运行数据采集任务,确保HBase中已有数据存在。

新建工具过滤器工具类:HBaseFilterUtil

package com.atguigu.utils;

import org.apache.hadoop.hbase.filter.*;

import org.apache.hadoop.hbase.util.Bytes;

import java.util.Collection;

public class HBaseFilterUtil {

    /**

     * 获得相等过滤器。相当于SQL的 [字段] = [值]

     *

     * @param cf  列族名

     * @param col 列名

     * @param val 值

     * @return 过滤器

     */

    public static Filter eqFilter(String cf, String col, byte[] val) {

        SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, val);

        f.setLatestVersionOnly(true);

        f.setFilterIfMissing(true);

        return f;

    }

    /**

     * 获得大于过滤器。相当于SQL的 [字段] > [值]

     *

     * @param cf  列族名

     * @param col 列名

     * @param val 值

     * @return 过滤器

     */

    public static Filter gtFilter(String cf, String col, byte[] val) {

        SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.GREATER, val);

        f.setLatestVersionOnly(true);

        f.setFilterIfMissing(true);

        return f;

    }

    /**

     * 获得大于等于过滤器。相当于SQL的 [字段] >= [值]

     *

     * @param cf  列族名

     * @param col 列名

     * @param val 值

     * @return 过滤器

     */

    public static Filter gteqFilter(String cf, String col, byte[] val) {

        SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.GREATER_OR_EQUAL, val);

        f.setLatestVersionOnly(true);

        f.setFilterIfMissing(true);

        return f;

    }

    /**

     * 获得小于过滤器。相当于SQL的 [字段] < [值]

     *

     * @param cf  列族名

     * @param col 列名

     * @param val 值

     * @return 过滤器

     */

    public static Filter ltFilter(String cf, String col, byte[] val) {

        SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.LESS, val);

        f.setLatestVersionOnly(true);

        f.setFilterIfMissing(true);

        return f;

    }

    /**

     * 获得小于等于过滤器。相当于SQL的 [字段] <= [值]

     *

     * @param cf  列族名

     * @param col 列名

     * @param val 值

     * @return 过滤器

     */

    public static Filter lteqFilter(String cf, String col, byte[] val) {

        SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.LESS_OR_EQUAL, val);

        f.setLatestVersionOnly(true);

        f.setFilterIfMissing(true);

        return f;

    }

    /**

     * 获得不等于过滤器。相当于SQL的 [字段] != [值]

     *

     * @param cf  列族名

     * @param col 列名

     * @param val 值

     * @return 过滤器

     */

    public static Filter neqFilter(String cf, String col, byte[] val) {

        SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.NOT_EQUAL, val);

        f.setLatestVersionOnly(true);

        f.setFilterIfMissing(true);

        return f;

    }

    /**

     * 和过滤器 相当于SQL的 的 and

     *

     * @param filters 多个过滤器

     * @return 过滤器

     */

    public static Filter andFilter(Filter... filters) {

        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);

        if (filters != null && filters.length > 0) {

            if (filters.length > 1) {

                for (Filter f : filters) {

                    filterList.addFilter(f);

                }

            }

            if (filters.length == 1) {

                return filters[0];

            }

        }

        return filterList;

    }

    /**

     * 和过滤器 相当于SQL的 的 and

     *

     * @param filters 多个过滤器

     * @return 过滤器

     */

    public static Filter andFilter(Collection<Filter> filters) {

        return andFilter(filters.toArray(new Filter[0]));

    }

    /**

     * 或过滤器 相当于SQL的 or

     *

     * @param filters 多个过滤器

     * @return 过滤器

     */

    public static Filter orFilter(Filter... filters) {

        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);

        if (filters != null && filters.length > 0) {

            for (Filter f : filters) {

                filterList.addFilter(f);

            }

        }

        return filterList;

    }

    /**

     * 或过滤器 相当于SQL的 or

     *

     * @param filters 多个过滤器

     * @return 过滤器

     */

    public static Filter orFilter(Collection<Filter> filters) {

        return orFilter(filters.toArray(new Filter[0]));

    }

    /**

     * 非空过滤器 相当于SQL的 is not null

     *

     * @param cf  列族

     * @param col 列

     * @return 过滤器

     */

    public static Filter notNullFilter(String cf, String col) {

        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.NOT_EQUAL, new NullComparator());

        filter.setFilterIfMissing(true);

        filter.setLatestVersionOnly(true);

        return filter;

    }

    /**

     * 空过滤器 相当于SQL的 is null

     *

     * @param cf  列族

     * @param col 列

     * @return 过滤器

     */

    public static Filter nullFilter(String cf, String col) {

        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, new NullComparator());

        filter.setFilterIfMissing(false);

        filter.setLatestVersionOnly(true);

        return filter;

    }

    /**

     * 子字符串过滤器 相当于SQL的 like '%[val]%'

     *

     * @param cf  列族

     * @param col 列

     * @param sub 子字符串

     * @return 过滤器

     */

    public static Filter subStringFilter(String cf, String col, String sub) {

        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, new SubstringComparator(sub));

        filter.setFilterIfMissing(true);

        filter.setLatestVersionOnly(true);

        return filter;

    }

    /**

     * 正则过滤器 相当于SQL的 rlike '[regex]'

     *

     * @param cf    列族

     * @param col   列

     * @param regex 正则表达式

     * @return 过滤器

     */

    public static Filter regexFilter(String cf, String col, String regex) {

        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex));

        filter.setFilterIfMissing(true);

        filter.setLatestVersionOnly(true);

        return filter;

    }

}

新建单元测试类:HBaseScanTest1(这是个当前情景被废弃的方案,现用方案:HBaseScanTest2后续讲解

package com. atguigu;

import com.atguigu.utils.HBaseFilterUtil;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

import java.io.IOException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

public class HBaseScanTest1 {

    private static Configuration conf = null;

    static{

        conf = HBaseConfiguration.create();

    }

    @Test

    public void scanTest() throws IOException {

        HTable hTable = new HTable(conf, "ns_telecom:calllog");

        Scan scan = new Scan();

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");

        String startTimePoint = null;

        String endTimePoint = null;

        try {

            startTimePoint = String.valueOf(simpleDateFormat.parse("2017-01-1").getTime());

            endTimePoint = String.valueOf(simpleDateFormat.parse("2017-03-01").getTime());

        } catch (ParseException e) {

            e.printStackTrace();

        }

        Filter filter1 = HBaseFilterUtil.gteqFilter("f1", "date_time_ts", Bytes.toBytes(startTimePoint));

        Filter filter2 = HBaseFilterUtil.ltFilter("f1", "date_time_ts", Bytes.toBytes(endTimePoint));

        Filter filterList = HBaseFilterUtil.andFilter(filter1, filter2);

        scan.setFilter(filterList);

        ResultScanner resultScanner = hTable.getScanner(scan);

        //每一个rowkey对应一个result

        for(Result result : resultScanner){

            //每一个rowkey里面包含多个cell

            Cell[] cells = result.rawCells();

            for(Cell c: cells){

//                System.out.println("行:" + Bytes.toString(CellUtil.cloneRow(c)));

//                System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(c)));

//                System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(c)));

//                System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(c)));

                System.out.println(Bytes.toString(CellUtil.cloneRow(c))

                        + ","

                        + Bytes.toString(CellUtil.cloneFamily(c))

                        + ":"

                        + Bytes.toString(CellUtil.cloneQualifier(c))

                        + ","

                        + Bytes.toString(CellUtil.cloneValue(c)));

            }

        }

    }

}