kafka on a mac

I went to Apache Site and pulled down the binary http://www-us.apache.org/dist/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz

Then unpacked it.

Start Zk

cd kafka_2.11-0.11.0.2
export JAVA_HOME=`/usr/libexec/java_home -v 1.8`
bin/zookeeper-server-start.sh config/zookeeper.properties

Start kafka

cd kafka_2.11-0.11.0.2
bin/kafka-server-start.sh  config/server.properties

Kafka create new topic

cd kafka_2.11-0.11.0.2
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1  --replication-factor 1 --topic test

or

bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 13 \
  --topic my-topic

Kafka list topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

Console Producer

To generate messages....

bin/kafka-console-producer.sh --broker-list localhost:9020 --topic test

Now type (and press return in this window)

enter 4+ messages

Console Consumer

From the beginning

bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-topic \
    --from-beginning

Python Producer

To add lots of messages this is the code.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
for n in range(100):
    producer.send('my-topic', "Message{} bla bla bla".format(n).encode())

Diagnosing Issues

  • Check ZK
echo ruok | nc localhost 2181

You should see imok

  • Check TO see if Kafka has connected to zk
echo dump | nc localhost 2181

And I see

       Session Sets (3):
0 expire at Sun Dec 03 17:43:27 GST 2017:
0 expire at Sun Dec 03 17:43:30 GST 2017:
1 expire at Sun Dec 03 17:43:33 GST 2017:
    0x1601c9a25190000
ephemeral nodes dump:
Sessions with Ephemerals (1):
0x1601c9a25190000:
    /controller
    /brokers/ids/0

Or using zkshell

bin/zookeeper-shell.sh localhost:2181 <<< "ls /brokers/ids"

TO see more, info (especially the broker-list) do this

bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"

Good reference site

A good link is http://cloudurable.com/blog/kafka-tutorial-kafka-from-command-line/index.html

Production Kafka Testing

Create new Topic

bin/kafka-topics.sh --create --zookeeper phm01.eq.com:2181,phm02.eq.com:2181,phm03.eq.com:2181 --partitions 5  --replication-factor 1 --topic timtest

Created topic "timtest".

To Delete a topic try this

bin/kafka-topics.sh --delete --zookeeper phm01.eq.com:2181,phm02.eq.com:2181,phm03.eq.com:2181  --topic timtest --if-exists true

Producer

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='phm01.eq.com:6667')
for n in range(10000):
    producer.send('timtest', "Message{} bla bla bla".format(n).encode())

This will add 10k test messages.

Consumer

from kafka import KafkaConsumer

consumer = KafkaConsumer('timtest',
           bootstrap_servers='phm01.eq.com:6667',
           group_id="Con4", 
           auto_offset_reset='smallest')

cnt=0
consumer.poll()
consumer.seek_to_beginning()
print("Consumer done")
for msg in consumer:
    print('{} {}'.format(cnt, msg))
    cnt += 1

Please note: This will only re-read the Topic (queue) is the Group-id is reset. Otherwise it will wait until new messages are placed into the Topic (queue).