Write A Custom Serializer

pom.xml

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.0.1-kafka-3.1.0</version>
</dependency>
 <dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.7.4</version>
</dependency>

Customer.java

public class Customer {
	
	private int customerID;
	
	private String customerName;
	
	public Customer() {}
	
	public Customer(int customerID , String customerName) {
		this.customerID = customerID;
		this.customerName = customerName;
	}
	
	public int getCustomerID() {
		return customerID;
	}

	public void setCustomerID(int customerID) {
		this.customerID = customerID;
	}

	public String getCustomerName() {
		return customerName;
	}

	public void setCustomerName(String customerName) {
		this.customerName = customerName;
	}
	
	public String toString() {
		return "### customerID : " + customerID + " , customerName : " + customerName;
	}
}

CustomerSerializer.java

import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomerSerializer implements Serializer<Customer> {

	public void configure(Map<String, ?> configs, boolean isKey) {

	}

	public byte[] serialize(String topic, Customer data) {
		byte[] retVal = null;
	    ObjectMapper objectMapper = new ObjectMapper();
	    try {
	      retVal = objectMapper.writeValueAsString(data).getBytes();
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	    return retVal;
	}

	public void close() {
	}
}

CustomerDeserializer.java

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CustomerDeserializer implements Deserializer<Customer> {

	public void configure(Map<String, ?> configs, boolean isKey) {

	}

	public Customer deserialize(String topic, byte[] data) {
		ObjectMapper mapper = new ObjectMapper();
		Customer customer = null;
		try {
			System.out.println("deserialize...");
			customer = mapper.readValue(data, Customer.class);
			System.out.println(customer);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return customer;
	}

	public void close() {
	}
}

CustomerProducer.java

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import com.mykafka.bean.Customer;

public class CustomerProducer implements Runnable {

	private final KafkaProducer<String, Customer> producer;
	private final String topic;

	public CustomerProducer(String brokers, String topic) {
		Properties prop = createProducerConfig(brokers);
		this.producer = new KafkaProducer<String, Customer>(prop);
		this.topic = topic;
	}

	private static Properties createProducerConfig(String brokers) {
		Properties props = new Properties();
		props.put("bootstrap.servers", brokers);
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "com.mykafka.bean.CustomerSerializer");
		return props;
	}

	public void run() {
		List<Customer> users = new ArrayList<Customer>();
		users.add(new Customer(1, "tom"));
		users.add(new Customer(2, "harry"));
		users.add(new Customer(3, "daniel"));
		
		for (Customer user : users) {
			//topic , key , value
			producer.send(new ProducerRecord<String, Customer>(topic, user.getCustomerName(), user), new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					long offset = metadata.offset();
					if (e != null) {
						e.printStackTrace();
					}
					System.out.println("Sent:" + user.toString() + " , offset -> " + offset);
				}
			});
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		producer.close();
	}
}

CustomerConsumer.java


import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.mykafka.bean.Customer;

public class CustomerConsumer implements Runnable {

	private final KafkaConsumer<String, Customer> consumer;

	private final String topic;

	public CustomerConsumer(String brokers, String groupId, String topic) {
		Properties prop = createConsumerConfig(brokers, groupId);
		this.consumer = new KafkaConsumer<String, Customer>(prop);
		this.topic = topic;
		this.consumer.subscribe(Arrays.asList(this.topic));
	}

	private static Properties createConsumerConfig(String brokers, String groupId) {
		Properties props = new Properties();
		props.put("bootstrap.servers", brokers);
		props.put("group.id", groupId);
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");
		props.put("auto.offset.reset", "earliest");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "com.mykafka.bean.CustomerDeserializer");
		return props;
	}

	public void run() {
		while (true) {
			ConsumerRecords<String, Customer> records = consumer.poll(100);
			for (final ConsumerRecord<String, Customer> record : records) {
				if(record != null) {
					String key = record.key();
					long offset = record.offset();
					Customer customer = record.value();
					String id = String.valueOf(customer.getCustomerID());
					String name = customer.getCustomerName();
					System.out.println("Receive: id -> " + id + " , name -> " + name + " , key -> " + key + " , offset -> " + offset);
				}
			}
		}
	}
}

CustomerMain.java

package com.mykafka.task;

public class CustomerMain {

	public static void main(String[] args) {
		String brokers = "192.168.61.105:9092";
		String groupId = "group01";
		String topic = "daniel-topic-2";

		CustomerProducer producerThread = new CustomerProducer(brokers, topic);
		Thread t1 = new Thread(producerThread);
		t1.start();

		CustomerConsumer consumerThread = new CustomerConsumer(brokers, groupId, topic);
		Thread t2 = new Thread(consumerThread);
		t2.start();
	}
}

測試畫面

kafka_day3_1.jpg