Spark-Streaming day 4 (spark streaming Custom Receiver)
目標
客制 spark-streaming 的 Receiver 到 FTP 讀檔案.
流程圖
- 從 FTP copy files 到 local 目錄
- 讀取目錄的檔案轉成 RDD 並對資料做一些處理(在字串加上_encrypt)
- 把檔案 copy 回 FTP
- 刪除原來 FTP 的檔案及 local 的檔案
客製一個 Receiver(CustomReceiver.scala)
package com.streaming.receiver
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import com.streaming.test.FtpFileTest
import com.streamingjava.test.FTPDirTest
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class CustomReceiver(identity:String , username:String , password:String , host:String ,sourcePath:String , targetPath:String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart(): Unit = {
FtpFileTest.copyDir(identity , username , password , host ,sourcePath , targetPath)
new Thread("Custom FTP Receiver") {
override def run() { receive() }
}.start()
}
override def onStop(): Unit = {
}
private def receive(){
var userInput: String = null
try {
var i = 0
val localFiles = getListOfFiles(new File(targetPath))
for(localFile <- localFiles) {
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(localFile), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
if(i > 0) {
userInput = userInput + "_encrypt"
store(userInput)
userInput = reader.readLine()
}
i = 1
}
reader.close()
FtpFileTest.copytoFTP(identity , username , password , host ,targetPath + "/" + localFile.getName , "/tmp/streaming-test/complete")
FTPDirTest.deleteFile(username , password , host , sourcePath + "/" + localFile.getName )
localFile.delete()
}
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" , e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
def getListOfFiles(dir: File):List[File] = dir.listFiles.filter(_.isFile).toList
}
CustomReceiverTest.scala
- 使用 StreamingContext 的 receiverStream 將客製的 CustomReceiver 放進去
- ssc.receiverStream(new CustomReceiver(identity,username,password,host,sourcePath,targetPath))
package com.streaming.test
import java.io.{BufferedWriter, File, FileWriter}
import com.streaming.receiver.CustomReceiver
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object CustomReceiverTest {
def main(args: Array[String]): Unit = {
val username = "daniel"
val password = "123456"
val host = "192.168.61.106"
val sftpPort = 22
val identity = null
val sourcePath = "/tmp/streaming-test/sourcefile"
val targetPath = "/Users/daniel/test/streamFile"
val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.receiverStream(new CustomReceiver(identity,username,password,host,sourcePath,targetPath))
lines.foreachRDD(myRdd => {
if(myRdd != null) {
val test = myRdd.map(x => x)
test.saveAsTextFile("/Users/daniel/test/streamFile_result")
} else {
}
})
ssc.start()
ssc.awaitTermination()
}
}
FTPDirTest.scala
由於 spark-sftp_2.11 沒有辦法針對整個目錄做 copy,而底層其實是用 java 的 jsch.有嘗試過改用 scala 實作但會遇到一個問題是 scala 要去引用 java 的內部類別 ChannelSftp.LsEntry 這個物件時,引用不到,所以只好還是用 java 寫好 api 給 scala 的方法呼叫 :
package com.streamingjava.test;
import com.jcraft.jsch.*;
import com.springml.sftp.client.ProgressMonitor;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
public class FTPDirTest {
public static void main(String[] args) {
String username = "daniel";
String password = "123456";
String pemFileLocation = "/Users/daniel/test/testFolder/";
String pemPassphrase = "";
String host = "192.168.61.106";
int sftpPort = 22;
String identity = null;
String target = "/tmp/streaming-test";
List<String> files = getDirFileList(username,password,host,target);
for(String file : files) {
System.out.println(file);
}
}
public static List<String> getDirFileList(String username, String password, String host, String targetDir) {
List<String> fileList = new ArrayList<String>();
try {
ChannelSftp sftpChannel = createSFTPChannel("" , username , password , host);
sftpChannel.cd(targetDir);
Vector<ChannelSftp.LsEntry> childFiles = sftpChannel.ls("*.txt");
for (ChannelSftp.LsEntry lsEntry : childFiles) {
String entryName = lsEntry.getFilename();
fileList.add(entryName);
}
} catch (Exception e) {
e.printStackTrace();
}
return fileList;
}
public static void deleteFile(String username, String password, String host, String deleteFile) {
try {
ChannelSftp sftpChannel = createSFTPChannel("" , username , password , host);
sftpChannel.rm(deleteFile);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void copyDir(ChannelSftp sftpChannel, String source, String target) throws Exception {
System.out.println("Copying files from " + source + " to " + target);
sftpChannel.cd(source);
sftpChannel.lcd(target);
Vector<ChannelSftp.LsEntry> childFiles = sftpChannel.ls(".");
for (ChannelSftp.LsEntry lsEntry : childFiles) {
String entryName = lsEntry.getFilename();
System.out.println("File Entry " + entryName);
}
}
private static ChannelSftp createSFTPChannel(String identity,String username,String password,String host) throws Exception {
JSch jsch = new JSch();
boolean useIdentity = identity != null && !identity.isEmpty();
if (useIdentity) {
jsch.addIdentity(identity);
}
Session session = jsch.getSession(username, host, 22);
session.setConfig("StrictHostKeyChecking", "no");
if (!useIdentity) {
session.setPassword(password);
}
session.connect();
Channel channel = session.openChannel("sftp");
channel.connect();
return (ChannelSftp) channel;
}
}
FtpFileTest.scala
package com.streaming.test
import com.springml.sftp.client.SFTPClient
import com.streamingjava.test.FTPDirTest
import scala.collection.JavaConverters._
object FtpFileTest {
def main(args: Array[String]): Unit = {
val username = "daniel"
val password = "123456"
val host = "192.168.61.106"
val sftpPort = 22
val identity = null
//val sourcePath = "/tmp/streaming-test/testFile.txt"
//val targetPath = "/Users/daniel/test/testFolder/"
//copyFile(identity , username, password, host)
val sourcePath = "/tmp/streaming-test"
val targetPath = "/Users/daniel/test/testFolder"
copyDir(identity , username, password, host , sourcePath , targetPath)
}
def copyFile(identity:String , username:String , password:String , host:String , sourcePath:String , targetPath:String) : Unit = {
val sftpClient = new SFTPClient(identity , username, password, host)
val target = sftpClient.copy(sourcePath,targetPath)
}
def copytoFTP(identity:String , username:String , password:String , host:String , sourcePath:String , targetPath:String) = {
val sftpClient = new SFTPClient(identity , username, password, host)
val target = sftpClient.copyToFTP(sourcePath , targetPath)
}
def copyDir(identity:String , username:String , password:String , host:String , source:String, target:String) : Unit = {
val jfiles:java.util.List[String] = FTPDirTest.getDirFileList(username , password , host , source)
//import scala.collection.JavaConverters
//val sfiles = JavaConverters.asScalaIteratorConverte(jfiles.iterator()).asScala.seq
//import scala.collection.JavaConverters._
val sfiles = jfiles.iterator.asScala
for(sfileName <- sfiles) {
val sourceFile = source + "/" + sfileName
copyFile(identity , username , password , host , sourceFile , target)
}
}
def moveFile(): Unit = {
}
}
- java 的 list 轉到 scala 的 list :
使用 java 的 class 取得檔案清單val jfiles:java.util.List[String] = FTPDirTest.getDirFileList(username , password , host , source)
- 透過 JavaConverters 轉成 scala 的 seq
import scala.collection.JavaConverters val sfiles = JavaConverters.asScalaIteratorConverte(jfiles.iterator()).asScala.seq
- 簡化版
import scala.collection.JavaConverters._ val sfiles = jfiles.iterator.asScala
hdfs dfs -setrep -R 1 /
參考資料
sample-1