数据移动的关键要素
将大量数据移入和移出Hadoop面临很多挑战,包括数据一致性和资源对数据源和目标的影响。然而,在深入研究这些技术之前,我们需要讨论在处理数据移动时应该注意的因素。
幂等
在编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。换句话说,幂等操作无论执行多少次都会产生相同的结果。在关系数据库中,插入通常不是幂等的,因为多次执行不会产生相同的结果数据库状态。或者,更新通常是幂等的,因为它们将产生相同的最终结果。
无论何时写入数据,都应该考虑幂等性,Hadoop中的数据入口和出口没有很大区别。分布式日志收集框架如何处理数据重传?如何在多个任务并行插入数据库的MapReduce作业中确保幂等行为?
聚合
数据聚合过程组合了多个数据元素。在数据入口的上下文中,这将大量小文件移动到HDFS。在实际操作中,我们可能会面临NameNode内存以及MapReduce执行时间慢等问题,将文件或数据聚合在一起可以缓解此类问题,这是一个需要考虑的功能。
数据格式转换
数据格式转换将一种数据转换为另一种格式的操作。通常,源数据格式不适合在MapReduce等工具中进行处理。例如,如果源数据采用多行XML或JSON格式,则可能需要考虑预处理步骤。这会将数据转换为可以拆分的形式,例如每行一个JSON或XML元素,或者将其转换为Avro等格式。
数据压缩
数据压缩不仅有助于减少静态数据的占用空间,而且在读取和写入数据时也具有I/O优势。
可用性和可恢复性
可恢复性允许入口或出口工具在操作失败时重试。由于任何数据源,接收器或Hadoop本身都不可能100%可用,因此在发生故障时可重试非常重要。
可靠的数据传输和数据验证
在数据传输中,检查正确性的方法是验证数据在传输过程中是否发生损坏。当使用异构系统(如Hadoop数据入口和出口)时,数据通过不同主机,网络和协议传输只会增加数据传输过程中出现问题的可能性。检查原始数据(如存储设备)正确性的常用方法是循环冗余校验(CRC),这是HDFS内部用于维护块级完整性的常用方法。
此外,由于生成数据的软件存在错误,源数据本身可能存在问题。在入口时执行数据验证允许进行一次性检查,而不是在发生问题时处理数据的所有下游消费者,强迫这些消费者必须更新以处理数据中的错误。
资源消耗和性能
资源消耗和性能分别是系统资源利用率和系统效率的度量。入口和出口工具通常不会对系统施加大量负载(资源消耗),除非有非常可观的数据量。对于性能,要考虑的问题包括工具是否并行执行操作,如果是,提供了什么机制来调整并行度。如果数据源是生产数据库并且正在使用MapReduce提取该数据,请不要使用大量并发map任务来导入数据。
监控
监控确保功能在自动化系统中按预期执行。对于数据入口和出口,监控分为两部分:确保入口和出口中涉及的进程存活,并验证源和目标数据是否按预期生成。监控还应包括验证正在移动的数据量是否达到预期水平; 数据中意外的下降或高电流将提醒潜在的系统问题或软件错误。
推测执行
MapReduce具有一个称为推测(Speculative)执行的功能,可以在作业结束时为仍在执行的任务启动备份,这有助于防止缓慢的硬件影响作业执行时间。但是,这种做法也可能有问题,如果使用map任务执行插入关系数据库,你应该知道可以有两个并行进程插入相同的数据。
补充:推测执行(Speculative Execution)是指在集群环境下运行MapReduce,可能是程序Bug,负载不均或者其他的一些问题,导致在一个JOB下的多个TASK速度不一致,比如有的任务已经完成,有的却只跑了10%,根据木桶原理,这些任务将成为整个JOB的短板,如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同时处理一份数据,哪个先运行完,则将哪个结果作为最终结果,并且在运行完成后Kill掉另外一个任务。
将数据移入Hadoop
在Hadoop中处理数据的第一步是将其提供给Hadoop。有两种主要方法可用于将数据移入Hadoop:在HDFS层(数据推送)写入外部数据,或在MapReduce层读取外部数据(更像是拉取)。在MapReduce中读取数据具有以下优点:操作可以轻松并行并具有容错能力。然而,并非所有数据都可以使用MapReduce访问,例如在日志文件下,其他系统需要依赖传输,包括用于最终数据hop的HDFS。
本节,我们将介绍将源数据移动到Hadoop的方法,将使用上一节中的设计注意事项作为检查和理解不同工具的标准。
HDFS命令行
Hadoop捆绑了许多方法来将数据导入HDFS。本节将介绍这些内置工具如何满足数据移动中的各种需求,可以使用的第一个也是最简单的工具是HDFS命令行。
为作业选择正确的数据获取工具
本节中的低级工具适用于一次性文件移动,或者处理基于文件的旧数据源和目标。但是,以这种方式移动数据很轻易就会被Flume和Kafka(本章稍后介绍)等工具所淘汰,这些工具提供了自动数据移动管道。
注:Kafka是一个更好的平台,用于从A到B(B可以是Hadoop集群)移动数据,而不是老式的“复制文件”。使用Kafka,只需将数据泵入其中,就拥有了实时(例如通过Storm)或离线/批量作业(例如通过Camus)消费数据。这种方法将在之后的章节中介绍。
使用CLI加载文件
如果需要手动执行,那么HDFS命令行界面(CLI)就是最合适的工具。它允许执行在常规Linux文件系统上可执行的大多数操作。本节,我们将重点介绍如何将数据从本地文件系统复制到HDFS中。
问题
使用shell将文件复制到HDFS。
解决方案
HDFS命令行界面可用于一次性移动,或者可以将其合并到脚本中以进行一系列移动。
讨论
使用hadoop命令将文件从本地磁盘复制到HDFS:
$ hadoop fs -put local-file.txt hdfs-file.txt
Hadoop -put命令的行为与Linux中的Linux cp命令不同,如果目标已存在,则会被覆盖; 在Hadoop中,副本失败并显示错误:
put: `hdfs-file.txt': File exists
必须添加-f选项以强制覆盖文件:
$ hadoop fs -put -f local-file.txt hdfs-file.txt
与Linux cp命令非常相似,可以使用相同的命令复制多个文件。在这种情况下,最后一个参数必须是HDFS中复制本地文件的目录:
$ hadoop fs -put local-file1.txt local-file2.txt /hdfs/dest/
可以使用Linux管道将命令输出传递到HDFS文件——使用相同的-put命令并在其后添加单独的连字符,这告诉Hadoop从标准输入读取:
$ echo "the cat sat on the mat" | hadoop fs -put - hdfs-file.txt
要测试文件或目录是否存在,请使用-test命令和-e或-d选项分别测试文件或目录是否存在。如果文件或目录存在,则命令的代码为0;如果不存在,则为1:
$ hadoop fs -test -e hdfs-file.txt
$ echo $?
1
$ hadoop fs -touchz hdfs-file.txt
$ hadoop fs -test -e hdfs-file.txt
$ echo $?
$ hadoop fs -test -d hdfs-file.txt
$ echo $?
1
如果只想在HDFS中“touch”文件(创建一个新的空文件),那么touchz选项可以完成该工作:
CLI专为交互式HDFS活动而设计,它也可以合并到脚本中,以用于自动执行某些任务。CLI的缺点是级别较低,并且没有内置任何自动化机制。它需要为每个命令分配一个fork,如果在bash脚本中使用可能没问题,但如果试图将HDFS功能集成到Python或Java应用程序中,可能就会出现问题。在这种情况下,为每个命令启动外部进程的开销可能也是想要避免的。
使用REST加载文件
CLI便于快速运行命令和编写脚本。但是,它会产生为每个命令分配一个单独进程的开销,这可能是想要避免的,特别是编程语言与HDFS连接时。
问题
没有HDFS本机接口的编程语言如何与HDFS交互。
解决方案
使用Hadoop的WebHDFS接口,该接口为HDFS操作提供全功能的REST API。
讨论
在开始之前,需要确保在集群上启用WebHDFS(默认不启动),这由dfs.webhdfs.enabled属性控制。如果未启用,则需要更新hdfs-site.xml并添加以下内容:
在这种技术中,我们将介绍在不安全的Hadoop集群上运行WebHDFS.3的情况。如果正在使用安全的Hadoop集群,则不会提供user.name参数。
相反,我们将在与WebHDFS交互之前使用kinit对Kerberos进行身份验证,然后在curl命令行中提供-negotiate -u:youruser。
警告:如果为已经关闭了安全性的集群启用WebHDFS,则可以轻松地将其用作集群中任意用户命令(只需将URL中的用户名更改为簇)。建议仅在打开安全性的情况下运行WebHDFS。
要想在此技术中使用HTTP与NameNode进行通信,需要知道运行NameNode RPC服务的主机和端口,这是使用dfs.namenode.http-address属性配置的。在伪分布式设置中,这很可能设置为0.0.0.0:50070。我们假设其余技术的伪分布式——替换适当的主机和端口进行设置。
首先使用CLI在HDFS中创建文件:
$ echo "the cat sat on the mat" | hadoop fs -put - /tmp/hdfs-file.txt
使用WebHDFS获取有关该文件的各种有趣的元数据(用户名替换为以下URL中的aholmes):
命令语法由两部分组成:一是路径;二是正在执行的操作。但是,需要提供执行操作的用户名,否则HDFS将假定你是一个访问受限的匿名用户。
图5.1 解析WebHDFS URL路径
从HDFS读取文件只需将OPEN指定为operation:
使用WebHDFS编写文件分为两步:第一步通知NameNode创建新文件的意图,可以使用HTTP PUT命令执行此操作:
此时,文件尚未写入。只是让NameNode有机会确定要写入哪个DataNode,这是在“Location”标头中指定的。需要获取该URL,然后发出第二个HTTP PUT执行实际写入:
可以通过读取文件来验证写入是否成功:
WebHDFS支持可以使用常规命令行执行所有HDFS操作,它更有用,因为它可以访问结构化JSON表单中的元数据,从而更容易解析数据。
值得一提的是WebHDFS提供的一些附加功能。首先,文件的第一个块存放数据位置。NameNode将客户端重定向到承载第一个块的DataNode,提供强大的数据位置。对于文件中的后续块,DataNode充当代理,并将数据流入保存块数据的节点或从中保存数据。
WebHDFS还与Hadoop的安全身份验证集成,这意味着可以启用Kerberos并在HTTP请求中使用委派令牌。此外,API将保持跨Hadoop版本的兼容性,这意味着目前发布的命令将适用于未来版本的Hadoop(反之亦然)。 这是一个有用的工具,用于访问运行不同Hadoop版本的多个集群。
表5.1 WebHDFS库
当客户端可以访问所有NameNode和DataNode时,WebHDFS非常有用。在锁定环境中,情况可能并非如此,可能需要查看HttpFS。
从防火墙后面访问HDFS
生产Hadoop环境通常被锁定以保护这些集群中的数据。部分安全程序可能包括将集群置于防火墙之后,如果尝试从防火墙外部读取或写入HDFS,这将是一件麻烦事。 这种技术着眼于HttpFS网关,它可以使用HTTP(通常在防火墙上打开)提供HDFS访问。
问题
想要写入HDFS,但有一个防火墙限制对NameNode或DataNode的访问。
解决方案
使用HttpFS网关,它是一个独立的服务器,可通过HTTP提供对HDFS的访问。因为它是一个单独的服务而且是HTTP,所以可以配置为在任何可以访问Hadoop节点的主机上运行,并且可以打开防火墙规则以允许流量到服务。
讨论
HttpFS非常有用,因为它不仅允许使用REST访问HDFS,而且具有完整的Hadoop文件系统实现,这意味着可以使用CLI和本机HDFS Java客户端与HDFS进行通信,如图5.2所示。
图5.2 HttpFS网关架构
要启动并运行HttpFS,必须指定代理用户。这是将运行HttpFS进程的用户,此用户也将在Hadoop中配置为代理用户。假设有一个名为foxyproxy的用户,你将其指定为代理用户。你用以下代码更新core-site.xml:
基本上,这表明Hadoop应该只接受来自主机localhost的代理请求,并且foxyproxy可以冒充任何用户(你可以通过提供以逗号分隔的组列表来锁定可以模拟的用户集名)。更改用户名,主机和组值,以便它们在环境中有意义。
在对core-site.xml进行更改后,我们需要启动HttpFS进程:
$ sbin/httpfs.sh start
现在,可以使用WebHDFS发出与之前技术中相同的curl命令。这是关于HttpFS网关的好处之一 :语法完全相同。要在根目录上执行目录列表,需要执行以下操作:
此curl命令与先前技术中使用的curl命令的唯一区别是端口号。默认情况下,HttpFS在端口14000上运行,但可以通过编辑httpfs-env.sh来更改。表5.2中显示了可以在文件中更改的一些有趣属性。
表5.2 HttpFS属性
可以在httpfs-site.xml中配置其他Kerberos以及用户和组级别设置。
WebHDFS和HttpFS之间的差异
WebHDFS和HttpFS之间的主要区别在于客户端对所有数据节点的可访问性。如果客户端可以访问所有数据节点,那么WebHDFS将正常工作,因为读取和写入文件涉及客户端直接与数据节点通信以进行数据传输。另一方面,如果位于防火墙之后,客户端可能无法访问所有数据节点,在这种情况下,HttpFS选项最适合。使用HttpFS,服务器将与数据节点通信,客户端只需要与单个HttpFS服务器通信。
如果可以,请选择WebHDFS,因为客户端直接与数据节点通信具有固有的优势:这允许轻松扩展多个主机并发客户端数量,而不会遇到通过HttpFS流式传输数据的网络瓶颈。如果客户端本身在数据节点上运行,则更是如此,因为将通过直接从本地文件系统而不是网络流式传输本地托管的HDFS数据块来使用WebHDFS的优势。
使用NFS挂载Hadoop
通常,如果Hadoop数据可以作为文件系统的常规安装来访问,那么使用Hadoop数据要容易得多。这允许使用现有脚本,工具和编程语言,并与HDFS中的数据进行交互。本节介绍如何使用NFS挂载轻松地将数据复制到HDFS中和从HDFS复制数据。
问题
将HDFS视为常规Linux文件系统,并使用标准Linux工具与HDFS进行交互。
解决方案
使用Hadoop的NFS实现来访问HDFS中的数据。
讨论
在Hadoop 2.1之前,NFS安装HDFS的唯一方法是使用FUSE。由于各种性能和可靠性问题,不建议将其用于一般用途。它还引入了额外的负担,要求在任何客户端计算机上安装驱动程序(换句话说,它没有提供NFS网关)。
Hadoop中的新NFS实现解决了旧的基于FUSE系统的所有缺点。这是一个合适的NFSv3实现,允许运行一个或多个NFS网关以提高可用性和吞吐量。
要启动并运行NFS服务,首先需要停止在主机上运行的NFS服务。在Linux系统上,可以使用以下命令实现:
图5.3 Hadoop NFS
接下来,需要启动Hadoop NFS服务。启动的第一个服务是portmap,它为协议及其关联的传输和端口提供注册服务。在受限的端口上运行,因此需要以root用户身份启动:
$ sudo hadoop-daemon.sh start portmap
接下来,你需要启动实际的NFS服务,运行此服务的用户一定要与运行HDFS的用户相同,这一点非常重要:
$ hadoop-daemon.sh start nfs3
通过运行rpcinfo和showmount来验证服务是否正在运行,应该看到类似于以下的输出:
现在,需要在主机目录上安装HDFS。以下示例选择/hdfs作为安装目录。第二个mount命令验证是否已创建安装:
现在,可以使用挂载的文件系统直接操作HDFS。
使用NFS网关时需要考虑以下几点:
HDFS是仅附加文件系统。可以附加到文件,但不能执行随机写入。如果需要使用支持随机写入的文件系统来使用Hadoop,那么应该看看MapR的Hadoop distribution。
Hadoop 2.2版不支持Hadoop安全验证(Kerberos),并且有一个添加该支持的开放票证。
在Hadoop 2.4(或3.0)之前,不支持代理用户。这实质上意味着以前版本的Hadoop将以超级用户身份执行所有命令,因为要求NFS网关作为与HDFS本身相同的用户运行。
由于这些限制,建议将NFS网关保留用于实验用途,或者用于不考虑用户级安全性的单租户集群。
使用DistCp在集群内和集群间复制数据
如果移入或移出Hadoop的数据量很大,通过单个主机汇集数据,一定要尽可能优化数据移动。DistCp可以在Hadoop集群之间以及进出NFS安装的数据之间高效复制数据。
问题
在Hadoop集群之间高效复制大量数据,并且进行增量复制。
解决方案
使用DistCp,一种内置于Hadoop中的并行文件复制工具。
讨论
本节,我们将首先介绍DistCp的重要配置。之后,我们将继续查看使用DistCp的特定方案,以及配置和运行DistCp的最佳方法。
此技术涵盖了Hadoop 2中可用的DistCp新版本,名为DistCp 2。此代码被反向移植到Hadoop 1.2.0中,可通过使用distcp2作为命令启用Hadoop 2来替换现有的DistCp,然后就可以正常使用distcp命令。
DistCp 2支持与DistCp的旧版本相同的命令行参数集,并带来了许多有用的优势:
使用大量文件时减少了设置和执行时间,因为驱动程序不再需要预处理所有输入(现在这已推迟到mapper)。
具有功能齐全的Java接口,无需Java客户端将参数序列化为字符串。
原子提交允许全部复制语义。
使用-update跳过目标中已存在的文件,如果文件属性与源文件不同,将导致文件属性发生更改。
作为副本的一部分,不再跳过空目录。
DistCp使用仅map的MapReduce作业来执行复制。以下是一个非常简单的示例,在单个Hadoop集群中用于将源目录,/ hello,复制到目标目录,/world:
$ hadoop distcp /hello /world
如果/ world目录尚不存在,则此命令将创建/ world目录,然后将/ hello(其所有文件和目录递归)的内容复制到/ world。
处理已存在的目标文件
目标中已存在的文件和目录保持不变(即使文件不同)。
可以通过查看作业完成时转储到标准输出的SKIP计数器来查看跳过的文件数:
-update和-overwrite参数巧妙地改变了复制内容的行为。如果没有这些选项,如果源是目录,则在目标目录下创建该目录。使用-update或-overwrite参数,仅复制文件和子目录,而不复制源目录。通过一个例子证明这一点:
忽略错误
当使用DistCp复制大量文件时,使用-i标志执行命令以忽略错误是明智的。这样,单个错误不会导致整个复制过程失败,可以通过使用-update选项重新发出相同的DistCp命令来再次尝试复制失败文件。
动态复制策略
DistCp的默认行为是通过均匀地传播所有文件以使所有mapper复制大致相同的字节数来为每个mapper预分配工作。从理论上讲,这听起来像是一种公平分配工作的好方法,但实际上,诸如硬件,硬件错误和配置不良等因素往往导致长尾工作执行,少数落后的mapper占用时间比其他要长。
使用DistCp 2,可以使用替代策略,其中mapper直接接收工作而不是预先分配,这被称为动态复制策略,使用-strategy动态参数激活,添加此参数的效果是改进复制时间。
原子提交
DistCp 2的另一个有用功能是原子提交。DistCp默认将每个文件写入临时文件,然后移动到最终目标。这意味着无法撤消在作业中遇到错误之前复制的文件。
因此,原子提交允许在复制所有文件时将实际的“提交”推迟到作业结束,这样如果遇到错误,你将看不到任何部分写入,可以使用-atomic参数启用此功能。
并行性和mapper数量
目前,DistCp最细的工作单元是文件级别。因此,无论文件多大,都只使用一个mapper来复制,提高作业的mapper数量对提高复制速度没有任何影响。
默认情况下,DistCp使用20个mapper运行,每个mapper副本对应的文件由选择的复制策略确定。Hadoop开发人员考虑了mapper数量的默认设置,选择正确的值是想要使用多少网络带宽以及希望在复制期间占用多少任务的函数,可以通过指定-m后跟的值来更改mapper的数量。
带宽
最后一个考虑因素是复制期间使用的网络带宽。大型副本可能会使集群之间的网络饱和。企业中网络运营人员保持运行良好的一种方法是使用-bandwidth参数来指定每个map任务在复制期间消耗的带宽量上限。此参数的值以兆字节/秒(MBps)为单位。
其他
到目前为止,我们已经看到了DistCp中一些更有趣的选项。要查看完整的选项列表,可以运行distcp命令,或者查看Hadoop文档。
网友评论