之前都是在spark shell上执行,当数据量达到一定程度,我们可以利用Spark的集群模式来运行,增加算力,而且本地小数据量验证成功的代码可以直接放到集群上跑。
这一小节从提交一个集群环境下的Spark Job出发,讨论了在集群运行Spark Job时的配置项,再讲到Spark基础的架构,最后讲解了一下Spark Job性能调试的经验。
一、spark-submit应用部署
Spark使用spark-submit脚本来启动集群部署模式,进入/usr/bin/目录,可以看到一个spark-submit脚本,输入spark-submit —help可以看到一些配置选项介绍,截取一部分如下图:
以下是一个spark-submit提交scala应用的shell脚本的示例,我们可以看到在主程序job_submit中,我们从控制台传入了四个所需参数,对spark-submit进行了配置,比如class/master/queue等等,以下会逐一讲解,最后我们传入了JAR_FILE及其参数,这是我们的需要执行的打包的scala程序:
所以spark-submit的一般格式是这样的:
|
|
这里主要讲几个比较常用的配置项:
- — master: 集群的master URL,上面的示例中我们传入了yarn集群
有以下可以接收的值:
值 | 描述 | |
---|---|---|
spark://host:port | 连接到指定端口的Spark独立集群,默认Saprk独立主节点端口为7077,例如spark://23.195.26.187:7077 | |
mesos://host:port | 连接到指定端口的Mesos集群,默认为5050端口 | |
yarn | yarn集群,需设置环境变量HADOOP_CONF_DIR为HADOOP配置目录 | |
local | 本地模式,使用单核 | |
local[N] | 本地模式,N个核 | |
local[*] | 本地模式 | 尽可能多的核 |
— class: 运行Java或scala程序时应用的主类,示例中为com.stat.UserAdProfileGenerator
— delopy-mode: 选择在客户端client还是在集群cluster启动驱动器
— name: 该应用显示在Spark网页用户界面上的名称,示例中未设置该参数
— files: 需放到该应用工作目录中的文件列表,我们程序中往往会使用一些数据文件,可以放在这里
— py-files: 需添加到PYTHONPATH中的文件列表,可以包含.py/.egg/.zip文件
— driver-memory: 驱动器进程使用的内存量,示例为16G
— executor-memory: 执行器进程使用的内存量,示例为16G
— num-executors: 指定执行器数量,只在yarn集群模式下使用,示例为50个
— exectors-cores: 执行器节点的核心数,只在yarn集群模式下使用,示例为1个
以上这些选项主要包括两部分,第一部分为调度的信息,比如master、driver-memory、exectors-cores,这部分和我们的性能调试息息相关,后面会讲到怎么去做调整;第二部分是依赖,比如files、py-files。
上面出现了一些陌生的概念,比如驱动器、执行器、集群等等,这些都是Spark架构中的概念,分别看一下。
二、Spark基础架构
下图为Spark架构组成图,有三个主要组成部分,Driver Program、Cluster Manager和Worker Node。
- Driver Program:驱动器程序,负责中央协调,调度各个分布式工作节点;
- Cluster Manager:集群管理器,控制整个集群,监控worker。自带的集群管理器称为独立集群管理器,也可以运行在Hadoop YARN或者Apache Mesos上;
- Worker Node:工作节点,负责控制计算节点,启动Executor或者Driver。
- Executor:执行器节点,运行在worker node上的一个进程;
分别具体介绍一下驱动器节点、执行器节点和集群管理器,如下:
驱动器节点
作用:执行main()方法的进程,比如创建SparkContext、创建RDD、RDD转化和行动操作的代码,当我们启动Spark shell时,就启动了Spark驱动器程序,而当驱动器程序一旦终止,Spark应用也就结束了。
职责:
- 所有的Spark程序不外乎创建RDD,RDD转化操作生成新的RDD,最后行动操作存储RDD的数据,这样就构成了一个有向无环图(DAG)的操作逻辑图,当驱动器节点启动时,Spark把逻辑图转为物理执行计划,即转变为一系列步骤(Stage),每个步骤包含多个任务(Task),这些任务打包后被送到集群进行分布式计算。Task是Spark中最小的工作单元。
- 驱动器节点启动,生成物理执行计划,驱动器程序负责协调各执行器进程之间的各个任务,执行器进程启动后会在驱动器上注册自己,以在驱动器上有每个执行器进程的完整记录,每个执行器节点代表一个能够处理任务和存储RDD数据的进程。Spark会把分配任务给合适的执行器进程,执行器进程会缓存数据,驱动器跟踪数据缓存任务,从而调度之后的任务,尽可能减少数据网络传输。
执行器节点
- 作用:负责在Spark作业中运行任务,任务间相互独立;与Spark任务同时启动,伴随整个生命周期,即使执行器节点发生了异常奔溃,Saprk也可继续执行
- 职责:
- 负责运行组成Spark应用的任务,并将结果返回给驱动器进程;
- 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在执行器进程中的,所以可以在运行时充分利用缓存数据提高运算速度。
集群管理器
- 作用:Spark依赖于集群管理器启动执行器节点,在某些特殊情况下,也会依赖集群管理器来启动驱动器节点。Spark有自带的独立集群管理器,也可以运行在其他外部集群管理器上,如YARN和Mesos等。
这里主要说一下YARN集群管理器,Yarn(Yet Another Resource Negotiator)是Hadoop的资源管理层,它旨在提供一个通用且灵活的框架来管理Hadoop集群中的计算资源。它也是一种主/从结构,由三个部件组成,分别是:
- Resource Manager (RM)
- Node Manager (NM)
- Application Master (AM)
想了解这三个组成部分的细节,可以细读这篇文章Hadoop Yarn的架构
在使用层面,我们看一下如何设置YARN作为集群管理器:
- 设置指向HADOOP配置目录的环境变量:找到HADOOP的配置目录,设为环境变量HADOOP_CONF_DIR => export HADOOP_CONF_DIR = “…”
- 使用spark-submit向主节点URL提交作业 => spark-submit —master yarn ···
再看一下如何配置资源用量,这个在之前已经提到过了:
- —num -executors :设置执行器节点,默认值为2,一般要提高这个值
- —executor -memory: 设置每个执行器的内存用量
- —executor -cores: 设置每个执行器进程从YARN中占用的核心数目
- —queue:设置队列名称,YARN可以将应用调度到多个队列中
这里几个参数的调优会在第四小节讲到。
讲完了spark-summit和架构部分,那如果想在代码层面进行debug,有哪些比较好的方法,下一章节分别介绍一下。
三、Spark UI与日志
Spark可以通过单机和集群模式来部署代码,单机部署的时候我们可以通过断点调试来修正代码,但集群部署却很难用该方法去调试,但我们可以通过日志分析或者Spark UI界面来进行调试和优化我们的代码。
Spark UI
Spark UI是呈现Spark应用性能以及其他信息的前端展示页面,
看一下导航栏,按照数字标注分别是:
- job页面:列出了当前应用的所有任务,以及所有的excutors中action的执行时间、耗时、完成情况;
- Stages:查看应用的所有stage,粒度上要比job更细一些;
- storage:查看应用目前使用了多少缓存,一般由cache persist等操作触发
- environment:展示当前spark所依赖的环境,比如jdk,lib等等
- executors:查看申请使用的内存大小以及shuffle中input和output等数据
- AppName:显示代码中使用setAppName设定应用名字
下图为Spark的UI主页面,这里展示了已完成的Job信息,点击Job进入详细页面,分为两部分,一部分是event timeline,另一部分是进行中和完成的job任务。
更多页面的详细介绍可以参考这个链接
yarn logs
如果我们没法从UI界面中排查出错原因,这时候只有把driver和executor进程所生成的日志来得到更多的信息,因为日志会详细记录各种异常事件,比如内部警告以及用户代码输出的详细异常信息。
在YARN模式下,我们可以在控制台输入:
|
|
示例如下:
app ID一般会在程序执行的时候打印到控制台,或者一般把日志重定向到log文件中,在执行过程中用tail -f 查看执行日志,实时监控执行进程。
四、Spark 性能调优经验
打印信息
可以在程序中打印出一些输出信息,数据量很大的时候,做一些take(10)/show(10)/count()这样的action操作,就可以看到一些变量的中间状态了,比如将yarn作为资源管理器时,直接可以在yarn logs中就可以看到这些信息。这对我们调试也是很有帮助的,可以知道我们的程序是否得到了我们想得到的结果。
并行度调优
在物理执行期间,RDD会分为多个分区,每个分区存储了全体数据的一个子集,然后Spark会给每个分区的数据创建一个Task,Spark一般会根据其底层的存储系统自动推断出合适的并行度,比如从HDFS读数据会为每个文件区块创建一个分区,从混洗后的RDD派生下来的RDD的并行度保持和父一致。
并行度会从两方面影响性能:
- 并行度过低时,会出现资源限制的情况,可以提高并行度来充分利用更多的计算核心
- 并行度过高时,每个分区产生的间接开销累计起来会更大。评价并行度是否过高可以看你的任务是不是在瞬间(毫秒级)完成的,或者任务是不是没有读写任何数据。
如何调优:
- 在数据混洗操作时,对混洗后的RDD指定并行度
- 对任何已有的RDD进行重新分区来获取更多或更少的分区数,重新分区可用repartition(),减少分区可用coalesce(),coalesce不会打乱数据,所以比repartition效率高
以下是一个实例:
|
|
内存管理
1.改变内存比例
- RDD存储(60%):当调用RDD的persist()或cache()方法时,这个RDD分区会被存储到缓存中,Spark会根据spark.storage.memoryFraction限制用来缓存的内存占整个JVM堆空间的比例大小。若超出限制,旧的分区数据会被移出内存。
- 数据混洗与聚合缓存区(20%):当数据进行数据混洗时,Spark会创造一些中间缓存区来存储数据混洗的输出数据。根据spark.shuffle.memoryFraction限定这种缓存区占总内存的比例。
- 用户的代码(20%):spark可以执行任意代码,所以用户的代码可以申请大量内存,它可以访问JVM堆空间中除了分配给RDD存储和数据混洗存储以外的全部空间。
2.改进缓存等级
- cache()操作:它以内存优先即MEMORY_ONLY的存储等级持久化数据,分区不够用的话,旧分区会直接删除。需要用到时会再重新算,这样效率会低。一般不太建议直接使用cache,一旦cache的量很大,就会导致内存溢出。
- persist()操作可以持久化级别,比如使用MEMORY_AND_DISK级别调用persist(),内存如果放不下的旧分区会被写入磁盘,当再次需要用到的时候再从磁盘上读取,代价比重算要低,性能也会稳定一些,比如从数据库中读取数据时,重算(即读取)的代价很大,这个时候就很有效
硬件资源
硬件资源会很大程度上影响Job的效率,主要有以下几个:
- — executor-memory: executor的内存,各种部署模式都可以,当任务失败,报错信息为sparkContext shutdown时,基本是内存不足导致的。可以尝试调大—excutor-memory参数,但若系统条件受限,无法加大内存,可以局部进行调试,把程序按执行步骤检查问题点。
- — executor-cores: executor的核数,仅在yarn模式下
- — num—executors: executor节点的总数,仅在yarn模式下
性能调优可以在实践中不断积累经验,以上只是一些参考。如果想了解更多的调优方法,可以访问Tuning Spark
参考资料:
《Spark大数据分析》