目標

使用 intelliJ 開發一個 Hello World 程式

install scala plugin

spark-streaming_day2_1.jpg spark-streaming_day2_2.jpg spark-streaming_day2_3.jpg spark-streaming_day2_4.jpg

建立一個新的 project

spark-streaming_day2_5.jpg spark-streaming_day2_6.jpg spark-streaming_day2_7.jpg

寫一個測試程式 StreamingTest

spark-streaming_day2_8.jpg spark-streaming_day2_9.jpg spark-streaming_day2_10.jpg

使用 intelliJ build jar

spark-streaming_day2_11.jpg spark-streaming_day2_12.jpg spark-streaming_day2_13.jpg spark-streaming_day2_14.jpg spark-streaming_day2_15.jpg spark-streaming_day2_16.jpg spark-streaming_day2_17.jpg

啟動一個 netcat

spark-streaming_day2_18.jpg

先直接 run main function 測試看看

spark-streaming_day2_19.jpg

出現下列錯誤訊息

spark-streaming_day2_20.jpg

修改 sbt 檔,從 2.10 版變成 2.11

spark-streaming_day2_21.jpg

測試執行成功

spark-streaming_day2_22.jpg

接著 demo 讓 spark-streaming 從某個目錄抓取檔案內容處理

在本機建立一個目錄路徑為 /Users/daniel/test/streamFile

修改程式

將原來的 ssc.socketTextStream 改成 ssc.textFileStream

import org.apache.spark._
import org.apache.spark.streaming._

object StreamingTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    //val lines = ssc.socketTextStream("localhost", 9988)
    val lines = ssc.textFileStream("file:///Users/daniel/test/streamFile")
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

測試執行成功

spark-streaming_day2_23.jpg

使用 spark2-submit 測試

  • 流程圖 spark-streaming_day2_24.jpg

1.先把 compiler 好的 jar 丟到 master1 上面

scp streaming-test.jar root@192.168.61.105:/tmp/streaming-test

2.在使用 spark2-submit 把 spark job 送到 yarn 上執行

spark2-submit --class com.streaming.test.StreamingTest --master yarn /tmp/streaming-test/streaming-test.jar
  • 成功後會再 yarn 上看到執行的 job spark-streaming_day2_25.jpg

    3.再把檔案(testFile.txt)丟到該目錄底下(/tmp/streaming-test/testfiles)

  • 但 master1 及 slave 1 都要有該檔案及目錄,因為是分散式處理的關係 spark-streaming_day2_26.jpg spark-streaming_day2_27.jpg

    會在 master1 上看到執行結果

    spark-streaming_day2_28.jpg