kafka day 3 (Custom Serializer)
Write A Custom Serializer
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1-kafka-3.1.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
Customer.java
public class Customer {
private int customerID;
private String customerName;
public Customer() {}
public Customer(int customerID , String customerName) {
this.customerID = customerID;
this.customerName = customerName;
}
public int getCustomerID() {
return customerID;
}
public void setCustomerID(int customerID) {
this.customerID = customerID;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
public String toString() {
return "### customerID : " + customerID + " , customerName : " + customerName;
}
}
CustomerSerializer.java
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomerSerializer implements Serializer<Customer> {
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Customer data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
public void close() {
}
}
CustomerDeserializer.java
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomerDeserializer implements Deserializer<Customer> {
public void configure(Map<String, ?> configs, boolean isKey) {
}
public Customer deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
Customer customer = null;
try {
System.out.println("deserialize...");
customer = mapper.readValue(data, Customer.class);
System.out.println(customer);
} catch (Exception e) {
e.printStackTrace();
}
return customer;
}
public void close() {
}
}
CustomerProducer.java
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import com.mykafka.bean.Customer;
public class CustomerProducer implements Runnable {
private final KafkaProducer<String, Customer> producer;
private final String topic;
public CustomerProducer(String brokers, String topic) {
Properties prop = createProducerConfig(brokers);
this.producer = new KafkaProducer<String, Customer>(prop);
this.topic = topic;
}
private static Properties createProducerConfig(String brokers) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.mykafka.bean.CustomerSerializer");
return props;
}
public void run() {
List<Customer> users = new ArrayList<Customer>();
users.add(new Customer(1, "tom"));
users.add(new Customer(2, "harry"));
users.add(new Customer(3, "daniel"));
for (Customer user : users) {
//topic , key , value
producer.send(new ProducerRecord<String, Customer>(topic, user.getCustomerName(), user), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
long offset = metadata.offset();
if (e != null) {
e.printStackTrace();
}
System.out.println("Sent:" + user.toString() + " , offset -> " + offset);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
}
CustomerConsumer.java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.mykafka.bean.Customer;
public class CustomerConsumer implements Runnable {
private final KafkaConsumer<String, Customer> consumer;
private final String topic;
public CustomerConsumer(String brokers, String groupId, String topic) {
Properties prop = createConsumerConfig(brokers, groupId);
this.consumer = new KafkaConsumer<String, Customer>(prop);
this.topic = topic;
this.consumer.subscribe(Arrays.asList(this.topic));
}
private static Properties createConsumerConfig(String brokers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.mykafka.bean.CustomerDeserializer");
return props;
}
public void run() {
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(100);
for (final ConsumerRecord<String, Customer> record : records) {
if(record != null) {
String key = record.key();
long offset = record.offset();
Customer customer = record.value();
String id = String.valueOf(customer.getCustomerID());
String name = customer.getCustomerName();
System.out.println("Receive: id -> " + id + " , name -> " + name + " , key -> " + key + " , offset -> " + offset);
}
}
}
}
}
CustomerMain.java
package com.mykafka.task;
public class CustomerMain {
public static void main(String[] args) {
String brokers = "192.168.61.105:9092";
String groupId = "group01";
String topic = "daniel-topic-2";
CustomerProducer producerThread = new CustomerProducer(brokers, topic);
Thread t1 = new Thread(producerThread);
t1.start();
CustomerConsumer consumerThread = new CustomerConsumer(brokers, groupId, topic);
Thread t2 = new Thread(consumerThread);
t2.start();
}
}