尚硅谷大数据技术之电信客服

3.2.2 数据消费

如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。

思路:

  1. a) 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;
  2. b) 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;
  3. c) 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。

创建新的module项目:ct_consumer

pom.xml文件配置:

<dependencies>

<dependency>

    <groupId>junit</groupId>

    <artifactId>junit</artifactId>

    <version>4.12</version>

    <scope>test</scope>

 </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
</dependency>

    </dependencies>

 1)新建类:HBaseConsumer

该类主要用于读取kafka中缓存的数据,然后调用HBaseAPI,持久化数据。

package com.atguigu.kafka;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import com.atguigu.dao.HBaseDAO;

import com. atguigu.utils.PropertiesUtil;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.serializer.StringDecoder;

import kafka.utils.VerifiableProperties;

public class HBaseConsumer {

    public static void main(String[] args) {

         KafkaConsumer<String, String> kafkaConsumer = new

KafkaConsumer<>(PropertiesUtil.properties);

kafkaConsumer.subscribe(Collections.singletonList(PropertiesUtil.getProperty("kafka.topic")));

HBaseDAO hBaseDAO = new HBaseDAO();
while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
    for (ConsumerRecord cr : records) {

              System.out.println(cr.value());

              hBaseDAO.put(cr.value());

             }

        }

    }

}

2)新建类:PropertiesUtil

该类主要用于将常用的项目所需的参数外部化,解耦,方便配置。

package com. atguigu.utils;

import java.io.IOException;

import java.io.InputStream;

import java.util.Properties;

public class PropertiesUtil {

    public static Properties properties = null;

    static{

        try {

            // 加载配置属性

            InputStream inputStream = ClassLoader.getSystemResourceAsStream("kafka.properties");

            properties = new Properties();

            properties.load(inputStream);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    public static String getProperty(String key){

        return properties.getProperty(key);

    }

}

3)创建kafka.properties文件,并放置于resources目录下

bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

group.id=g1

enable.auto.commit=true

auto.commit.interval.ms=30000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

#设置kafka主题

kafka.topics=calllog

hbase.namespace=ns_telecom

hbase.table.name=ns_telecom:calllog

hbase.regions=6

4)将hdfs-site.xml、core-site.xml、hbase-site.xml、log4j.properties放置于resources目录

5)新建类:HBaseUtil

该类主要用于封装一些HBase的常用操作,比如创建命名空间,创建表等等。

package com.atguigu.utils;

import java.io.IOException;

import java.text.DecimalFormat;

import java.util.Iterator;

import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.MasterNotRunningException;

import org.apache.hadoop.hbase.NamespaceDescriptor;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.ZooKeeperConnectionException;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.util.Bytes;

public class HBaseUtil {

    /**

     * 判断HBase表是否存在

     *

     * @throws IOException

     * @throws ZooKeeperConnectionException

     * @throws MasterNotRunningException

     */

    public static boolean isExistTable(Configuration conf, String tableName) {

        // 操作HBase表必须创建HBaseAdmin对象

        HBaseAdmin admin = null;

        try {

            admin = new HBaseAdmin(conf);

            return admin.tableExists(tableName);

        } catch (IOException e) {

            e.printStackTrace();

        } finally {

            try {

                if (admin != null) admin.close();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

        return false;

    }

    /**

     * 初始化命名空间

     */

    public static void initNamespace(Configuration conf, String namespace) {

        HBaseAdmin admin = null;

        try {

            admin = new HBaseAdmin(conf);

            //命名空间类似于关系型数据库中的schema,可以想象成文件夹

            NamespaceDescriptor ns = NamespaceDescriptor

                    .create(namespace)

                    .addConfiguration("creator", "WHAlex")

                    .addConfiguration("create_time", String.valueOf(System.currentTimeMillis()))

                    .build();

            admin.createNamespace(ns);

        } catch (MasterNotRunningException e) {

            e.printStackTrace();

        } catch (ZooKeeperConnectionException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        } finally {

            if (null != admin) {

                try {

                    admin.close();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

    }

    /**

     * 创建表

     *

     * @param tableName

     * @param columnFamily

     * @throws IOException

     * @throws ZooKeeperConnectionException

     * @throws MasterNotRunningException

     */

    public static void createTable(Configuration conf, String tableName, String... columnFamily) {

        HBaseAdmin admin = null;

        try {

            admin = new HBaseAdmin(conf);

            // 判断表是否存在

            if (isExistTable(conf, tableName)) {

                // 存在

                System.out.println("表已经存在:" + tableName);

                System.exit(0);

            } else {

                // 不存在

                // 通过表名实例化“表描述器”

                HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

                for (String cf : columnFamily) {

                    tableDescriptor.addFamily(new HColumnDescriptor(cf).setMaxVersions(3));

                }

//添加协处理器          //tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver");

                int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count"));

                admin.createTable(tableDescriptor, getSplitKeys(regions));

                System.out.println("表创建成功:" + tableName);

            }

        } catch (IOException e) {

            e.printStackTrace();

        } finally {

            try {

                if (admin != null) admin.close();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

    /**

     * 预分区键

     * 例如:{"00|", "01|", "02|", "03|", "04|", "05|"}

     * @return

     */

    private static byte[][] getSplitKeys(int regions) {

        String[] keys = new String[regions];

        //这里默认不会超过两位数的分区,如果超过,需要变更设计,如果需要灵活操作,也需要变更设计

        DecimalFormat df = new DecimalFormat("00");

        for(int i = 0; i < regions; i++){

            //例如:如果regions = 6,则:{"00|", "01|", "02|", "03|", "04|", "05|"}

            keys[i] = df.format(i) + "|";

        }

        byte[][] splitKeys = new byte[keys.length][];

        TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);// 升序排序

        for (int i = 0; i < keys.length; i++) {

            rows.add(Bytes.toBytes(keys[i]));

        }

        Iterator<byte[]> rowKeyIter = rows.iterator();

        int i = 0;

        while (rowKeyIter.hasNext()) {

            byte[] tempRow = rowKeyIter.next();

            rowKeyIter.remove();

            splitKeys[i] = tempRow;

            i++;

        }

        return splitKeys;

    }

    /**

     * 生成rowkey

     *

     * @param regionHash

     * @param call1

     * @param dateTime

     * @param flag

     * @param duration

     * @return

     */

    public static String genRowKey(String regionHash, String call1, String dateTime, String call2, String flag, String duration) {

        StringBuilder sb = new StringBuilder();

        sb.append(regionHash + "_")

                .append(call1 + "_")

                .append(dateTime + "_")

                .append(call2 + "_")

                .append(flag + "_")

                .append(duration);

        return sb.toString();

    }

    /**

     * 生成分区号

     * @return

     */

    public static String genPartitionCode(String call1, String callTime, int regions) {

        int len = call1.length();

        //取出后4位电话号码

        String last4Num = call1.substring(len - 4);

        //取出年月

        String first4Num = callTime.replace("-", "").substring(0, 6);

        //亦或后与初始化设定的region个数求模

        int hashCode = (Integer.valueOf(last4Num) ^ Integer.valueOf(first4Num)) % regions;

        return new DecimalFormat("00").format(hashCode);

    }

}

6)新建类:HBaseDAO

该类主要用于执行具体的保存数据的操作,rowkey的生成规则等等。

package com.atguigu.dao;

import java.io.IOException;

import java.text.DecimalFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import com.atguigu.utils.HBaseUtil;

import com.atguigu.utils.PropertiesUtil;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

public class HBaseDAO {

    private int regions;

    private String namespace;

    private String tableName;

    private String flag;

    private SimpleDateFormat simpleDateFormat;

    private static Configuration conf = null;

    private HTable callLogTable;

    static{

        conf = HBaseConfiguration.create();

    } 

    public HBaseDAO() {

        simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        tableName = PropertiesUtil.getProperty("hbase.table.name");

        regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count"));

        namespace = PropertiesUtil.getProperty("hbase.namespace");

        flag = PropertiesUtil.getProperty("hbase.caller.flag");       

        if(!HBaseUtil.isExistTable(conf, tableName)){

            HBaseUtil.initNamespace(conf, namespace);

            HBaseUtil.createTable(conf, tableName, "f1", "f2");

        }

    }

    /**

     * 15596505995,17519874292,2017-03-11 00:30:19,0652

     * 将当前数据put到HTable中

     * @param log

     */

    public void put(String log){

        try {

            callLogTable = new HTable(conf, tableName);

            String[] splits = log.split(",");            

            String call1 = splits[0];

            String call2 = splits[1];

            String dateAndTime = splits[2];

            String timestamp = null;

            try {

                timestamp = String.valueOf(simpleDateFormat.parse(dateAndTime).getTime());

            } catch (ParseException e) {

                e.printStackTrace();

            }

            String date = dateAndTime.split(" ")[0].replace("-", "");

            String time = dateAndTime.split(" ")[1].replace(":", "");

            String duration = splits[3];           

            String regionHash = HBaseUtil.genPartitionCode(call1, date, regions);

            String rowKey = HBaseUtil.genRowKey(regionHash, call1, date + time, call2, flag, duration);

            Put put = new Put(Bytes.toBytes(rowKey));            

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("call1"), Bytes.toBytes(call1));

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("call2"), Bytes.toBytes(call2));

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("date_time"), Bytes.toBytes(date + time));

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp));

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

            put.add(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes(flag));            

            callLogTable.put(put);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

}