尚硅谷大数据技术之电信客服
3.2.3 数据查询方式一
使用scan查看HBase中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时间点的数据。进行该单元测试前,需要先运行数据采集任务,确保HBase中已有数据存在。
新建工具过滤器工具类:HBaseFilterUtil
package com.atguigu.utils; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import java.util.Collection; public class HBaseFilterUtil { /** * 获得相等过滤器。相当于SQL的 [字段] = [值] * * @param cf 列族名 * @param col 列名 * @param val 值 * @return 过滤器 */ public static Filter eqFilter(String cf, String col, byte[] val) { SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, val); f.setLatestVersionOnly(true); f.setFilterIfMissing(true); return f; } /** * 获得大于过滤器。相当于SQL的 [字段] > [值] * * @param cf 列族名 * @param col 列名 * @param val 值 * @return 过滤器 */ public static Filter gtFilter(String cf, String col, byte[] val) { SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.GREATER, val); f.setLatestVersionOnly(true); f.setFilterIfMissing(true); return f; } /** * 获得大于等于过滤器。相当于SQL的 [字段] >= [值] * * @param cf 列族名 * @param col 列名 * @param val 值 * @return 过滤器 */ public static Filter gteqFilter(String cf, String col, byte[] val) { SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.GREATER_OR_EQUAL, val); f.setLatestVersionOnly(true); f.setFilterIfMissing(true); return f; } /** * 获得小于过滤器。相当于SQL的 [字段] < [值] * * @param cf 列族名 * @param col 列名 * @param val 值 * @return 过滤器 */ public static Filter ltFilter(String cf, String col, byte[] val) { SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.LESS, val); f.setLatestVersionOnly(true); f.setFilterIfMissing(true); return f; } /** * 获得小于等于过滤器。相当于SQL的 [字段] <= [值] * * @param cf 列族名 * @param col 列名 * @param val 值 * @return 过滤器 */ public static Filter lteqFilter(String cf, String col, byte[] val) { SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.LESS_OR_EQUAL, val); f.setLatestVersionOnly(true); f.setFilterIfMissing(true); return f; } /** * 获得不等于过滤器。相当于SQL的 [字段] != [值] * * @param cf 列族名 * @param col 列名 * @param val 值 * @return 过滤器 */ public static Filter neqFilter(String cf, String col, byte[] val) { SingleColumnValueFilter f = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.NOT_EQUAL, val); f.setLatestVersionOnly(true); f.setFilterIfMissing(true); return f; } /** * 和过滤器 相当于SQL的 的 and * * @param filters 多个过滤器 * @return 过滤器 */ public static Filter andFilter(Filter... filters) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); if (filters != null && filters.length > 0) { if (filters.length > 1) { for (Filter f : filters) { filterList.addFilter(f); } } if (filters.length == 1) { return filters[0]; } } return filterList; } /** * 和过滤器 相当于SQL的 的 and * * @param filters 多个过滤器 * @return 过滤器 */ public static Filter andFilter(Collection<Filter> filters) { return andFilter(filters.toArray(new Filter[0])); } /** * 或过滤器 相当于SQL的 or * * @param filters 多个过滤器 * @return 过滤器 */ public static Filter orFilter(Filter... filters) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); if (filters != null && filters.length > 0) { for (Filter f : filters) { filterList.addFilter(f); } } return filterList; } /** * 或过滤器 相当于SQL的 or * * @param filters 多个过滤器 * @return 过滤器 */ public static Filter orFilter(Collection<Filter> filters) { return orFilter(filters.toArray(new Filter[0])); } /** * 非空过滤器 相当于SQL的 is not null * * @param cf 列族 * @param col 列 * @return 过滤器 */ public static Filter notNullFilter(String cf, String col) { SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.NOT_EQUAL, new NullComparator()); filter.setFilterIfMissing(true); filter.setLatestVersionOnly(true); return filter; } /** * 空过滤器 相当于SQL的 is null * * @param cf 列族 * @param col 列 * @return 过滤器 */ public static Filter nullFilter(String cf, String col) { SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, new NullComparator()); filter.setFilterIfMissing(false); filter.setLatestVersionOnly(true); return filter; } /** * 子字符串过滤器 相当于SQL的 like '%[val]%' * * @param cf 列族 * @param col 列 * @param sub 子字符串 * @return 过滤器 */ public static Filter subStringFilter(String cf, String col, String sub) { SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, new SubstringComparator(sub)); filter.setFilterIfMissing(true); filter.setLatestVersionOnly(true); return filter; } /** * 正则过滤器 相当于SQL的 rlike '[regex]' * * @param cf 列族 * @param col 列 * @param regex 正则表达式 * @return 过滤器 */ public static Filter regexFilter(String cf, String col, String regex) { SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(col), CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); filter.setFilterIfMissing(true); filter.setLatestVersionOnly(true); return filter; } } |
新建单元测试类:HBaseScanTest1(这是个当前情景被废弃的方案,现用方案:HBaseScanTest2后续讲解)
package com. atguigu; import com.atguigu.utils.HBaseFilterUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; public class HBaseScanTest1 { private static Configuration conf = null; static{ conf = HBaseConfiguration.create(); } @Test public void scanTest() throws IOException { HTable hTable = new HTable(conf, "ns_telecom:calllog"); Scan scan = new Scan(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); String startTimePoint = null; String endTimePoint = null; try { startTimePoint = String.valueOf(simpleDateFormat.parse("2017-01-1").getTime()); endTimePoint = String.valueOf(simpleDateFormat.parse("2017-03-01").getTime()); } catch (ParseException e) { e.printStackTrace(); } Filter filter1 = HBaseFilterUtil.gteqFilter("f1", "date_time_ts", Bytes.toBytes(startTimePoint)); Filter filter2 = HBaseFilterUtil.ltFilter("f1", "date_time_ts", Bytes.toBytes(endTimePoint)); Filter filterList = HBaseFilterUtil.andFilter(filter1, filter2); scan.setFilter(filterList); ResultScanner resultScanner = hTable.getScanner(scan); //每一个rowkey对应一个result for(Result result : resultScanner){ //每一个rowkey里面包含多个cell Cell[] cells = result.rawCells(); for(Cell c: cells){ // System.out.println("行:" + Bytes.toString(CellUtil.cloneRow(c))); // System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(c))); // System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(c))); // System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(c))); System.out.println(Bytes.toString(CellUtil.cloneRow(c)) + "," + Bytes.toString(CellUtil.cloneFamily(c)) + ":" + Bytes.toString(CellUtil.cloneQualifier(c)) + "," + Bytes.toString(CellUtil.cloneValue(c))); } } } } |