Scala day 35 (spark-sql)
使用 spark-sql 對讀取 xlsx 的 dataframe 做操作
build.sbt
name := "streaming-test"
version := "0.1"
scalaVersion := "2.11.0"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.16"
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.9.8"
ReadXlsxTest.scala
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.FunSuite
class ReadXlsxTest extends FunSuite {
test("read xlsx") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"
val xlsxSchema = StructType(
List(
StructField("isBigSystem", StringType, true),
StructField("lineId", StringType, true),
StructField("spinningId", StringType, true),
StructField("spindleId", StringType, true),
StructField("tagName", StringType, true),
StructField("common", StringType, true)
)
)
val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.option("sheetName", "BB")
.option("path", xlsxPath)
.option("treatEmptyValuesAsNulls", "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.schema(xlsxSchema)
.load()
df.filter((_.getAs[String]("lineId") != null))
.filter(_.getAs[String]("spinningId") != null)
.foreach(row => {
val lineId = row.getAs[String]("lineId")
val spinningId = row.getAs[String]("spinningId")
val fileName = lineId + "_" + spinningId
println(fileName)
})
}
test("spark sql use table") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"
val xlsxSchema = StructType(
List(
StructField("isBigSystem", StringType, true),
StructField("lineId", StringType, true),
StructField("spinningId", StringType, true),
StructField("spindleId", StringType, true),
StructField("tagName", StringType, true),
StructField("common", StringType, true)
)
)
val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.option("sheetName", "BB")
.option("path", xlsxPath)
.option("treatEmptyValuesAsNulls", "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.schema(xlsxSchema)
.load()
import spark.implicits._
df.filter($"tagName" === "L33-A-G301-RPM").show()
//df.show()
}
test("select spark sql") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"
val xlsxSchema = StructType(
List(
StructField("isBigSystem", StringType, true),
StructField("lineId", StringType, true),
StructField("spinningId", StringType, true),
StructField("spindleId", StringType, true),
StructField("tagName", StringType, true),
StructField("common", StringType, true)
)
)
val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.option("sheetName", "BB")
.option("path", xlsxPath)
.option("treatEmptyValuesAsNulls", "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.schema(xlsxSchema)
.load()
import spark.implicits._
df.createGlobalTempView("machine_config")
df.show()
//spark.sql("SELECT * FROM global_temp.machine_config").show()
//spark.sql("select lineId from global_temp.machine_config").show()
}
test("spark_select") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"
val xlsxSchema = StructType(
List(
StructField("isBigSystem", StringType, true),
StructField("lineId", StringType, true),
StructField("spinningId", StringType, true),
StructField("spindleId", StringType, true),
StructField("tagName", StringType, true),
StructField("common", StringType, true)
)
)
val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.option("sheetName", "BB")
.option("path", xlsxPath)
.option("treatEmptyValuesAsNulls", "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.schema(xlsxSchema)
.load()
import spark.implicits._
df.createTempView("machine_config")
//df.createGlobalTempView("machine_config")
spark.sql("select lineId from machine_config").show()
}
}
參考資料 :
spark-sql