kafka day 1 (run kafka)
下載 kafka
執行 zookeeper server :
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
執行 kafka server :
./kafka-server-start.sh ../config/server.properties
使用 Single Node-Single Broker Configuration 測試
create topic Hello-kafka :
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-kafka
Created topic "Hello-kafka".
created a topic named “Hello-kafka” with a single partition and one replica factor.
list topics
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
Hello-kafka
Start Producer to Send Messages
Config/server.properties
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-kafka
>Hello
>My first Message
>My second Message
>
Start Consumer to Receive Messages(offset 存在 zookeeper)
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic Hello-kafka --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Hello
My first Message
My second Message
delete a topic
[cloudera@quickstart kafka_2.11-1.0.0]$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Topic Hello-kafka is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
cloudera manager zookeeper 設定檔位置 :
/etc/zookeeper/conf/zoo.cfg
cloudera manager 安裝好 kafka 後,指令的位置 :
[root@daniel-3-test-slave1 local]# ll /usr/bin | grep kafka
lrwxrwxrwx 1 root root 43 May 23 13:33 kafka-broker-api-versions -> /etc/alternatives/kafka-broker-api-versions
lrwxrwxrwx 1 root root 30 May 23 13:33 kafka-config -> /etc/alternatives/kafka-config
lrwxrwxrwx 1 root root 40 May 23 13:33 kafka-console-consumer -> /etc/alternatives/kafka-console-consumer
lrwxrwxrwx 1 root root 40 May 23 13:33 kafka-console-producer -> /etc/alternatives/kafka-console-producer
lrwxrwxrwx 1 root root 39 May 23 13:33 kafka-consumer-groups -> /etc/alternatives/kafka-consumer-groups
lrwxrwxrwx 1 root root 47 May 23 13:33 kafka-consumer-offset-checker -> /etc/alternatives/kafka-consumer-offset-checker
lrwxrwxrwx 1 root root 42 May 23 13:33 kafka-consumer-perf-test -> /etc/alternatives/kafka-consumer-perf-test
lrwxrwxrwx 1 root root 50 May 23 13:33 kafka-preferred-replica-election -> /etc/alternatives/kafka-preferred-replica-election
lrwxrwxrwx 1 root root 42 May 23 13:33 kafka-producer-perf-test -> /etc/alternatives/kafka-producer-perf-test
lrwxrwxrwx 1 root root 43 May 23 13:33 kafka-reassign-partitions -> /etc/alternatives/kafka-reassign-partitions
lrwxrwxrwx 1 root root 33 May 23 13:33 kafka-run-class -> /etc/alternatives/kafka-run-class
lrwxrwxrwx 1 root root 30 May 23 13:33 kafka-sentry -> /etc/alternatives/kafka-sentry
lrwxrwxrwx 1 root root 30 May 23 13:33 kafka-topics -> /etc/alternatives/kafka-topics
[root@daniel-3-test-slave1 local]# ll /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/bin/
total 52
-rwxr-xr-x 1 root root 684 Oct 6 2017 kafka-broker-api-versions
-rwxr-xr-x 1 root root 671 Oct 6 2017 kafka-config
-rwxr-xr-x 1 root root 681 Oct 6 2017 kafka-console-consumer
-rwxr-xr-x 1 root root 681 Oct 6 2017 kafka-console-producer
-rwxr-xr-x 1 root root 680 Oct 6 2017 kafka-consumer-groups
-rwxr-xr-x 1 root root 688 Oct 6 2017 kafka-consumer-offset-checker
-rwxr-xr-x 1 root root 683 Oct 6 2017 kafka-consumer-perf-test
-rwxr-xr-x 1 root root 691 Oct 6 2017 kafka-preferred-replica-election
-rwxr-xr-x 1 root root 683 Oct 6 2017 kafka-producer-perf-test
-rwxr-xr-x 1 root root 684 Oct 6 2017 kafka-reassign-partitions
-rwxr-xr-x 1 root root 674 Oct 6 2017 kafka-run-class
-rwxr-xr-x 1 root root 671 Oct 6 2017 kafka-sentry
-rwxr-xr-x 1 root root 671 Oct 6 2017 kafka-topics
這裡也有,kafka-connect 的 sh 也在這 :
[root@daniel-3-test-master1 ~]# ll /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin
total 124
-rwxr-xr-x 1 root root 1335 Oct 6 2017 connect-distributed.sh
-rwxr-xr-x 1 root root 1332 Oct 6 2017 connect-standalone.sh
-rwxr-xr-x 1 root root 861 Oct 6 2017 kafka-acls.sh
-rwxr-xr-x 1 root root 873 Oct 6 2017 kafka-broker-api-versions.sh
-rwxr-xr-x 1 root root 864 Oct 6 2017 kafka-configs.sh
-rwxr-xr-x 1 root root 945 Oct 6 2017 kafka-console-consumer.sh
-rwxr-xr-x 1 root root 944 Oct 6 2017 kafka-console-producer.sh
-rwxr-xr-x 1 root root 871 Oct 6 2017 kafka-consumer-groups.sh
-rwxr-xr-x 1 root root 872 Oct 6 2017 kafka-consumer-offset-checker.sh
-rwxr-xr-x 1 root root 948 Oct 6 2017 kafka-consumer-perf-test.sh
-rwxr-xr-x 1 root root 869 Oct 6 2017 kafka-delete-records.sh
-rwxr-xr-x 1 root root 862 Oct 6 2017 kafka-mirror-maker.sh
-rwxr-xr-x 1 root root 886 Oct 6 2017 kafka-preferred-replica-election.sh
-rwxr-xr-x 1 root root 959 Oct 6 2017 kafka-producer-perf-test.sh
-rwxr-xr-x 1 root root 874 Oct 6 2017 kafka-reassign-partitions.sh
-rwxr-xr-x 1 root root 868 Oct 6 2017 kafka-replay-log-producer.sh
-rwxr-xr-x 1 root root 874 Oct 6 2017 kafka-replica-verification.sh
-rwxr-xr-x 1 root root 7027 Oct 6 2017 kafka-run-class.sh
-rwxr-xr-x 1 root root 1961 Oct 6 2017 kafka-sentry.sh
-rwxr-xr-x 1 root root 1394 Oct 6 2017 kafka-server-start.sh
-rwxr-xr-x 1 root root 975 Oct 6 2017 kafka-server-stop.sh
-rwxr-xr-x 1 root root 870 Oct 6 2017 kafka-simple-consumer-shell.sh
-rwxr-xr-x 1 root root 945 Oct 6 2017 kafka-streams-application-reset.sh
-rwxr-xr-x 1 root root 863 Oct 6 2017 kafka-topics.sh
-rwxr-xr-x 1 root root 958 Oct 6 2017 kafka-verifiable-consumer.sh
-rwxr-xr-x 1 root root 958 Oct 6 2017 kafka-verifiable-producer.sh
-rwxr-xr-x 1 root root 867 Oct 6 2017 zookeeper-security-migration.sh
-rwxr-xr-x 1 root root 1393 Oct 6 2017 zookeeper-server-start.sh
-rwxr-xr-x 1 root root 978 Oct 6 2017 zookeeper-server-stop.sh
-rwxr-xr-x 1 root root 968 Oct 6 2017 zookeeper-shell.sh
測試指令 :
kafka-topics --zookeeper 192.168.61.105:2181 --list
kafka-topics --create --zookeeper 192.168.61.105:2181 --replication-factor 1 --partitions 1 --topic daniel-topic-1
kafka-topics --describe --zookeeper 192.168.61.105:2181 --topic daniel-topic-1
kafka-console-producer --broker-list 192.168.61.105:9092 --topic daniel-topic-1
kafka-console-consumer --bootstrap-server 192.168.61.105:9092 --topic daniel-topic-1 --from-beginning
java 程式
Producer :
package com.mykafka.task;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class MyMessageProducer {
private final static String TOPIC = "daniel-topic-1";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "34");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> procuder = new KafkaProducer<String, String>(props);
for (int i = 1; i <= 3; i++) {
String value = "value3_" + i;
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(TOPIC, value);
System.out.println("send message : " + value);
procuder.send(msg);
}
List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
partitions = procuder.partitionsFor(TOPIC);
for (PartitionInfo p : partitions) {
System.out.println(p);
}
System.out.println("send message over.");
procuder.close(100, TimeUnit.MILLISECONDS);
}
}
Consumer :
package com.mykafka.task;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class MyMessageConsumer {
private final static String TOPIC = "daniel-topic-1";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "daniel-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(java.util.Arrays.asList(TOPIC));
// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
//從最前面開始讀取
consumer.seekToBeginning(consumer.assignment());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Broker 參數設定 :
- broker.id 每個 broker 都會有一個 id
- auto.create.topics.enable
如果設定 true,有三種情況下會自動建立 topic :- When a producer starts writing messages to the topic
- When a consumer starts reading messages from the topic
- When any client requests metadata for the topic
Topic 參數設定 :
- num.partitions
決定當建立 topic 時 partition 要給多少個.
原本設1時,自動建立topic時只會有1個partition :
改為3時,自動建立topic時只會有3個partition :
下指令建立時指定 parition 為 5 :
[root@daniel-3-test-slave1 local]# kafka-topics --create --zookeeper 192.168.61.105:2181 --replication-factor 1 --partitions 5 --topic daniel-topic-4
Log會看到還是會建立 5 個 partitions 所以 num.partitions 是當系統 auto create 時會被影響 :
18/05/25 08:28:35 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
18/05/25 08:28:35 INFO admin.AdminUtils$: Topic creation {"version":1,"partitions":{"4":[34],"1":[34],"0":[34],"2":[34],"3":[34]}}
Created topic "daniel-topic-4".
Keep in mind that the number of partitions for a topic can only be increased, never decreased. This means that if a topic needs to have fewer partitions than num.partitions, care will need to be taken to manually create the topic.
-
log.retention.ms kafka 保留 message 多久
-
log.retention.bytes kafka 保留 message 可以到多大,假設設定 1 G 則等於 1 G * partition 個數(假設8)則等於可以保留 8G.-1等於沒限制.
如果有設定 log.retention.ms 及 og.retention.bytes 只要符合其中一個項目 message 就會被 delete.