Get Data From Mongo and Store it again to Mongo DB.

Get Data From Mongo and Store it again to Mongo DB.

Use Case:

Here we will create one database and collection in mongo db and get data of it in Spark’s dataframe after which we will again store it back to the mongodb collection.

Versions Used:

Scala-2.11.8
Spark—2.0.2

Steps:

1. First we need to create database in mongo db.If database already exist we can use it.To create a database write following command.

2. Create a collection in the database.Here in Customer database create collection named CustomerInfo.

3. Now insert some data into the collection.

4. Now we have these documents in our CustomerInfo collection.

5 .Now to connect mongodb with Spark first we need to include two dependency libraries in our
project.

“org.mongodb.spark” % “mongo-spark-connector_2.11” % “2.0.0”,
“com.stratio.datasource” % “spark-mongodb_2.11” % “0.11.1”

6. Now start programming with Spark.As in version 2.0.2 we do not need to create SparkContext
diffentely we will now create SparkSession as shown below.

val spark=SparkSession.builder().appName(“Spark Mongo Integration”).master(“local[*]”).config(“spark.some.config.option”,”some-value”).getOrCreate()

7. Now in this version of mongo spark connector it provides two classes i.e ReadConfig and
WriteConfig which we can use for read from mongodb and write to mongodb respectively.

Here we will create object for ReadConfig class first which we will use to read data from
Mongodb.

Use below command:

val readConfig=ReadConfig(Map(“uri” -> “mongodb://localhost:27017/Customer.CustomerInfo”))

Here we can mention different options for ReadConfig.Refere https://docs.mongodb.com/spark-connector/v2.0/scala/read-from-mongodb/

8. Now use MongoSpark.load method to create an RDD representing a collection.
MongoSpark.Load() method can take argument as sparkSession and ReadConfig object.

val info=MongoSpark.load(spark,readConfig)

info.printSchema()

println(“Total count of records is :” + info.count())

println(“First element of data is :” + info.first)

Note:

• SparkContext has an implicit method called loadFromMongoDB() to load data from MongoDB.

For example:

sc.loadFromMongoDB() // This Uses SparkConf for Configuration

sc.loadFromMongoDB(ReadConfig(Map(“uri” -> “mongodb://example.com/database.collection”))) // Uses the ReadConf

9. We can also do some modification on the data and we can also apply some operations and can
store it back to the mongo db on the new collection.

For example:

If we want to multiply whole english info column by two and create a new column calles new_info then we can do that using following:

First create WriteConfig object.

val writeConfig=WriteConfig(Map(“uri” -> “mongodb://localhost:27017/Customer.CustomerInfo_new”))

Then do the operation on previously made RDD for ReadConfig object.

For example:

val info_updated=info.withColumn(“Current_age”,info(“age”)+3)

10. MongoSpark.save() method accepts WriteConfig object .So we use this method to save data
back to mongodb.

MongoSpark.save(info_updated.write.mode(“overwrite”),writeConfig)

info_updated.show()

We can see the new created collection in MongoDb.

Note:

• RDDs have an implicit helper method saveToMongoDB() to write data to MongoDB.

• For example, the following uses the documents RDD defined above and uses its saveToMongoDB() method without any arguments to save the documents to the collection specified in the SparkConf:

• documents.saveToMongoDB() // Uses the SparkConf for configuration

• Call saveToMongoDB() with a WriteConfig object to specify a different MongoDB server address, database and collection

• documents.saveToMongoDB(WriteConfig(Map(“uri” -> “mongodb://example.com/Customer.CustomerInfo”)))// Uses the WriteConfig

Leave a comment