Read Data From Kafka Stream and Store it in to MongoDB.

Read Data From Kafka Stream and Store it in to MongoDB.

Use Case:

In this tutorial we will create a topic in Kafka and then using producer we will produce some
Data in Json format which we will store to mongoDb.
For example ,here we will pass colour and its hexadecimal code in Json in kafka and put it in the Mongodb table.

Version which we are using :

Kafka—0.10.0
Spark—2.0.2
Scala—2.11.8

Steps:

1. Start the zookeeper.
Run the command below:

bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start the Kafka severs on your local machine in standalone mode.
Run the below command:

bin/kafka-server-start.sh config/server.properties

3.Create a topic in Kafka server from which you want to produce messages.

For example:

We want to create a topic names test.

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic ColourCoe

Note:

You can check the list of topics using following command :

bin/kafka-topics.sh –list –zookeeper localhost:2181

4.Kafka comes with a command line client that will take input from a file or from standard input
and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate
message.

Run the producer and then type a few messages into the console to send to the server

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic ColourCode

5.Kafka also has a command line consumer that will dump out messages to standard output.

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ColourCode –from-beginning

6.Now before writing program we need to include following depedecies in buid.sbt

“org.apache.spark” % “spark-streaming-kafka-0-10_2.11” % “2.1.0”,
“org.apache.spark” %% “spark-streaming” % “2.1.0”,
“com.stratio.datasource” % “spark-mongodb_2.11” % “0.12.0”

You can get this from Maven Repository :

https://mvnrepository.com/

7.First create a Sparksession and set master.

val spark = SparkSession.builder().master(“local[*]”).config(“spark.some.config.option”, “some-value”).getOrCreate()

8.Now create a StreamingContext which is for kafka streaming.

val ssc = new StreamingContext(spark.sparkContext, Seconds(2))

Here we are passing sparkContext as spark.sparkContext.

9.Now create one val containing all the values for kafka parameters.

val kafkaParams = Map[String, Object](“bootstrap.servers” -> “localhost:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “use_a_separate_group_id_for_each_stream”,
“auto.offset.reset” -> “latest”,
“enable.auto.commit” -> (false: java.lang.Boolean))

• Here Bootstrap.servers indicates a list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
• key.Deserializer indicates Deserializer class for key that implements the Deserializer interface.
• value.Deserializer indicates Deserializer class for key that implements the Deserializer interface.
• group id indicates A unique string that identifies the consumer group this consumer belongs to.
Note :

Also you can refer to so many other properties here : http://kafka.apache.org/documentation.html#newconsumerconfigs

10. Now create val to set the parameters for mongoDb.

For example :

val mongoDbFormat = “com.stratio.datasource.mongodb”
val mongoDbDatabase = “ColoursInfo”
val mongoDbCollection = “Colour”

val MongoDbOptiops = Map(MongodbConfig.Host -> “localhost:27017”,
MongodbConfig.Database -> mongoDbDatabase,
MongodbConfig.Collection -> mongoDbCollection)

where

mongoDbFormat –is the format in which we store in mongodb using this library.
MongoDbDatabase –is the database in Mongodb in which we want to save that collection.
MongoDbCollection– is the collection of database in which data will be stored.

By default host for MongoDb is 27017 which is mentioned in th Options.

11.Now here we can get data in any format from kafka stream.Lets assume we are getting
data in Json format.
We can specify schema structure for the data using StructType.

For example in incoming data if we have two filed name and age we can specify scehma like

val schemaString = “colour code”

val fields = schemaString.split(” “).map(fieldname => StructField(fieldname, StringType, nullable = true))

val schema = StructType(fields)

12.We need to pass the topic name as a parameter in the KafkaUtils method.So

val topic1=Array(“ColourCode”)

13.Kafka library provides us KafkaUtils class of which createDirectStream method we can use
to fetch the kafka streaming data and get in format of key value pair.

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic1, kafkaParams))

14. We get Dstream in the stream val.We will covert that stream into Dataframe.For that first we
convert it into RDD of stream which we will covert into dataframe.

That dataframe we will store into MongoDb.

val elementDstream = stream.map(v => v.value).foreachRDD { rdd =>
val PeopleDf=spark.read.schema(schema).json(rdd)
PeopleDf.write.format(mongoDbFormat).mode(SaveMode.Append).options(MongoDbOptiops).save()
}

15. At the end start spark context .

ssc.start()
ssc.awaitTermination

16.Now go to the directory where project reside and run it.

17.Produce some messages in Producer for consumer to consume.

Note :

Here as we have taken data in Json format enter the data in json format.If you enter data other than
Json format it will show null in the Mongo db table.

In Consumer you will get the same messages produced by consumer irrespective of the fomat.

18.Now you can see the this data inserted in the Mongo db table.

Here name of our database is KafkaMongoTest1 and Collection name is People1.

So here we are able to see the data inserted in the Mongo db table.