1 编译与CDH集成
下载Spark源码:Spark3.2.1<br /> 官方下载:[https://spark.apache.org/downloads.html](https://spark.apache.org/downloads.html)<br />选择源码包:<br /><br />注意:Spark3只支持Scala2.12+<br />指定CDH环境hadoop、hive、scala版本并编译Spark<br />编译命令:<br />./dev/make-distribution.sh --name 3.0.0-cdh6.3.2 --tgz -Phive -Phive-thriftserver -Pyarn -Phadoop-3.0.0 -Dhadoop.version=3.0.0-cdh6.3.2 -DskipTests clean package -X<br />编译好的gz包在当前目录下,上传至部署服务器中
2 部署Spark
解压gz包:
tar -zxvf /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2.tar.gz
赋予root权限:
chown -R root:root /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2
建立软连接方便管理:
ln -s /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2 /opt/spark3
修改配置:
cp -r /opt/spark3/conf/spark-defaults.conf.template /opt/spark3/conf/spark-defaults.conf
cp -r /opt/spark3/conf/spark-env.sh.template /opt/spark3/conf/spark-env.sh
cp -r /opt/spark3/conf/workers.template /opt/spark3/conf/workers
拷贝cdh自带spark的默认配置spark-default.conf并修改为如下配置:
spark.authenticate=falsespark.driver.log.dfsDir=/user/spark3/driverLogsspark.driver.log.persistToDfs.enabled=truespark.dynamicAllocation.enabled=truespark.dynamicAllocation.executorIdleTimeout=60spark.dynamicAllocation.cachedExecutorIdleTimeout=600spark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.maxExecutors=100spark.dynamicAllocation.schedulerBacklogTimeout=1spark.eventLog.enabled=truespark.io.encryption.enabled=falsespark.network.crypto.enabled=falsespark.serializer=org.apache.spark.serializer.KryoSerializerspark.shuffle.service.enabled=truespark.shuffle.service.port=7338spark.ui.enabled=truespark.ui.killEnabled=truespark.lineage.log.dir=/var/log/spark3/lineagespark.lineage.enabled=truespark.master=yarnspark.submit.deployMode=clientspark.eventLog.dir=hdfs://hdfs/user/spark3/applicationHistoryspark.yarn.historyServer.address=http://data-master2.bj.sm:18088spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/nativespark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/nativespark.yarn.am.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/nativespark.yarn.config.gatewayPath=/opt/cloudera/parcelsspark.yarn.config.replacementPath={{HADOOP_COMMON_HOME}}/../../..spark.yarn.historyServer.allowTracking=true
spark的环境配置spark-env.sh并修改为如下配置:
#shumei spark env propertiesexport SPARK_HOME=/opt/spark3export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CONF_DIR=/etc/hadoop/confexport YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn
workers中添加所有datanode节点:
data-slave1.bj.smdata-slave2.bj.smdata-slave3.bj.smdata-slave4.bj.smdata-slave5.bj.smdata-slave6.bj.smdata-slave7.bj.smdata-slave8.bj.smdata-slave9.bj.smdata-slave10.bj.smdata-slave11.bj.smdata-slave12.bj.smdata-slave13.bj.smdata-slave14.bj.smdata-slave15.bj.smdata-slave16.bj.smdata-slave17.bj.smdata-slave18.bj.smdata-slave19.bj.smdata-slave20.bj.smdata-slave21.bj.smdata-slave22.bj.smdata-slave23.bj.smdata-slave24.bj.smdata-slave25.bj.smdata-slave26.bj.sm
在conf下添加cdh环境配置文件软连接:
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/core-site.xml
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/hdfs-site.xml
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/hive-site.xml
测试spark-shell是否正常启动:
/opt/spark3/bin/spark-shell
在需要Spark的机器上部署即可,无需分布式部署
3 配置
3.1 调度配置
调度相关的配置主要有:
- 数据本地性相关,可以控制判断数据本地性的时间 - 任务调度的模式,如FIFO, FAIR等
- 黑名单机制,如失败多少次这个executor拉入黑名单、失败多少次节点拉入黑名单、黑名单后多长时间能再次启用
- 推测执行相关,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。
- 任务相关,比如使用的CPU、重试的次数等
# 开启speculative,默认关闭spark.speculation=true# 检测周期,单位毫秒spark.speculation.interval=100# 任务完成的百分比,比如同一个stage中task的完成占比spark.speculation.quantile=0.75# 任务延迟的比例,比如当75%的task都完成,那么取他们的中位数跟还未执行完的任务作对比。如果超过1.5倍,则开启推测执行。spark.speculation.multiplier=1.5spark.cores.max当运行在standalone和mesos,应用可以使用的最大核数。如果没有配置,默认是 spark.deploy.defaultCores。spark.locality.wait默认值,3s启动数据本地任务等待多长时间后启动非本地节点。数据本地性有多种级别,进程本地性、节点本地性、机架本地性或其他。也可以根据不同的级别配置等待时间,如 spark.locality.wait.node 等。如果任务时间长或者很少看见本地性可以调整该配置。spark.locality.wait.node默认值,spark.locality.wait定义节点本地性的等待时间,比如,可以配置0跳过节点本地性,直接使用rack本地性。spark.locality.wait.process默认值,spark.locality.wait定义进程本地性的等待时间spark.locality.wait.rack定义机架本地性的等待时间spark.scheduler.maxRegisteredResourcesWaitingTime默认值,30s调度任务前等待资源注册的时间spark.scheduler.minRegisteredResourcesRatio默认值,yarn模式下0.8,standalone和mesos模式下0.0spark.scheduler.mode默认值,FIFO在相同的SparkContext中提交的任务调度模式,默认先进先出,也可以是FAIR公平调度spark.scheduler.revive.interval默认值,1s检查worker资源准备的间隔时间spark.scheduler.listenerbus.eventqueue.capacity默认值,10000Spark监听总线的事件队列长度,必须大于0.如果监听的事件被丢弃,可以增加该值。增加会使driver需要更多的内存。spark.scheduler.blacklist.unschedulableTaskSetTimeout默认值,120sspark.blacklist.enabled默认值,false如果为true,阻止spark在失败多次而进入黑名单的executor上 调度任务。spark.blacklist.timeout默认值,1h当节点或执行者被当做黑名单时的时间,之后会从黑名单移除正常参与执行任务。spark.blacklist.task.maxTaskAttemptsPerExecutor默认值,1试验特性,对于一个task,在一个executor进入黑名单前可以执行重试多少次。spark.blacklist.task.maxTaskAttemptsPerNode默认值,2试验特性,对于一个task,在一个node成为黑名单前可以执行重试多少次。spark.blacklist.stage.maxFailedTasksPerExecutor默认值,2试验特性,executor针对某个stage成为黑名单需要失败多少个任务。spark.blacklist.stage.maxFailedExecutorsPerNode默认值,2试验特性,在一个节点上,有几个executor针对某个stage成为黑名单,这个node才算黑名单。spark.blacklist.application.maxFailedTasksPerExecutor默认值,2试验特性,有多少不同的任务在同一个executor失败后,这个executor针对整个应用成为黑名单。黑名单的executor当 spark.blacklist.timeout 超时后,会被自动添加到资源池中。注意如果使用了动态分配,那么executor可能会被标记为空闲,从而被资源调度框架回收。spark.blacklist.application.maxFailedExecutorsPerNode默认值,2试验特性,针对整个应用,有多少不同的executor成为黑名单后,这个节点也会被标记为黑名单。spark.blacklist.killBlacklistedExecutors默认值,false如果为true,允许spark立即删除黑名单的executor。如果node被标记为黑名单,那么上面那所有的executor都会被kill。spark.blacklist.application.fetchFailure.enabled默认值,false如果配置为true,当executor执行fetch操作报错时,将直接会拉入黑名单。如果使用外部shuffle服务,那么整个节点将会被拉入黑名单。spark.speculation默认值,false如果为true,将会针对任务进行推测执行。比如某个任务执行缓慢,会再开启一个任务,哪个执行快用哪个作为结果。spark.speculation.interval默认值,100ms针对任务进行探测的间隔时间spark.speculation.multiplier默认值,1.5任务比均值慢多少将会执行推测spark.speculation.quantile默认值,0.75在进行推测前任务需要完成多少spark.task.cpus默认值,1每个task允许使用的核数spark.task.maxFailures默认值,4任务在放弃执行前可以允许的失败次数。不同的任务的失败次数,不会导致job失败。spark.task.reaper.enabled默认值,false启用任务的停止监控。当配置成true,任何任务被kill,都会被executor监控到,直到任务完成。参考 spark.task.reaper.* 配置。spark.task.reaper.pollingInterval默认值,10s当enabled为true时,这个配置控制executor多长时间检测一次。spark.task.reaper.threadDump默认值,true当任务停止后dump出日志。spark.task.reaper.killTimeout默认值,-1用来配置任务无法终止时,使用JVM进行停止的等待时间。-1为禁用。spark.stage.maxConsecutiveAttempts默认值,4在stage停止前可以尝试的次数。
