kafka day 7 (kafka connector demo)
Connectors and Tasks
Connectors 有分兩個層面 SourceConnectors 與 SinkConnectors :
-
SourceConnectors : which import data from another system
-
SinkConnectors : which export data to another system
-
例如: DB -> JDBCSourceConnector -> kafkaf -> HDFSSinkConnector -> HDFS
Connectors 主要是負責將一個 job 分成多個 Task 分散在 Kafka Connect workers
Tasks 也分兩種 SourceTask 與 SinkTask.
- 實作 Connector 時 : 1.can monitor the data changes of external systems 2.request task reconfiguration
當有一份資料時,Task 需要從 kafka copy.connector copy 該 data 時必須成為一個 partitioned stream. 而每個 partition stream 都會有一個 offset 來代表.
有時候這種對應很清楚,例如資料來源如果是很多 log file,每個 log file 可以被切為一個 patition. 而 file 裡的每一行資料可以當成一筆 record 行數就是 offsets.
另一個例子 JDBC connector 可以把一個 table 當作一個 partition. 但這時候要定義 offset 就比較難,可以用一個 timestamp 的欄位當作 offset 判斷是否產生新的資料.
Partitions and Records
partition 是一個有順序的 sequence 存放 key-value 的 records. 而 key 和 value 可以是複雜的結構(by org.apache.kafka.connect.data). 許多 primitive types 或 arrays, structs, nested data structures 都有 supported.
為了能夠解析每筆 records,Schemas 也許會包含了每筆 record.(SchemaBuilder class)
runtime 時的 data format 不需要特別的 serialization format. 會有一個 Converter 介於 org.apache.kafka.connect.data runtime format 和 serialized data 成 byte[].
除了 key 和 value 之外,record 還會有 partition IDs 和 offsets. partition IDs 和 offsets 會由 framework 定期的 commit offsets 當資料已經被處理. 所以當有錯誤發生時可以回復到最後 commit 的 offset 避免資料重複被處理.
connector standalone demo
這邊是用 CDH 的 KAFKA 做 demo :
cd /opt/cloudera/parcels/KAFKA/lib/kafka
會看到下列目錄結構
[root@daniel-3-test-master1 kafka]# ll
total 64
-rwxr-xr-x 1 root root 28824 Oct 6 2017 LICENSE
-rwxr-xr-x 1 root root 336 Oct 6 2017 NOTICE
drwxr-xr-x 2 root root 4096 Jul 11 15:54 bin
drwxr-xr-x 2 root root 4096 Oct 6 2017 cloudera
lrwxrwxrwx 1 root root 15 Oct 6 2017 config -> /etc/kafka/conf
drwxr-xr-x 2 root root 12288 Oct 6 2017 libs
drwxr-xr-x 2 root root 4096 Jul 11 15:21 logs
drwxr-xr-x 2 root root 4096 Oct 6 2017 site-docs
connect 啟動的 shell 在 bin 資料夾底下
[root@daniel-3-test-master1 kafka]# ll bin | grep connect
-rwxr-xr-x 1 root root 1335 Oct 6 2017 connect-distributed.sh
-rwxr-xr-x 1 root root 1332 Oct 6 2017 connect-standalone.sh
接著在 config 底下增加下列 4 個檔案
[root@daniel-3-test-master1 kafka]# ll config/ | grep connect
-rw-r--r-- 1 root root 300 Jul 11 15:34 connect-log4j.properties
-rw-r--r-- 1 root root 132 Jul 11 16:54 connect-sink.properties
-rw-r--r-- 1 root root 130 Jul 11 16:41 connect-source.properties
-rw-r--r-- 1 root root 540 Jul 11 15:59 connect-standalone.properties
可參考 kafka 提供的 config sample :
[root@daniel-3-test-master1 /]# cd /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/etc/kafka/conf.dist
[root@daniel-3-test-master1 conf.dist]# ll
total 52
-rw-r--r-- 1 root root 906 Oct 6 2017 connect-console-sink.properties
-rw-r--r-- 1 root root 909 Oct 6 2017 connect-console-source.properties
-rw-r--r-- 1 root root 5807 Oct 6 2017 connect-distributed.properties
-rw-r--r-- 1 root root 883 Oct 6 2017 connect-file-sink.properties
-rw-r--r-- 1 root root 881 Oct 6 2017 connect-file-source.properties
-rw-r--r-- 1 root root 1111 Oct 6 2017 connect-log4j.properties
-rw-r--r-- 1 root root 2730 Oct 6 2017 connect-standalone.properties
-rw-r--r-- 1 root root 4696 Oct 6 2017 log4j.properties
-rw-r--r-- 1 root root 6954 Oct 6 2017 server.properties
-rw-r--r-- 1 root root 1032 Oct 6 2017 tools-log4j.properties
connect-log4j.properties :
root.logger=INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
connect-standalone.properties :
bootstrap.servers=daniel-3-test-master1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connect-source.properties :
name=local-file-source
connector.class=FileStreamSourceConnector
tasks.max=1
file=/tmp/connectorfile/test.txt
topic=connect-test
connect-sink.properties
name=local-file-sink
connector.class=FileStreamSinkConnector
tasks.max=1
file=/tmp/connectorfile/test.sink.txt
topics=connect-test
接著啟動 connect-standalone.sh
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-source.properties config/connect-sink.properties
啟動後可以用下列指令看 connect-test topic 是否被建立起來
./bin/kafka-topics.sh --list --zookeeper localhost:2181
然後在啟動一個 kafka-console-consumer.sh 來取得 connect-test 的每筆 record
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic connect-test --from-beginning
再到 /tmp/connectorfile 底下
cd /tmp/connectorfile
新增資料到 test.txt
echo "aaa" >> test.txt
echo "bbb" >> test.txt
echo "ccc" >> test.txt
kafka-console-consumer.sh 會看到下列訊息
18/07/11 17:22:39 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-console-consumer-9532_daniel-3-test-master1-1531300958401-ae423226-0-34]: Starting
18/07/11 17:22:39 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1531300958446] Added fetcher for partitions ArrayBuffer([connect-test-2, initOffset -1 to broker BrokerEndPoint(34,daniel-3-test-master1.is-land.taipei,9092)] , [connect-test-1, initOffset -1 to broker BrokerEndPoint(34,daniel-3-test-master1.is-land.taipei,9092)] , [connect-test-0, initOffset -1 to broker BrokerEndPoint(34,daniel-3-test-master1.is-land.taipei,9092)] )
{"schema":{"type":"string","optional":false},"payload":"aaa"}
{"schema":{"type":"string","optional":false},"payload":"bbb"}
{"schema":{"type":"string","optional":false},"payload":"ccc"}
而 test.sink.txt 會新增下列資料
[root@daniel-3-test-master1 connectorfile]# cat test.sink.txt
aaa
bbb
ccc
如果將 ccc 改成 ccc_update
[root@daniel-3-test-master1 connectorfile]# cat test.txt
aaa
bbb
ccc_update
topic 會在新加一筆資料 update
{"schema":{"type":"string","optional":false},"payload":"aaa"}
{"schema":{"type":"string","optional":false},"payload":"bbb"}
{"schema":{"type":"string","optional":false},"payload":"ccc"}
{"schema":{"type":"string","optional":false},"payload":"update"}
而 test.sink.txt 的內容就會是
[root@daniel-3-test-master1 connectorfile]# cat test.sink.txt
aaa
bbb
ccc
update
接著把 test.txt 裡的 ccc_update 刪掉
[root@daniel-3-test-master1 connectorfile]# cat test.txt
aaa
bbb
connector 並不會有動作
[root@daniel-3-test-master1 connectorfile]# cat test.sink.txt
aaa
bbb
ccc
update
接著再繼續新增資料
echo "ddd" >> test.txt
echo "eee" >> test.txt
echo "fff" >> test.txt
echo "ggg" >> test.txt
這時候資料順序就會亂掉了
[root@daniel-3-test-master1 connectorfile]# cat test.sink.txt
aaa
bbb
ccc
update
ggg
而 topic 的內容會是 :
{"schema":{"type":"string","optional":false},"payload":"aaa"}
{"schema":{"type":"string","optional":false},"payload":"bbb"}
{"schema":{"type":"string","optional":false},"payload":"ccc"}
{"schema":{"type":"string","optional":false},"payload":"update"}{"schema":{"type":"string","optional":false},"payload":""}
{"schema":{"type":"string","optional":false},"payload":"ggg"}
之前的動作是新增 a,b,c,c_update,d,e,f,g,接著把 topic 清掉 檔案也都清空,但不刪 /tmp/connect.offsets 這檔案. 然後這次新增 h,i,j,k,l,m,n,o,p.
[root@daniel-3-test-master1 connectorfile]# echo "hhh" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "iii" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "jjj" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "kkk" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "lll" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "mmm" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "nnn" >> test.txt
[root@daniel-3-test-master1 connectorfile]# echo "ooo" >> test.txt
然後發現新增到 n 開始就會開始寫入.所以可以加設之前的 offset 是記錄到 a,b,c,d,e,f,g 第 7 個.
[root@daniel-3-test-master1 connectorfile]# cat test.sink.txt
nnn
ooo
而 topic 的內容會是 :
{"schema":{"type":"string","optional":false},"payload":"nnn"}
{"schema":{"type":"string","optional":false},"payload":"ooo"}
- 所以結論是 connector 的 offsets 會存在根據 connect-standalone.properties 裡的 offset.storage.file.filename 存放,所以想整個重新測試時,該檔案記得要刪掉.
否則 topic 還是會記得之前的 offset.這時候檔案重新新增一行由於之前的 offset 已經存在所以不會有動作,除非有增加到超過舊的 offset 比如說之前新增過 3 筆,
這時候重新測試新增前3筆都不會有動作,要到第 4 筆才會.但這些只是目前實驗的結果,實際還是要深入看 source code 才會知道.
rm -f /tmp/connect.offsets
KAFKA REST API
- http://192.168.61.105:8083/connectors
connector distributed demo
建立 topic :
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 1 --partitions 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 1 --partitions 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-status --replication-factor 1 --partitions 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --list --zookeeper localhost:2181
修改 connect-distributed.properties :
bootstrap.servers=daniel-3-test-master1:9092
啟動 connect-distributed
./bin/connect-distributed.sh ./config/connect-distributed.properties
config/connect-distributed.properties 內容
bootstrap.servers=daniel-3-test-master1:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
使用 rest api 建立 connector
- 建立 source connector
curl -X POST -H "Content-Type: application/json" --data '{"name": "distributed-local-file-source", "config": {"connector.class":"FileStreamSourceConnector", "tasks.max":"3", "file":"/tmp/connectorfile/test.txt", "topic":"connect-test" }}' http://localhost:8083/connectors
- 建立 sink connector
curl -X POST -H "Content-Type: application/json" --data '{"name": "distributed-local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"3", "file":"/tmp/connectorfile/test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
- delete connector
curl -X DELETE localhost:8083/connectors/distributed-local-file-source
curl -X DELETE localhost:8083/connectors/distributed-local-file-sink
- 一些指令
./bin/kafka-topics.sh --list --zookeeper localhost:2181
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic connect-test
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic connect-test --from-beginning
參考資料
connect devguide
connect managing
kafka github
kafka-connect-jdbc