import org.scalatest.FunSuite
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class MySqlConnectTest extends FunSuite {
test("TestConnection") {
val url = "jdbc:mysql://192.168.61.107:3306/testdb"
val driver = "com.mysql.jdbc.Driver"
val username = "daniel"
val password = "MyNewPass1!"
var connection:Connection = null
try {
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement
val rs = statement.executeQuery("SELECT id, name, tel FROM customers")
while (rs.next) {
val id = rs.getString("id")
val name = rs.getString("name")
val tel = rs.getString("tel")
println("id = %s, name = %s, tel = %s".format(id,name,tel))
}
} catch {
case e: Exception => e.printStackTrace
}
connection.close
}
test("spark-shell-query") {
val csvPath ="/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
val spark = SparkSession
.builder
.appName("Spark-shell-jdbc")
.master("local[2]")
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://192.168.61.107:3306/testdb")
.option("dbtable", "customers")
.option("user", "daniel")
.option("password", "MyNewPass1!")
.load()
jdbcDF.show
}
test("spark-shell-insert") {
val csvPath ="/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
val spark = SparkSession
.builder
.appName("Spark-shell-jdbc")
.master("local[2]")
.getOrCreate()
val personRDD = spark.sparkContext.parallelize(Array("1 tom 111", "3 jerry 222", "4 kitty 333")).map(_.split(" "))
val schema = StructType(
List(
StructField("id", StringType, true),
StructField("name", StringType, true),
StructField("tel", StringType, true)
)
)
val rowRDD = personRDD.map(p => Row(p(0).trim, p(1).trim, p(2).trim))
val personDataFrame = spark.sqlContext.createDataFrame(rowRDD, schema)
val prop = new Properties()
prop.put("user", "daniel")
prop.put("password", "MyNewPass1!")
personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.61.107:3306/testdb", "customers", prop)
}
test("test-udf") {
val spark = SparkSession
.builder
.appName("Spark-shell-jdbc")
.master("local[2]")
.getOrCreate()
spark.sqlContext.udf.register("strLen", (s: String) => s.length())
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://192.168.61.107:3306/testdb")
.option("dbtable", "customers")
.option("user", "daniel")
.option("password", "MyNewPass1!")
.load()
jdbcDF.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT id , strLen(name) nameLen FROM people where id='3'")
sqlDF.show()
}
}