kafka on a mac
I went to Apache Site and pulled down the binary http://www-us.apache.org/dist/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz
Then unpacked it.
Start Zk
cd kafka_2.11-0.11.0.2 export JAVA_HOME=`/usr/libexec/java_home -v 1.8` bin/zookeeper-server-start.sh config/zookeeper.properties
Start kafka
cd kafka_2.11-0.11.0.2 bin/kafka-server-start.sh config/server.properties
Kafka create new topic
cd kafka_2.11-0.11.0.2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
or
bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 13 \ --topic my-topic
Kafka list topics
bin/kafka-topics.sh --list --zookeeper localhost:2181
Console Producer
To generate messages....
bin/kafka-console-producer.sh --broker-list localhost:9020 --topic test
Now type (and press return in this window)
enter 4+ messages
Console Consumer
From the beginning
bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic my-topic \ --from-beginning
Python Producer
To add lots of messages this is the code.
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') for n in range(100): producer.send('my-topic', "Message{} bla bla bla".format(n).encode())
Diagnosing Issues
- Check ZK
echo ruok | nc localhost 2181
You should see imok
- Check TO see if Kafka has connected to zk
echo dump | nc localhost 2181
And I see
Session Sets (3): 0 expire at Sun Dec 03 17:43:27 GST 2017: 0 expire at Sun Dec 03 17:43:30 GST 2017: 1 expire at Sun Dec 03 17:43:33 GST 2017: 0x1601c9a25190000 ephemeral nodes dump: Sessions with Ephemerals (1): 0x1601c9a25190000: /controller /brokers/ids/0
Or using zkshell
bin/zookeeper-shell.sh localhost:2181 <<< "ls /brokers/ids"
TO see more, info (especially the broker-list) do this
bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"
Good reference site
A good link is http://cloudurable.com/blog/kafka-tutorial-kafka-from-command-line/index.html
Production Kafka Testing
Create new Topic
bin/kafka-topics.sh --create --zookeeper phm01.eq.com:2181,phm02.eq.com:2181,phm03.eq.com:2181 --partitions 5 --replication-factor 1 --topic timtest
Created topic "timtest".
To Delete a topic try this
bin/kafka-topics.sh --delete --zookeeper phm01.eq.com:2181,phm02.eq.com:2181,phm03.eq.com:2181 --topic timtest --if-exists true
Producer
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='phm01.eq.com:6667') for n in range(10000): producer.send('timtest', "Message{} bla bla bla".format(n).encode())
This will add 10k test messages.
Consumer
from kafka import KafkaConsumer consumer = KafkaConsumer('timtest', bootstrap_servers='phm01.eq.com:6667', group_id="Con4", auto_offset_reset='smallest') cnt=0 consumer.poll() consumer.seek_to_beginning() print("Consumer done") for msg in consumer: print('{} {}'.format(cnt, msg)) cnt += 1
Please note: This will only re-read the Topic (queue) is the Group-id is reset. Otherwise it will wait until new messages are placed into the Topic (queue).