尚硅谷大数据技术之Hadoop(MapReduce)(新)第7章 MapReduce扩展案例

7.3 找博客共同好友案例

1.需求

以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

(1)数据输入

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

2.需求分析

先求出A、B、C、….等是谁的好友

第一次输出结果

A I,K,C,B,G,F,H,O,D,

B A,F,J,E,

C A,E,B,H,F,G,K,

D G,C,K,A,L,F,E,H,

E G,M,L,H,A,F,B,D,

F L,M,D,C,G,A,

G M,

H O,

I O,C,

J O,

K B,

L D,E,

M E,F,

O A,H,I,J,F,

第二次输出结果

A-B E C

A-C D F

A-D E F

A-E D B C

A-F O B C D E

A-G F E C D

A-H E C D O

A-I O

A-J O B

A-K D C

A-L F E D

A-M E F

B-C A

B-D A E

B-E C

B-F E A C

B-G C E A

B-H A E C

B-I A

B-K C A

B-L E

B-M E

B-O A

C-D A F

C-E D

C-F D A

C-G D F A

C-H D A

C-I A

C-K A D

C-L D F

C-M F

C-O I A

D-E L

D-F A E

D-G E A F

D-H A E

D-I A

D-K A

D-L E F

D-M F E

D-O A

E-F D M C B

E-G C D

E-H C D

E-J B

E-K C D

E-L D

F-G D C A E

F-H A D O E C

F-I O A

F-J B O

F-K D C A

F-L E D

F-M E

F-O A

G-H D C E A

G-I A

G-K D A C

G-L D F E

G-M E F

G-O A

H-I O A

H-J O

H-K A C D

H-L D E

H-M E

H-O A

I-J O

I-K A

I-O A

K-L D

K-O A

L-M E F

3.代码实现

(1)第一次Mapper类

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)

throws IOException, InterruptedException {

 

// 1 获取一行 A:B,C,D,F,E,O

String line = value.toString();

// 2 切割

String[] fields = line.split(":");

// 3 获取person和好友

String person = fields[0];

String[] friends = fields[1].split(",");

// 4写出去

for(String friend: friends){

 

// 输出 <好友,人>

context.write(new Text(friend), new Text(person));

}

}

}

(2)第一次Reducer类

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

@Override

protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer();

 

//1 拼接

for(Text person: values){

sb.append(person).append(",");

}

//2 写出

context.write(key, new Text(sb.toString()));

}

}

(3)第一次Driver类

package com.atguigu.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OneShareFriendsDriver {

 

public static void main(String[] args) throws Exception {

// 1 获取job对象

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

// 2 指定jar包运行的路径

job.setJarByClass(OneShareFriendsDriver.class);

 

// 3 指定map/reduce使用的类

job.setMapperClass(OneShareFriendsMapper.class);

job.setReducerClass(OneShareFriendsReducer.class);

// 4 指定map输出的数据类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

// 6 指定job的输入原始所在目录

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交

boolean result = job.waitForCompletion(true);

System.exit(result?0:1);

}

}

(4)第二次Mapper类

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import java.util.Arrays;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// A I,K,C,B,G,F,H,O,D,

// 友 人,人,人

String line = value.toString();

String[] friend_persons = line.split("\t");

 

String friend = friend_persons[0];

String[] persons = friend_persons[1].split(",");

 

Arrays.sort(persons);

 

for (int i = 0; i < persons.length - 1; i++) {

for (int j = i + 1; j < persons.length; j++) {

// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去

context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));

}

}

}

}

(5)第二次Reducer类

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

@Override

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer();

 

for (Text friend : values) {

sb.append(friend).append(" ");

}

context.write(key, new Text(sb.toString()));

}

}

(6)第二次Driver类

package com.atguigu.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TwoShareFriendsDriver {

 

public static void main(String[] args) throws Exception {

// 1 获取job对象

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

// 2 指定jar包运行的路径

job.setJarByClass(TwoShareFriendsDriver.class);

 

// 3 指定map/reduce使用的类

job.setMapperClass(TwoShareFriendsMapper.class);

job.setReducerClass(TwoShareFriendsReducer.class);

// 4 指定map输出的数据类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

// 6 指定job的输入原始所在目录

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交

boolean result = job.waitForCompletion(true);

System.exit(result?0:1);

}

}