Hadoop
Hadoop 是一个提供分布式存储和分布式计算的框架,为大量数据的存储和计算提供了一个可靠的平台支持。现在 Hadoop 和其它相关的衍生产品构成了大数据生态系统。
两大神兽HDFS
HDFS 是 Hadoop 提供的一个分布式存储的文件系统,基本思想就是分而存之,让多台计算机分别存储一个大文件的一部分,这样就解决了大文件无法在单台计算机上存储和无法在单台计算机上快速计算的问题。
整体架构与磁盘一样,HDFS 也有块的概念,将一个大文件进行拆分,每一部分就称为一个块 (block)。在 Hadoop 2.x 中,默认一个块大小为 128M。当有一个 1M 的数据存放到 HDFS 中时并不会占用一个 128M 的数据空间,而是占有 1M,这是和磁盘块不同的地方。
一个块大小为 128M 主要原因是为了让寻道时间小于传输时间的 1%,假设传输速率为 100m/s,寻道时间为 10ms,则 (10ms / 1000) / (1%) * 100m/s = 100M。
HDFS 默认情况下,会为每个数据块存储 3 份,并存储在不同的计算机上,这就是其备份机制。HDFS 的可靠性就体现在这里。
这些数据块分别由多台不同的计算机存储,而记录这些块在那台机器上存储、块大小是多少、属于哪个文件的信息称为元数据,也可以称为是描述数据的数据。
元数据由一个单独的进程来维护,这个进程称为 NameNode。一般会由一台单独的计算机作为 NameNode 节点,也就是在那台计算机上启动一个 NameNode 进程。
管理这些数据块的工作也是由一个单独的进程来完成,这个进程为 DataNode。数据块则是由多台计算机协同存储的,在每台计算机上都会启动一个 DataNode 进程,并时刻与 NameNode 进行通信。
-
NameNode
NameNode 用于维护着元数据信息,为了提高查找效率,NameNode 会将元数据信息存放在内存中。所以,NameNode 节点必须是大内存的,这也是 HDFS 的一个瓶颈。
内存是不可靠的,所以元数据信息也会持久化到磁盘一份,这个稍后再说。
由于每个数据块都会产生一条元数据信息,如果 HDFS 中存放大量小文件,就会产生大量的元数据信息,这样 NameNode 的内存就会很快就会撑爆。(最简单的解决办法就是将文件进行合并,然后上传到 HDFS 中)
-
DataNode
DataNode 是一个维护其所在计算机的数据块的进程,主要工作可以分为读和写两部分,也就是检索和存储功能。同时 DataNode 也会定时向 NameNode 报告其健康状况和其所维护的数据块列表。
NameNode 是 HDFS 的命门,如果 NameNode 挂掉之后 HDFS 就彻底的无法提供服务了,并且存储在内存中的元数据信息也会丢失,那样就永远无法提供服务了。所以,为防止元数据丢失问题,HDFS 有了就 SecondaryNameNode 来帮助 NameNode 进行元数据的持久化,有高可用机制来进行 NameNode 的主备切换工作。
-
SecondaryNameNode
SecondaryNameNode 可以说是 NameNode 的辅助节点,在一定程度上也可以起到备份节点的作用。在 NameNode 中会有一个名为 FSimage 的旧的元数据持久化文件和一个名为 Edits Log 的预写日志。
SecondaryNameNode 会定期询问 NameNode 是否需要将 FSimage 和 Edits Log 进行合并 (称为检查点),通过设置间隔时间和 Edits Log 的文件大小阈值来限定是否需要合并。
SecondaryNameNode 从 NameNode 拉取过来 FSimage 和 Edits Log 后,会根据预写日志进行重演,然后合并到 FSimage 中,最后将合并后的 FSimage 发给 NameNode,并且自身也会存储一份。
当 NameNode 重启的时候,就会读取 FSimage 中的持久化文件进行元数据的恢复。当 NameNode 节点磁盘也坏的时候,SecondaryNameNode 保存的 FSimage 也可以一定程度上进行元数据的恢复,但是会丢失一部分数据数据。
基本流程
在介绍完 HDFS 基本组成后,我们再看看 HDFS 读写操作流程:
-
写操作
当客户端上传文件的时候,会先向 NameNode 发送一个写请求,NameNode 会先对身份和时候已经存在这个文件进行一个校验。校验通过后,就会给客户端一个确认消息,告诉它可以上传。
客户端收到确认消息后,就会对文件进行分块,分块操作是在客户端进行的。分块完成后,就会向 NameNode 询问第一个数据块的存放地址,NameNode 会根据动态感知机制,为这个数据块找到一个合适的存储位置,然后将 DataNode 的地址返回给客户端。
客户端在收到上传地址后,就会与 DataNode 进行通信并上传,由于有备份机制, DataNode 在收到数据后,会发送给备份 DataNode。
数据块传输完成后,就会给客户端发送一个确认消息。然后客户端告诉 NameNode 上传完成,再向 NameNode 发送上传第二个数据块的请求,以此类推。
由此可见 HDFS 的写操作是串行进行的。
写操作流程 -
读操作
客户端会向 NameNode 发送一个读操作请求,NameNode 会返回最近通信的存储请求文件的 DataNode 的节点地址。
客户端收到地址后,就会从 DataNode 中并行读取,然后将读取到的数据在客户端进行合并。
如果,读取失败后,客户端就会想 NameNode 重新发送请求,NameNode 会从其它备份节点中选择一个,返回给客户端。
读操作流程
HDFS 就简单说到这里。
MapReduce
MapReduce 是 Hadoop 提供的一个分布式计算框架,基本思想就是分而算之和移动程序不移动数据,也就是针对每个数据块进行运算 (MapTask),最后将每个节点的运算结果进行汇总 (ReduceTask)。
MapReduce 流程MapReduce 的工作可以基本分为读取、Shuffle和输出三步:
-
分片
分片是 MR 程序对输入文件分割的一个文件集的引用。不同于 HDFS 中的数据块,一个分片就代表着一个 MapTask 输入数据的引用。
MapReduce 程序会为每个分片都会启动一个 MapTask 进程,让其专门处理这个分片所引用的数据。
默认情况下,一个数据块就对应一个分片,这样主要是为了避免数据在网络上传输,只需要将 MapTask 程序发送到数据块所在的节点就行了,这就是数据不动程序动。
这样就会产生数据块过大和数据块过小两种情况:
- 如果数据块过小(大量小文件),这样每个数据块作为一个分片,就会启动大量的 MapTask。而每个 MapTask 都是一个进程,这样就把大量的时间花费在了创建线程销毁线程上了。MapReduce 提供了 CombineFileInputFormat 类,将所有数据块作为一个分片,也就是只启动一个 MapTask。
- 如果数据块过大,那就降低了并行度,无法发挥分布式计算的优势。可以根据具体的业务,将数据块大小调整为合适的尺寸。
-
输入 / InputFormat
InputFormat 用来为 MR 程序提供计算分片和获取对应分片的 Reader 服务:
public abstract class InputFormat<K, V> { // 计算分片 public abstract List<InputSplit> getSplits(JobContext context); // 根据 split 获取数据 Reader public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context); }
getSplits() 方法是由客户端进行调用的,然后将分片信息存放到 HDFS 中。ApplicationMaster 会从 HDFS 中进行拉取,并根据分片信息,选择最优的位置在 Worker 上启动 MapTask。
-
MapTask
我们写 MR 程序的时候,Map 端都会继承 Mapper:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } // 初始化 protected void setup(Context context) { } // 我们一般都会重写这个方法 // 默认是,将 key-value 原样输出 protected void map(KEYIN key, VALUEIN value, Context context){ context.write((KEYOUT) key, (VALUEOUT) value); } // 清理工作 protected void cleanup(Context context) { } // Map 任务启动的时候,就会调用这个方法 public void run(Context context) throws IOException, InterruptedException { setup(context); try { // 内部就是调用 RecordReader 的 nextKeyValue() 方法 while (context.nextKeyValue()) { // RecordReader 的 getCurrentKey() 和 getCurrentValue() 方法 map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
这里只是简单的看一下工作流程,具体的细节在剖析源码的时候再看。
Mapper 将从 RecordReader 中获取到的 key-value 交给 map() 方法去做具体的运算工作,最后我们会调用 context.write() 方法将处理后的 key-value 写出。
这样就进行到了 Map 端的 Shuffle 操作。
-
Shuffle
Shuffle 操作可以分为 Map 端和 Reduce 端两部分,总的来说,Shuffle 就做了分区、聚合和排序三件事。
Map 端调用完 context.write() 方法后,就会通过 RecordWriter 将 key-value 按 key 进行分区,并写入到环形缓冲区中,并会在环形缓冲区中进行一次快排操作。
缓冲区大小默认为 100M,阈值为 80%,也就是缓冲区写满 80% 的时候就会发生溢写操作,将缓冲区的数据溢写到磁盘,每次溢写都会产生一个新的文件。可以将缓冲区大小设的更大一些,尽量避免溢写的发生。
当 Map 端将数据写完后,会将溢写文件进行合并,然后按在进行一次归并排序,这样就产生了分区且排序后的 key-value,来等待 Reduce 端的拉取。
如果设置了 Combiner 的话,就会在溢写的时候执行和最后合并数据的时候执行,并不是只执行一次。
分区数是由 ReduceTasK 的数量来决定的,默认使用 HashPartiton 进行分区操作,当然,也可以根据业务需求进行自定义分区:
// 自定义 partition public class PhonePartition extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTask) { String phoneNumber = key.toString(); if(phoneNumber.startsWith("137")){ return 0; } if(phoneNumber.startsWith("138")){ return 1; } if(phoneNumber.startsWith("139")){ return 2; } if(phoneNumber.startsWith("135")){ return 3; } if(phoneNumber.startsWith("136")){ return 4; } return 5; } }
排序操作默认使用的是字典排序,也可以自定义排序器:
// 方式一 public class MyPair implements WritableComparable<SortPair> { private String first; private int second; // ... @Override public int compareTo(SortPair sortPair) { String anoFirst = sortPair.getFirst(); int firstComp = first.compareTo(anoFirst); if(firstComp != 0){ return firstComp; } else { int anoSecond = sortPair.getSecond(); return second - anoSecond; } } // ... } // 方式二 public class MyPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text text, Text text2, int i) { return text.compareTo(text2); } } // Job job.setPartitionerClass(MyPartitioner.class);
Reduce 端 Shuffle 就是对从 Map 端拉取过来的数据进行一次聚合操作,将相同 key 的 value 方法一起,并暴露给 ReduceTask 一个 value 迭代器。
-
输出
输出就比较简单了,将数据输出到指定介质中。
YARN
Yarn 是一个独立的资源调度框架,由 ResourceManger 和 NodeManager 两部分构成:
-
ResourceManger
ResourceManger 用来处理用户提交的任务请求,并维护 NodeManager 的节点信息。
-
NodeManager
NodeManager 主要用于资源管理、任务管理和 Container (容器) 管理、
-
ApplicationMaster
ApplicationMaster 是 ResourceManger 在 NodeManager 上启动个一个负责管理特定任务的进程,ResourceManger 只负责分派任务,不复制管理任务,这个工作就是由 ApplicationMaster 来完成的。
-
Container
资源管理单位,ApplicationMaster 向 ResourceManager 请求任务所需要的资源的时候,ResourceManager 分配给它的资源就可以理解为一个 Container。
网友评论