kafka day 1 (run kafka)
下載 kafka
執行 zookeeper server :
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
執行 kafka server :
./kafka-server-start.sh ../config/server.properties
使用 Single Node-Single Broker Configuration 測試
create topic Hello-kafka :
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-kafka
Created topic "Hello-kafka".
created a topic named “Hello-kafka” with a single partition and one replica factor.
list topics
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
Hello-kafka
Start Producer to Send Messages
Config/server.properties
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-kafka
>Hello
>My first Message
>My second Message
>
Start Consumer to Receive Messages(offset 存在 zookeeper)
[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic Hello-kafka --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Hello
My first Message
My second Message
delete a topic
[cloudera@quickstart kafka_2.11-1.0.0]$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Topic Hello-kafka is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

cloudera manager zookeeper 設定檔位置 :
/etc/zookeeper/conf/zoo.cfg
cloudera manager 安裝好 kafka 後,指令的位置 :
[root@daniel-3-test-slave1 local]# ll /usr/bin | grep kafka
lrwxrwxrwx 1 root root        43 May 23 13:33 kafka-broker-api-versions -> /etc/alternatives/kafka-broker-api-versions
lrwxrwxrwx 1 root root        30 May 23 13:33 kafka-config -> /etc/alternatives/kafka-config
lrwxrwxrwx 1 root root        40 May 23 13:33 kafka-console-consumer -> /etc/alternatives/kafka-console-consumer
lrwxrwxrwx 1 root root        40 May 23 13:33 kafka-console-producer -> /etc/alternatives/kafka-console-producer
lrwxrwxrwx 1 root root        39 May 23 13:33 kafka-consumer-groups -> /etc/alternatives/kafka-consumer-groups
lrwxrwxrwx 1 root root        47 May 23 13:33 kafka-consumer-offset-checker -> /etc/alternatives/kafka-consumer-offset-checker
lrwxrwxrwx 1 root root        42 May 23 13:33 kafka-consumer-perf-test -> /etc/alternatives/kafka-consumer-perf-test
lrwxrwxrwx 1 root root        50 May 23 13:33 kafka-preferred-replica-election -> /etc/alternatives/kafka-preferred-replica-election
lrwxrwxrwx 1 root root        42 May 23 13:33 kafka-producer-perf-test -> /etc/alternatives/kafka-producer-perf-test
lrwxrwxrwx 1 root root        43 May 23 13:33 kafka-reassign-partitions -> /etc/alternatives/kafka-reassign-partitions
lrwxrwxrwx 1 root root        33 May 23 13:33 kafka-run-class -> /etc/alternatives/kafka-run-class
lrwxrwxrwx 1 root root        30 May 23 13:33 kafka-sentry -> /etc/alternatives/kafka-sentry
lrwxrwxrwx 1 root root        30 May 23 13:33 kafka-topics -> /etc/alternatives/kafka-topics
[root@daniel-3-test-slave1 local]# ll /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/bin/
total 52
-rwxr-xr-x 1 root root 684 Oct  6  2017 kafka-broker-api-versions
-rwxr-xr-x 1 root root 671 Oct  6  2017 kafka-config
-rwxr-xr-x 1 root root 681 Oct  6  2017 kafka-console-consumer
-rwxr-xr-x 1 root root 681 Oct  6  2017 kafka-console-producer
-rwxr-xr-x 1 root root 680 Oct  6  2017 kafka-consumer-groups
-rwxr-xr-x 1 root root 688 Oct  6  2017 kafka-consumer-offset-checker
-rwxr-xr-x 1 root root 683 Oct  6  2017 kafka-consumer-perf-test
-rwxr-xr-x 1 root root 691 Oct  6  2017 kafka-preferred-replica-election
-rwxr-xr-x 1 root root 683 Oct  6  2017 kafka-producer-perf-test
-rwxr-xr-x 1 root root 684 Oct  6  2017 kafka-reassign-partitions
-rwxr-xr-x 1 root root 674 Oct  6  2017 kafka-run-class
-rwxr-xr-x 1 root root 671 Oct  6  2017 kafka-sentry
-rwxr-xr-x 1 root root 671 Oct  6  2017 kafka-topics
這裡也有,kafka-connect 的 sh 也在這 :
[root@daniel-3-test-master1 ~]# ll /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin
total 124
-rwxr-xr-x 1 root root 1335 Oct  6  2017 connect-distributed.sh
-rwxr-xr-x 1 root root 1332 Oct  6  2017 connect-standalone.sh
-rwxr-xr-x 1 root root  861 Oct  6  2017 kafka-acls.sh
-rwxr-xr-x 1 root root  873 Oct  6  2017 kafka-broker-api-versions.sh
-rwxr-xr-x 1 root root  864 Oct  6  2017 kafka-configs.sh
-rwxr-xr-x 1 root root  945 Oct  6  2017 kafka-console-consumer.sh
-rwxr-xr-x 1 root root  944 Oct  6  2017 kafka-console-producer.sh
-rwxr-xr-x 1 root root  871 Oct  6  2017 kafka-consumer-groups.sh
-rwxr-xr-x 1 root root  872 Oct  6  2017 kafka-consumer-offset-checker.sh
-rwxr-xr-x 1 root root  948 Oct  6  2017 kafka-consumer-perf-test.sh
-rwxr-xr-x 1 root root  869 Oct  6  2017 kafka-delete-records.sh
-rwxr-xr-x 1 root root  862 Oct  6  2017 kafka-mirror-maker.sh
-rwxr-xr-x 1 root root  886 Oct  6  2017 kafka-preferred-replica-election.sh
-rwxr-xr-x 1 root root  959 Oct  6  2017 kafka-producer-perf-test.sh
-rwxr-xr-x 1 root root  874 Oct  6  2017 kafka-reassign-partitions.sh
-rwxr-xr-x 1 root root  868 Oct  6  2017 kafka-replay-log-producer.sh
-rwxr-xr-x 1 root root  874 Oct  6  2017 kafka-replica-verification.sh
-rwxr-xr-x 1 root root 7027 Oct  6  2017 kafka-run-class.sh
-rwxr-xr-x 1 root root 1961 Oct  6  2017 kafka-sentry.sh
-rwxr-xr-x 1 root root 1394 Oct  6  2017 kafka-server-start.sh
-rwxr-xr-x 1 root root  975 Oct  6  2017 kafka-server-stop.sh
-rwxr-xr-x 1 root root  870 Oct  6  2017 kafka-simple-consumer-shell.sh
-rwxr-xr-x 1 root root  945 Oct  6  2017 kafka-streams-application-reset.sh
-rwxr-xr-x 1 root root  863 Oct  6  2017 kafka-topics.sh
-rwxr-xr-x 1 root root  958 Oct  6  2017 kafka-verifiable-consumer.sh
-rwxr-xr-x 1 root root  958 Oct  6  2017 kafka-verifiable-producer.sh
-rwxr-xr-x 1 root root  867 Oct  6  2017 zookeeper-security-migration.sh
-rwxr-xr-x 1 root root 1393 Oct  6  2017 zookeeper-server-start.sh
-rwxr-xr-x 1 root root  978 Oct  6  2017 zookeeper-server-stop.sh
-rwxr-xr-x 1 root root  968 Oct  6  2017 zookeeper-shell.sh
測試指令 :
kafka-topics --zookeeper 192.168.61.105:2181 --list
kafka-topics --create --zookeeper 192.168.61.105:2181 --replication-factor 1 --partitions 1 --topic daniel-topic-1
kafka-topics --describe --zookeeper 192.168.61.105:2181 --topic daniel-topic-1
kafka-console-producer --broker-list 192.168.61.105:9092 --topic daniel-topic-1
kafka-console-consumer --bootstrap-server 192.168.61.105:9092 --topic daniel-topic-1 --from-beginning
java 程式
Producer :
package com.mykafka.task;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class MyMessageProducer {
	private final static String TOPIC = "daniel-topic-1";
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "34");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		Producer<String, String> procuder = new KafkaProducer<String, String>(props);
		for (int i = 1; i <= 3; i++) {
			String value = "value3_" + i;
			ProducerRecord<String, String> msg = new ProducerRecord<String, String>(TOPIC, value);
			System.out.println("send message : " + value);
			procuder.send(msg);
		}
		List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
		partitions = procuder.partitionsFor(TOPIC);
		for (PartitionInfo p : partitions) {
			System.out.println(p);
		}
		System.out.println("send message over.");
		procuder.close(100, TimeUnit.MILLISECONDS);
	}
}
Consumer :
package com.mykafka.task;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class MyMessageConsumer {
	private final static String TOPIC = "daniel-topic-1";
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "daniel-consumer-group");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(java.util.Arrays.asList(TOPIC));
        // At this point, there is no heartbeat from consumer so seekToBeinning() wont work
        // So call poll()
        consumer.poll(0);
        //從最前面開始讀取
        consumer.seekToBeginning(consumer.assignment());
        while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
}
Broker 參數設定 :
- broker.id 每個 broker 都會有一個 id
- auto.create.topics.enable
 如果設定 true,有三種情況下會自動建立 topic :- When a producer starts writing messages to the topic
- When a consumer starts reading messages from the topic
- When any client requests metadata for the topic
 
Topic 參數設定 :
- num.partitions
決定當建立 topic 時 partition 要給多少個.
 原本設1時,自動建立topic時只會有1個partition : 原本設1時,自動建立topic時只會有1個partition :
  改為3時,自動建立topic時只會有3個partition : 改為3時,自動建立topic時只會有3個partition :
  
下指令建立時指定 parition 為 5 :
[root@daniel-3-test-slave1 local]# kafka-topics --create --zookeeper 192.168.61.105:2181 --replication-factor 1 --partitions 5 --topic daniel-topic-4
Log會看到還是會建立 5 個 partitions 所以 num.partitions 是當系統 auto create 時會被影響 :
18/05/25 08:28:35 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
18/05/25 08:28:35 INFO admin.AdminUtils$: Topic creation {"version":1,"partitions":{"4":[34],"1":[34],"0":[34],"2":[34],"3":[34]}}
Created topic "daniel-topic-4".
Keep in mind that the number of partitions for a topic can only be increased, never decreased. This means that if a topic needs to have fewer partitions than num.partitions, care will need to be taken to manually create the topic.
- 
    log.retention.ms kafka 保留 message 多久  
- 
    log.retention.bytes kafka 保留 message 可以到多大,假設設定 1 G 則等於 1 G * partition 個數(假設8)則等於可以保留 8G.-1等於沒限制.  
如果有設定 log.retention.ms 及 og.retention.bytes 只要符合其中一個項目 message 就會被 delete.