This document specifies basic Apache Kafka configuration and consumer/producer implementation. The document does not cover setting up an AWS machine nor AWS VPC network configuration. However, references to tutorials covering theses issues have been mentioned below.
Amazon Linux 2 AMI 2.0.20181024 x86_64 HVM gp2
.kafka-server-start.sh
and zookeeper-server-start.sh
(after complete kafka instalation). Change section:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
to:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
The -Xms
parameter specifies the minimum heap size and -Xmx
parametrer changes maximum heap size. Confiuration above, changes it to 256M and 128M respectively.
Assign elastic IP to your machine. You can find information about elastic IP's here.
Configure security groups. You need to configure network in order to open port 9092
(Kafka default port). You can find instructions here.
Install Java JDK using this tutorial.
Open port 9092
on server firewall:
iptables -A INPUT -i eth0 -p tcp --dport 9092 -j ACCEPT
sudo useradd kafka -m
wheel
group, so that it has sufficent privileges to install packages and dependencies:sudo usermod -aG wheel kafka
sudo su kafka
cd ~
wget "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz"
kafka
and unzip downloaded archive into the folder:mkdir kafka
cd kafka
tar -xvzf ~/kafka_2.11-2.1.1.tgz --strip 1
config/server.properties
file. This configurations is needed, as we need to send and read Kafka messages from ouside AWS network:vim ~/kafka/config/server.properties
Add following line to the file (at the end):
listeners=PLAINTEXT://xx.xx.xx.xx:9092
Where xx.xx.xx.xx
is Public DNS (IPv4) assigned to EC2 machine (full DNS address, not IP, e.g.: ec2-15-127-209-111.eu-central-1.compute.amazonaws.com).
Logout from user kafka using exit
command.
Configure zookeepeer service
\etc\init.d\zookeer
. Assign permissions, so that user kafka has access to the file.cd \etc\init.d\
sudo vim zookeeper
sudo chmod u=rwx,g=rx,o=xr zookeeper
mkdir /var/log/zookeeper
sudo chown -R kafka /var/log/zookeeper/
sudo chmod u=rwx,g=rx,o=xr zookeeper
Configure kafka service
\etc\init.d\kafka
. Assign permissions, so that user kafka has access to the file.cd \etc\init.d\
sudo vim zookeeper
sudo chmod u=rwx,g=rx,o=xr zookeeper
mkdir /var/log/kafka
sudo chown -R kafka /var/log/kafka/
sudo chmod u=rwx,g=rx,o=xr kafka
Start zookeeper service
sudo service zookeeper start
sudo service zookeeper status
Exected output:
zookeeper is running (pid 2646)
sudo service kafka start
sudo service kafka status
Expected output:
kafka is running (pid 2649)
Apache Kafka uses some common terminology. It is useful to define common concepts:
Topic is a feed channel where messages are being broadcasted
Producer process that sends messages to specific topic
Consumer: process that subscribes to specific topic and retrieves messages
Partition a group of topics configured for scalability
Broker an element of physical Kafka cluster (single node)
openTopic
:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic openTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
On the list above, you should be able to see previously created topic, named openTopic
:
__consumer_offsets
openTopic
To test overall Kafka configuration and installation, do following steps:
Login to AWS server using SSH client twice: in two windows or tabs. Login to kafka user:
sudo su kafka
In first window run simple Kafka producer:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic openTopic
This scripts activates input from keyboard, so whatever you type here is sent to Kafka topic openTopic
.
In the second window run Kafka consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic openTopic --from-beginning
Now, every message you type in producer window, should be visible on consumer window.
You can also use producer written in Python. Python code should be run from Databricks Spark cluster, with following specification:
Databricks Runtime Version: 4.3 (includes Apache Spark 2.3.1, Scala 2.11). Python version 3.
To send and receice messages from kafka cluster in Python, you need to use kafka-python
library. Note, that previos library: kafka
is now depraciated and may not work with new versions of kafka.
Simple producer below, sends lorem ipsum dolor
message to topic openTopic
.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:9092'])
msg = "lorem ipsum dolor"
producer.send('openTopic',msg).get(timeout=30)
Where xx.xx.xx.xx
is Public DNS (IPv4) assigned to EC2 machine
This producer opens file police-department-incidents.csv
and sends it to Kafka with the speed of 1 record per 5 seconds.
import sys, csv
import threading, logging, time
import datetime
from kafka import KafkaProducer
policeData = csv.reader(open("police-department-incidents.csv"), delimiter=",")
kafkaTopic = 'openTopic'
producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:9092'])
for IncidntNum, Category, Descript, DayOfWeek, Date, Time, PdDistrict, Resolution, Address, X, Y, Location, PdId in policeData:
if IncidntNum !='IncidntNum':
producer.send(kafkaTopic, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+";"+IncidntNum +";"+ Category + ";" + Descript + ";" + DayOfWeek +";" + Date +";"+ Time +";" + X +";"+ Y).get(timeout=30)
time.sleep(5)
Where xx.xx.xx.xx
is Public DNS (IPv4) assigned to EC2 machine
You can also use consumer written in Python. Python code should be run from Databricks Spark cluster, with following specification:
Databricks Runtime Version: 4.3 (includes Apache Spark 2.3.1, Scala 2.11). Python version 2.
Simple consumer below, sends receives message from topic openTopic
every 4 seconds.
from kafka import KafkaConsumer
import sys
import time
consumer = KafkaConsumer(
'openTopic',
bootstrap_servers=['xx.xx.xx.xx:9092'])
for message in consumer:
print(message.value)
time.sleep(4)
Where xx.xx.xx.xx
is Public DNS (IPv4) assigned to EC2 machine
Common issues that may occur during Kafka setup and execution. Note that, the solutions to these issues are mentioned above.
Cannot allocate memory
while starting kafka serverPossible cause: JVM allocates insuficient memory to run kafka. You need to change KAFKA_HEAP_OPTS
parameter in kafka-server-start.sh
file.
Possible cause: Kafka port 9092
is not open.
Possible cause: Assuming that Kafka port 9092
is open, check Kafka configuration file server.properties
for listeners
specifications.
Possible cause: Assuming that:
9092
is openlisteners
is OKThe cause may be in Kafka timeout configuration. You may need to set timeout=30
explicitly in producer.