使用 SparkSession 的 read

  • 可讀取目錄底下所有的 csv 資料,加上 option(“header”,”true”) 可濾掉 header,但缺點是無法知道哪些知道是在哪個 file 裡
    spark.read.option("header","true").format("com.databricks.spark.cvs").csv(csvDir).foreach(println(_))
    

使用 Sparkcontext 的 wholeTextFiles

  • 可取得 RDD[(String, String)] 的型態,第一個 String 放檔案絕對路徑,第二個 String 放檔案的資料
    spark.sparkContext.wholeTextFiles(csvDir).foreach(pairdata => println(pairdata._1 + " **** " + pairdata._2))
    

使用 filter、map、drop 及 fold 等操作

  • filter 選擇檔案開頭為 C 的,透過 map 將第一個值轉成檔名,第二個值由於是 csv 資料先透過 split(“\n”)轉成 array,第一行是標題,所以 drop 掉,再透過 fold 將 (dataLine + tempElement + “\n”) 一直壘加到 dataLine(初始值為””).
    val cdata = spark.sparkContext.wholeTextFiles(csvDir)
    .filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
    .map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
      (dataLine , tempElement) => dataLine + tempElement + "\n"
    }))
    

透過 DataFrame 的 write 將資料寫入 HDFS

  • 由於 cdata 已經是屬於 RDD,如果沒有先 collect(),在透過 sc.parallelize 轉 DataFrame 會出錯. 這種方式寫檔會再以檔名為目錄下,產生很多小檔.而且資料大時先 collect 會有風險.
    import spark.implicits._
    cdata.collect().foreach(csvFile => {
    val csvDataDF = sc.parallelize(csvFile._2.stripLineEnd.split("\n").toSeq).toDF()
    csvDataDF.write.mode(SaveMode.Append).csv("hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1)
    })
    

使用 HDFS 的 api 將資料寫入 HDFS

  • 對資料一直 append 寫檔,會根據檔名為一個檔案,對各檔案一直 append 資料.這邊要注意副本數的設定,我測試的環境複本數為 1,所以這邊不設定的話會出現 Under-Replicated Blocks 的問題.
cdata.collect().foreach(csvFile => {
  val mergeHDFSFilePath = "hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1
  val conf = new Configuration()
  conf.setBoolean("dfs.support.append" , true)
  conf.setInt("dfs.replication", 1) //會根據環境的副本數決定
  //conf.addResource(new Path("/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/config/hdfs-site.xml"))
  val fs = FileSystem.get(URI.create(mergeHDFSFilePath) , conf)
  val hdfsPath = new org.apache.hadoop.fs.Path(mergeHDFSFilePath)
  val out = fs.exists(hdfsPath) match {
    case true => new BufferedWriter(new OutputStreamWriter(fs.append(hdfsPath)))
    case false => new BufferedWriter(new OutputStreamWriter(fs.create(hdfsPath)))
  }
  out.write(csvFile._2)
  out.flush()
})

DataBeanTest.scala

import java.io.{BufferedWriter, OutputStreamWriter}
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.StructType
import org.scalatest.FunSuite

class DataBeanTest extends FunSuite {

  test("read Data") {
    val userSchema = new StructType()
      .add("TagName", "string")
      .add("TimeStamp", "string")
      .add("Min", "integer")
      .add("Max", "integer")
      .add("Avg", "double")

    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()

    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
    /*
    val csvDF = spark.read
      .option("header","true")
      .schema(userSchema)
      .csv("/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile")
    */
      //df.coalesce(1).write.format("com.databricks.spark.cvs").save("...path...")
    spark.read.option("header","true").format("com.databricks.spark.cvs").csv(csvDir).foreach(println(_))
  }

  test("use wholeTextFiles") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
    //(filePath , fileData)
    spark.sparkContext.wholeTextFiles(csvDir).foreach(pairdata => println(pairdata._1 + " **** " + pairdata._2))
  }

  test("read wholeTextFiles get fileName") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"

    spark.sparkContext.wholeTextFiles(csvDir).map(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0)).foreach(println(_))
  }

  test("read wholeTextFiles filter fileName") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"

    spark.sparkContext.wholeTextFiles(csvDir).filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C").foreach(println(_))
  }

  test("remove csv string header") {
    val data = "TagName,TimeStamp,Min,Max,Avg\nL41-F200A-HH,2018-05-09 10:18:25,0,0,0\nL41-F200A-HK,2018-05-10 11:18:25,0,0,0"
    val line = data.split("\n").drop(1).fold("") {
      (dataLine , tempElement) => dataLine + tempElement + "\n"
    }
    println(line)
  }

  test("onlyGetCaculateData") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[2]")
      .getOrCreate()
    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"

    val cdata = spark.sparkContext.wholeTextFiles(csvDir)
      .filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
      .map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
        (dataLine , tempElement) => dataLine + tempElement + "\n"
      }))
    cdata.foreach(println(_))
  }

  test("getCdataToHdfs_1") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[4]")
      .getOrCreate()
    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
    val sc = spark.sparkContext

    val cdata:RDD[Tuple2[String,String]] = sc.wholeTextFiles(csvDir)
      .filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
      .map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
        (dataLine , tempElement) => dataLine + tempElement + "\n"
      }))

    import spark.implicits._
    cdata.collect().foreach(csvFile => {
      val csvDataDF = sc.parallelize(csvFile._2.stripLineEnd.split("\n").toSeq).toDF()
      csvDataDF.write.mode(SaveMode.Append).csv("hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1)
    })
  }

  test("getCdataToHdfs_2") {
    val spark = SparkSession
      .builder
      .appName("Spark-csv")
      .master("local[4]")
      .getOrCreate()
    val csvDir = "/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/csvfile"
    val sc = spark.sparkContext
    val cdata:RDD[Tuple2[String,String]] = sc.wholeTextFiles(csvDir)
      .filter(x => x._1.substring(x._1.lastIndexOf("/") + 1).split("_")(0) == "C")
      .map(x => (x._1.substring(x._1.lastIndexOf("/") + 1).split("[.]")(0) , x._2.split("\n").drop(1).fold("") {
        (dataLine , tempElement) => dataLine + tempElement + "\n"
      }))

    cdata.collect().foreach(csvFile => {
      val mergeHDFSFilePath = "hdfs://192.168.61.105/tmp/csvfiles/" + csvFile._1
      val conf = new Configuration()
      conf.setBoolean("dfs.support.append" , true)
      conf.setInt("dfs.replication", 1) //會根據環境的副本數決定
      //conf.addResource(new Path("/Volumes/Transcend/1-program-workspace/2-intellij-workspace/streaming-test/config/hdfs-site.xml"))
      val fs = FileSystem.get(URI.create(mergeHDFSFilePath) , conf)
      val hdfsPath = new org.apache.hadoop.fs.Path(mergeHDFSFilePath)
      val out = fs.exists(hdfsPath) match {
        case true => new BufferedWriter(new OutputStreamWriter(fs.append(hdfsPath)))
        case false => new BufferedWriter(new OutputStreamWriter(fs.create(hdfsPath)))
      }
      out.write(csvFile._2)
      out.flush()
    })
  }
}

  • 原本使用 hdfs api 操作如果 receiveRdd 不下 collect,會出現 task not serialize. 換個方式 receiveRdd 使用 foreachPartition 之後就可以了.
package com.fareast.scala.streaming.job

import java.io.{BufferedWriter, OutputStreamWriter}
import java.net.URI
import java.util.Properties

import com.fareast.scala.bean.UserBean
import com.fareast.scala.receivers.FtpFileReadReceiver
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FTPReadStreamingJob {

  def main(args: Array[String]): Unit = {
    val reader = scala.io.Source.fromURL(getClass.getResource("/config.properties")).bufferedReader()
    val prop = new Properties()
    prop.load(reader)
    val ftpUsername = prop.getProperty("ftp.username")
    val ftpPassword = prop.getProperty("ftp.password")
    val ftpHost = prop.getProperty("ftp.host")
    val ftpInpath = prop.getProperty("ftp.inPath")
    val ftpOutpath = prop.getProperty("ftp.outPath")
    val hdfsOutpath = prop.getProperty("hdfs.outPath")
    val xlsxConfigPath = prop.getProperty("xlsx.configPath")


    val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    val ftpUser = UserBean(ftpUsername,ftpPassword)
    val receiveRdds = ssc.receiverStream(new FtpFileReadReceiver(ftpUser,ftpHost,ftpInpath,ftpOutpath))

    receiveRdds.foreachRDD(receiveRdd => receiveRdd.groupByKey().foreachPartition(receiveFileInfo => {
      receiveFileInfo.foreach(receiveFile => {
        val hdfsConf = new Configuration()
        hdfsConf.setBoolean("dfs.support.append" , true)
        hdfsConf.setInt("dfs.replication", 1)
        val fileName = receiveFile._1
        if(fileName.split("_")(0) == "C") {
          val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/caculatedata/" + fileName
          val out = getOutput(mergeHDFSFilePath)

          var isHead = true
          val fileDatas = receiveFile._2
          fileDatas.foreach(
            fileData => {
              if(!isHead) {
                out.write(fileData)
                out.newLine()
                out.flush()
              }
              isHead = false
            }
          )
          out.close()
          //fs.close()
        } else if(fileName.split("_")(0) == "R") {
          val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/rowdata/" + fileName
          val out = getOutput(mergeHDFSFilePath)
          val fileDatas = receiveFile._2
          var isHead = true
          fileDatas.foreach(
            fileData => {
              if(!isHead) {
                out.write(fileData)
                out.newLine()
                out.flush()
              }
              isHead = false
            }
          )
          out.close()
          //fs.close()
        }
        println("========== fileName : " + fileName + " END ==========")
      })
    }))

/*
    receiveRdds.foreachRDD(receiveRdd => receiveRdd.groupByKey.collect.foreach(receiveFile => {
      val hdfsConf = new Configuration()
      hdfsConf.setBoolean("dfs.support.append" , true)
      hdfsConf.setInt("dfs.replication", 1)

      val fileName = receiveFile._1
      if(fileName.split("_")(0) == "C") {
        val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/caculatedata/" + fileName
        val out = getOutput(mergeHDFSFilePath)

        var isHead = true
        val fileDatas = receiveFile._2
        fileDatas.foreach(
          fileData => {
            if(!isHead) {
              out.write(fileData)
              out.newLine()
              out.flush()
            }
            isHead = false
          }
        )
        out.close()
        //fs.close()
      } else if(fileName.split("_")(0) == "R") {
        val mergeHDFSFilePath = "hdfs://" + hdfsOutpath + "/rowdata/" + fileName
        val out = getOutput(mergeHDFSFilePath)
        val fileDatas = receiveFile._2
        var isHead = true
        fileDatas.foreach(
          fileData => {
            if(!isHead) {
              out.write(fileData)
              out.newLine()
              out.flush()
            }
            isHead = false
          }
        )
        out.close()
        //fs.close()
      }
      println("========== fileName : " + fileName + " END ==========")
    }))
*/

    /*
    receiveRdds.foreachRDD(receiveRdd => {
      if(receiveRdd != null) {
        if(!receiveRdd.isEmpty()) {
          val fileName = receiveRdd.first()._1
          val fileData = receiveRdd.map(data => data._2)
          //println("#### fileName ---> " + fileName)
          //println("#### fileData ---> " + fileData)

          if(fileName.split("_")(0) == "C") {
            fileData.saveAsTextFile("hdfs://" + hdfsPath + "/caculatedata/" + fileName)
          } else if(fileName.split("_")(0) == "R") {
            fileData.saveAsTextFile("hdfs://" + hdfsPath + "/rowdata/" + fileName)
          }
        }
      }
    })
    */
    ssc.start()
    ssc.awaitTermination()
  }

  def getOutput(mergeHDFSFilePath:String): BufferedWriter = {
    val hdfsConf = new Configuration()
    hdfsConf.setBoolean("dfs.support.append" , true)
    hdfsConf.setInt("dfs.replication", 1)
    val fs = FileSystem.get(URI.create(mergeHDFSFilePath) , hdfsConf)
    val mergehdfsPath = new org.apache.hadoop.fs.Path(mergeHDFSFilePath)
    val out = fs.exists(mergehdfsPath) match {
      case true => new BufferedWriter(new OutputStreamWriter(fs.append(mergehdfsPath)))
      case false => new BufferedWriter(new OutputStreamWriter(fs.create(mergehdfsPath)))
    }
    out
  }
}