Databricks integration with Amazon EC2 Kafka server

© Mariusz Rafało

This document specifies basic Apache Kafka configuration and consumer/producer implementation. The document does not cover setting up an AWS machine nor AWS VPC network configuration. As a result, there are several prerequisites before you begin work with Kafka.

Prerequisites

Ensure, that all tasks specified below have been done.

  1. You have AWS EC2 machine up and running on Amazon Linux OS. At least t2.medium server is required, as Kafka requires at least 4 GB of RAM. Select Amazon Linux 2 AMI 2.0.20181024 x86_64 HVM gp2.
  2. You have ssh access to your machine (e.g.: via private key).
  3. You have elastic IP assigned to your machine. You can find information about elastic IP's here.
  4. You have downloaded newest Kafka version: http://ftp.man.poznan.pl/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz.
  5. You have installed and started Apache Kafka, according to this tutorial.
  6. Your EC2 machine is accessible from Internet via security groups. You need to configure network in order to open port 9092 (Kafka default port). You can find instructions here.
  7. Kafka is properly installed and Kafka service is running on default port (9092)
  8. All commands are executed from kafka folder.

Basic terms

Apache Kafka uses some common terminology. It is useful to define common concepts:

Topic is a feed channel, where messages are being broadcast

Producer process that sends messages to specific topic

Consumer: process that subscribes to specific topic retrieves messages

Partition a group of topics configured for scalability

Broker an element of physical Kafka cluster (single node)

Kafka configuration

This part covers Kafka network configuration and setting up new topic.

In order to configure access to Kafka from outside VPC, add following line to config/server.properties:

advertised.host.name = xx.xx.xx.xx

Where xx.xx.xx.xx is elastic IP assigned to EC2 machine.

Open port 9092 on local firewall:

iptables -A INPUT -i eth0 -p tcp --dport 9092 -j ACCEPT

Create Kafka topic called openTopic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic openTopic

Verify topic creation by listing all Kafka topics:

bin/kafka-topics.sh --zookeeper localhost:2181 --list

On the list above, you should be able to see previously created topic, named openTopic.:

[ec2-user@ip-10-10-40-153 kafka]$ bin/kafka-topics.sh --zookeeper localhost:2 181 --list TutorialTopic __consumer_offsets openTopic

Test Kafka environment locally

To test overall Kafka configuration and installation, do following steps:

Connect to AWS machine via SSH

Open shh client (preferably multi tabbed client, like MTPuTTY or CMDER). Connect do AWS machine twice; in two different windows.

In first window run simple Kafka producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic openTopic

This scripts activates input from keyboard, so whatever you type here is sent on Kafka topic openTopic.

In the second window run Kafka consumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic openTopic --from-beginning

Now, every message you type in producer window, should be visible on consumer window.

To check the list of Kafka groups:

bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list

Python Kafka producer

You can also use producer written in Python. Python code should be run from Databricks Spark cluster, with following specification:

Databricks Runtime Version: 4.3 (includes Apache Spark 2.3.1, Scala 2.11). Python version 2.

Simple producer below, sends lorem ipsum dolor message to topic openTopic.

from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:9092'], api_version=(0,10)) msg = "lorem ipsum dolor" producer.send('openTopic',msg)

This producer opens file police-department-incidents.csv and sends it to Kafka with the speed of 1 record per 5 seconds.

import sys, csv import threading, logging, time import datetime from kafka import KafkaProducer policeData = csv.reader(open("/home/ec2-user/police/police-department-incidents.csv"), delimiter=",") kafkaTopic = 'police' producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0,10)) for IncidntNum, Category, Descript, DayOfWeek, Date, Time, PdDistrict, Resolution, Address, X, Y, Location, PdId in policeData: if IncidntNum !='IncidntNum': producer.send(kafkaTopic, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+";"+IncidntNum +";"+ Category + ";" + Descript + ";" + DayOfWeek +";" + Date +";"+ Time +";" + X +";"+ Y) time.sleep(5)

Python Kafka consumer

You can also use consumer written in Python. Python code should be run from Databricks Spark cluster, with following specification:

Databricks Runtime Version: 4.3 (includes Apache Spark 2.3.1, Scala 2.11). Python version 2.

Simple consumer below, sends receives message from topic openTopic every 4 seconds.

from kafka import KafkaConsumer import sys import time consumer = KafkaConsumer( 'openTopic', bootstrap_servers=['xx.xx.xx.xx:9092'], api_version=(0,10)) for message in consumer: print(message.value) time.sleep(4)