kafka serializing Using Apache Avro

架構圖 : kafka_day5_1.jpg

準備一台 Schema Registry,到 Confluent Platform 下載,使用 confluent-4.1.1

目錄結構 : kafka_day5_2.jpg

參考網址:
Confluent Platform

修改confluent-4.1.1裡的設定檔

confluent-4.1.1/etc/schema-registry/schema-registry.properties

這邊修改 zookeeper 的 url :

kafkastore.connection.url=192.168.61.105:2181

改好後執行啟動指令(要給設定檔不然會run不起來)

./schema-registry-start ../etc/schema-registry/schema-registry.properties

 daniel@Danielde-MBP > /Volumes/Transcend/kafka-test/confluent-4.1.1/bin > ./schema-registry-start ../etc/schema-registry/schema-registry.properties
[2018-05-29 19:24:19,150] INFO SchemaRegistryConfig values:
	metric.reporters = []
	kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
	response.mediatype.default = application/vnd.schemaregistry.v1+json
	kafkastore.ssl.trustmanager.algorithm = PKIX
	authentication.realm =
	ssl.keystore.type = JKS
	kafkastore.topic = _schemas
	metrics.jmx.prefix = kafka.schema.registry
	kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
	kafkastore.topic.replication.factor = 3
	ssl.truststore.password = [hidden]
	kafkastore.timeout.ms = 500
	host.name = danielde-mbp.is-land.taipei
	kafkastore.bootstrap.servers = []
	schema.registry.zk.namespace = schema_registry
	kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
	kafkastore.sasl.kerberos.service.name =
	schema.registry.resource.extension.class = []
	ssl.endpoint.identification.algorithm =
	compression.enable = false
	kafkastore.ssl.truststore.type = JKS
	avro.compatibility.level = backward
	kafkastore.ssl.protocol = TLS
	kafkastore.ssl.provider =
	kafkastore.ssl.truststore.location =
	response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
	kafkastore.ssl.keystore.type = JKS
	authentication.skip.paths = []
	ssl.truststore.type = JKS
	kafkastore.ssl.truststore.password = [hidden]
	access.control.allow.origin =
	ssl.truststore.location =
	ssl.keystore.password = [hidden]
	port = 8081
	kafkastore.ssl.keystore.location =
	metrics.tag.map = {}
	master.eligibility = true
	ssl.client.auth = false
	kafkastore.ssl.keystore.password = [hidden]
	kafkastore.security.protocol = PLAINTEXT
	ssl.trustmanager.algorithm =
	authentication.method = NONE
	request.logger.name = io.confluent.rest-utils.requests
	ssl.key.password = [hidden]
	kafkastore.zk.session.timeout.ms = 30000
	kafkastore.sasl.mechanism = GSSAPI
	kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
	kafkastore.ssl.key.password = [hidden]
	zookeeper.set.acl = false
	schema.registry.inter.instance.protocol = http
	authentication.roles = [*]
	metrics.num.samples = 2
	ssl.protocol = TLS
	schema.registry.group.id = schema-registry
	kafkastore.ssl.keymanager.algorithm = SunX509
	kafkastore.connection.url = 192.168.61.105:2181
	debug = false
	listeners = [http://0.0.0.0:8081]
	kafkastore.group.id =
	ssl.provider =
	ssl.enabled.protocols = []
	shutdown.graceful.ms = 1000
	ssl.keystore.location =
	ssl.cipher.suites = []
	kafkastore.ssl.endpoint.identification.algorithm =
	kafkastore.ssl.cipher.suites =
	access.control.allow.methods =
	kafkastore.sasl.kerberos.min.time.before.relogin = 60000
	ssl.keymanager.algorithm =
	metrics.sample.window.ms = 30000
	kafkastore.init.timeout.ms = 60000
 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:179)
[2018-05-29 19:24:19,192] INFO Logging initialized @705ms (org.eclipse.jetty.util.log:186)
[2018-05-29 19:24:19,792] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://daniel-3-test-master1.is-land.taipei:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-29 19:24:20,197] INFO Validating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:228)
[2018-05-29 19:24:20,203] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:242)
[2018-05-29 19:24:20,302] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-29 19:24:20,303] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-29 19:24:20,385] INFO Wait to catch up until the offset of the last message at 5 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-29 19:24:20,396] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-29 19:24:20,410] INFO Created schema registry namespace 192.168.61.105:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-29 19:24:20,454] INFO Successfully elected the new master: {"host":"danielde-mbp.is-land.taipei","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-29 19:24:20,471] INFO Wait to catch up until the offset of the last message at 6 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-29 19:24:20,471] INFO Successfully elected the new master: {"host":"danielde-mbp.is-land.taipei","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-29 19:24:20,553] INFO Adding listener: http://0.0.0.0:8081 (io.confluent.rest.Application:190)
[2018-05-29 19:24:20,600] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-29 19:24:21,081] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-29 19:24:21,209] INFO Started o.e.j.s.ServletContextHandler@67304a40{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-29 19:24:21,219] INFO Started NetworkTrafficServerConnector@71812481{HTTP/1.1}{0.0.0.0:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-29 19:24:21,220] INFO Started @2740ms (org.eclipse.jetty.server.Server:379)
[2018-05-29 19:24:21,220] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

開瀏覽器查看

kafka_day5_3.jpg

接著準備 producer 及 consumer

下載 jar

由於會用到 io.confluent.kafka.serializers.KafkaAvroDeserializer 但 confluent 的 repository 抓不到, 後來發現在 avro-kcql-0.5.jar 裡面也有.

MyAvroProducer.java

要給 schema.registry.url 的位置,及設定 KafkaAvroSerializer


import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import com.mykafka.bean.LogLine;
import com.mykafka.util.EventGenerator;

public class MyAvroProducer {

	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.61.105:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
		props.put("schema.registry.url", "http://localhost:8081/");
		String topic = "my-avro-topic-1";

		Producer<String, LogLine> producer = new KafkaProducer<String, LogLine>(props);
		for(long i = 0; i < 3; i++) {
			LogLine event = EventGenerator.getNext();
			ProducerRecord<String, LogLine> record = new ProducerRecord<String, LogLine>(topic, event.getIp().toString(), event);
			try {
				System.out.println("send ip : " + event.getIp() + " , url : " + event.getUrl());
				RecordMetadata metadata = producer.send(record).get();
				long offset = metadata.offset();
				System.out.println("producer set offset : " + offset);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		producer.flush();
		producer.close();
	}

}

MyAvroConsumer.java

要給 schema.registry.url 的位置,及設定 KafkaAvroDeserializer


import java.util.Arrays;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyAvroConsumer {

	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroDeserializer");
		props.put("schema.registry.url", "http://localhost:8081");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

		String topic = "my-avro-topic-1";
		final Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		consumer.subscribe(Arrays.asList(topic));

		try {
			while (true) {
				ConsumerRecords<String, String> records = consumer.poll(100);
				for (ConsumerRecord<String, String> record : records) {
					System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(),record.value());
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			consumer.close();
		}
	}
}

測試畫面 kafka_day5_4.jpg

可利用 restful api 對 Schema Registry 操作

  • 查看 subjects(http://localhost:8081/subjects/) : kafka_day5_5.jpg
  • 查看 subjects version(http://localhost:8081/subjects/my-avro-topic-1-value/versions) : kafka_day5_6.jpg
  • 查看 subjects version 的內容(http://localhost:8081/subjects/my-avro-topic-1-value/versions/1) : kafka_day5_7.jpg 整理一下結構如下 :
    {"subject":"my-avro-topic-1-value",
     "version":1,
     "id":21,
     "schema":"{
    \"type\":\"record\",
    \"name\":\"LogLine\",
    \"namespace\":\"com.mykafka.bean\",
    \"fields\":[
      {\"name\":\"ip\",\"type\":\"string\"},
      {\"name\":\"timestamp\",\"type\":\"long\"},
      {\"name\":\"url\",\"type\":\"string\"},
      {\"name\":\"referrer\",\"type\":\"string\"},
      {\"name\":\"useragent\",\"type\":\"string\"},
      {\"name\":\"sessionid\",\"type\":[\"null\",\"int\"],\"default\":null}
    ]}"
    }
    
  • 查看 subjects version 的 schema 結構(http://localhost:8081/subjects/my-avro-topic-1-value/versions/1/schema) : kafka_day5_7.jpg schema 結構就跟前一篇自己定義avro檔案(LogLine.avro)一樣,producer 及 consumer 就是透過這 schema 來解析 json 格式 :
    {"type":"record",
     "name":"LogLine",
     "namespace":"com.mykafka.bean",
     "fields":[
    {"name":"ip","type":"string"},
    {"name":"timestamp","type":"long"},
    {"name":"url","type":"string"},
    {"name":"referrer","type":"string"},
    {"name":"useragent","type":"string"},
    {"name":"sessionid","type":["null","int"],"default":null}
    ]
    }
    

    後續還有 update 或 delete 的功能可以試試.

    參考網址:
    https://docs.confluent.io/current/schema-registry/docs/api.html#post–subjects-(string-