博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark性能调优
阅读量:6818 次
发布时间:2019-06-26

本文共 13503 字,大约阅读时间需要 45 分钟。

hot3.png

1、分配更多的资源

  1.1 分配的资源有:executor、cup per executor、memory per executor、driver memory

  1.2 如何分配:在spark-submit提交时设置相应的参数  

复制代码

/usr/local/spark/bin/spark-submit \--class cn.spark.sparktest.core.WordCountCluster \--num-executors 3 \  配置executor的数量 --driver-memory 100m \  配置driver的内存(影响不大) --executor-memory 100m \  配置每个executor的内存大小 --executor-cores 3 \  配置每个executor的cpu core数量 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

复制代码

  1.3 调节到多大(原则:能使用的资源有多大,就尽量调节到最大的大小)

    第一种,spark standalone,公司集群上,搭建了一套spark集群,应该清楚每台机器还能够给你使用的,还有多少内存和多少个cpu,然后根据此来进行配置;

      比如:有20台机器,每台有2个cpu和4G的内存,那么如果配置20个executor,那个每个executor内存分配4G和2个cpu

    第二种,yarn,应该去查看spark作业提交到的资源队列大概有多少资源;

      比如:如果有500G内存和100个cpu core,那么如果分配50个executor,那么每个executor分配的cpu core为2个

2、调节并行度

  2.1 并行度:其实是指,spark作业中,各个stage的task数量,也就代表了spark作业在各个stage的并行度

  2.2 配置方法:

spark.default.parallelism SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

  2.3 调节原则:应该配置到足够大,大到可以完全利用你的集群资源

    2.3.1 task数量:至少设置成与spark application的总cpu数量相同

      比如:总共150个cpu core,配置150个task,一起运行,差不多同一时间运行完毕

    2.3.2 官方推荐 task数据设置为spark application的总cpu数量的2~3倍

      比如:总共150个cupcore,配置300~500个task

3、将rdd进行持久化

  3.1 持久化的原则

    3.1.1 Rdd的架构重构和优化

      尽量复用Rdd,差不多的Rdd进行抽象为一个公共的Rdd,供后面使用

    3.1.2 公共Rdd一定要进行持久化

      对应对次计算和使用的Rdd,一定要进行持久化

    3.1.3 持久化是可以序列化的

      首先采用纯内存的持久化方式,如果出现OOM异常,则采用纯内存+序列化的方法,如果依然存在OOM异常,使用内存+磁盘,以及内存+磁盘+序列化的方法

    3.1.4 为了数据的高可靠性,而且内存充足时,可以使用双副本机制进行持久化

  3.2 持久化的代码实现

    .persist(StorageLevel.MEMORY_ONLY())

  3.3 持久化等级

    StorageLevel.MEMORY_ONLY()    纯内存    等效于   .cache()

    序列化的:后缀带有_SER 如:StorageLevel.MEMORY_ONLY_SER()   内存+序列化

        后缀带有_DISK 表示磁盘,如:MEMORY_AND_DISK() 内存+磁盘

        后缀带有_2表示副本数,如:MEMORY_AND_DISK_2() 内存+磁盘且副本数为2

4、将每个task中都使用的大的外部变量作为广播变量

  4.1 没有使用广播变量的缺点

    默认情况,task使用到了外部变量,每个task都会获取一份外部变量的副本,会占用不必要的内存消耗,导致在Rdd持久化时不能写入到内存,只能持久化到磁盘中,增加了IO读写操作。

    同时,在task创建对象时,内存不足,进行频繁的GC操作,降低效率

  4.2 使用广播变量的好处

    广播变量不是每个task保存一份,而是每个executor保存一份。

    广播变量初始化时,在Driver上生成一份副本,task运行时需要用到广播变量中的数据,首次使用会在本地的Executor对应的BlockManager中尝试获取变量副本;如果本地没有,那么就会从Driver远程拉取变量副本,并保存到本地的BlockManager中;此后这个Executor中的task使用到的数据都从本地的BlockManager中直接获取。

    Executor中的BlockManager除了从远程的Driver中拉取变量副本,也可能从其他节点的BlockManager中拉取数据,距离越近越好。

5、使用KryoSerializer进行序列化

  5.1 使用KryoSerializer序列化的好处

    默认情况,spark使用的是java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。

    该序列化的好处是方便使用,但必须实现Serializable接口,缺点是效率低,速度慢,序列化后的占用空间大

    KryoSerializer序列化机制,效率高,速度快,占用空间小(只有java序列化的1/10),可以减少网络传输

  5.2 使用方法

      //配置使用KryoSerializer进行序列化        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")        //(为了使序列化效果达到最优)注册自定义的类型使用KryoSerializer序列化        .registerKryoClasses(new Class[]{ExtractSession.class,FilterCount.class,SessionDetail.class,Task.class,Top10Session.class,Top10.class,VisitAggr.class});

  5.3 使用KryoSerializer序列化的场景

    5.3.1 算子函数中使用到的外部变量,使用KryoSerializer后,可以优化网络传输效率,优化集群中内存的占用和消耗

    5.3.2 持久化Rdd,优化内存占用,task过程中创建对象,减少GC次数

    5.3.3 shuffle过程,优化网络的传输性能

6、使用fastutil代替java标准的集合框架

  6.1 fastutil是什么

    fastutil扩展了java标准的集合框架,占用内存更小,存取速度更快,还提供了双向迭代器,并对引用类型使用等号(=)进行比较

  6.2 使用方法

    6.2.1 在pom.xml文件中引入相应的jar包

fastutil
fastutil
5.0.9

    6.2.2 在java代码中使用fastutil相应的集合框架

复制代码

//使用fastutil代替java utilfinal Map
extractSessionIndexs=new HashMap
();//final Map
> extractSessionIndexs=new HashMap
>();//使用fastutil代替java utilIntList extractIndexSet= new IntArrayList();//extractIndexSet= new ArrayList
();

复制代码

  6.3 适用场景

    6.3.1 fastutil尽量提供了在任何情况下都是速度最快的集合框架

    6.3.2 如果算子函数中使用到了外部变量,第一,可以使用广播变量进行优化;第二、可以使用KryoSerializer序列化进行优化;第三可以使用fastutil代替java 标准的集合框架进行优化

    6.3.3 算子函数中如果出现较大的集合,可以考虑使用fastutil进行重构

7、调节数据本地化等待时长

  7.1 什么事数据本地化等待时长

    每个task在哪个节点上执行是根据spark的task分配算法进行预先计算好的,但是可能由于该节点的资源或者计算能力满了,该task无法分配到该节点上,默认会等待3s,如果还是不能分配到该节点上,就会选择比较差的本地化级别,比如说,将task分配到原节点比较近的节点进行计算。

  7.2 数据本地化级别

    PROCESS_LOCAL:(默认),进程本地化,在同一个节点中执行,数据在执行task的executor中的BlockManager中,性能最优

    NODE_LOCAL:节点本地化,数据和task在同一节点的不同executor中,数据需要进行进程间的传输

    NO_PRE:对于task来说,数据从哪里获取都一样,没有好坏之分

    RACK_LOCAL:机架本地化,数据和task在一个机架的不同节点上,数据需要进行网络传输

    ANY:数据和task可能在集群中的任何地方,而且不在一个机架上,性能最差

  7.3 如何调节

    通过查看日志,日志里会显示,starting task ....,PROCESS_LOCAL、NODE_LOCAL、等信息  

//设置数据本地化等待时间(单位为s)conf.set("spark.locality.wait", "5");

 

1、降低cache操作的内存占比

  1.1 为什么要降低cache操作的内存占比

    spark的堆内存分别两部分,一部分用来给Rdd的缓存进行使用,另一部分供spark算子函数运行使用,存放函数中的对象

    默认情况下,供Rdd缓存使用的占0.6,但是,有些时候,不需要那么多的缓存,反而函数计算需要更多的内存,这样导致频繁的minor gc和full gc,导致性能降低。

  1.2 如何调节

    根据spark作业的yarn界面,如果有频繁的gc,就需要调节 

//调节cache操作的内存占比conf.set("spark.storage.memoryFraction", "0.4");

2、调节executor的堆外内存

  2.1 什么情况下进行调节

    当spark作业中,是不是的报错,shuffle file cannot find,executro、task lost,out of memory等,可能是堆外内存不足,导致executor挂掉,task拉取该executor的数据是无法获取到,导致以上错误,甚至spark作业崩溃。

  2.2 如何调节

    在spark作业的提交脚本中,修改spark.yarn.executor.memoryOverhead参数(默认为300多M)

复制代码

/usr/local/spark/bin/spark-submit \--class com.ibeifeng.sparkstudy.WordCount \--num-executors 80 \--driver-memory 6g \--executor-memory 6g \--executor-cores 3 \--master yarn-cluster \--queue root.default \--conf spark.yarn.executor.memoryOverhead=2048 \        调节堆外内存 --conf spark.core.connection.ack.wait.timeout=300 \       调节连接时间 /usr/local/spark/spark.jar

复制代码

3、调节连接等待时间

  3.1 什么情况下需要调节

    当一个executor的blockManager需要从其他的executor的blockManager中拉取数据,但是目标executor正处在gc阶段,此时源executor会进入等待连接状态(默认60s),如果多次拉取失败则会报   一串filed id  uuid(dsfsss-12323-sdsdsd-wewe) not found ,file lost,甚至spark作用直接崩溃。

  3.2 如何调节

    在spark作业的提交脚本中,修改conf spark.core.connection.ack.wait.timeout参数(默认为60s)

1、开启map端输出文件的合并机制

  1.1 为什么要开启map端输出文件的合并机制

    默认情况下,map端的每个task会为reduce端的每个task生成一个输出文件,reduce段的每个task拉取map端每个task生成的相应文件

        

    开启后,map端只会在并行执行的task生成reduce端task数目的文件,下一批map端的task执行时,会复用首次生成的文件

    

  1.2 如何开启

//开启map端输出文件的合并机制conf.set("spark.shuffle.consolidateFiles", "true");

 2、调节map端内存缓冲区

  2.1 为什么要调节map端内存缓冲区

    默认情况下,shuffle的map task,输出的文件到内存缓冲区,当内存缓冲区满了,才会溢写spill操作到磁盘,如果该缓冲区比较小,而map端输出文件又比较大,会频繁的出现溢写到磁盘,影响性能。

  2.2 如何调整

//设置map 端内存缓冲区大小(默认32k)conf.set("spark.shuffle.file.buffer", "64k");

3、调节reduce端内存占比

  3.1 为什么要调节reduce端内存占比

    reduce task 在进行汇聚,聚合等操作时,实际上使用的是自己对应的executor内存,默认情况下executor分配给reduce进行聚合的内存比例是0.2,如果拉取的文件比较大,会频繁溢写到本地磁盘,影响性能。

  3.2 如何调整

//设置reduce端内存占比conf.set("spark.shuffle.memoryFraction", "0.4");

 4、修改shuffle管理器

  4.1 有哪些shuffle管理器

    HashShuffleManager:1.2.x版本前的默认选择

    SortShuffleManager:1.2.x版本之后的默认选择,会对每个task要处理的数据进行排序;同时,可以避免像HashShuffleManager那么默认去创建多份磁盘文件,而是一个task只会写入一个磁盘文件,不同reduce task需要的的数据使用offset来进行划分。

    tungsten-sort(钨丝):1.5.x之后的出现,和SortShuffleManager相似,但是它本事实现了一套内存管理机制,性能有了很大的提高,而且避免了shuffle过程中产生大量的OOM、GC等相关问题。

  4.2 如何选择

    4.2.1 如果不需要排序,建议使用HashShuffleManager以提高性能

    4.2.2 如果需要排序,建议使用SortShuffleManager

    4.2.3 如果不需要排序,但是希望每个task输出的文件都合并到一个文件中,可以去调节bypassMergeThreshold这个阀值(默认为200),因为在合并文件的时候会进行排序,所以应该让该阀值大于reduce task数量。

    4.2.4 如果需要排序,而且版本在1.5.x或者更高,可以尝试使用tungsten-sort

  4.3 在项目中如何使用

//设置spark shuffle manager (hash,sort,tungsten-sort)conf.set("spark.shuffle.manager", "tungsten-sort");        //设置文件合并的阀值conf.set("spark.shuffle.sort.bypassMergeThreshold", "550");

 

1、使用MapPartitions代替map

  1.1 为什么要死使用MapPartitions代替map

    普通的map,每条数据都会传入function中进行计算一次;而是用MapPartitions时,function会一次接受所有partition的数据出入到function中计算一次,性能较高。

    但是如果内存不足时,使用MapPartitions,一次将所有的partition数据传入,可能会发生OOM异常

  1.2 如何使用

    有map的操作的地方,都可以使用MapPartitions进行替换

复制代码

/**         * 使用mapPartitionsToPair代替mapToPair         */        JavaPairRDD
sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction
, String, Row>() { private static final long serialVersionUID = 1L; public Iterable
> call(Iterator
rows) throws Exception { List
> list=new ArrayList
>(); while (rows.hasNext()) { Row row=rows.next(); list.add(new Tuple2
(row.getString(2), row)); } return list; } }); /*JavaPairRDD
sessionRowPairRdd = dateRangeRdd .mapToPair(new PairFunction
() { private static final long serialVersionUID = 1L; // 先将数据映射为
public Tuple2
call(Row row) throws Exception { return new Tuple2
(row.getString(2), row); } });*/

复制代码

2、使用coalesce对过滤后的Rdd进行重新分区和压缩

  2.1 为什么使用coalesce

    默认情况下,经过过滤后的数据的分区数和原分区数是一样的,这就导致过滤后各个分区中的数据可能差距很大,在之后的操作中造成数据倾斜

    使用coalesce可以使过滤后的Rdd的分区数减少,并让每个分区中的数据趋于平等

  2.2 如何使用   

复制代码

//过滤符合要求的ClickCategoryIdRow        filteredSessionRdd.filter(new Function
, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2
tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce将过滤后的数据重新分区和压缩,时新的分区中的数据大致相等 .coalesce(100)

复制代码

3、使用foreachPartition替代foreach

  3.1 为什么使用foreachPartition

    默认使用的foreach,每条数据都会传入function进行计算;如果操作数据库,每条数据都会获取一个数据库连接并发送sql进行保存,消耗资源比较大,性能低。

    使用foreachPartition,会把所用partition的数据一次出入function,只需要获取一次数据库连接,性能高。

  3.2 如何使用

复制代码

/**         * 使用foreachPartition替代foreach         */        sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction
>>>() { private static final long serialVersionUID = 1L; public void call(Iterator
>> iterator) throws Exception { List
sessionDetails=new ArrayList
(); if (iterator.hasNext()) { Tuple2
> tuple2=iterator.next(); String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1)); sessionDetails.add(sessionDetail); } DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails); } }); /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction
>>() { private static final long serialVersionUID = 1L; public void call(Tuple2
> tuple2) throws Exception { String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1));   DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail); } });*/

复制代码

 4、使用repartition进行调整并行度

  4.1 为什么要使用repartition

    spark.default.parallelism设置的并行度只能对没有Spark SQL(DataFrame)的阶段有用,对Spark SQL的并行度是无法设置的,该并行度是通过hdfs文件所在的block块决定的。

    可以通过repartition调整之后的并行度

  4.2 如何使用 

sqlContext.sql("select * from user_visit_action where date >= '" + startDate + "' and date <= '" + endDate + "'").javaRDD()    //使用repartition调整并行度    .repartition(100)

 5、使用reduceByKey进行本地聚合

  5.1 reduceByKey有哪些优点

    reduceByKey相对于普通的shuffle操作(如groupByKey)的一个最大的优点,会进行map端的本地聚合,从而减少文件的输出,减少磁盘IO,网络传输,内存占比以及reduce端的聚合操作数据。

  5.2 使用场景

    只有是针对每个不同的key进行相应的操作都可以使用reduceByKey进行处理

1、调节reduce端缓冲区大小避免OOM异常

  1.1 为什么要调节reduce端缓冲区大小

    对于map端不断产生的数据,reduce端会不断拉取一部分数据放入到缓冲区,进行聚合处理;

    当map端数据特别大时,reduce端的task拉取数据是可能全部的缓冲区都满了,此时进行reduce聚合处理时创建大量的对象,导致OOM异常;

  1.2 如何调节reduce端缓冲区大小

    当由于以上的原型导致OOM异常出现是,可以通过减小reduce端缓冲区大小来避免OOM异常的出现

    但是如果在内存充足的情况下,可以适当增大reduce端缓冲区大小,从而减少reduce端拉取数据的次数,提供性能。

//调节reduce端缓存的大小(默认48M)conf.set("spark.reducer.maxSizeInFlight", "24");

2、解决JVM GC导致的shuffle文件拉取失败

  2.1 问题描述

    下一个stage的task去拉取上一个stage的task的输出文件时,如果正好上一个stage正处在full gc的情况下(所有线程后停止运行),它们之间是通过netty进行通信的,就会出现很长时间拉取不到数据,此时就会报shuffle file not found的错误;但是下一个stage又重新提交task就不会出现问题了。

  2.2 如何解决

    调节最大尝试拉取次数:spark.shuffle.io.maxRetries 默认为3次

    调节每次拉取最大的等待时长:spark.shuffle.io.retryWait 默认为5秒

//调节拉取文件的最大尝试次数(默认3次)conf.set("spark.shuffle.io.maxRetries", "60");        //调节每次拉取数据时最大等待时长(默认为5s)conf.set("spark.shuffle.io.retryWait", "5s");

 3、yarn队列资源不足导致application直接失败

  3.1 问题描述

    如果yarn上的spark作业已经消耗了一部分资源,如果现在再提交一个spark作业,可能会出现以下两个情况:第一、发现yarn资源不足,直接打印fail的log,直接就失败;第二、发现yarn资源不足,该作业就一直处于等待状态,等待分配资源执行。

  3.2 如何解决

    如果发生了上面的第一种问题,可以通过以下方式解决

    方法一:限制同一时间内只有一个spark作业提交到yarn上,确保spark作业的资源是充足的(调节同一时间内每个spark能充分使用yarn的最大资源)。

    方法二:将长时间的spark作业和短时间的spark作业分别提交到不同的队列里(通过线程池的方式实现)。

4、序列化导致的错误

  4.1 问题描述

    如果日志信息出现了Serializable、Serialize等错误信息

  4.2 如何解决

    4.2.1 如果算子函数中使用到外部的自定义的变量,自定义类型需要实行Serializable接口

    4.2.2 如果RDD中使用到自定义的数据类型,自定义类型需要实行Serializable接口

    4.2.3 以上两种情况的类型,不能使用第三方提供的没有实现Serializable接口的类型

5、算子函数返回NULL导致的错误

  5.1 问题描述

    有些算子函数需要有返回值,但是有些数据,就是不想返回任何数据,此时如果返回NULL,可能会导致错误。

  5.2 如何解决

    先返回一个固定的值,之后进行过滤掉指定的数据即可。

6、yarn-cluster模式的JVM内存溢出无法执行的问题

  5.1 问题描述

    有些spark作业,在yarn-client模式下是可以运行的,但在yarn-cluster模式下,会报出JVM的PermGen(永久代)的内存溢出,OOM.

    出现以上原因是:yarn-client模式下,driver运行在本地机器上,spark使用的JVM的PermGen的配置,是本地的默认配置128M;

          但在yarn-cluster模式下,driver运行在集群的某个节点上,spark使用的JVM的PermGen是没有经过默认配置的,默认是82M,故有时会出现PermGen Out of Memory error log.

  5.2 如何处理

    在spark-submit脚本中设置PermGen

    --conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"(最小128M,最大256M)

    如果使用spark sql,sql中使用大量的or语句,可能会报出jvm stack overflow,jvm栈内存溢出,此时可以把复杂的sql简化为多个简单的sql进行处理即可。

 7、checkpoint的使用

  7.1 checkpoint的作用

    默认持久化的Rdd会保存到内存或磁盘中,下次使用该Rdd时直接冲缓存中获取,不需要重新计算;如果内存或者磁盘中文件丢失,再次使用该Rdd时需要重新进行。

    如果将持久化的Rdd进行checkpoint处理,会把内存写入到hdfs文件系统中,此时如果再次使用持久化的Rdd,但文件丢失后,会从hdfs中获取Rdd并重新进行缓存。

  7.2 如何使用

    首先设置checkpoint目录

//设置checkpoint目录javaSparkContext.checkpointFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/yanglin/spark/checkpoint/UserVisitSessionAnalyzeSpark");

    将缓存后的Rdd进行checkpoint处理

//将缓存后的Rdd进行checkpointsessionRowPairRdd.checkpoint();

 

 

 

转载于:https://my.oschina.net/hblt147/blog/1572767

你可能感兴趣的文章
thinkphp5 多图片拖拽上传,自己写的,不足之处请指正~
查看>>
将Unicon字符串转成汉字String C#
查看>>
Centos 6.7 4TB 硬盘LVM 水平扩容
查看>>
iOS11上手体验
查看>>
第一天的Oracle
查看>>
UML类图几种关系的总结
查看>>
Linux修改时区的正确方法【修改时间,需要修改软连接,靠谱】
查看>>
文件下载功能的实现【本文是excel下载】
查看>>
ffmpeg mp4 to hls
查看>>
XML 文件解析--标签内容长度限制问题
查看>>
maven 与多模块构建
查看>>
ubuntu14.04 配置tomcat8
查看>>
VirtualBox体验及介绍
查看>>
Ubuntu 12.04 下安装 JDK 7
查看>>
1&gt;s.cpp(465) : error C2448: “main”: 函数样式初始值设定项类似函数定义 问题的解决方法...
查看>>
Error in Javac compilation for JSP
查看>>
XWifiMouse早期写的一个Android鼠标App
查看>>
Android AIDL 客户端和服务端配置
查看>>
制作自己的镜像(二)
查看>>
運維之下標題
查看>>