尚硅谷大数据技术之电信客服
3.2.2 数据消费
如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。
思路:
- a) 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;
- b) 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;
- c) 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。
创建新的module项目:ct_consumer
pom.xml文件配置:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> </dependencies> |
1)新建类:HBaseConsumer
该类主要用于读取kafka中缓存的数据,然后调用HBaseAPI,持久化数据。
package com.atguigu.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import com.atguigu.dao.HBaseDAO; import com. atguigu.utils.PropertiesUtil; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class HBaseConsumer { public static void main(String[] args) { KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(PropertiesUtil.properties); kafkaConsumer.subscribe(Collections.singletonList(PropertiesUtil.getProperty("kafka.topic"))); HBaseDAO hBaseDAO = new HBaseDAO(); System.out.println(cr.value()); hBaseDAO.put(cr.value()); } } } } |
2)新建类:PropertiesUtil
该类主要用于将常用的项目所需的参数外部化,解耦,方便配置。
package com. atguigu.utils; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class PropertiesUtil { public static Properties properties = null; static{ try { // 加载配置属性 InputStream inputStream = ClassLoader.getSystemResourceAsStream("kafka.properties"); properties = new Properties(); properties.load(inputStream); } catch (IOException e) { e.printStackTrace(); } } public static String getProperty(String key){ return properties.getProperty(key); } } |
3)创建kafka.properties文件,并放置于resources目录下
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 group.id=g1 enable.auto.commit=true auto.commit.interval.ms=30000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #设置kafka主题 kafka.topics=calllog hbase.namespace=ns_telecom hbase.table.name=ns_telecom:calllog hbase.regions=6 |
4)将hdfs-site.xml、core-site.xml、hbase-site.xml、log4j.properties放置于resources目录
5)新建类:HBaseUtil
该类主要用于封装一些HBase的常用操作,比如创建命名空间,创建表等等。
package com.atguigu.utils; import java.io.IOException; import java.text.DecimalFormat; import java.util.Iterator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; public class HBaseUtil { /** * 判断HBase表是否存在 * * @throws IOException * @throws ZooKeeperConnectionException * @throws MasterNotRunningException */ public static boolean isExistTable(Configuration conf, String tableName) { // 操作HBase表必须创建HBaseAdmin对象 HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); return admin.tableExists(tableName); } catch (IOException e) { e.printStackTrace(); } finally { try { if (admin != null) admin.close(); } catch (IOException e) { e.printStackTrace(); } } return false; } /** * 初始化命名空间 */ public static void initNamespace(Configuration conf, String namespace) { HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); //命名空间类似于关系型数据库中的schema,可以想象成文件夹 NamespaceDescriptor ns = NamespaceDescriptor .create(namespace) .addConfiguration("creator", "WHAlex") .addConfiguration("create_time", String.valueOf(System.currentTimeMillis())) .build(); admin.createNamespace(ns); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (null != admin) { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 创建表 * * @param tableName * @param columnFamily * @throws IOException * @throws ZooKeeperConnectionException * @throws MasterNotRunningException */ public static void createTable(Configuration conf, String tableName, String... columnFamily) { HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); // 判断表是否存在 if (isExistTable(conf, tableName)) { // 存在 System.out.println("表已经存在:" + tableName); System.exit(0); } else { // 不存在 // 通过表名实例化“表描述器” HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); for (String cf : columnFamily) { tableDescriptor.addFamily(new HColumnDescriptor(cf).setMaxVersions(3)); } //添加协处理器 //tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver"); int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count")); admin.createTable(tableDescriptor, getSplitKeys(regions)); System.out.println("表创建成功:" + tableName); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (admin != null) admin.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 预分区键 * 例如:{"00|", "01|", "02|", "03|", "04|", "05|"} * @return */ private static byte[][] getSplitKeys(int regions) { String[] keys = new String[regions]; //这里默认不会超过两位数的分区,如果超过,需要变更设计,如果需要灵活操作,也需要变更设计 DecimalFormat df = new DecimalFormat("00"); for(int i = 0; i < regions; i++){ //例如:如果regions = 6,则:{"00|", "01|", "02|", "03|", "04|", "05|"} keys[i] = df.format(i) + "|"; } byte[][] splitKeys = new byte[keys.length][]; TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);// 升序排序 for (int i = 0; i < keys.length; i++) { rows.add(Bytes.toBytes(keys[i])); } Iterator<byte[]> rowKeyIter = rows.iterator(); int i = 0; while (rowKeyIter.hasNext()) { byte[] tempRow = rowKeyIter.next(); rowKeyIter.remove(); splitKeys[i] = tempRow; i++; } return splitKeys; } /** * 生成rowkey * * @param regionHash * @param call1 * @param dateTime * @param flag * @param duration * @return */ public static String genRowKey(String regionHash, String call1, String dateTime, String call2, String flag, String duration) { StringBuilder sb = new StringBuilder(); sb.append(regionHash + "_") .append(call1 + "_") .append(dateTime + "_") .append(call2 + "_") .append(flag + "_") .append(duration); return sb.toString(); } /** * 生成分区号 * @return */ public static String genPartitionCode(String call1, String callTime, int regions) { int len = call1.length(); //取出后4位电话号码 String last4Num = call1.substring(len - 4); //取出年月 String first4Num = callTime.replace("-", "").substring(0, 6); //亦或后与初始化设定的region个数求模 int hashCode = (Integer.valueOf(last4Num) ^ Integer.valueOf(first4Num)) % regions; return new DecimalFormat("00").format(hashCode); } } |
6)新建类:HBaseDAO
该类主要用于执行具体的保存数据的操作,rowkey的生成规则等等。
package com.atguigu.dao; import java.io.IOException; import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import com.atguigu.utils.HBaseUtil; import com.atguigu.utils.PropertiesUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class HBaseDAO { private int regions; private String namespace; private String tableName; private String flag; private SimpleDateFormat simpleDateFormat; private static Configuration conf = null; private HTable callLogTable; static{ conf = HBaseConfiguration.create(); } public HBaseDAO() { simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); tableName = PropertiesUtil.getProperty("hbase.table.name"); regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count")); namespace = PropertiesUtil.getProperty("hbase.namespace"); flag = PropertiesUtil.getProperty("hbase.caller.flag"); if(!HBaseUtil.isExistTable(conf, tableName)){ HBaseUtil.initNamespace(conf, namespace); HBaseUtil.createTable(conf, tableName, "f1", "f2"); } } /** * 15596505995,17519874292,2017-03-11 00:30:19,0652 * 将当前数据put到HTable中 * @param log */ public void put(String log){ try { callLogTable = new HTable(conf, tableName); String[] splits = log.split(","); String call1 = splits[0]; String call2 = splits[1]; String dateAndTime = splits[2]; String timestamp = null; try { timestamp = String.valueOf(simpleDateFormat.parse(dateAndTime).getTime()); } catch (ParseException e) { e.printStackTrace(); } String date = dateAndTime.split(" ")[0].replace("-", ""); String time = dateAndTime.split(" ")[1].replace(":", ""); String duration = splits[3]; String regionHash = HBaseUtil.genPartitionCode(call1, date, regions); String rowKey = HBaseUtil.genRowKey(regionHash, call1, date + time, call2, flag, duration); Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes("f1"), Bytes.toBytes("call1"), Bytes.toBytes(call1)); put.add(Bytes.toBytes("f1"), Bytes.toBytes("call2"), Bytes.toBytes(call2)); put.add(Bytes.toBytes("f1"), Bytes.toBytes("date_time"), Bytes.toBytes(date + time)); put.add(Bytes.toBytes("f1"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp)); put.add(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration)); put.add(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes(flag)); callLogTable.put(put); } catch (IOException e) { e.printStackTrace(); } } } |