kafka custom partitioning

架構圖 : kafka_day6_1.jpg

建立 topic (systemLogTopic)

要有 4 個 partition :

[root@daniel-3-test-master1 ~]# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic systemLogTopic

kafka_day6_2.jpg

LogLevelPartitioner.java

客製 Partitioner,logLevel(key) 是 info 的話丟到 partition1,warning 的話丟到 partition2 以此類推 :


import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class LogLevelPartitioner implements Partitioner {

	public void configure(Map<String, ?> configs) {}

	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		String logLevel = (String) key;
		return getLevelPartition(logLevel);
	}

	public void close() {}
	
	private int getLevelPartition(String logLevel) {
		if("info".equals(logLevel)) {
			return 1;
		} else if("warning".equals(logLevel)) {
			return 2;
		} else if("error".equals(logLevel)) {
			return 3;
		} else {
			return 0;
		}
	}
}

LogLevelProducer.java

import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class LogLevelProducer implements Runnable {

	public static String[] LOG_LEVEL = { "info", "warning", "debug", "error" };
	private KafkaProducer<String, String> producer;
	private String topic;

	public LogLevelProducer(String brokers, String topic) {
		Properties prop = createProducerConfig(brokers);
		this.producer = new KafkaProducer<String, String>(prop);
		this.topic = topic;
	}

	private static Properties createProducerConfig(String brokers) {
		Properties props = new Properties();
		props.put("bootstrap.servers", brokers);
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("partitioner.class", "com.mykafka.partition.LogLevelPartitioner");
		return props;
	}

	public void run() {
		for (int i = 0; i < 10; i++) {
			String logMsg = genRandomLog();
			String logLevel = LOG_LEVEL[i % 4];
			producer.send(new ProducerRecord<String, String>(topic, logLevel, logMsg), new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					if (e != null) {
						e.printStackTrace();
					}
					System.out.println("Sent:" + logMsg + ", logLevel: " + logLevel + ", Partition: " + metadata.partition());
				}
			});
		}
	}

	private String genRandomLog() {
		return UUID.randomUUID().toString();
	}
}

LogLevelConsumer.java

package com.mykafka.task;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class LogLevelConsumer implements Runnable {

	private KafkaConsumer<String, String> consumer;
	private String topic;

	public LogLevelConsumer(String brokers, String groupId, String topic) {
		Properties prop = createConsumerConfig(brokers, groupId);
		this.consumer = new KafkaConsumer<>(prop);
		this.topic = topic;
		this.consumer.subscribe(Arrays.asList(this.topic));
	}

	private static Properties createConsumerConfig(String brokers, String groupId) {
		Properties props = new Properties();
		props.put("bootstrap.servers", brokers);
		props.put("group.id", groupId);
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");
		props.put("auto.offset.reset", "earliest");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		return props;
	}

	public void run() {
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (final ConsumerRecord record : records) {
				System.out.println("Receive message: " + record.value() + " , key:" + record.key().toString() + ", Partition: " + record.partition() + ", Offset: " + record.offset());
			}
		}
	}
}

LogLevelMain.java

import com.mykafka.task.LogLevelConsumer;
import com.mykafka.task.LogLevelProducer;

public class LogLevelMain {

	public static void main(String[] args) {
		String brokers = "192.168.61.105:9092";
		String groupId = "logLevel-group-1";
		String topic = "systemLogTopic";

		LogLevelProducer producerThread = new LogLevelProducer(brokers, topic);
		Thread t1 = new Thread(producerThread);
		t1.start();

		LogLevelConsumer consumerThread = new LogLevelConsumer(brokers, groupId, topic);
		Thread t2 = new Thread(consumerThread);
		t2.start();
	}
}

測試畫面 : kafka_day6_3.jpg

可以看到 message 會根據 key 對應到 partition,然後 consumer 再去對應的 partition 取出.
kafka_day6_4.jpg