使用 spark-sql 對 mysql 操作

build.sbt

name := "streaming-test"
version := "0.1"
scalaVersion := "2.11.0"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7"
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.11"
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.9.8"

MySqlConnectTest.scala

import org.scalatest.FunSuite
import java.sql.{Connection, DriverManager}
import java.util.Properties

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class MySqlConnectTest extends FunSuite {

  test("TestConnection") {
    val url = "jdbc:mysql://192.168.61.107:3306/testdb"
    val driver = "com.mysql.jdbc.Driver"
    val username = "daniel"
    val password = "MyNewPass1!"
    var connection:Connection = null
    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)
      val statement = connection.createStatement
      val rs = statement.executeQuery("SELECT id, name, tel FROM customers")
      while (rs.next) {
        val id = rs.getString("id")
        val name = rs.getString("name")
        val tel = rs.getString("tel")
        println("id = %s, name = %s, tel = %s".format(id,name,tel))
      }
    } catch {
      case e: Exception => e.printStackTrace
    }
    connection.close
  }

  test("spark-shell-query") {
    val csvPath ="/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
    val spark = SparkSession
      .builder
      .appName("Spark-shell-jdbc")
      .master("local[2]")
      .getOrCreate()

    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.61.107:3306/testdb")
      .option("dbtable", "customers")
      .option("user", "daniel")
      .option("password", "MyNewPass1!")
      .load()

    jdbcDF.show
  }

  test("spark-shell-insert") {
    val csvPath ="/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
    val spark = SparkSession
      .builder
      .appName("Spark-shell-jdbc")
      .master("local[2]")
      .getOrCreate()

    val personRDD = spark.sparkContext.parallelize(Array("1 tom 111", "3 jerry 222", "4 kitty 333")).map(_.split(" "))
    val schema = StructType(
      List(
        StructField("id", StringType, true),
        StructField("name", StringType, true),
        StructField("tel", StringType, true)
      )
    )
    
    val rowRDD = personRDD.map(p => Row(p(0).trim, p(1).trim, p(2).trim))
    val personDataFrame = spark.sqlContext.createDataFrame(rowRDD, schema)
    val prop = new Properties()
    prop.put("user", "daniel")
    prop.put("password", "MyNewPass1!")
    personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.61.107:3306/testdb", "customers", prop)
  }

  test("test-udf") {
    val spark = SparkSession
      .builder
      .appName("Spark-shell-jdbc")
      .master("local[2]")
      .getOrCreate()

    spark.sqlContext.udf.register("strLen", (s: String) => s.length())

    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.61.107:3306/testdb")
      .option("dbtable", "customers")
      .option("user", "daniel")
      .option("password", "MyNewPass1!")
      .load()

    jdbcDF.createOrReplaceTempView("people")

    val sqlDF = spark.sql("SELECT id , strLen(name) nameLen FROM people where id='3'")
    sqlDF.show()
  }
}