apache avro 介紹

pom.xml

<dependency>
	<groupId>org.apache.avro</groupId>
	<artifactId>avro</artifactId>
	<version>1.8.1</version>
</dependency>

準備一個 LogLine.avro 定義 json 的 schema 格式

namespace 會對應到 java 的 package 名稱,name 就是 java 的 class 名稱,fields 就是欄位資訊

{"namespace": "com.mykafka.bean",
  "type": "record",
  "name": "LogLine",
  "fields": [
    {"name": "ip", "type": "string"},
    {"name": "timestamp",  "type": "long"},
    {"name": "url",  "type": "string"},
    {"name": "referrer",  "type": "string"},
    {"name": "useragent",  "type": "string"},
    {"name": "sessionid",  "type": ["null","int"], "default": null}
  ]
}

在使用 avro-tools-1.8.1.jar build 出該 entity :

java -jar avro-tools-1.8.1.jar compile schema {avsc檔位置} {entity要產生的目錄}

daniel@Danielde-MBP > cd /Volumes/Transcend/workspace/mykafkademo
daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo > cd lib
daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo/lib > ll
total 90880
-rwxrwxrwx  1 daniel  staff    11M  5 29 17:46 avro-kcql-0.5.jar
-rwxrwxrwx  1 daniel  staff    33M  5 29 15:39 avro-tools-1.8.1.jar
 daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo/lib > java -jar avro-tools-1.8.1.jar compile schema ../src/main/resources/LogLine.avsc .
Input files to compile:
  ../src/main/resources/LogLine.avsc
log4j:WARN No appenders could be found for logger (AvroVelocityLogChute).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
 daniel@Danielde-MBP > /Volumes/Transcend/workspace/mykafkademo/lib > ll
total 91136
-rwxrwxrwx  1 daniel  staff    11M  5 29 17:46 avro-kcql-0.5.jar
-rwxrwxrwx  1 daniel  staff    33M  5 29 15:39 avro-tools-1.8.1.jar
drwxrwxrwx  1 daniel  staff   128K  5 29 18:45 com

kafka_day4_1.jpg

MyArvoGenarater.java

將實體 serialize avro 檔案存放

import java.io.File;
import java.io.IOException;
import java.util.Date;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import com.mykafka.bean.LogLine;

public class MyArvoGenarater {

	public static void main(String[] args) {
		LogLine logLine1 = new LogLine();
		logLine1.setIp("111");
		logLine1.setTimestamp(getSysTime());
		logLine1.setUrl("url1");
		logLine1.setReferrer("ref1");
		logLine1.setUseragent("agent1");
		logLine1.setSessionid(1);
		LogLine logLine2 = new LogLine("222", getSysTime(), "url2", "ref2", "agent2", 2);
		LogLine logLine3 = LogLine.newBuilder().setIp("333").setReferrer("ref3").setSessionid(3).setTimestamp(getSysTime()).setUrl("url3").setUseragent("agent3").build();
		DatumWriter<LogLine> userDatumWriter = new SpecificDatumWriter<LogLine>(LogLine.class);
		DataFileWriter<LogLine> dataFileWriter = new DataFileWriter<LogLine>(userDatumWriter);
		try {
			dataFileWriter.create(logLine1.getSchema(), new File("/Volumes/Transcend/workspace/mykafkademo/avrofiles/logLine.avro"));
			dataFileWriter.append(logLine1);
			dataFileWriter.append(logLine2);
			dataFileWriter.append(logLine3);
			dataFileWriter.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private static long getSysTime() {
		return new Date().getTime();
	}

}

執行後產生的檔案 : kafka_day4_2.jpg

MyArvoReader.java

將 avro 檔案 deserialize 成 java 的 entity :

package com.mykafka.arvo.task;

import java.io.File;
import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;

import com.mykafka.bean.LogLine;

public class MyArvoReader {

	public static void main(String[] args) {
		String avroFile = "/Volumes/Transcend/workspace/mykafkademo/avrofiles/logLine.avro";
		DatumReader<LogLine> LogLineDatumReader = new SpecificDatumReader<LogLine>(LogLine.class);
		DataFileReader<LogLine> dataFileReader;
		try {
			dataFileReader = new DataFileReader<LogLine>(new File(avroFile), LogLineDatumReader);
			LogLine LogLine = null;
			while (dataFileReader.hasNext()) {
				LogLine = dataFileReader.next(LogLine);
				System.out.println(LogLine);
			}
		} catch (IOException e1) {
			e1.printStackTrace();
		}
	}
}

測試畫面 kafka_day4_3.jpg