下載 kafka

執行 zookeeper server :

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

執行 kafka server :

./kafka-server-start.sh ../config/server.properties

使用 Single Node-Single Broker Configuration 測試

create topic Hello-kafka :

[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-kafka
Created topic "Hello-kafka".

created a topic named “Hello-kafka” with a single partition and one replica factor.

list topics

[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
Hello-kafka

Start Producer to Send Messages

Config/server.properties

[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-kafka
>Hello
>My first Message
>My second Message
>

Start Consumer to Receive Messages(offset 存在 zookeeper)

[cloudera@quickstart kafka_2.11-1.0.0]$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic Hello-kafka --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Hello
My first Message
My second Message

delete a topic

[cloudera@quickstart kafka_2.11-1.0.0]$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Topic Hello-kafka is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

kafka_day1_1.jpg

cloudera manager zookeeper 設定檔位置 :

/etc/zookeeper/conf/zoo.cfg

cloudera manager 安裝好 kafka 後,指令的位置 :

[root@daniel-3-test-slave1 local]# ll /usr/bin | grep kafka
lrwxrwxrwx 1 root root        43 May 23 13:33 kafka-broker-api-versions -> /etc/alternatives/kafka-broker-api-versions
lrwxrwxrwx 1 root root        30 May 23 13:33 kafka-config -> /etc/alternatives/kafka-config
lrwxrwxrwx 1 root root        40 May 23 13:33 kafka-console-consumer -> /etc/alternatives/kafka-console-consumer
lrwxrwxrwx 1 root root        40 May 23 13:33 kafka-console-producer -> /etc/alternatives/kafka-console-producer
lrwxrwxrwx 1 root root        39 May 23 13:33 kafka-consumer-groups -> /etc/alternatives/kafka-consumer-groups
lrwxrwxrwx 1 root root        47 May 23 13:33 kafka-consumer-offset-checker -> /etc/alternatives/kafka-consumer-offset-checker
lrwxrwxrwx 1 root root        42 May 23 13:33 kafka-consumer-perf-test -> /etc/alternatives/kafka-consumer-perf-test
lrwxrwxrwx 1 root root        50 May 23 13:33 kafka-preferred-replica-election -> /etc/alternatives/kafka-preferred-replica-election
lrwxrwxrwx 1 root root        42 May 23 13:33 kafka-producer-perf-test -> /etc/alternatives/kafka-producer-perf-test
lrwxrwxrwx 1 root root        43 May 23 13:33 kafka-reassign-partitions -> /etc/alternatives/kafka-reassign-partitions
lrwxrwxrwx 1 root root        33 May 23 13:33 kafka-run-class -> /etc/alternatives/kafka-run-class
lrwxrwxrwx 1 root root        30 May 23 13:33 kafka-sentry -> /etc/alternatives/kafka-sentry
lrwxrwxrwx 1 root root        30 May 23 13:33 kafka-topics -> /etc/alternatives/kafka-topics
[root@daniel-3-test-slave1 local]# ll /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/bin/
total 52
-rwxr-xr-x 1 root root 684 Oct  6  2017 kafka-broker-api-versions
-rwxr-xr-x 1 root root 671 Oct  6  2017 kafka-config
-rwxr-xr-x 1 root root 681 Oct  6  2017 kafka-console-consumer
-rwxr-xr-x 1 root root 681 Oct  6  2017 kafka-console-producer
-rwxr-xr-x 1 root root 680 Oct  6  2017 kafka-consumer-groups
-rwxr-xr-x 1 root root 688 Oct  6  2017 kafka-consumer-offset-checker
-rwxr-xr-x 1 root root 683 Oct  6  2017 kafka-consumer-perf-test
-rwxr-xr-x 1 root root 691 Oct  6  2017 kafka-preferred-replica-election
-rwxr-xr-x 1 root root 683 Oct  6  2017 kafka-producer-perf-test
-rwxr-xr-x 1 root root 684 Oct  6  2017 kafka-reassign-partitions
-rwxr-xr-x 1 root root 674 Oct  6  2017 kafka-run-class
-rwxr-xr-x 1 root root 671 Oct  6  2017 kafka-sentry
-rwxr-xr-x 1 root root 671 Oct  6  2017 kafka-topics

這裡也有,kafka-connect 的 sh 也在這 :

[root@daniel-3-test-master1 ~]# ll /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin
total 124
-rwxr-xr-x 1 root root 1335 Oct  6  2017 connect-distributed.sh
-rwxr-xr-x 1 root root 1332 Oct  6  2017 connect-standalone.sh
-rwxr-xr-x 1 root root  861 Oct  6  2017 kafka-acls.sh
-rwxr-xr-x 1 root root  873 Oct  6  2017 kafka-broker-api-versions.sh
-rwxr-xr-x 1 root root  864 Oct  6  2017 kafka-configs.sh
-rwxr-xr-x 1 root root  945 Oct  6  2017 kafka-console-consumer.sh
-rwxr-xr-x 1 root root  944 Oct  6  2017 kafka-console-producer.sh
-rwxr-xr-x 1 root root  871 Oct  6  2017 kafka-consumer-groups.sh
-rwxr-xr-x 1 root root  872 Oct  6  2017 kafka-consumer-offset-checker.sh
-rwxr-xr-x 1 root root  948 Oct  6  2017 kafka-consumer-perf-test.sh
-rwxr-xr-x 1 root root  869 Oct  6  2017 kafka-delete-records.sh
-rwxr-xr-x 1 root root  862 Oct  6  2017 kafka-mirror-maker.sh
-rwxr-xr-x 1 root root  886 Oct  6  2017 kafka-preferred-replica-election.sh
-rwxr-xr-x 1 root root  959 Oct  6  2017 kafka-producer-perf-test.sh
-rwxr-xr-x 1 root root  874 Oct  6  2017 kafka-reassign-partitions.sh
-rwxr-xr-x 1 root root  868 Oct  6  2017 kafka-replay-log-producer.sh
-rwxr-xr-x 1 root root  874 Oct  6  2017 kafka-replica-verification.sh
-rwxr-xr-x 1 root root 7027 Oct  6  2017 kafka-run-class.sh
-rwxr-xr-x 1 root root 1961 Oct  6  2017 kafka-sentry.sh
-rwxr-xr-x 1 root root 1394 Oct  6  2017 kafka-server-start.sh
-rwxr-xr-x 1 root root  975 Oct  6  2017 kafka-server-stop.sh
-rwxr-xr-x 1 root root  870 Oct  6  2017 kafka-simple-consumer-shell.sh
-rwxr-xr-x 1 root root  945 Oct  6  2017 kafka-streams-application-reset.sh
-rwxr-xr-x 1 root root  863 Oct  6  2017 kafka-topics.sh
-rwxr-xr-x 1 root root  958 Oct  6  2017 kafka-verifiable-consumer.sh
-rwxr-xr-x 1 root root  958 Oct  6  2017 kafka-verifiable-producer.sh
-rwxr-xr-x 1 root root  867 Oct  6  2017 zookeeper-security-migration.sh
-rwxr-xr-x 1 root root 1393 Oct  6  2017 zookeeper-server-start.sh
-rwxr-xr-x 1 root root  978 Oct  6  2017 zookeeper-server-stop.sh
-rwxr-xr-x 1 root root  968 Oct  6  2017 zookeeper-shell.sh

測試指令 :

kafka-topics --zookeeper 192.168.61.105:2181 --list
kafka-topics --create --zookeeper 192.168.61.105:2181 --replication-factor 1 --partitions 1 --topic daniel-topic-1
kafka-topics --describe --zookeeper 192.168.61.105:2181 --topic daniel-topic-1
kafka-console-producer --broker-list 192.168.61.105:9092 --topic daniel-topic-1
kafka-console-consumer --bootstrap-server 192.168.61.105:9092 --topic daniel-topic-1 --from-beginning

java 程式

Producer :

package com.mykafka.task;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyMessageProducer {

	private final static String TOPIC = "daniel-topic-1";

	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "34");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		Producer<String, String> procuder = new KafkaProducer<String, String>(props);
		for (int i = 1; i <= 3; i++) {
			String value = "value3_" + i;
			ProducerRecord<String, String> msg = new ProducerRecord<String, String>(TOPIC, value);
			System.out.println("send message : " + value);
			procuder.send(msg);
		}
		List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
		partitions = procuder.partitionsFor(TOPIC);
		for (PartitionInfo p : partitions) {
			System.out.println(p);
		}
		System.out.println("send message over.");
		procuder.close(100, TimeUnit.MILLISECONDS);
	}
}

Consumer :

package com.mykafka.task;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class MyMessageConsumer {

	private final static String TOPIC = "daniel-topic-1";

	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.61.105:9092");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "daniel-consumer-group");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(java.util.Arrays.asList(TOPIC));
        // At this point, there is no heartbeat from consumer so seekToBeinning() wont work
        // So call poll()
        consumer.poll(0);
        //從最前面開始讀取
        consumer.seekToBeginning(consumer.assignment());
        while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

Broker 參數設定 :

  • broker.id 每個 broker 都會有一個 id
  • auto.create.topics.enable
    如果設定 true,有三種情況下會自動建立 topic :
    1. When a producer starts writing messages to the topic
    2. When a consumer starts reading messages from the topic
    3. When any client requests metadata for the topic

Topic 參數設定 :

  • num.partitions 決定當建立 topic 時 partition 要給多少個. kafka_day1_2.jpg 原本設1時,自動建立topic時只會有1個partition :
    kafka_day1_3.jpg 改為3時,自動建立topic時只會有3個partition :
    kafka_day1_4.jpg

下指令建立時指定 parition 為 5 :

[root@daniel-3-test-slave1 local]# kafka-topics --create --zookeeper 192.168.61.105:2181 --replication-factor 1 --partitions 5 --topic daniel-topic-4

Log會看到還是會建立 5 個 partitions 所以 num.partitions 是當系統 auto create 時會被影響 :

18/05/25 08:28:35 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
18/05/25 08:28:35 INFO admin.AdminUtils$: Topic creation {"version":1,"partitions":{"4":[34],"1":[34],"0":[34],"2":[34],"3":[34]}}
Created topic "daniel-topic-4".

Keep in mind that the number of partitions for a topic can only be increased, never decreased. This means that if a topic needs to have fewer partitions than num.partitions, care will need to be taken to manually create the topic.

  • log.retention.ms kafka 保留 message 多久 kafka_day1_5.jpg

  • log.retention.bytes kafka 保留 message 可以到多大,假設設定 1 G 則等於 1 G * partition 個數(假設8)則等於可以保留 8G.-1等於沒限制. kafka_day1_6.jpg

如果有設定 log.retention.ms 及 og.retention.bytes 只要符合其中一個項目 message 就會被 delete.