Jamzy Wang

life is a struggle,be willing to do,be happy to bear~~~

Hadoop常见问题解决方案总结

2015-05-09 20:32

原创声明:本作品采用知识共享署名-非商业性使用 3.0 版本许可协议进行许可,欢迎转载,演绎,但是必须保留本文的署名(包含链接),且不得用于商业目的。

作业执行性能

(1) reduce执行很慢

reduce执行很慢的一个原因是数据数据分桶不均导致个别reduce节点的数据量远大于其他节点。解决方案是检查 分桶是否均匀,如不均匀考虑使用自定义的Partitioner(使用 IntHashPartitioner / MapIntPartioner:可以根据int前缀/后缀决定分到哪个桶),如果均匀考虑单个reduce的数据里是否过大,如果单个reduce的数据量过大则应该考虑增加reduce的数量。

(2) 数据倾斜(Data Skew)问题

由于数据本身的分布特点导致reduce阶段分桶不均,个别(1,2个)reduce严重拖慢整个任务的效率。以WordCount为例可以如下方式解决:

增加一轮任务,以随机数作为partition key

1
2
Map<doc> -> <rndkey, word1, count1>, <rndkey, word2, count2>
Reduce<rndkey, wordi, counti> -> <word, count>

第二轮再按word聚合

1
2
Map:直接输出
Reduce<wordi, counti> -> <word, count>

(3) map执行很慢

输入数据文件大小不均衡,个别的输入文件较其他输入文件要大很多。一种解决方案是切分个别大文件。

(4) 绝大多数的map任务都完成了,只有少数几个map老是进度很慢,完不成

Hadoop会把任务分配到许多个节点,其中一些慢的节点会限制整体程序的执行速度。这时Hadoop会引入“推测执行”过程:因为作业中大多数任务都已经完成,hadoop平台会在几个空闲节点上调度执行剩余任务复制,当任务完成时会向JobTracker通告。任何一个首先完成的复制任务将成为权威复制,如果其他复制任务还在推测执行中,hadoop会告诉TaskTracker去终止这些任务并丢弃其输出,然后reducer会从首先完成的mapper那里获取输入数据。推测执行为默认启用,设置 JobConf的mapred.map.tasks.speculative.executionmapred.reduce.tasks.speculative.execution 为false可以禁止推测执行。

(5) 小文件 、大文件

数据块的默认大小是64M,如果一个文件的大小小于64M,那么它也要占据一个数据块的大小。一个block在NameNode中对应一条记录(一般一条记录占用150字节),如果是大量的小文件,会消耗大量内存,所以要使用archive的方式来实现归并小文件。

默认情况下,每个map最多处理一个HDFS文件,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件。CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)

(6)作业的某个任务阻塞了,长时间占用资源不释放,如何处理?

这种场景通常是由于软件Bug、数据特殊性等原因导致的,会让程序阻塞,任务运行停滞不前。这种事情经常发生,由于任务长时间占用着资源但不使用(如果不采取一定的手段,可能永远不会被使用,造成“资源泄露”),会导致资源利用率下降,对系统不利,那么,Hadoop MapReduce遇到这种情况如何处理呢?

在TaskTracker上,每个任务会定期向TaskTracker汇报新的进度(如果进度不变则不汇报),并由TaskTracker进一步汇报给JobTracker。当某个任务被阻塞时,它的进度将停滞不前,此时任务不会向TaskTracker汇报进度,这样,一定达到超时时间上限,TaskTracker会将该任务杀掉,并将任务状态(KILLED)汇报给JobTracker,进而触发JobTracker重新调度该任务。

在实际应用场景中,有些正常的作业,其任务可能长时间没有读入或者输出,比如读取数据库的Map Task或者需要连接其他外部系统的Task,对于这类应用,在编写Mapper或Reducer时,应当启动一个额外的线程通过Reporter组件定期向TaskTracker汇报心跳。

(7)任务JVM重用

mapred.job.reuse.jvm.num.tasks 默认为1,即每个Task都启动一个JVM来运行任务,当值为-1时,表示JVM可以无限制重用。当值为-1时,TaskTracker先判断当前当前节点是否有slot剩余,如果没有slot槽位才会判断当前分配的slot槽位中的JVM是否已经将当前task任务运行完,如果task已经运行完,才会复用当前JVM(同一Job的JVM才会复用)

注意:当一个Job的Task(尤其Task耗时很小)数目很大,由于频繁的JVM停启会造成很大开销,进行JVM复用会使同一个Job的一些静态数据得到共享,从而是集群性能得到很大提升。但是JVM重用会导致在同一个JVM中的碎片增加,导致JVM性能变差。

作业故障处理

在生产环境中,用户代码存在软件错误、进程崩溃、机器故障等都会导致失败。Hadoop判断的失败有不同级别类型,针对不同级别的失败有不同的处理对策,这就是MapReduce的容错机制。

(1) map或者reduce任务失败

任务失败分为3种情况:Task失败、子进程JVM退出、超时检测被关闭。

1.任务失败。最常见的是Map或Reduce任务的失败,即写的本身MR代码导致失败。发生Map或Reduce失败的时候,子任务JVM进程会在退出之前向上一级TaskTracker发送错误报告。错误报告最后会记录在用户的错误日志里面,TaskTracker会将此次task attempt标记为failed,释放一个任务槽slot用来运行另一个任务。

2.子进程JVM突然退出。可能由于JVM的bug导致,从而导致MapReduce用户代码执行失败。在这种情况下,TaskTracker会监控到进程以便退出,并将此次尝试标记为“failed”失败。

3.关闭了超时连接(把超时timeout设置成0)。所以长时间运行的任务永不会被标记failed。在这种情况下,被挂起的任务永远不会释放其所占用的任务槽slot,并随时间推移会降低整个集群的性能。

当任务执行失败后会重试,超过重试次数(默认为4),整个Job会失败。Hadoop 提供配置参数 mapred.max.ap.failures.percent 解决这个问题。如果一个 Job 有 200 个 MapTask ,参数设置为5,则单个 Job 最多允许 10 个 MapTask 失败(200×5%=10),其可以在配置文件 mapred-site.xml 里修改。

(2) TaskTracker失败

正常情况下,TaskTracker 会通过心跳向 JobTracker 通信,如果 TaskTracker 发生故障则心跳会停止,JobTracker 会将 TaskTracker 从等待任务调度的池中移除。TaskTracker失败分为两种情况:

1.map 阶段的情况。如果属于未完成的作业,reduce 阶段无法获取本地 Map 输出的文件结果,任务都需要重新调度和执行,只要是Map阶段失败必然是重新执行这个任务。

2.reduce 阶段的情况。自然是执行未完成的 reduce 任务。因为 reduce 只要执行完了就会把输出写到 HDFS 上。

(3) JobTracker失败

jobtracker和hdfs的namenode一样也存在单点故障,JobTracker失败会导致作业最终失败。hadoop框架里包含zookeeper框架,zookeeper可以结合jobtracker,用几台机器同时部署jobtracker,保证一台出故障,有一台马上能补充上,不过这种方式也没法恢复正在跑的mapreduce任务。

(4) map结果存放磁盘损坏

如果节点没有挂,只是存放map任务结果的磁盘损坏了,则分两种情况:如果map任务的结果已经传输给了所有reduce,那么所有reduce任务可以正常执行下去,整个job会正常执行。如果尚有部分reduce任务没有获得map任务,那么当reduce任务读取那个已经运行完成的map任务(但结果已经损坏)时,会尝试读取若干次,如果尝试次数超过了某个上限值,则会通过RPC告诉所在的TaskTracker该map任务的结果已经损坏,而TaskTracker则进一步通过RPC告诉JobTracker,JobTracker收到该消息后,会重新调度该map任务进而重新计算生成结果。

目前Hadoop实现中,reduce任务尝试读取map任务结果的时间间隔是指数形式递增的,计算公式是 10000*1.3^noFailedFetches,其中noFailedFetches取值范围为 MAX{10, numMaps/30},也就是说,如果map task数目是300,则需要尝试10次才会发现map结果结果已经损坏,尝试时间间隔分别是10s,13s,21s,28s,37s,48s,62s,81s和106s,需要非常长的时间才能发现。

常用技巧

(1) 多路输出实现

一般情况下map/reduce的输出是一个存储在HDFS上文件的形式,如果希望一个map/reduce任务希望产出多份输出数据怎么办?一种解决方案如下:

hadoop命令增加一个参数:

1
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat

reduce阶段每行的 value 末尾增加’#A’, ‘#B’, … 不同的字母会被输出到对应的 part 文件:part-00000-A, part-00000-B 最多支持26 路输出 A~Z 如果输出数据只有key没有value,那么#A 前面需要补一个 ’\t’

(2) map/reduce程序如何做本地测试

在调试mapper,reducer时,可使用如下命令在本地调试:cat filename | mapper.sh | sort –k1,3 | reducer.sh,其中mapper.sh和reducer.sh可以是任何可执行文件。

易错点

(1) map任务前的数据分割不保证相同的key在一起。因此在reduce阶段需要先做merge(边merge边sort)。

(2) map任务的输出中相同的key会被copy到同一个reducer,但是一个reducer并不只放一个key,因此在对同一个key下的数据进行操作时,仍需判断当前行的key是否与上一行的key相同。

Ref

Comments