触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在Web UI上看就可以,然后查看运行耗时的task,查看数据是否倾斜了!
根据这个task,根据stage划分原理,推算出数据倾斜发生在哪个shuffle类算子上。
查看导致数据倾斜的key的数据分布情况
根据执行操作的不同,可以有很多种查看key分布的方式:
1,如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
2,如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。
比如针对wordCount案例,最后的reduceByKey算子导致了数据倾斜:
val sampledPairs = pairs.sample(false,0.1) //对pairs采样10%
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
数据倾斜的解决办法
方案一:使用Hive ETL预处理数据
适用场景:导致数据倾斜的是Hive表,Hive表中的数据本身很不均匀,业务场景需要频繁使用Spark对Hive表执行某个分析操作。
实现思路:提前将join等操作执行,进行Hive阶段的ETL。将导致数据倾斜的shuffle前置。
优缺点:实现简单,Spark作业性能提升,但是Hive ETL还是会发生数据倾斜,导致Hive ETL的速度很慢。
实践经验:将数据倾斜提前到上游的Hive ETL,每天就执行一次,慢就慢点吧。
方案二:过滤少数导致倾斜的key
适用场景:少数几个key导致数据倾斜,而且对计算本身影响并不大的话。
实现思路:比如Spark SQL中直接用where条件过滤掉这些key,如果是RDD的话,用filter算子过滤掉这些key。如果是动态判断哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。
优缺点:实现简单,效果也好。缺点是一般情况下导致倾斜的key还是很多的,不会是少数。
解决方案三:提高shuffle操作的并行度
适用场景:直接面对数据倾斜的简单解决方案。
实现思路:对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行的shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by,join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数默认值是200,对于很多场景来说有点过小。
优缺点:简单能缓解,缺点是没有根除问题,效果有限。
解决方案四:两阶段聚合(局部聚合+全局聚合)
适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适合这种方案。
实现思路:先局部聚合,给每个key打一个小范围的随机数,比如10以内的随机数,相当于分成10份,一个task分成10个task。聚合聚合后,去掉key上的随机数前缀,再次进行全局聚合操作。
优缺点:大幅度缓解数据倾斜,缺点是仅适用于聚合类的shuffle操作。
解决方案五:将reduce join转为map join
Spark已经取代Hadoop成为最活跃的开源大数据项目,但是,在选择大数据框架时,企业不能因此就厚此薄彼
近日,著名大数据专家Bernard Marr在一篇文章中分析了Spark和 Hadoop 的异同
Hadoop和Spark均是大数据框架,都提供了一些执行常见大数据任务的工具,但确切地说,它们所执行的任务并不相同,彼此也并不排斥
虽然在特定的情况下,Spark据称要比Hadoop快100倍,但它本身没有一个分布式存储系统
而分布式存储是如今许多大数据项目的基础,它可以将 PB 级的数据集存储在几乎无限数量的普通计算机的硬盘上,并提供了良好的可扩展性,只需要随着数据集的增大增加硬盘
因此,Spark需要一个第三方的分布式存储,也正是因为这个原因,许多大数据项目都将Spark安装在Hadoop之上,这样,Spark的高级分析应用程序就可以使用存储在HDFS中的数据了
与Hadoop相比,Spark真正的优势在于速度,Spark的大部分操作都是在内存中,而Hadoop的MapReduce系统会在每次操作之后将所有数据写回到物理存储介质上,这是为了确保在出现问题时能够完全恢复,但Spark的弹性分布式数据存储也能实现这一点
另外,在高级数据处理(如实时流处理、机器学习)方面,Spark的功能要胜过Hadoop
在Bernard看来,这一点连同其速度优势是Spark越来越受欢迎的真正原因
实时处理意味着可以在数据捕获的瞬间将其提交给分析型应用程序,并立即获得反馈
在各种各样的大数据应用程序中,这种处理的用途越来越多,比如,零售商使用的推荐引擎、制造业中的工业机械性能监控
Spark平台的速度和流数据处理能力也非常适合机器学习算法,这类算法可以自我学习和改进,直到找到问题的理想解决方案
这种技术是最先进制造系统(如预测零件何时损坏)和无人驾驶汽车的核心
Spark有自己的机器学习库MLib,而Hadoop系统则需要借助第三方机器学习库,如Apache Mahout
实际上,虽然Spark和Hadoop存在一些功能上的重叠,但它们都不是商业产品,并不存在真正的竞争关系,而通过为这类免费系统提供技术支持赢利的公司往往同时提供两种服务
例如,Cloudera 就既提供 Spark服务也提供 Hadoop服务,并会根据客户的需要提供最合适的建议
Bernard认为,虽然Spark发展迅速,但它尚处于起步阶段,安全和技术支持基础设施方还不发达,在他看来,Spark在开源社区活跃度的上升,表明企业用户正在寻找已存储数据的创新用法