一、MapReduce

  • MapReduce是一个分布式运算程序的编程框架

  • 由于MR是个框架,所以要注重逻辑,而不太需要关心程序的运行

  • 优点:高容错性、超大量数据
  • 缺点:不适用实时计算、流式计算、有向图计算,就是

二、核心思想

  • MapReduce分成Map和Reduce两个部分
  • Map指映射,将数据映射成KV值<key, value>
  • Reduce指化简,将Map阶段的KV值合并成想要的数据

三、工作流程

1、简化流程

1
2
3
4
5
6
MapReudce的简化流程:Input -> Mapper -> Reducer -> Output
1. Input到Mapper由InputFormat完成,变成<k1, v1>
2. Mapper将<k1, v1>转成<k2, v2>
3. Mapper到Reducer为shuffle阶段:将<k2, v2>排序
3. Reducer处理成想要的<k3, v3>形式
4. Reducer到Output由OutputFormat完成
  • 上述的流程都是写成代码,并封装成一个jar包,提交给Yarn ResourceManager
  • 这个jar包里面包括:Application Master程序、启动Application Master的命令、用户程序等

2、详细流程

  • 假设现在有三个DataNode,每个DataNode里面的文件大小为128M、200M、100M
  1. MR程序提交到客户端所在的节点,YarnRunner向ResourceManager申请一个Application

  2. RM将该Application的资源路径和作业id返回给YarnRunner

  3. YarnRunner将运行job所需资源提交到HDFS上

  4. 程序资源提交完毕后,申请运行MRAppmaster

  5. RM将用户的请求初始化成一个Task

  6. 第一个DataNode里面的NodeManager领取到Task

  7. 该NodeManager创建容器Container,并产生MRAppmaster

  8. Container从HDFS上拷贝资源到本地,拷贝的是啥东西???

  9. MRAppmaster向RM申请运行MapTask资源

  10. RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器

  11. MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序:

    • Mapper阶段

    • 由InputFormat -> FileInputFormat -> TextInputFormat,通过getSplits方法获得Split数组

      • 将三个文件切分成:0~128M0~128M128~200M0~100M,共四个切片。
      • 有多少个切片,就有多少个Map Task
    • 用getRecordReader方法对Split做处理,将split解析成<k1, v1>
    • Mapper将<k1, v1>映射成<k2, v2>,主要是setup(), map(), cleanup(), run()完成的
    • KV值写给outputCollector,环形缓冲区,进行分区排序
    • Shuffle阶段
    • Mapper出来的数据(KV值)进入环形缓冲区,并获得分区号p(逻辑分割,没有进行物理分割):(key,value,p)
    • 环形缓冲区溢出,(key, value, p)分区排序,这里是二次排序,排序的是索引:先排分区p,分区内按照key排序
      • 可以加Combiner,即分区排序、落盘之前合并(key, value),也可以不加
    • 如果环形缓冲区,多次溢出,重复2~3步,再对多次溢出并处理后的数据归并排序、Combiner
    • 落盘,最终文件就是Map Task的输出内容
  12. MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask

  13. ReduceTask向MapTask获取相应分区的数据

    • 假设有n个Map Task(即四个),每个Map Task分了m个区
    • 启动Reduce Task,在哪台机器启动呢????,数量为m个,即多少个分区就多少个Reduce Task
    • 每个Reduce Task接受n个Map Task相应分区的数据
    • 每个Reduce Task对n份数据归并排序,按照key分组,可自定义分组
    • 数据输出到Reduce并转成<k3, v3>,通过OutputFormat输出
  14. 程序运行完毕后,MR会向RM申请注销自己

四、例子

  • 注意:hadoop不能直接使用Java的数据类型,hadoop有自带。
  • 除了String对应Text,其他Java变量都在后面加个Writable,如Int对应Intwritable
  • MapReduce程序一共三大件:Mapper.javaReducer.javaDriver.java
  • 有时候会添加一些自定义的东西,如:
    • 自定义InputFormat,自定义如何读取数据
    • 序列化Bean,自定义的变量类型,替换掉下面的LongWritable, Text, Text, IntWritable
    • 自定义分区partition,默认的是哈希分区
    • Combiner,
    • 自定义OutputFormat,自定义如何输出数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// WcMapper.java
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* LongWritable, Text, Text, IntWritable分别是<k1, v1>,<k2, v2>的类型
* LongWritable是指行首偏移量,距文件开始有多远
* Text就是string
* Context是个抽象类,详情查看百度
*/
private Text word = new Text();
private IntWritable one = new IntWritable( value:1 );

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
// 获取一行数据
String line = value.toString();
// 空格切分
String[] words = line.split(reqex:" ");
// 遍历数据,映射
for (String word : words) {
// 转成(word, 1):new Text(word), new Intwritable(value:1)
context.write(this.word, this.one);
}
}
}

// WcReducer.java
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* <k2, v2>,<k3, v3>的类型
* Reducer输入的数据形式是Mapper的输出形式
*/
private IntWritable total = new IntWritable();

@Override
protected void map(Text key, Iterable<Intwritable> values, Context context) throws IOException, InterruptedException{
// 这里Iterable<Intwritable> values将相同key的values合并一组
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 把sum的值赋值到IntWritable的变量
total.set(sum);
// context只能接受IntWritable的变量
context.write(key, total);
}
}


// WcDriver.java
public class WcDriver {
public static void main(String[] args){
// 1.获取job实例
Job job = Job.getInstance(new Configuration());
// 2.设置类路径
job.setJarByClass(WcDriver.class);
// 3.设置Mapper和Redcuer
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
// 4.设置Mapper和Redcuer输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValuesClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValuesClass(IntWritable.class);
// job.setNumReduceTasks(10); 这个是设置Reduce Task的数量
// 5.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6.提交Job
boolean b = job.waitForCompletion(verbose:true);
System.exit(b ? 0 : 1);
}
}

五、自定义InputFormat

1、InputFormat

  • 这里已有的InputFormat
InputFormat 切片方法 KV方法
Text FileInputFormat的方法 LineRecordReader
KeyValue FileInputFormat的方法 KeyValueLineRecordReader
NLine 自定义,N行一个切片 LineRecordReader
CombineText 自定义 CombineFileRecordReader
FixedLength FileInputFormat的方法 FixedLengthLineRecordReader
SequenceFile FileInputFormat的方法 SequenceFileRecordReader
  • 这个TextInputFormat将每一行的行号作为key,每行内容作为values
  • KeyValueInputFormat每一行为一条记录,被分隔符分成key、values
  • NLineInputFormat,每个map进程处理的InputSplit不按照Block块划分,按照N来划分,即切片数量不同于FileInputFormat。
  • CombineFileRecordReader和LineRecordReader相似,前者是跨文件的
  • SequenceFileRecordReader是用于多个MapReduce成序列的情况

2、自定义InputFormat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 将多个小文件合并成一个SequenceFile文件
* 自定义的InputFormat:WholeFileInputFormat.java
*/
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>{
// 泛型Text,BytesWritable是Key、values的类型
@Override
protected boolean isSplitable(JobContext context, Path filename){
return false; // 禁止对文件切片
}
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttempcontext context){
return new WholeFileRecordReader();
}
}

// 自定义的RecordReader.java
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable>{
private boolean notRead = true;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
private FSDataInputStream inputStream;
private FileSplit fs;
/**
* 初始化方法,框架在调用的时候启用一次
*/
public abstract void initialize(InputSplit split, TaskAttempcontext context) throws IOException, InterruptedException{
// 文件开流,输入流
fs = (FileSplit) split; // 多态,转切片类型到文件切片
Path path = fs.getPath(); // 路径
FileSytem fileSytem = path.getFileSystem(context.getConfiguration());
inputStream = fileSystem.open(path); // 完成开流
}
/**
* 读取下一组KV值
* @return 如果读到,返回true
*/
public boolean nextKeyValue() throws IOException, InterruptedException{
if (notRead) { // 具体读文件的过程
// 读取key
key.set(fs.getPath().toString());
// 读取value
byte[] buf = new byte[(int) fs.getLength()]; // 文件长度的数组
inputStream.read(buf);
value.set(buf, offset:0, buf.length);

notRead = false;
return true;
} else { // 已经读过文件
return false;
}
}
/**
* 获取当前读到key
* @return 当前key
*/
public Text getCurrentKey() throws IOException, InterruptedException{
return key;
}
/**
* 获取当前读到value
*/
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException{
return value;
}
// 当前读取进度
public abstract float getProgress() throws IOException, InterruptedException{
return notRead ? 0 : 1; // 要么没读过,要么读完了
}
// 关闭资源
public abstract void close() throws IOException{
IOUtils.closeStream(inputStream); // 关流
}
}

/**
* WholeFileDriver.java
* 这里mapper和reducer没有工作,所以不用写,如果有,添加上就ok了
*/
public class WholeFileDriver{
public static void main(String[] args) throws IOException {
// 1.获取job实例
Job job = Job.getInstance(new Configuration());
// 2.设置类路径
job.setJarByClass(WholeFileDriver.class);
/** 3.设置Mapper和Redcuer
* job.setMapperClass(Mapper.class);
* job.setReducerClass(Reducer.class);
*/

// 4.设置Mapper和Redcuer输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValuesClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValuesClass(BytesWritable.class);

// 5.InputFormat类型
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 6.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7.提交Job
boolean b = job.waitForCompletion(verbose:true);
System.exit(b ? 0 : 1);
}
}

六、自定义序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class FlowBean implements Writable{ // 这个就是自己写的定义
private long upFlow;
private long downFlow;
private long sumFlow;

// 通过反射建议空参构造器
public FlowBean(){

}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

public void set(long upFlow, long downFlow){
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

public long getUpFlow(long upFlow){
this.upFlow = upFlow;
}

public void setUpFlow(long upFlow){
this.upFlow = upFlow;
}

public long getDownFlow(){
return downFlow;
}

public void setDownFlow(long downFlow){
this.downFlow = downFlow;
}

public long getSumFlow(){
return sumFlow;
}

public void setSumFlow(long sumFlow){
this.sumFlow = sumFlow;
}
// 序列化方法,out是框架提供的数据出口,这里将三个Flow输出
public void wrtie(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
// 反序列化方法,in是框架提供的数据来源,给数据赋值。注意读取的顺序
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
}

七、自定义分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 默认分区
(key.hashCode() & Intergar.MAX_VALUE) % numReduceTasks; // 确定分区号
// 自定义分区,partition.java
public class MyPartitioner extends Partitioner<Text , Flowbean>{
public int getPartition(Text text, Flowbean flowbean, int numPartitions){
String phone = text.toStrings();
switch (phone.substrings(0, 3)){
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
default:
return 3;
}
}
}
// 指定Partitioner的类
job.setPartitionerClass(MyPartitioner.class);

八、自定义Combiner

  • 排序:MapReduce框架的最重要操作之一,强制性的
1
2
3
4
5
6
7
// writableComparable.java
public class FlowBean implements Writable,Comparable<FlowBean>{
@Override
public int compareTo(FlowBean o){
return Long.compare(o.sumFlow, this.sumFlow;);
}
}
  • Combiner是MR中除Mapper、Reducer之外的一个组件

    • 对map task局部的结果汇总,积在Map Task中运行
    • 这个应用的前提不能影响业务的逻辑
    • Combiner的输入数据要有序
  • GroupingComparator分组,在Reduce截断的数据根据某一个或几个字段分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class OrderBean implements WritableComparable<OrderBean>{
private String orderId;
private String productId;
private double price;

public String toString(){
return orderId + "\t" + productId + "\t" + price;
}

public String getOrderId(){
return orderId;
}
// 省略
@Override // 二级排序
public int compareTo(OrderBean o){
int compare = this.orderId.compareTo(o.orderId);
if(compare == 0){
return Double.compare(o.price, this.price);
} else {
return compare;
}

}

@Override
public void write(DataOutput out) {
out.writeUTF(orderId);
out.writeUTF(producted);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) {
this.ordered = in.
}
}
// NullWritable 空对象
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
private OrderBean orderBean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, COntext context){
// 省略不写了
}
}
// 比较
public class OrderComparator extends WritableComparator{
public int compare(WritableComparable a, WritableComparable b){
OrderBean oa = (OrderBean) a;
OrderBean ob = (OrderBean) b;
return oa.getOrderId().compareTo(ob.getOrderId());
}
}
// Reducer
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context){
context.write(key, NullWritable.get());
}
}
// driver里加入一句
job.setMapOutputValueClass(NullWritable.class);
job.setGroupingComaratorClass(Order.Comparator.class);

九、自定义OutputFormat

  • Reducer里面:
1
2
// 这里的key是一个对象,里面有很多值
protected void reduce(xxxx key, Iterable<xxxx> values, Context context);
  • OutputFormat:KV值到文件

  • TextOutputFormat:将输出值写成文本

  • SequenceFileOutputFormat:输出二进制,有利于后续MapReduce任务的输入
  • 自定义OutputFormat
    1. 继承FileOutputFormat
    2. 改写RecordWriter,具体改写输出数据的方式write()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class MyOutputFormat extends FileOutputFormat<LongWritable, Text>{
@Override
public RecordWriter<LongWritable, Text> getRecordWriter<TaskAttemptContext job>{
MyRecordWriter myRecordWriter = new MyRecordWriter();
myRecordWriter.initialize(job);
return myRecordWriter;
}
}
public class MyRecordWriter extends RecordWriter<LongWritable, Text>{
private FSDataOutputStream user;
private FSDataOutputStream other;
// private FileOutputStream user;
// private FileOutputStream other;
public void initialize(TaskAttemptContext job){
// 自定义初始化
String outdir = job.getConfiguration().get(FileOuputFormat.OUTDIR);
// 获取文件系统
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
user = fileSystem.create(new Path(outdir + "/user.log"));
other = fileSystem.create(new Path(outdir + "/other.log"));
// user = new FileOutputStream("d:\\user.log");
// other = new FileOutputStream("d:\\other.log");
}
@Override
public void write<LongWritable key, Text value> {
// 将KV值写出去,每对KV值调用一次
String out = value.toString() + "\n";
if (out.contains("user")){
user.write(out.getBytes());
} else {
other.write(out.getBytes());
}
}
@Override
public void class(TaskAttemptContext context){
// 关闭资源
IOUtils.closeStream(user);
IOUtils.closeStream(other);
}
}
// driver里面加一句
job.setOutputFormatClass(MyOutputFormat.class);

十、Join

  • ReduceJoin,通过MapReduce处理数据,完成Join工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//RJMapper.java
public class RJMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
private OrderBean orderbean = new OrderBean();
private String filename;
@Override
protected void setup(Context context) {
// setup在MapTask执行一次,然后执行map,最后执行cleanup
FileSplit fs = context.getIntputSplit();
filename = fs.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) {
String[] fields = value.toString().split(regex:"\t");
if (filename.equals("order.txt")){
// 数据来源于order文件
orderBean.setId(fields[0]);
orderBean.setPid(fields[1]);
orderBean.setAmount(Integer.parseInt(fields[2]));
orderBean.setPname("");
// setPname必须设置
} else {
orderBean.setPid(fields[0]);
orderBean.setPname(fields[1]);
orderBean.setId("");
orderBean.setAmount(0);
}
context.write(orderBean, NullWritable.get());
}
}
// RJCompatator.java
public class RJComparator extends WritableComparator {
protected RJComparator() {
super(OrderBean.class, createInstance: true);
}
@Override
public int compare(WritableCompable a, WritableCompable b) {
OrderBean oa = (OrderBean) a;
OrderBean ob = (OrderBean) b;
return oa.getPid().compareTo(ob.getPid());
}
}
// RJReducer.java
public class RJReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrdeBean key, Iterable<NullWritable> values, Context context) {
// 迭代器
Iterator<NullWritable> iterator = values.iterator();
iterator.next();
String pname = key.getPname();
while (iterator.hasNext()){
iterator.next();
key.setPname(pname);
context.write(key, NullWritable.get());
}
}
}
// OrderBean.java,新包,序列化
public class OrderBean implements WritableComparable<OrderBean>{
private String id;
private String pid;
private int amount;
private String pname;
@Override
public String toString(){
return id + "\t" + pname + "\t" + amount;
}
@Override
public int compareTo(OrderBean o){
// 比较规则
int compare = this.pid.compareTo(o.pid);
if (compare == 0){
return o.pname.compareTo(this.pame);
} else {
return compare;
}
}
@Override
public void write(DataOutput out){
out.writeUTF(id);
out.writeUTF(pid);
out.writeUTF(amount);
out.writeUTF(pname);
}
@Override
public void readFields(DataInput in){
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readUTF();
this.pname = in.readUTF();
}
}