kafka day 5 (kafka serializing Using Apache Avro)
kafka serializing Using Apache Avro
架構圖 :
準備一台 Schema Registry,到 Confluent Platform 下載,使用 confluent-4.1.1
目錄結構 :
參考網址:
Confluent Platform
修改confluent-4.1.1裡的設定檔
confluent-4.1.1/etc/schema-registry/schema-registry.properties
這邊修改 zookeeper 的 url :
kafkastore.connection.url=192.168.61.105:2181
改好後執行啟動指令(要給設定檔不然會run不起來)
./schema-registry-start ../etc/schema-registry/schema-registry.properties
daniel@Danielde-MBP > /Volumes/Transcend/kafka-test/confluent-4.1.1/bin > ./schema-registry-start ../etc/schema-registry/schema-registry.properties
[2018-05-29 19:24:19,150] INFO SchemaRegistryConfig values:
metric.reporters = []
kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
response.mediatype.default = application/vnd.schemaregistry.v1+json
kafkastore.ssl.trustmanager.algorithm = PKIX
authentication.realm =
ssl.keystore.type = JKS
kafkastore.topic = _schemas
metrics.jmx.prefix = kafka.schema.registry
kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
kafkastore.topic.replication.factor = 3
ssl.truststore.password = [hidden]
kafkastore.timeout.ms = 500
host.name = danielde-mbp.is-land.taipei
kafkastore.bootstrap.servers = []
schema.registry.zk.namespace = schema_registry
kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
kafkastore.sasl.kerberos.service.name =
schema.registry.resource.extension.class = []
ssl.endpoint.identification.algorithm =
compression.enable = false
kafkastore.ssl.truststore.type = JKS
avro.compatibility.level = backward
kafkastore.ssl.protocol = TLS
kafkastore.ssl.provider =
kafkastore.ssl.truststore.location =
response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
kafkastore.ssl.keystore.type = JKS
authentication.skip.paths = []
ssl.truststore.type = JKS
kafkastore.ssl.truststore.password = [hidden]
access.control.allow.origin =
ssl.truststore.location =
ssl.keystore.password = [hidden]
port = 8081
kafkastore.ssl.keystore.location =
metrics.tag.map = {}
master.eligibility = true
ssl.client.auth = false
kafkastore.ssl.keystore.password = [hidden]
kafkastore.security.protocol = PLAINTEXT
ssl.trustmanager.algorithm =
authentication.method = NONE
request.logger.name = io.confluent.rest-utils.requests
ssl.key.password = [hidden]
kafkastore.zk.session.timeout.ms = 30000
kafkastore.sasl.mechanism = GSSAPI
kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
kafkastore.ssl.key.password = [hidden]
zookeeper.set.acl = false
schema.registry.inter.instance.protocol = http
authentication.roles = [*]
metrics.num.samples = 2
ssl.protocol = TLS
schema.registry.group.id = schema-registry
kafkastore.ssl.keymanager.algorithm = SunX509
kafkastore.connection.url = 192.168.61.105:2181
debug = false
listeners = [http://0.0.0.0:8081]
kafkastore.group.id =
ssl.provider =
ssl.enabled.protocols = []
shutdown.graceful.ms = 1000
ssl.keystore.location =
ssl.cipher.suites = []
kafkastore.ssl.endpoint.identification.algorithm =
kafkastore.ssl.cipher.suites =
access.control.allow.methods =
kafkastore.sasl.kerberos.min.time.before.relogin = 60000
ssl.keymanager.algorithm =
metrics.sample.window.ms = 30000
kafkastore.init.timeout.ms = 60000
(io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:179)
[2018-05-29 19:24:19,192] INFO Logging initialized @705ms (org.eclipse.jetty.util.log:186)
[2018-05-29 19:24:19,792] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://daniel-3-test-master1.is-land.taipei:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-29 19:24:20,197] INFO Validating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:228)
[2018-05-29 19:24:20,203] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:242)
[2018-05-29 19:24:20,302] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-29 19:24:20,303] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-29 19:24:20,385] INFO Wait to catch up until the offset of the last message at 5 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-29 19:24:20,396] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-29 19:24:20,410] INFO Created schema registry namespace 192.168.61.105:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-29 19:24:20,454] INFO Successfully elected the new master: {"host":"danielde-mbp.is-land.taipei","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-29 19:24:20,471] INFO Wait to catch up until the offset of the last message at 6 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-29 19:24:20,471] INFO Successfully elected the new master: {"host":"danielde-mbp.is-land.taipei","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-29 19:24:20,553] INFO Adding listener: http://0.0.0.0:8081 (io.confluent.rest.Application:190)
[2018-05-29 19:24:20,600] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-29 19:24:21,081] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-29 19:24:21,209] INFO Started o.e.j.s.ServletContextHandler@67304a40{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-29 19:24:21,219] INFO Started NetworkTrafficServerConnector@71812481{HTTP/1.1}{0.0.0.0:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-29 19:24:21,220] INFO Started @2740ms (org.eclipse.jetty.server.Server:379)
[2018-05-29 19:24:21,220] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
開瀏覽器查看
接著準備 producer 及 consumer
下載 jar
由於會用到 io.confluent.kafka.serializers.KafkaAvroDeserializer 但 confluent 的 repository 抓不到, 後來發現在 avro-kcql-0.5.jar 裡面也有.
MyAvroProducer.java
要給 schema.registry.url 的位置,及設定 KafkaAvroSerializer
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import com.mykafka.bean.LogLine;
import com.mykafka.util.EventGenerator;
public class MyAvroProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.61.105:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081/");
String topic = "my-avro-topic-1";
Producer<String, LogLine> producer = new KafkaProducer<String, LogLine>(props);
for(long i = 0; i < 3; i++) {
LogLine event = EventGenerator.getNext();
ProducerRecord<String, LogLine> record = new ProducerRecord<String, LogLine>(topic, event.getIp().toString(), event);
try {
System.out.println("send ip : " + event.getIp() + " , url : " + event.getUrl());
RecordMetadata metadata = producer.send(record).get();
long offset = metadata.offset();
System.out.println("producer set offset : " + offset);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
MyAvroConsumer.java
要給 schema.registry.url 的位置,及設定 KafkaAvroDeserializer
import java.util.Arrays;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyAvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "my-avro-topic-1";
final Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(),record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
測試畫面
可利用 restful api 對 Schema Registry 操作
- 查看 subjects(http://localhost:8081/subjects/) :
- 查看 subjects version(http://localhost:8081/subjects/my-avro-topic-1-value/versions) :
- 查看 subjects version 的內容(http://localhost:8081/subjects/my-avro-topic-1-value/versions/1) :
整理一下結構如下 :
{"subject":"my-avro-topic-1-value", "version":1, "id":21, "schema":"{ \"type\":\"record\", \"name\":\"LogLine\", \"namespace\":\"com.mykafka.bean\", \"fields\":[ {\"name\":\"ip\",\"type\":\"string\"}, {\"name\":\"timestamp\",\"type\":\"long\"}, {\"name\":\"url\",\"type\":\"string\"}, {\"name\":\"referrer\",\"type\":\"string\"}, {\"name\":\"useragent\",\"type\":\"string\"}, {\"name\":\"sessionid\",\"type\":[\"null\",\"int\"],\"default\":null} ]}" }
- 查看 subjects version 的 schema 結構(http://localhost:8081/subjects/my-avro-topic-1-value/versions/1/schema) :
schema 結構就跟前一篇自己定義avro檔案(LogLine.avro)一樣,producer 及 consumer 就是透過這 schema 來解析 json 格式 :
{"type":"record", "name":"LogLine", "namespace":"com.mykafka.bean", "fields":[ {"name":"ip","type":"string"}, {"name":"timestamp","type":"long"}, {"name":"url","type":"string"}, {"name":"referrer","type":"string"}, {"name":"useragent","type":"string"}, {"name":"sessionid","type":["null","int"],"default":null} ] }
後續還有 update 或 delete 的功能可以試試.
參考網址:
https://docs.confluent.io/current/schema-registry/docs/api.html#post–subjects-(string-