hadoop-MapReduce
一、MapReduce
MapReduce是一个分布式运算程序的编程框架
由于MR是个框架,所以要注重逻辑,而不太需要关心程序的运行
- 优点:高容错性、超大量数据
- 缺点:不适用实时计算、流式计算、有向图计算,就是慢
二、核心思想
- MapReduce分成Map和Reduce两个部分
- Map指映射,将数据映射成KV值
<key, value>
- Reduce指化简,将Map阶段的KV值合并成想要的数据
三、工作流程
1、简化流程
1 | MapReudce的简化流程:Input -> Mapper -> Reducer -> Output |
- 上述的流程都是写成代码,并封装成一个jar包,提交给Yarn ResourceManager
- 这个jar包里面包括:Application Master程序、启动Application Master的命令、用户程序等
2、详细流程
- 假设现在有三个DataNode,每个DataNode里面的文件大小为
128M、200M、100M
MR程序提交到客户端所在的节点,YarnRunner向ResourceManager申请一个Application
RM将该Application的资源路径和作业id返回给YarnRunner
YarnRunner将运行job所需资源提交到HDFS上
程序资源提交完毕后,申请运行MRAppmaster
RM将用户的请求初始化成一个Task
第一个DataNode里面的NodeManager领取到Task
该NodeManager创建容器Container,并产生MRAppmaster
Container从HDFS上拷贝资源到本地,拷贝的是啥东西???
MRAppmaster向RM申请运行MapTask资源
RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器
MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序:
Mapper阶段
由InputFormat -> FileInputFormat -> TextInputFormat,通过getSplits方法获得Split数组
- 将三个文件切分成:
0~128M
、0~128M
和128~200M
、0~100M
,共四个切片。 - 有多少个切片,就有多少个Map Task。
- 将三个文件切分成:
- 用getRecordReader方法对Split做处理,将split解析成
<k1, v1>
- Mapper将
<k1, v1>
映射成<k2, v2>
,主要是setup(), map(), cleanup(), run()
完成的 - KV值写给outputCollector,环形缓冲区,进行分区排序
- Shuffle阶段
- Mapper出来的数据(KV值)进入环形缓冲区,并获得分区号
p
(逻辑分割,没有进行物理分割):(key,value,p)
- 环形缓冲区溢出,
(key, value, p)
分区排序,这里是二次排序,排序的是索引:先排分区p
,分区内按照key
排序- 可以加Combiner,即分区排序、落盘之前合并
(key, value)
,也可以不加
- 可以加Combiner,即分区排序、落盘之前合并
- 如果环形缓冲区,多次溢出,重复2~3步,再对多次溢出并处理后的数据归并排序、Combiner
- 落盘,最终文件就是Map Task的输出内容
MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask
ReduceTask向MapTask获取相应分区的数据
- 假设有n个Map Task(即四个),每个Map Task分了m个区
- 启动Reduce Task,在哪台机器启动呢????,数量为m个,即多少个分区就多少个Reduce Task
- 每个Reduce Task接受n个Map Task相应分区的数据
- 每个Reduce Task对n份数据归并排序,按照key分组,可自定义分组
- 数据输出到Reduce并转成
<k3, v3>
,通过OutputFormat输出
程序运行完毕后,MR会向RM申请注销自己
- 可以看下源码,以后再深究吧,现在太过复杂:Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析
四、例子
- 注意:hadoop不能直接使用Java的数据类型,hadoop有自带。
- 除了
String
对应Text
,其他Java变量都在后面加个Writable
,如Int
对应Intwritable
- MapReduce程序一共三大件:
Mapper.java
、Reducer.java
和Driver.java
- 有时候会添加一些自定义的东西,如:
- 自定义
InputFormat
,自定义如何读取数据 - 序列化
Bean
,自定义的变量类型,替换掉下面的LongWritable, Text, Text, IntWritable
- 自定义分区
partition
,默认的是哈希分区 - Combiner,
- 自定义
OutputFormat
,自定义如何输出数据
- 自定义
1 | // WcMapper.java |
五、自定义InputFormat
1、InputFormat
- 这里已有的InputFormat
InputFormat | 切片方法 | KV方法 |
---|---|---|
Text | FileInputFormat的方法 | LineRecordReader |
KeyValue | FileInputFormat的方法 | KeyValueLineRecordReader |
NLine | 自定义,N行一个切片 | LineRecordReader |
CombineText | 自定义 | CombineFileRecordReader |
FixedLength | FileInputFormat的方法 | FixedLengthLineRecordReader |
SequenceFile | FileInputFormat的方法 | SequenceFileRecordReader |
- 这个TextInputFormat将每一行的行号作为key,每行内容作为values
- KeyValueInputFormat每一行为一条记录,被分隔符分成key、values
- NLineInputFormat,每个map进程处理的InputSplit不按照Block块划分,按照N来划分,即切片数量不同于FileInputFormat。
- CombineFileRecordReader和LineRecordReader相似,前者是跨文件的
- SequenceFileRecordReader是用于多个MapReduce成序列的情况
2、自定义InputFormat
1 | /** |
六、自定义序列化
1 | public class FlowBean implements Writable{ // 这个就是自己写的定义 |
七、自定义分区
- Partition分区:分区号指KV值去哪个Reduce Task
- MapReduce原理之ReduceTask工作机制
1 | // 默认分区 |
八、自定义Combiner
- 排序:MapReduce框架的最重要操作之一,强制性的
1 | // writableComparable.java |
Combiner是MR中除Mapper、Reducer之外的一个组件
- 对map task局部的结果汇总,积在Map Task中运行
- 这个应用的前提不能影响业务的逻辑
- Combiner的输入数据要有序
GroupingComparator分组,在Reduce截断的数据根据某一个或几个字段分组
1 | public class OrderBean implements WritableComparable<OrderBean>{ |
九、自定义OutputFormat
- Reducer里面:
1 | // 这里的key是一个对象,里面有很多值 |
OutputFormat:KV值到文件
TextOutputFormat:将输出值写成文本
- SequenceFileOutputFormat:输出二进制,有利于后续MapReduce任务的输入
- 自定义OutputFormat
- 继承FileOutputFormat
- 改写RecordWriter,具体改写输出数据的方式write()
1 | public class MyOutputFormat extends FileOutputFormat<LongWritable, Text>{ |
十、Join
- ReduceJoin,通过MapReduce处理数据,完成Join工作
1 | //RJMapper.java |