3.3.4 需求实现
1) 创建类:CountDurationMapper
package com.atguigu.analysis.mapper; import com.atguigu.analysis.kv.impl.ComDimension; import com.atguigu.analysis.kv.impl.ContactDimension; import com.atguigu.analysis.kv.impl.DateDimension; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class CountDurationMapper extends TableMapper<ComDimension, Text>{ //存放联系人电话与姓名的映射 private Map<String, String> contacts; private byte[] family = Bytes.toBytes(“f1”); private ComDimension comDimension = new ComDimension(); private void initContact(){ contacts = new HashMap<String, String>(); contacts.put(“15369468720”, “李雁”); contacts.put(“19920860202”, “卫艺”); contacts.put(“18411925860”, “仰莉”); contacts.put(“14473548449”, “陶欣悦”); contacts.put(“18749966182”, “施梅梅”); contacts.put(“19379884788”, “金虹霖”); contacts.put(“19335715448”, “魏明艳”); contacts.put(“18503558939”, “华贞”); contacts.put(“13407209608”, “华啟倩”); contacts.put(“15596505995”, “仲采绿”); contacts.put(“17519874292”, “卫丹”); contacts.put(“15178485516”, “戚丽红”); contacts.put(“19877232369”, “何翠柔”); contacts.put(“18706287692”, “钱溶艳”); contacts.put(“18944239644”, “钱琳”); contacts.put(“17325302007”, “缪静欣”); contacts.put(“18839074540”, “焦秋菊”); contacts.put(“19879419704”, “吕访琴”); contacts.put(“16480981069”, “沈丹”); contacts.put(“18674257265”, “褚美丽”); } @Override protected void setup(Context context) throws IOException, InterruptedException { initContact(); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //01_15837312345_20170810141024_13738909097_1_0180 String rowKey = Bytes.toString(value.getRow()); String[] values = rowKey.split(“_”); String flag = values[4]; //只拿到主叫数据即可 if(StringUtils.equals(flag, “0”)) return; String date_time = values[2]; String duration = values[5]; String call1 = values[1]; String call2 = values[3]; int year = Integer.valueOf(date_time.substring(0, 4)); int month = Integer.valueOf(date_time.substring(4, 6)); int day = Integer.valueOf(date_time.substring(6, 8)); DateDimension dateDimensionYear = new DateDimension(year, -1, -1); DateDimension dateDimensionMonth = new DateDimension(year, month, -1); DateDimension dateDimensionDay = new DateDimension(year, month, day); //第一个电话号码 ContactDimension contactDimension1 = new ContactDimension(call1, contacts.get(call1)); comDimension.setContactDimension(contactDimension1); comDimension.setDateDimension(dateDimensionYear); context.write(comDimension, new Text(duration)); comDimension.setDateDimension(dateDimensionMonth); context.write(comDimension, new Text(duration)); comDimension.setDateDimension(dateDimensionDay); context.write(comDimension, new Text(duration)); //第二个电话号码 ContactDimension contactDimension2 = new ContactDimension(call2, contacts.get(call2)); comDimension.setContactDimension(contactDimension2); comDimension.setDateDimension(dateDimensionYear); context.write(comDimension, new Text(duration)); comDimension.setDateDimension(dateDimensionMonth); context.write(comDimension, new Text(duration)); comDimension.setDateDimension(dateDimensionDay); context.write(comDimension, new Text(duration)); } } |
2) 创建类:CountDurationReducer
package com.atguigu.analysis.reducer; import com.atguigu.analysis.kv.impl.ComDimension; import com.atguigu.analysis.kv.impl.CountDurationValue; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CountDurationReducer extends Reducer<ComDimension, Text, ComDimension, CountDurationValue>{ @Override protected void reduce(ComDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int count = 0; int sumDuration = 0; for(Text text : values){ count ++; sumDuration += Integer.valueOf(text.toString()); } CountDurationValue countDurationValue = new CountDurationValue(count, sumDuration); context.write(key, countDurationValue); } } |
3) 创建类:CountDurationRunner
package com.atguigu.analysis.runner; import com.atguigu.analysis.format.MySQLOutputFormat; import com.atguigu.analysis.kv.impl.ComDimension; import com.atguigu.analysis.kv.impl.CountDurationValue; import com.atguigu.analysis.mapper.CountDurationMapper; import com.atguigu.analysis.reducer.CountDurationReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class CountDurationRunner implements Tool{ private Configuration conf = null; @Override public void setConf(Configuration conf) { this.conf = HBaseConfiguration.create(conf); } @Override public Configuration getConf() { return this.conf; } @Override public int run(String[] args) throws Exception { //得到conf对象 Configuration conf = this.getConf(); //创建Job Job job = Job.getInstance(conf, “CALL_LOG_ANALYSIS”); job.setJarByClass(CountDurationRunner.class); //为Job设置Mapper this.setHBaseInputConfig(job); //为Job设置Reducer job.setReducerClass(CountDurationReducer.class); job.setOutputKeyClass(ComDimension.class); job.setOutputValueClass(CountDurationValue.class); //为Job设置OutputFormat job.setOutputFormatClass(MySQLOutputFormat.class); return job.waitForCompletion(true) ? 0 : 1; } private void setHBaseInputConfig(Job job) { Configuration conf = job.getConfiguration(); HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); //如果表不存在则直接返回,抛个异常也挺好 if(!admin.tableExists(“ns_telecom:calllog”)) throw new RuntimeException(“Unable to find the specified table.”); Scan scan = new Scan(); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(“ns_telecom:calllog”)); TableMapReduceUtil.initTableMapperJob(“ns_telecom:calllog”, scan, CountDurationMapper.class, ComDimension.class, Text.class, job, true); } catch (IOException e) { e.printStackTrace(); }finally { if(admin != null) try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { try { int status = ToolRunner.run(new CountDurationRunner(), args); System.exit(status); if(status == 0){ System.out.println(“运行成功”); }else { System.out.println(“运行失败”); } } catch (Exception e) { System.out.println(“运行失败”); e.printStackTrace(); } } } |
上一篇: 尚硅谷大数据技术之电信客服
下一篇: 尚硅谷大数据技术之电信客服