尚硅谷大数据技术之电信客服

3.3.4 需求实现

1) 创建类:CountDurationMapper

package com.atguigu.analysis.mapper;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.ContactDimension;

import com.atguigu.analysis.kv.impl.DateDimension;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Text;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

public class CountDurationMapper extends TableMapper<ComDimension, Text>{

    //存放联系人电话与姓名的映射

    private Map<String, String> contacts;

    private byte[] family = Bytes.toBytes(“f1”);

    private ComDimension comDimension = new ComDimension();

    private void initContact(){

        contacts = new HashMap<String, String>();

        contacts.put(“15369468720”, “李雁”);

        contacts.put(“19920860202”, “卫艺”);

        contacts.put(“18411925860”, “仰莉”);

        contacts.put(“14473548449”, “陶欣悦”);

        contacts.put(“18749966182”, “施梅梅”);

        contacts.put(“19379884788”, “金虹霖”);

        contacts.put(“19335715448”, “魏明艳”);

        contacts.put(“18503558939”, “华贞”);

        contacts.put(“13407209608”, “华啟倩”);

        contacts.put(“15596505995”, “仲采绿”);

        contacts.put(“17519874292”, “卫丹”);

        contacts.put(“15178485516”, “戚丽红”);

        contacts.put(“19877232369”, “何翠柔”);

        contacts.put(“18706287692”, “钱溶艳”);

        contacts.put(“18944239644”, “钱琳”);

        contacts.put(“17325302007”, “缪静欣”);

        contacts.put(“18839074540”, “焦秋菊”);

        contacts.put(“19879419704”, “吕访琴”);

        contacts.put(“16480981069”, “沈丹”);

        contacts.put(“18674257265”, “褚美丽”);

    }

    @Override

    protected void setup(Context context) throws IOException, InterruptedException {

        initContact();

    }

    @Override

    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //01_15837312345_20170810141024_13738909097_1_0180

        String rowKey = Bytes.toString(value.getRow());

        String[] values = rowKey.split(“_”);

        String flag = values[4];

        //只拿到主叫数据即可

        if(StringUtils.equals(flag, “0”)) return;

        String date_time = values[2];

        String duration = values[5];

        String call1 = values[1];

        String call2 = values[3];

        int year = Integer.valueOf(date_time.substring(0, 4));

        int month = Integer.valueOf(date_time.substring(4, 6));

        int day = Integer.valueOf(date_time.substring(6, 8));

        DateDimension dateDimensionYear = new DateDimension(year, -1, -1);

        DateDimension dateDimensionMonth = new DateDimension(year, month, -1);

        DateDimension dateDimensionDay = new DateDimension(year, month, day);

        //第一个电话号码

        ContactDimension contactDimension1 = new ContactDimension(call1, contacts.get(call1));

        comDimension.setContactDimension(contactDimension1);

        comDimension.setDateDimension(dateDimensionYear);

        context.write(comDimension, new Text(duration));

        comDimension.setDateDimension(dateDimensionMonth);

        context.write(comDimension, new Text(duration));

        comDimension.setDateDimension(dateDimensionDay);

        context.write(comDimension, new Text(duration));

        //第二个电话号码

        ContactDimension contactDimension2 = new ContactDimension(call2, contacts.get(call2));

        comDimension.setContactDimension(contactDimension2);

        comDimension.setDateDimension(dateDimensionYear);

        context.write(comDimension, new Text(duration));

        comDimension.setDateDimension(dateDimensionMonth);

        context.write(comDimension, new Text(duration));

        comDimension.setDateDimension(dateDimensionDay);

        context.write(comDimension, new Text(duration));

    }

}

2) 创建类:CountDurationReducer

package com.atguigu.analysis.reducer;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.CountDurationValue;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CountDurationReducer extends Reducer<ComDimension, Text, ComDimension, CountDurationValue>{

    @Override

    protected void reduce(ComDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        int count = 0;

        int sumDuration = 0;

        for(Text text : values){

            count ++;

            sumDuration += Integer.valueOf(text.toString());

        }

        CountDurationValue countDurationValue = new CountDurationValue(count, sumDuration);

        context.write(key, countDurationValue);

    }

}

3) 创建类:CountDurationRunner

package com.atguigu.analysis.runner;

import com.atguigu.analysis.format.MySQLOutputFormat;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.CountDurationValue;

import com.atguigu.analysis.mapper.CountDurationMapper;

import com.atguigu.analysis.reducer.CountDurationReducer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class CountDurationRunner implements Tool{

    private Configuration conf = null;

    @Override

    public void setConf(Configuration conf) {

        this.conf = HBaseConfiguration.create(conf);

    }

    @Override

    public Configuration getConf() {

        return this.conf;

    }

    @Override

    public int run(String[] args) throws Exception {

        //得到conf对象

        Configuration conf = this.getConf();

        //创建Job

        Job job = Job.getInstance(conf, “CALL_LOG_ANALYSIS”);

        job.setJarByClass(CountDurationRunner.class);

        //为Job设置Mapper

        this.setHBaseInputConfig(job);

        //为Job设置Reducer

        job.setReducerClass(CountDurationReducer.class);

        job.setOutputKeyClass(ComDimension.class);

        job.setOutputValueClass(CountDurationValue.class);

        //为Job设置OutputFormat

        job.setOutputFormatClass(MySQLOutputFormat.class);

        return job.waitForCompletion(true) ? 0 : 1;

    }

    private void setHBaseInputConfig(Job job) {

        Configuration conf = job.getConfiguration();

        HBaseAdmin admin = null;

        try {

            admin = new HBaseAdmin(conf);

            //如果表不存在则直接返回,抛个异常也挺好

            if(!admin.tableExists(“ns_telecom:calllog”)) throw new RuntimeException(“Unable to find the specified table.”);

            Scan scan = new Scan();

            scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(“ns_telecom:calllog”));

            TableMapReduceUtil.initTableMapperJob(“ns_telecom:calllog”, scan,

                    CountDurationMapper.class, ComDimension.class, Text.class,

                    job, true);

        } catch (IOException e) {

            e.printStackTrace();

        }finally {

            if(admin != null) try {

                admin.close();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

    public static void main(String[] args) {

        try {

            int status = ToolRunner.run(new CountDurationRunner(), args);

            System.exit(status);

            if(status == 0){

                System.out.println(“运行成功”);

            }else {

                System.out.println(“运行失败”);

            }

        } catch (Exception e) {

            System.out.println(“运行失败”);

            e.printStackTrace();

        }

    }

}


上一篇:
下一篇:
关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园综合楼6层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)