Tutorial : Spark SQL and DataFrames Operations using Spark 1.6

In previous tutorial, we  have explained about Spark Core and RDD functionalities. Now In this tutorial we have covered Spark SQL and DataFrame operation from different source like JSON, Text and CSV data files. And we have provided running example of each functionality for better support. Lets begin the tutorial and discuss about the SparkSQL and DataFrames Operations using Spark 1.6

SparkSQL

Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data. Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL. It can also be used to read data from an existing Hive installation.It provides a programming abstraction called DataFrame and can act as distributed SQL query engine.

Create SQL Context

To create a basic SQL Context,

val sc = SparkCommon.sparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Basic Query

To make a query against a table, we call the sql() method on the SQLContext. The first thing we need to do is tell Spark SQL about some data to query. In this case we will load some Cars data from JSON, and give it a name by registering it as a “Cars1” so we can query it with SQL.

[{"itemNo" : 1, "name" : "ferrari", "speed" : 259 , "weight": 800},  {"itemNo" : 2, "name" : "jaguar", "speed" : 274 , "weight":998},  {"itemNo" : 3, "name" : "mercedes", "speed" : 340 , "weight": 1800},  {"itemNo" : 4, "name" : "audi", "speed" : 345 , "weight": 875},  {"itemNo" : 5, "name" : "lamborghini", "speed" : 355 , "weight": 1490},{"itemNo" : 6, "name" : "chevrolet", "speed" : 260 , "weight": 900},  {"itemNo" : 7, "name" : "ford", "speed" : 250 , "weight": 1061},  {"itemNo" : 8, "name" : "porche", "speed" : 320 , "weight": 1490},  {"itemNo" : 9, "name" : "bmw", "speed" : 325 , "weight": 1190},  {"itemNo" : 10, "name" : "mercedes-benz", "speed" : 312 , "weight": 1567}]
object BasicQueryExample {
val sc = SparkCommon.sparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

def main(args: Array[String]) {

import sqlContext.implicits._

val input = sqlContext.read.json("src/main/resources/cars1.json")

input.registerTempTable("Cars1")

val result = sqlContext.sql("SELECT * FROM Cars1")

result.show()
}

}

case class Cars1(name: String)

sqldata

Interoperating with RDDs

SparkSQL supports two method for converting existing RDDs into DataFrames .

1. Inferring the Schema using Reflection

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and they become the names of the columns RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

1, Grapes, 25
2, Guava, 28
3, Gooseberry, 39
4, Raisins, 23
5, Naseberry, 23
def main(args: Array[String]) {
/**
   * Create RDD and Apply Transformations
  */

val fruits = sc.textFile("src/main/resources/fruits.txt")
      .map(_.split(","))
      .map(frt => Fruits(frt(0).trim.toInt, frt(1), frt(2).trim.toInt))
      .toDF()

/**
  * Store the DataFrame Data in a Table
  */
fruits.registerTempTable("fruits")

/**
   * Select Query on DataFrame
   */
val records = sqlContext.sql("SELECT * FROM fruits")


/**
   * To see the result data of allrecords DataFrame
   */
 records.show()

  }
}

case class Fruits(id: Int, name: String, quantity: Int)
inferringdataframe
2. Programmatically Specifying the Schema

Creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing  RDD. Data Frame can be created programmatically with three steps. We Create an RDD of Rows from an Original RDD. Create the schema represented by a Struct Type matching the structure of Rows in the RDD created in Step first. Apply the schema to the RDD of Rows via create DataFrame method provided by SQL Context.

 object ProgrammaticallySchema {
 val sc = SparkCommon.sparkContext
 val schemaOptions = Map("header" -> "true", "inferSchema" -> "true")

 //sc is an existing SparkContext.
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 def main(args: Array[String]) {

 // Create an RDD
 val fruit = sc.textFile("src/main/resources/fruits.txt")

 // The schema is encoded in a string
 val schemaString = "id name"
 // Generate the schema based on the string of schema
 val schema =StructType(
 schemaString.split(" ").map(fieldName=>StructField(fieldName, StringType, true)))
 schema.foreach(println)
 // Convert records of the RDD (fruit) to Rows.
 val rowRDD = fruit.map(_.split(",")).map(p => Row(p(0), p(1).trim))
 rowRDD.foreach(println)

 // Apply the schema to the RDD.
 val fruitDataFrame = sqlContext.createDataFrame(rowRDD, schema)

fruitDataFrame.foreach(println)

 // Register the DataFrames as a table.
 fruitDataFrame.registerTempTable("fruit")
 /**
 * SQL statements can be run by using the sql methods provided by sqlContext.
 */
 val results = sqlContext.sql("SELECT * FROM fruit")
 results.show()

 }
}

programmaticallydata

Creating DataFrames

A DataFrame is a distributed collection of data, which is organized into named columns. Conceptually, it is equivalent to relational tables with good optimization techniques. A DataFrame can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, or existing RDDs. Here we are using  JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

[{"itemNo" : 1, "name" : "Ferrari", "speed" : 259 , "weight": 800},  {"itemNo" : 2, "name" : "Jaguar", "speed" : 274 , "weight":998},  {"itemNo" : 3, "name" : "Mercedes", "speed" : 340 , "weight": 1800},  {"itemNo" : 4, "name" : "Audi", "speed" : 345 , "weight": 875},  {"itemNo" : 5, "name" : "Lamborghini", "speed" : 355 , "weight": 1490}]
package com.tutorial.sparksql

import com.tutorial.utils.SparkCommon

object CreatingDataFarmes {

 val sc = SparkCommon.sparkContext

 /**
 * Create a Scala Spark SQL Context.
 */
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 def main(args: Array[String]) {
 /**
 * Create the DataFrame
 */
 val df = sqlContext.read.json("src/main/resources/cars.json")

 /**
 * Show the Data
 */
 df.show()

 }
}

creatingdataframe

Data Sources

Spark SQL supports a number of structured data sources. These sources include Hive tables, JSON, and Parquet files.Spark SQL supports operating on a variety of data source through the DataFrame interface.

DataFrame Operations in JSON file

Here we include some basic examples of structured data processing using DataFrames. As an example, the following creates a DataFrame based on the content of a JSON file. Read a JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

[{"itemNo" : "1", "name" : "ferrari", "speed" : "259" , "weight": "800"},{"itemNo" : "2", "name" : "jaguar", "speed" : "274" , "weight": "998"},{"itemNo" : "3", "name" : "mercedes", "speed" : "340" , "weight": "1800"},{"itemNo" : "4", "name" : "audi", "speed" : "345" , "weight": "875"},{"itemNo" : "5", "name" : "lamborghini", "speed" : "355" , "weight": "1490"}]
object DataFrameOperations {

 val sc = SparkCommon.sparkContext

 /**
 * Use the following command to create SQLContext.
 */
 val ssc = SparkCommon.sparkSQLContext

 val schemaOptions = Map("header" =>"true", "inferSchema"=>"true")

 def main(args: Array[String]) {

 /**
 * Create the DataFrame
 */
 val cars = "src/main/resources/cars.json"

 /**
 * read the JSON document
 * Use the following command to read the JSON document named cars.json.
 * The data is shown as a table with the fields − itemNo, name, speed and weight.
 */
 val empDataFrame: DataFrame = ssc.read.format("json").options(schemaOptions).load(cars)

 /**
 * Show the Data
* If you want to see the data in the DataFrame, then use the following command.
 */
 empDataFrame.show()
 /**
 * printSchema Method
 * If you want to see the Structure (Schema) of the DataFrame, then use the following command
 */
 empDataFrame.printSchema()

 /**
 * Select Method
 * Use the following command to fetch name-column among three columns from the DataFrame
 */
 empDataFrame.select("name").show()

 /**
 * Filter used to
 * cars whose speed is greater than 300 (speed > 300).
 */
 empDataFrame.filter(empDataFrame("speed") > 300).show()

 /**
 * groupBy Method
 * counting the number of cars who are of the same speed.
 */
 empDataFrame.groupBy("speed").count().show()

  }

}

Data Frame Operations in Text file:

As an example, the following creates a Data Frame based on the content of a text file. Read a text document named fruits.txt with the following content and generate a table based on the schema in the text document.

1, Grapes, 25
2, Guava, 28
3, Gooseberry, 39
4, Raisins, 23
5, Naseberry, 23

 
def main(args: Array[String]) {

    /**
      * Create RDD and Apply Transformations
      */

    val fruits = sc.textFile("src/main/resources/fruits.txt")
      .map(_.split(","))
      .map(frt => Fruits(frt(0).trim.toInt, frt(1), frt(2).trim.toInt))
      .toDF()

    /**
      * Store the DataFrame Data in a Table
      */
    fruits.registerTempTable("fruits")

    /**
      * Select Query on DataFrame
      */
    val records = sqlContext.sql("SELECT * FROM fruits")


    /**
      * To see the result data of allrecords DataFrame
      */
    records.show()

  }
}

case class Fruits(id: Int, name: String, quantity: Int)

alt text

DataFrame Operations in CSV file

As an example, the following creates a DataFrame based on the content of a CSV file. Read a csv document named cars.csv with the following content and generate a table based on the schema in the csv document.

year,make,model,comment,blank
"2012","Tesla","S","No comment",

1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt


val sc = SparkCommon.sparkContext

val sqlContext = SparkCommon.sparkSQLContext

def main(args: Array[String]) {

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("src/main/resources/cars.csv")
 df.show()
 df.printSchema()

val selectedData = df.select("year", "model")
selectedData.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(s"src/main/resources/${UUID.randomUUID()}")
 println("OK")

 }

}

csv.png

For more details see here.

We would look at how we can create more useful tutorials to grow it , then we would be adding more content to it together. If you have any suggestion feel free to suggest us 🙂 Stay tuned.

3 thoughts on “Tutorial : Spark SQL and DataFrames Operations using Spark 1.6

Leave a reply to Anand Kumar Singh Cancel reply