Scala day 32 (spark with HDFS)
使用 SparkSession 的 read
- 可讀取目錄底下所有的 csv 資料,加上 option(“header”,”true”) 可濾掉 header,但缺點是無法知道哪些知道是在哪個 file 裡
spark.read.option("header","true").format("com.databricks.spark.cvs").csv(csvDir).foreach(println(_))
使用 Sparkcontext 的 wholeTextFiles
- 可取得 RDD[(String, String)] 的型態,第一個 String 放檔案絕對路徑,第二個 String 放檔案的資料
spark.sparkContext.wholeTextFiles(csvDir).foreach(pairdata => println(pairdata._1 + " **** " + pairdata._2))
使用 filter、map、drop 及 fold 等操作
- filter 選擇檔案開頭為 C 的,透過 map 將第一個值轉成檔名,第二個值由於是 csv 資料先透過 split(“\n”)轉成 array,第一行是標題,所以 drop 掉,再透過 fold 將 (dataLine + tempElement + “\n”) 一直壘加到 dataLine(初始值為””).
val cdata = spark.sparkContext.wholeTextFiles(csvDir) .filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C") .map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") { (dataLine , tempElement) => dataLine + tempElement + "\n" }))
透過 DataFrame 的 write 將資料寫入 HDFS
- 由於 cdata 已經是屬於 RDD,如果沒有先 collect(),在透過 sc.parallelize 轉 DataFrame 會出錯.
這種方式寫檔會再以檔名為目錄下,產生很多小檔.而且資料大時先 collect 會有風險.
import spark.implicits._ cdata.collect().foreach(csvFile => { val csvDataDF = sc.parallelize(csvFile._2.stripLineEnd.split("\n").toSeq).toDF() csvDataDF.write.mode(SaveMode.Append).csv("hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1) })
使用 HDFS 的 api 將資料寫入 HDFS
- 對資料一直 append 寫檔,會根據檔名為一個檔案,對各檔案一直 append 資料.這邊要注意副本數的設定,我測試的環境複本數為 1,所以這邊不設定的話會出現 Under-Replicated Blocks 的問題.
cdata.collect().foreach(csvFile => {
val mergeHDFSFilePath = "hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1
val conf = new Configuration()
conf.setBoolean("dfs.support.append" , true)
conf.setInt("dfs.replication", 1) //會根據環境的副本數決定
//conf.addResource(new Path("/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/config/hdfs-site.xml"))
val fs = FileSystem.get(URI.create(mergeHDFSFilePath) , conf)
val hdfsPath = new org.apache.hadoop.fs.Path(mergeHDFSFilePath)
val out = fs.exists(hdfsPath) match {
case true => new BufferedWriter(new OutputStreamWriter(fs.append(hdfsPath)))
case false => new BufferedWriter(new OutputStreamWriter(fs.create(hdfsPath)))
}
out.write(csvFile._2)
out.flush()
})
DataBeanTest.scala
import java.io.{BufferedWriter, OutputStreamWriter}
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.StructType
import org.scalatest.FunSuite
class DataBeanTest extends FunSuite {
test("read Data") {
val userSchema = new StructType()
.add("TagName", "string")
.add("TimeStamp", "string")
.add("Min", "integer")
.add("Max", "integer")
.add("Avg", "double")
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
/*
val csvDF = spark.read
.option("header","true")
.schema(userSchema)
.csv("/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile")
*/
//df.coalesce(1).write.format("com.databricks.spark.cvs").save("...path...")
spark.read.option("header","true").format("com.databricks.spark.cvs").csv(csvDir).foreach(println(_))
}
test("use wholeTextFiles") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
//(filePath , fileData)
spark.sparkContext.wholeTextFiles(csvDir).foreach(pairdata => println(pairdata._1 + " **** " + pairdata._2))
}
test("read wholeTextFiles get fileName") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
spark.sparkContext.wholeTextFiles(csvDir).map(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0)).foreach(println(_))
}
test("read wholeTextFiles filter fileName") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
spark.sparkContext.wholeTextFiles(csvDir).filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C").foreach(println(_))
}
test("remove csv string header") {
val data = "TagName,TimeStamp,Min,Max,Avg\nL41-F200A-HH,2018-05-09 10:18:25,0,0,0\nL41-F200A-HK,2018-05-10 11:18:25,0,0,0"
val line = data.split("\n").drop(1).fold("") {
(dataLine , tempElement) => dataLine + tempElement + "\n"
}
println(line)
}
test("onlyGetCaculateData") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[2]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
val cdata = spark.sparkContext.wholeTextFiles(csvDir)
.filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
.map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
(dataLine , tempElement) => dataLine + tempElement + "\n"
}))
cdata.foreach(println(_))
}
test("getCdataToHdfs_1") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[4]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
val sc = spark.sparkContext
val cdata:RDD[Tuple2[String,String]] = sc.wholeTextFiles(csvDir)
.filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
.map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
(dataLine , tempElement) => dataLine + tempElement + "\n"
}))
import spark.implicits._
cdata.collect().foreach(csvFile => {
val csvDataDF = sc.parallelize(csvFile._2.stripLineEnd.split("\n").toSeq).toDF()
csvDataDF.write.mode(SaveMode.Append).csv("hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1)
})
}
test("getCdataToHdfs_2") {
val spark = SparkSession
.builder
.appName("Spark-csv")
.master("local[4]")
.getOrCreate()
val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
val sc = spark.sparkContext
val cdata:RDD[Tuple2[String,String]] = sc.wholeTextFiles(csvDir)
.filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
.map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
(dataLine , tempElement) => dataLine + tempElement + "\n"
}))
cdata.collect().foreach(csvFile => {
val mergeHDFSFilePath = "hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1
val conf = new Configuration()
conf.setBoolean("dfs.support.append" , true)
conf.setInt("dfs.replication", 1) //會根據環境的副本數決定
//conf.addResource(new Path("/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/config/hdfs-site.xml"))
val fs = FileSystem.get(URI.create(mergeHDFSFilePath) , conf)
val hdfsPath = new org.apache.hadoop.fs.Path(mergeHDFSFilePath)
val out = fs.exists(hdfsPath) match {
case true => new BufferedWriter(new OutputStreamWriter(fs.append(hdfsPath)))
case false => new BufferedWriter(new OutputStreamWriter(fs.create(hdfsPath)))
}
out.write(csvFile._2)
out.flush()
})
}
}
- 原本使用 hdfs api 操作如果 receiveRdd 不下 collect,會出現 task not serialize. 換個方式 receiveRdd 使用 foreachPartition 之後就可以了.
package com.fareast.scala.streaming.job
import java.io.{BufferedWriter, OutputStreamWriter}
import java.net.URI
import java.util.Properties
import com.fareast.scala.bean.UserBean
import com.fareast.scala.receivers.FtpFileReadReceiver
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FTPReadStreamingJob {
def main(args: Array[String]): Unit = {
val reader = scala.io.Source.fromURL(getClass.getResource("/config.properties")).bufferedReader()
val prop = new Properties()
prop.load(reader)
val ftpUsername = prop.getProperty("ftp.username")
val ftpPassword = prop.getProperty("ftp.password")
val ftpHost = prop.getProperty("ftp.host")
val ftpInpath = prop.getProperty("ftp.inPath")
val ftpOutpath = prop.getProperty("ftp.outPath")
val hdfsOutpath = prop.getProperty("hdfs.outPath")
val xlsxConfigPath = prop.getProperty("xlsx.configPath")
val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val ftpUser = UserBean(ftpUsername,ftpPassword)
val receiveRdds = ssc.receiverStream(new FtpFileReadReceiver(ftpUser,ftpHost,ftpInpath,ftpOutpath))
receiveRdds.foreachRDD(receiveRdd => receiveRdd.groupByKey().foreachPartition(receiveFileInfo => {
receiveFileInfo.foreach(receiveFile => {
val hdfsConf = new Configuration()
hdfsConf.setBoolean("dfs.support.append" , true)
hdfsConf.setInt("dfs.replication", 1)
val fileName = receiveFile._1
if(fileName.split("_")(0) == "C") {
val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/caculatedata/" + fileName
val out = getOutput(mergeHDFSFilePath)
var isHead = true
val fileDatas = receiveFile._2
fileDatas.foreach(
fileData => {
if(!isHead) {
out.write(fileData)
out.newLine()
out.flush()
}
isHead = false
}
)
out.close()
//fs.close()
} else if(fileName.split("_")(0) == "R") {
val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/rowdata/" + fileName
val out = getOutput(mergeHDFSFilePath)
val fileDatas = receiveFile._2
var isHead = true
fileDatas.foreach(
fileData => {
if(!isHead) {
out.write(fileData)
out.newLine()
out.flush()
}
isHead = false
}
)
out.close()
//fs.close()
}
println("========== fileName : " + fileName + " END ==========")
})
}))
/*
receiveRdds.foreachRDD(receiveRdd => receiveRdd.groupByKey.collect.foreach(receiveFile => {
val hdfsConf = new Configuration()
hdfsConf.setBoolean("dfs.support.append" , true)
hdfsConf.setInt("dfs.replication", 1)
val fileName = receiveFile._1
if(fileName.split("_")(0) == "C") {
val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/caculatedata/" + fileName
val out = getOutput(mergeHDFSFilePath)
var isHead = true
val fileDatas = receiveFile._2
fileDatas.foreach(
fileData => {
if(!isHead) {
out.write(fileData)
out.newLine()
out.flush()
}
isHead = false
}
)
out.close()
//fs.close()
} else if(fileName.split("_")(0) == "R") {
val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/rowdata/" + fileName
val out = getOutput(mergeHDFSFilePath)
val fileDatas = receiveFile._2
var isHead = true
fileDatas.foreach(
fileData => {
if(!isHead) {
out.write(fileData)
out.newLine()
out.flush()
}
isHead = false
}
)
out.close()
//fs.close()
}
println("========== fileName : " + fileName + " END ==========")
}))
*/
/*
receiveRdds.foreachRDD(receiveRdd => {
if(receiveRdd != null) {
if(!receiveRdd.isEmpty()) {
val fileName = receiveRdd.first()._1
val fileData = receiveRdd.map(data => data._2)
//println("#### fileName ---> " + fileName)
//println("#### fileData ---> " + fileData)
if(fileName.split("_")(0) == "C") {
fileData.saveAsTextFile("hdfs://" + hdfsPath + "/caculatedata/" + fileName)
} else if(fileName.split("_")(0) == "R") {
fileData.saveAsTextFile("hdfs://" + hdfsPath + "/rowdata/" + fileName)
}
}
}
})
*/
ssc.start()
ssc.awaitTermination()
}
def getOutput(mergeHDFSFilePath:String): BufferedWriter = {
val hdfsConf = new Configuration()
hdfsConf.setBoolean("dfs.support.append" , true)
hdfsConf.setInt("dfs.replication", 1)
val fs = FileSystem.get(URI.create(mergeHDFSFilePath) , hdfsConf)
val mergehdfsPath = new org.apache.hadoop.fs.Path(mergeHDFSFilePath)
val out = fs.exists(mergehdfsPath) match {
case true => new BufferedWriter(new OutputStreamWriter(fs.append(mergehdfsPath)))
case false => new BufferedWriter(new OutputStreamWriter(fs.create(mergehdfsPath)))
}
out
}
}