使用 spark-sql 對讀取 xlsx 的 dataframe 做操作

build.sbt

name := "streaming-test"
version := "0.1"
scalaVersion := "2.11.0"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7"


libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.16"
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.9.8"

ReadXlsxTest.scala

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.FunSuite

class ReadXlsxTest extends FunSuite {

  test("read xlsx") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"
    val xlsxSchema = StructType(
      List(
        StructField("isBigSystem", StringType, true),
        StructField("lineId", StringType, true),
        StructField("spinningId", StringType, true),
        StructField("spindleId", StringType, true),
        StructField("tagName", StringType, true),
        StructField("common", StringType, true)
      )
    )
    val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
      .option("useHeader", "true")
      .option("sheetName", "BB")
      .option("path", xlsxPath)
      .option("treatEmptyValuesAsNulls", "false")
      .option("inferSchema", "false")
      .option("addColorColumns", "true")
      .schema(xlsxSchema)
      .load()

    df.filter((_.getAs[String]("lineId") != null))
      .filter(_.getAs[String]("spinningId") != null)
      .foreach(row => {
        val lineId = row.getAs[String]("lineId")
        val spinningId = row.getAs[String]("spinningId")
        val fileName = lineId + "_" + spinningId
        println(fileName)
      })
  }


  test("spark sql use table") {

    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"

    val xlsxSchema = StructType(
      List(
        StructField("isBigSystem", StringType, true),
        StructField("lineId", StringType, true),
        StructField("spinningId", StringType, true),
        StructField("spindleId", StringType, true),
        StructField("tagName", StringType, true),
        StructField("common", StringType, true)
      )
    )

    val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
      .option("useHeader", "true")
      .option("sheetName", "BB")
      .option("path", xlsxPath)
      .option("treatEmptyValuesAsNulls", "false")
      .option("inferSchema", "false")
      .option("addColorColumns", "true")
      .schema(xlsxSchema)
      .load()
    import spark.implicits._
    df.filter($"tagName" === "L33-A-G301-RPM").show()
    //df.show()
  }

  test("select spark sql") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"

    val xlsxSchema = StructType(
      List(
        StructField("isBigSystem", StringType, true),
        StructField("lineId", StringType, true),
        StructField("spinningId", StringType, true),
        StructField("spindleId", StringType, true),
        StructField("tagName", StringType, true),
        StructField("common", StringType, true)
      )
    )

    val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
      .option("useHeader", "true")
      .option("sheetName", "BB")
      .option("path", xlsxPath)
      .option("treatEmptyValuesAsNulls", "false")
      .option("inferSchema", "false")
      .option("addColorColumns", "true")
      .schema(xlsxSchema)
      .load()
    import spark.implicits._
    df.createGlobalTempView("machine_config")

    df.show()
    //spark.sql("SELECT * FROM global_temp.machine_config").show()
    //spark.sql("select lineId from global_temp.machine_config").show()
  }

  test("spark_select") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val xlsxPath = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/properties/mechinconfig_V2R1.xlsx"

    val xlsxSchema = StructType(
      List(
        StructField("isBigSystem", StringType, true),
        StructField("lineId", StringType, true),
        StructField("spinningId", StringType, true),
        StructField("spindleId", StringType, true),
        StructField("tagName", StringType, true),
        StructField("common", StringType, true)
      )
    )

    val df = spark.sqlContext.read.format("com.crealytics.spark.excel")
      .option("useHeader", "true")
      .option("sheetName", "BB")
      .option("path", xlsxPath)
      .option("treatEmptyValuesAsNulls", "false")
      .option("inferSchema", "false")
      .option("addColorColumns", "true")
      .schema(xlsxSchema)
      .load()
    import spark.implicits._
    df.createTempView("machine_config")
    //df.createGlobalTempView("machine_config")
    spark.sql("select lineId from machine_config").show()
  }
}

參考資料 :
spark-sql