kafka day 6 (custom partitioning)
kafka custom partitioning
架構圖 :
建立 topic (systemLogTopic)
要有 4 個 partition :
[root@daniel-3-test-master1 ~]# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic systemLogTopic
LogLevelPartitioner.java
客製 Partitioner,logLevel(key) 是 info 的話丟到 partition1,warning 的話丟到 partition2 以此類推 :
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class LogLevelPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String logLevel = (String) key;
return getLevelPartition(logLevel);
}
public void close() {}
private int getLevelPartition(String logLevel) {
if("info".equals(logLevel)) {
return 1;
} else if("warning".equals(logLevel)) {
return 2;
} else if("error".equals(logLevel)) {
return 3;
} else {
return 0;
}
}
}
LogLevelProducer.java
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class LogLevelProducer implements Runnable {
public static String[] LOG_LEVEL = { "info", "warning", "debug", "error" };
private KafkaProducer<String, String> producer;
private String topic;
public LogLevelProducer(String brokers, String topic) {
Properties prop = createProducerConfig(brokers);
this.producer = new KafkaProducer<String, String>(prop);
this.topic = topic;
}
private static Properties createProducerConfig(String brokers) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.mykafka.partition.LogLevelPartitioner");
return props;
}
public void run() {
for (int i = 0; i < 10; i++) {
String logMsg = genRandomLog();
String logLevel = LOG_LEVEL[i % 4];
producer.send(new ProducerRecord<String, String>(topic, logLevel, logMsg), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
System.out.println("Sent:" + logMsg + ", logLevel: " + logLevel + ", Partition: " + metadata.partition());
}
});
}
}
private String genRandomLog() {
return UUID.randomUUID().toString();
}
}
LogLevelConsumer.java
package com.mykafka.task;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class LogLevelConsumer implements Runnable {
private KafkaConsumer<String, String> consumer;
private String topic;
public LogLevelConsumer(String brokers, String groupId, String topic) {
Properties prop = createConsumerConfig(brokers, groupId);
this.consumer = new KafkaConsumer<>(prop);
this.topic = topic;
this.consumer.subscribe(Arrays.asList(this.topic));
}
private static Properties createConsumerConfig(String brokers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (final ConsumerRecord record : records) {
System.out.println("Receive message: " + record.value() + " , key:" + record.key().toString() + ", Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}
}
LogLevelMain.java
import com.mykafka.task.LogLevelConsumer;
import com.mykafka.task.LogLevelProducer;
public class LogLevelMain {
public static void main(String[] args) {
String brokers = "192.168.61.105:9092";
String groupId = "logLevel-group-1";
String topic = "systemLogTopic";
LogLevelProducer producerThread = new LogLevelProducer(brokers, topic);
Thread t1 = new Thread(producerThread);
t1.start();
LogLevelConsumer consumerThread = new LogLevelConsumer(brokers, groupId, topic);
Thread t2 = new Thread(consumerThread);
t2.start();
}
}
測試畫面 :
可以看到 message 會根據 key 對應到 partition,然後 consumer 再去對應的 partition 取出.