針對 spark job 在 yarn 上面執行的效能調整,在 GCP 上面使用三台機器做一個 hadoop cluster 做測試.

測試檔案的資料格式大概長這樣,然後大小為 1G , 10200000 筆資料 :

jackalhelix68714,picbear.com/ebcbuzz/cannotmappin,2018-10-22 06:08:11,2018-10-22 06:08:57,33,530214
butterflyforest75334240,www.theguardian.com/media/iplayer,2018-10-13 12:07:15,2018-10-13 12:08:34,70,2487
lightningsmall9571044,picbear.com/ebcbuzz/cannotmappin,2018-10-17 06:07:15,2018-10-17 06:08:10,467706,58

主要就是跑 spark job 處理上面的檔案資料後,寫入 Redis.

一台機器的規格

4 Core , 15G

Hadoop Cluster 規格,每一台保留 2 Core 及 5G 的 Memory 給系統使用 :

Memory Total 30 GB
VCores Total 6

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
	yarn cluster mode , driver-memory 5g , executor-memory 5g , executor-cores 2 , num-executors 3
(2)yarn 執行狀態 : 
	Containers Running : 3 個, Memory Used : 18GB , VCores Used : 3
(3)執行時間 : 
	17:18:52 ~ 18:21:44 , 63 分鐘
(4)redis 資料 : 
	約 20399995 筆

使用上面的 spark-submit 參數,一開始只有 3 個 container 在跑,資源也沒有用滿.跑了 1 小時左右. 所以調整了一下參數,算法如下 :

1台機器 2 Core / 每個 executor 1 個 Core 所以每一台機器最多只能跑 2 個 executor. 1台機器 2 個 executor * 3 台,所以最多有 6 個 executor 可以執行,但扣掉 1 個 Driver 所以剩 5 個 executor. 1台機器的 Memory 有 10G / 2 個 executor,1 個 executor 可以使用的 Memory 公式算法大概是 :

1 個 executor 可以使用的 Memory - Min(384 , 0.1 * (1 個 executor 可以使用的 Memory)) - (Driver 使用的 Memory + Min(384 , (0.1 * (Driver 使用的 Memory))))

或著是將 0.1 改成 0.7

1 個 executor 可以使用的 Memory - Min(384 , 0.7 * (1 個 executor 可以使用的 Memory)) - (Driver 使用的 Memory + Min(384 , (0.7 * (Driver 使用的 Memory))))

所以大概是

5G - Min(384 , 0.1 * 5G) = 5G - 0.5G = 4.5G
10G - 4.5G 剩 5.5 給 driver,上面的例子 driver-memory 給 5g,實際會用到 5 + 0.5 = 5.5,所以可能不夠用.

如果是用 0.7 算的話

5G - Min(384 , 0.7 * 5G) = 5G - 3.5G = 1.5G 完全不夠給 Driver.

所以修改 spark-submit 參數如下 :

(1)spark-submit 參數 : 
	yarn cluster mode , driver-memory 7g , executor-memory 7g , executor-cores 1 , num-executors 5
(2)yarn 執行狀態 : 
	Containers Running : 6 個 , Memory Used : 48 GB , VCores Used : 6 core
(3)執行時間 : 
	10:17:34 ~ 10:57:48 , 40 分鐘
(4)redis 資料 : 

所以 Memory 的計算,0.1 的算法

5 個 executors * (7G + 7G * 0.1) = 5 * 7.7 = 38.5
1 個 driver * (7G + 7G * 0.1) = 7.7
38.5 + 7.7 = 46.2

0.7 的算法

5 個 executors * (7G + 7G * 0.7) = 5 * 11.9 = 59.5
1 個 driver * (7G + 7G * 0.7) = 11.9
59.5 + 11.9 = 71.4

Memory 實際使用了 48G,所以感覺用 0.7 去估比較保險.

提升機器規格做測試,一台機器的規格

8 Core , 30G

Hadoop Cluster 規格,每一台保留 2 Core 及 8G 的 Memory 給系統使用 :

Memory Total 66 GB
VCores Total 18

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
	yarn cluster mode , driver-memory 7g , executor-memory 7g , executor-cores 2 , num-executors 8
(2)yarn 執行狀態 : 
	Containers Running : 6 個 , Memory Used : 48 GB , VCores Used : 6 core
(3)執行時間 : 
	17:37:57 ~ 18:12:07 , 35 分鐘
(4)redis 資料 : 
	info keyspace 
	n1 -> db0:keys=6799307
	n2 -> db0:keys=6801872
	n3 -> db0:keys=6798816

每 1 個 executor 應該要是 2 個 core 但在 Yarn 上面看到的還是只有 6 個 core.

修改 capacity-scheduler.xml 的 yarn.scheduler.capacity.resource-calculator 參數,原本是

 <property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    <description>
      The ResourceCalculator implementation to be used to compare
      Resources in the scheduler.
      The default i.e. DefaultResourceCalculator only uses Memory while
      DominantResourceCalculator uses dominant-resource to compare
      multi-dimensional resources such as Memory, CPU etc.
    </description>
  </property>

改成使用 DominantResourceCalculator :

 <property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
    <description>
      The ResourceCalculator implementation to be used to compare
      Resources in the scheduler.
      The default i.e. DefaultResourceCalculator only uses Memory while
      DominantResourceCalculator uses dominant-resource to compare
      multi-dimensional resources such as Memory, CPU etc.
    </description>
  </property>

修改 capacity-scheduler.xml 只需要下

yarn rmadmin -refreshQueues

參考 yarn 資訊不準確 修改後再測試

(1)spark-submit 參數 : 
  yarn cluster mode , driver-memory 7g , executor-memory 7g , executor-cores 2 , num-executors 6
(2)yarn 執行狀態 : 
  Containers Running : 6 個 , Memory Used : 48 GB , VCores Used : 12 core
  spark 執行狀態 : 
   Task : 8 個 , 
   executor-cores : 2 Cores
(3)執行時間 : 
  14:54:19 ~ 15:26:05 , 32 分鐘
(4)redis 資料 : 20399989 筆左右

可以看到 core 是正確的了,但執行時間並沒有提升太多.

再次提升機器規格做測試,一台機器的規格

16 Core , 60G

Hadoop Cluster 規格,每一台保留 4 Core 及 12G 的 Memory 給系統使用 :

Memory Total 144 GB
VCores Total 36

這次提升了很多 core 跟 memory 所以重新定義 yarn 的參數 yarn-site.xml :

yarn.scheduler.maximum-allocation-mb : 49152/3 = 16384 = 16G
yarn.scheduler.minimum-allocation-mb : 49152/9 = 5461 = 5.3 G
yarn.scheduler.maximum-allocation-vcores : 36/3 = 12
yarn.scheduler.minimum-allocation-vcores : 36/9 = 4 Core
yarn.nodemanager.resource.memory-mb : 1024 * 48 = 49152
yarn.nodemanager.resource.cpu-vcores : 36 Core

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
	yarn cluster mode , driver-memory 8g , executor-memory 5g , executor-cores 4 , num-executors 24
(2)yarn 執行狀態 : 
	Containers Running : 12 個 , Memory Used : 127.99 GB , VCores Used : 48 core
  spark 執行狀態 : 
   executors : 11 個 executor + 1 個 driver
   Task : 8 個
   executor-cores : 4 Cores
(3)執行時間 : 
	17:42:14 ~ 18:14 , 32 分鐘
(4)redis 資料 : 20400541 筆左右

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
  yarn cluster mode , driver-memory 12g , executor-memory 5g , executor-cores 2 , num-executors 24
(2)yarn 執行狀態 : 
  Containers Running : 12 個 , Memory Used : 133.33 GB , VCores Used : 48 core
  spark 執行狀態 : 
   executors : 11 個 executor + 1 個 driver
   Task : 8 個
   executor-cores : 2 Cores
(3)執行時間 : 
  18:19:23  ~ 18:49 , 30 分鐘
(4)redis 資料 : 20400541 筆左右

不管怎麼調速度都差不多,而且 spark 的 task 最多都只切到 8 個 :

hadoop_day2_1.jpg

於是想到會不會跟 HDFS 的 Block 數有關,因為程式有寫到 sc.textFile 而主程式架構大致上為 :

sc.textFile(inDir).map { str =>
    ...
}.filter(
    ...
).reduceByKey(
    ...
).map { case (id, person) =>
    ...
}.reduceByKey(
    ...
).foreach { case (id, person) =>
    ...
}

查看目前的 blocksize

> hdfs getconf -confKey dfs.blocksize
134217728

修改 hdfs block size 將 128MB 改成 64M,修改 hdfs-site.xml :

<property>
      <name>dfs.blocksize</name>
      <value>134217728</value>
</property>

改成

<property>
<name>dfs.blocksize</name>
<value>67108864</value>
</property>

改完之後重啟 Hadoop cluster,記得 HDFS 的檔案要重新放,不然舊 HDFS 檔案的 blocksize 還會是舊的.

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
  yarn cluster mode , driver-memory 9g , driver-cores 12 , executor-memory 14g(15g會超過) , executor-cores 10 , num-executors 5
(2)yarn 執行狀態 : 
  Containers Running : 6 個 , Memory Used : 90.66  GB , VCores Used : 72 core
  spark 執行狀態 : 
   executors : 5 個 executor + 1 個 driver
   Task : 15 個
   executor-cores : 10 Cores
   driver : 5G
(3)執行時間 : 
  12:36:38 ~ 12:55:41 , 20 分鐘

這時候 Task 就變 15 個了,而且時間又快了 10 分鐘,公式如下

HDFS File Size / HDFS Block Size

所以原本的 8 個 task 是

1024M / 128M = 8

修改後變

1024M / 64M = 16 (實際只有15)

所以看起來增加 task 數,可以加快執行速度。

hadoop_day2_2.jpg

但由於要讀取 HDFS 檔案的關係 task 數受限於 HDFS 的 Block size.那想再增加 task 數該怎麼辦. 這時候可以利用 repartition 或 coalesce 這兩個 API 改變 partition 的數量. 目前理解是要增加 partition 數使用 repartition,要減少 partition 數要使用 coalesce. 減少 partition 時使用 coalesce 的話好像不會進行 shuffle,其他的動作都會產生 shuffle 增加額外的負擔.

先切 100 個 partition 測試,程式改成這樣 :

sc.textFile(inDir).map { str =>
    ...
}.filter(
    ...
).reduceByKey(
    ...
).repartition(100)
.map { case (id, person) =>
    ...
}.reduceByKey(
    ...
).foreach { case (id, person) =>
    ...
}

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
  yarn cluster mode , driver-memory 9g , driver-cores 12 , executor-memory 14g(15g會超過) , executor-cores 10 , num-executors 5
(2)yarn 執行狀態 : 
  Containers Running : 6 個 , Memory Used : 90.66  GB , VCores Used : 72 core
  spark 執行狀態 : 
   executors : 5 個 executor + 1 個 driver
   Task : 15 個
   executor-cores : 10 Cores
   driver : 5G
(3)執行時間 : 
  16:37:50 ~ 16:46:46  , 9 分鐘

速度又提升不少只花了 9 分鐘.

hadoop_day2_3.jpg

那調成 500 看看其他不變,repartition(500)

(3)執行時間 : 
  17:09:03 ~ 17:17:47 , 8 分鐘

hadoop_day2_4.jpg

看來 partition 切到一個大小速度就上不去了.在執行的過程中其實 RUNNING 的 task 只有約 50 個在執行. 有可能跟 Containers 的資源大小有關,可以在測測.

接下來把 foreach 改成 foreachPartition 試試看 :

sc.textFile(inDir).map { str =>
    ...
}.filter(
    ...
).reduceByKey(
    ...
).repartition(500)
.map { case (id, person) =>
    ...
}.reduceByKey(
    ...
).foreachPartition { case (id, person) =>
    ...
}

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
  yarn cluster mode , driver-memory 9g , driver-cores 12 , executor-memory 14g(15g會超過) , executor-cores 10 , num-executors 5
(2)yarn 執行狀態 : 
  Containers Running : 6 個 , Memory Used : 90.66  GB , VCores Used : 72 core
  spark 執行狀態 : 
   executors : 5 個 executor + 1 個 driver
   Task : 500 個(實際同時 running 的只有 50 個左右)
   executor-cores : 10 Cores
   driver : 5G
(3)執行時間 : 
  18:03:52 ~ 18:10 , 7 分鐘

hadoop_day2_5.jpg

在這邊的案例使用 foreachPartition 筆 foreach 快了 2 分鐘左右.

hadoop_day2_6.jpg

hadoop_day2_7.jpg

不管怎麼測,實際 RUNNING 的 task 上限大概是 50 左右,可能要再加大 excutoer 的 core 跟 memory 才可以再提升 RUNNING 的 task 數.

繼續提升機器規格做測試,一台機器的規格

32 Core , 120G

Hadoop Cluster 規格,每一台保留 6 Core 及 20G 的 Memory 給系統使用 :

Memory Total 100 GB
VCores Total 26

重新定義 yarn 的參數 yarn-site.xml :

yarn.scheduler.maximum-allocation-mb : 307200/3 = 102400 = 100G
yarn.scheduler.minimum-allocation-mb : 307200/12 = 25600 = 25 G
yarn.scheduler.maximum-allocation-vcores : 26/3 = 8 Core
yarn.scheduler.minimum-allocation-vcores : 26/13 = 2 Core
yarn.nodemanager.resource.memory-mb : 1024 * 100 = 102400
yarn.nodemanager.resource.cpu-vcores : 26 Core

執行 “update.sh” 測試 :

(1)spark-submit 參數 : 
	yarn cluster mode , driver-memory 20g , driver-cores 6 , executor-memory 8g , executor-cores 6 , num-executors 11
(2)yarn 執行狀態 : 
	Containers Running : 12 個 , Memory Used : 300 GB , VCores Used : 72 core
  spark 執行狀態 : 
   executors : 11 個 executor + 1 個 driver
   executor-cores : 6 Cores
(3)執行時間 : 
	11:04:05 ~ 11:13:26 , 9 分鐘
(4)redis 資料 : 20399991 筆左右

hadoop_day2_8.jpg

資源加大後提升到最多可以有 66 個 Task 同時在 RUNNING.

調整 GC

在 spark-defaults.conf 加上參數

spark.executor.extraJavaOptions -XX:+UseG1GC

沒有使用 G1GC 的時間 :
hadoop_day2_9.jpg 調整後的時間 :
hadoop_day2_10.jpg

KryoSerializer

改成使用 KryoSerializer,並將自定義的 class registerKryoClasses :

val conf = new SparkConf()
conf.set("spark.sql.warehouse.dif", "/tmp")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(
  Array(classOf[Person], classOf[Location], classOf[LatLon], classOf[Visit], classOf[Url], classOf[Url])
)

hadoop_day2_12.jpg

調高 JedisCluster 的 Connection 上限(預設是 8)

val jedisClusterNodes = new HashSet[HostAndPort]()
config.split(',').foreach { str =>
    val Array(ip, _port) = str.split(':')
    jedisClusterNodes.add(new HostAndPort(ip, _port.toInt))
}

val cpcfg =  new GenericObjectPoolConfig()
cpcfg.setMaxTotal(10000)
cpcfg.setMaxIdle(10000)

val jc = new JedisCluster(jedisClusterNodes ,5000,5000,20,
    AesUtil.decrypt(keyDir.split(",")(1) ,keyDir.split(",")(0)) , cpcfg)

spark.memory.fraction

hadoop_day2_11.jpg

所以調整 performance 的重點整理

1. 程式寫法及 API 的使用
2. task 數量的調整
3. core 及 memory 的分配

一些心得 :

1. executer 的 core 數影響到可以跑的 task 數量,1 個 core 大概開 1 個 task 左右,task 數量介於 (core 數 * 1) 
2. yarn 在算 memory 時會影響到實際能開多少個 container,所以雖然 num-executors 可能開 5 個,但資源沒開好,可能最後只會有 4 個 container 在跑
3. (executer 數量少資源多) 開的 task 比 (executer 數量多資源少) 還多.所以實際 Runnig 的 task 越多效能越好.
4. 在 spark-defaults.conf 加上 spark.executor.extraJavaOptions -XX:+UseG1GC 指定使用 G1GC 可以縮短許多 GC 的時間.
5. 使用 yarn external shuffle service 提昇性能.

參考資料
performance skill
update HDFS block size
map & mapPartitions
yarn UI params
manager-cpu-yarn-cluster
external shuffle service

  • 其他參考

一個 spark application 所使用的資源為:

cores = spark.driver.cores + spark.executor.cores * spark.executor.instances
memory = spark.driver.memory + spark.yarn.driver.memoryOverhead + (spark.executor.memory + spark.yarn.executor.memoryOverhead) * spark.executor.instances