Scala day 34 (spark-excel)
使用 spark-excel 讀取 xlsx
build.sbt
由於 spark-excel 有出現 jackson 版本不相容的問題,所以這邊用 dependencyOverrides 把新版覆蓋掉
name := "streaming-test"
version := "0.1"
scalaVersion := "2.11.0"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.16"
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.9.8"
ReadXlsxTest.scala
- StructField(“common”, StringType, true) 第三個參數如果設 false,沒值的時候會出 nullpointerexception 錯誤.
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.FunSuite
class ReadXlsxTest extends FunSuite {
test("read xlsx") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"
val xlsxSchema = StructType(
List(
StructField("isBigSystem", StringType, true),
StructField("lineId", StringType, true),
StructField("spinningId", StringType, true),
StructField("spindleId", StringType, true),
StructField("tagName", StringType, true),
StructField("common", StringType, true)
)
)
val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.option("sheetName", "BB")
.option("path", xlsxPath)
.option("treatEmptyValuesAsNulls", "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.schema(xlsxSchema)
.load()
df.filter((_.getAs[String]("lineId") != null))
.filter(_.getAs[String]("spinningId") != null)
.foreach(row => {
val lineId = row.getAs[String]("lineId")
val spinningId = row.getAs[String]("spinningId")
val fileName = lineId + "_" + spinningId
println(fileName)
})
}
}
- 執行結果
18/06/27 19:00:27 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 143776 bytes) 18/06/27 19:00:27 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 18/06/27 19:00:27 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 18/06/27 19:00:27 INFO CodeGenerator: Code generated in 25.279182 ms 18/06/27 19:00:27 INFO CodeGenerator: Code generated in 12.674182 ms L33_1 L37_8 L33_2 L37_8 L33_3 L37_9 L33_4 L37_9 L33_5 ...
參考資料 :
spark-excel