kafka day 4 (apache avro)
apache avro 介紹
pom.xml
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
準備一個 LogLine.avro 定義 json 的 schema 格式
namespace 會對應到 java 的 package 名稱,name 就是 java 的 class 名稱,fields 就是欄位資訊
{"namespace": "com.mykafka.bean",
"type": "record",
"name": "LogLine",
"fields": [
{"name": "ip", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "url", "type": "string"},
{"name": "referrer", "type": "string"},
{"name": "useragent", "type": "string"},
{"name": "sessionid", "type": ["null","int"], "default": null}
]
}
在使用 avro-tools-1.8.1.jar build 出該 entity :
java -jar avro-tools-1.8.1.jar compile schema {avsc檔位置} {entity要產生的目錄}
daniel@Danielde-MBP > cd /Volumes/Transcend/workspace/mykafkademo
daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo > cd lib
daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo/lib > ll
total 90880
-rwxrwxrwx 1 daniel staff 11M 5 29 17:46 avro-kcql-0.5.jar
-rwxrwxrwx 1 daniel staff 33M 5 29 15:39 avro-tools-1.8.1.jar
daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo/lib > java -jar avro-tools-1.8.1.jar compile schema ../src/main/resources/LogLine.avsc .
Input files to compile:
../src/main/resources/LogLine.avsc
log4j:WARN No appenders could be found for logger (AvroVelocityLogChute).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo/lib > ll
total 91136
-rwxrwxrwx 1 daniel staff 11M 5 29 17:46 avro-kcql-0.5.jar
-rwxrwxrwx 1 daniel staff 33M 5 29 15:39 avro-tools-1.8.1.jar
drwxrwxrwx 1 daniel staff 128K 5 29 18:45 com
MyArvoGenarater.java
將實體 serialize avro 檔案存放
import java.io.File;
import java.io.IOException;
import java.util.Date;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import com.mykafka.bean.LogLine;
public class MyArvoGenarater {
public static void main(String[] args) {
LogLine logLine1 = new LogLine();
logLine1.setIp("111");
logLine1.setTimestamp(getSysTime());
logLine1.setUrl("url1");
logLine1.setReferrer("ref1");
logLine1.setUseragent("agent1");
logLine1.setSessionid(1);
LogLine logLine2 = new LogLine("222", getSysTime(), "url2", "ref2", "agent2", 2);
LogLine logLine3 = LogLine.newBuilder().setIp("333").setReferrer("ref3").setSessionid(3).setTimestamp(getSysTime()).setUrl("url3").setUseragent("agent3").build();
DatumWriter<LogLine> userDatumWriter = new SpecificDatumWriter<LogLine>(LogLine.class);
DataFileWriter<LogLine> dataFileWriter = new DataFileWriter<LogLine>(userDatumWriter);
try {
dataFileWriter.create(logLine1.getSchema(), new File("/Volumes/Transcend/workspace/mykafkademo/avrofiles/logLine.avro"));
dataFileWriter.append(logLine1);
dataFileWriter.append(logLine2);
dataFileWriter.append(logLine3);
dataFileWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static long getSysTime() {
return new Date().getTime();
}
}
執行後產生的檔案 :
MyArvoReader.java
將 avro 檔案 deserialize 成 java 的 entity :
package com.mykafka.arvo.task;
import java.io.File;
import java.io.IOException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import com.mykafka.bean.LogLine;
public class MyArvoReader {
public static void main(String[] args) {
String avroFile = "/Volumes/Transcend/workspace/mykafkademo/avrofiles/logLine.avro";
DatumReader<LogLine> LogLineDatumReader = new SpecificDatumReader<LogLine>(LogLine.class);
DataFileReader<LogLine> dataFileReader;
try {
dataFileReader = new DataFileReader<LogLine>(new File(avroFile), LogLineDatumReader);
LogLine LogLine = null;
while (dataFileReader.hasNext()) {
LogLine = dataFileReader.next(LogLine);
System.out.println(LogLine);
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
測試畫面