Read Data From Kafka Stream and Store it in to MongoDB.

Read Data From Kafka Stream and Store it in to MongoDB.

Use Case:

In this tutorial we will create a topic in Kafka and then using producer we will produce some
Data in Json format which we will store to mongoDb.
For example ,here we will pass colour and its hexadecimal code in Json in kafka and put it in the Mongodb table.

Version which we are using :

Kafka—0.10.0
Spark—2.0.2
Scala—2.11.8

Steps:

1. Start the zookeeper.
Run the command below:

bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start the Kafka severs on your local machine in standalone mode.
Run the below command:

bin/kafka-server-start.sh config/server.properties

3.Create a topic in Kafka server from which you want to produce messages.

For example:

We want to create a topic names test.

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic ColourCoe

Note:

You can check the list of topics using following command :

bin/kafka-topics.sh –list –zookeeper localhost:2181

4.Kafka comes with a command line client that will take input from a file or from standard input
and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate
message.

Run the producer and then type a few messages into the console to send to the server

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic ColourCode

5.Kafka also has a command line consumer that will dump out messages to standard output.

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ColourCode –from-beginning

6.Now before writing program we need to include following depedecies in buid.sbt

“org.apache.spark” % “spark-streaming-kafka-0-10_2.11” % “2.1.0”,
“org.apache.spark” %% “spark-streaming” % “2.1.0”,
“com.stratio.datasource” % “spark-mongodb_2.11” % “0.12.0”

You can get this from Maven Repository :

https://mvnrepository.com/

7.First create a Sparksession and set master.

val spark = SparkSession.builder().master(“local[*]”).config(“spark.some.config.option”, “some-value”).getOrCreate()

8.Now create a StreamingContext which is for kafka streaming.

val ssc = new StreamingContext(spark.sparkContext, Seconds(2))

Here we are passing sparkContext as spark.sparkContext.

9.Now create one val containing all the values for kafka parameters.

val kafkaParams = Map[String, Object](“bootstrap.servers” -> “localhost:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “use_a_separate_group_id_for_each_stream”,
“auto.offset.reset” -> “latest”,
“enable.auto.commit” -> (false: java.lang.Boolean))

• Here Bootstrap.servers indicates a list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
• key.Deserializer indicates Deserializer class for key that implements the Deserializer interface.
• value.Deserializer indicates Deserializer class for key that implements the Deserializer interface.
• group id indicates A unique string that identifies the consumer group this consumer belongs to.
Note :

Also you can refer to so many other properties here : http://kafka.apache.org/documentation.html#newconsumerconfigs

10. Now create val to set the parameters for mongoDb.

For example :

val mongoDbFormat = “com.stratio.datasource.mongodb”
val mongoDbDatabase = “ColoursInfo”
val mongoDbCollection = “Colour”

val MongoDbOptiops = Map(MongodbConfig.Host -> “localhost:27017”,
MongodbConfig.Database -> mongoDbDatabase,
MongodbConfig.Collection -> mongoDbCollection)

where

mongoDbFormat –is the format in which we store in mongodb using this library.
MongoDbDatabase –is the database in Mongodb in which we want to save that collection.
MongoDbCollection– is the collection of database in which data will be stored.

By default host for MongoDb is 27017 which is mentioned in th Options.

11.Now here we can get data in any format from kafka stream.Lets assume we are getting
data in Json format.
We can specify schema structure for the data using StructType.

For example in incoming data if we have two filed name and age we can specify scehma like

val schemaString = “colour code”

val fields = schemaString.split(” “).map(fieldname => StructField(fieldname, StringType, nullable = true))

val schema = StructType(fields)

12.We need to pass the topic name as a parameter in the KafkaUtils method.So

val topic1=Array(“ColourCode”)

13.Kafka library provides us KafkaUtils class of which createDirectStream method we can use
to fetch the kafka streaming data and get in format of key value pair.

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic1, kafkaParams))

14. We get Dstream in the stream val.We will covert that stream into Dataframe.For that first we
convert it into RDD of stream which we will covert into dataframe.

That dataframe we will store into MongoDb.

val elementDstream = stream.map(v => v.value).foreachRDD { rdd =>
val PeopleDf=spark.read.schema(schema).json(rdd)
PeopleDf.write.format(mongoDbFormat).mode(SaveMode.Append).options(MongoDbOptiops).save()
}

15. At the end start spark context .

ssc.start()
ssc.awaitTermination

16.Now go to the directory where project reside and run it.

17.Produce some messages in Producer for consumer to consume.

Note :

Here as we have taken data in Json format enter the data in json format.If you enter data other than
Json format it will show null in the Mongo db table.

In Consumer you will get the same messages produced by consumer irrespective of the fomat.

18.Now you can see the this data inserted in the Mongo db table.

Here name of our database is KafkaMongoTest1 and Collection name is People1.

So here we are able to see the data inserted in the Mongo db table.

Advertisements

Kafka & ZooKeeper | Multi Node Cluster Setup

TODO

In This blog we will explains the setup of the Kafka & ZooKeeper Multi-Node cluster on a distributed environment.

What is Apache Kafka?

A high-throughput distributed messaging system is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumer.

What is ZooKeeper?

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.

Learn more about ZooKeeper on the ZooKeeper Wiki.

Prerequisites

  1. Install Java if you do not have it already. You can get it from here
  2. Kafka Binary files : http://kafka.apache.org/downloads.html

Installation

  • Now first download the Kafka Tarball or binaries on your all instances and extract them
$ tar -xzvf kafka_2.11-0.9.0.1.tgz
$ mv kafka_2.11-0.9.0.1 kafka
  • On Both the Instances, you only need two properties to be changed i.e. zookeeper.properties & server.properties

Lets start to edit “zookeeper.properties” on all the instances

$ vi ~/kafka/config/zookeeper.properties
# The number of milliseconds of each tick
tickTime=2000
 
# The number of ticks that the initial synchronization phase can take
initLimit=10
 
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5

# zoo servers
server.1=x.x.x.x:2888:3888
server.2=x.x.x.x:2888:3888
server.3=x.x.x.x:2888:3888
#add here more servers if you want

Now edit all instances “server.properties” and update the following this

$ vi ~/kafka/config/server.properties
broker.id=1 //Increase by one as per node count
host.name=x.x.x.x //Current node IP
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
  • After this go to the /tmp of every instance and create following things
$ cd /tmp/
$ mkdir zookeeper #Zookeeper temp dir
$ cd zookeeper
$ touch myid  #Zookeeper temp file
$ echo '1' >> myid #Add Server ID for Respective Instances i.e. "server.1 and server.2 etc"
  • Now all is done, Need to start ZooKeeper and Kafka Server on all instances

$ bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties

$ bin/kafka-server-start.sh ~/kafka/config/server.properties

We would look at how we can provide more useful tutorials to grow it , then we would be adding more content to it together. If you have any suggestion feel free to suggest us 🙂 Stay tuned.