Tutorial: How to load and save data from different data source in Spark 2.0.2

In this blog we will discuss about Spark 2.0.2 . It demonstrates the basic functionality of Spark 2.0.2. We also describe  how to load and save data in Spark2.0.2. We have tried to cover basics of Spark 2.0.2 core functionality  like read and write data from different source (Csv,JSON,Txt) .

Loading and saving CSV file

As an example, the following creates a DataFrame based on the content of a CSV file. Read a csv document named team.csv with the following content and generate a table based on the schema in the csv document.

id,name,age,location
1,Mahendra Singh Dhoni ,38,ranchi
2,Virat Kohli,25,delhi
3,Shikhar Dhawan,25,mumbai
4,Rohit Sharma,33,mumbai
5,Stuart Binny,22,chennai

 

def main(args: Array[String]) {
  val spark = SparkSession
    .builder()
    .master("local")
    .appName("Spark2.0")
    .getOrCreate()
  val df = spark.read.option("header", "true")
    .csv("/resources/team.csv")
 val selectedData = df.select("name", "age")
   selectedData.write.option("header", "true")
    .save(s"src/main/resources/${UUID.randomUUID()}")
  println("OK")
}

 csv1.png

csvsave.png

Loading and saving JSON file

Here we include some basic examples of structured data processing using DataFrames. As an example, the following creates a DataFrame based on the content of a JSON file. Read a JSON document named cars_price.json with the following content and generate a table based on the schema in the JSON document.

[{"itemNo" : 1, "name" : "Ferrari", "price" : 52000000 , "kph": 6.1},  {"itemNo" : 2, "name" : "Jaguar", "price" : 15000000 , "kph": 3.4},  {"itemNo" : 3, "name" : "Mercedes", "price" : 10000000, "kph": 3}, {"itemNo" : 4, "name" : "Audi", "price" : 5000000 , "kph": 3.6}, {"itemNo" : 5, "name" : "Lamborghini", "price" : 5000000 , "kph": 2.9}]
def main(args: Array[String]) {
  val spark = SparkSession
    .builder()
    .master("local")
    .appName("Spark2.0")
    .getOrCreate()
  val df = spark.read.option("header", "true")
    .json("/resources/cars_price.json")
  val selectedData = df.select("name", "price")
  selectedData.write.option("header", "true")
    .save(s"src/main/resources/${UUID.randomUUID()}")
  println("OK")
}

savejson.pngjsonsave

Loading and saving txt file

As an example, the following creates a Data Frame based on the content of a text file. Read a text document named rklick.txt with the following content and generate a table based on the schema in the text document.

Rklick creative technology company providing key digital services.
Focused on helping our clients to build a successful business on web and mobile.
We are mature and dynamic solution oriented IT full service product development and customized consulting services company. Since 2007 we're dedicated team of techno-whiz engaged in providing outstanding solutions to diverse group of business entities across North America.
Known as Professional Technologists, we help our customers enable latest technology stack or deliver quality cost effective products and applications
def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark2.0")
      .getOrCreate()
    import spark.implicits._
    val rklickData = spark.read.text("src/main/resources/rklick.txt").as[String]
    val rklickWords = rklickData.flatMap(value=>value.split("\\s+"))
    val saveTxt = rklickWords.write.text(s"src/main/resources/${UUID.randomUUID()}")
    println("OK")
  }

txtload.pngtxtsave

We would look at how we can create more useful tutorials to grow it , then we would be adding more content to it together. If you have any suggestion feel free to suggest us 🙂 Stay tuned.

Advertisements

Tutorial: Quick overview of Spark 2.0.1 Core Functionality

In this blog we will discuss about Spark 2.0.1 Core Functionality. It demonstrates the basic functionality of Spark 2.0.1. We also describe  SparkSession, Spark SQL and DataFrame API functionality. We have tried to cover basics of Spark 2.0.1 core functionality and SparkSession.

SparkSession:

SparkSession is new entry point of Spark.In previous version (1.6.x) of Spark ,Spark Context was entry point for Spark and in Spark 2.0.1 SparkSession is entry point of Spark. Spark session internally has a spark context for actual computation.As we know RDD was main API, it was created and manipulated using context API’s. For every other API,we needed to use different contexts.For streaming, we needed StreamingContext, for SQL sqlContext and for hive HiveContext. But as DataSet and Dataframe API’s are becoming new standard API’s we need an entry point build for them. So in Spark 2.0.1, we have a new entry point for DataSet and Dataframe API’s called as Spark Session.

Creating SparkSession:

Here we describe how to create SparkSession.

val spark = SparkSession.builder.master("local")
.appName("spark example")
.getOrCreate()

once we have created spark session then we can use it to read the data.

Read data using Spark Session

It looks like exactly like reading using SQLContext. You can easily replace all your code of SQLContext with SparkSession now.

val spark = SparkSession.builder.
 master("local")
 .appName("spark example")
 .getOrCreate()

Creating DataFrames:

How to create DataFrames with the help of SparkSession,applications can create DataFrames from an existing RDD, from a from data sources.As an example, the following creates a DataFrame based on the content of a csv file:

val dataFrame =spark.read.option("header","true")
 .csv("src/main/resources/team.csv")

csv.png

creates a DataFrame based on the content of a json file:

val dataFrame =spark.read.option("header","true")
.json("src/main/resources/cars_price.json")

dataFrame.show()

json.png

Creating Datasets:

Dataset is new abstraction in Spark introduced as alpha API in Spark 1.6. It’s becoming stable API in spark 2.0.1.Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network.The major difference is, dataset is collection of domain specific objects where as RDD is collection of any object. Domain object part of definition signifies the schema part of dataset. So dataset API is always strongly typed and optimized using schema where RDD is not. Dataset definition also talks about Dataframes API. Dataframe is special dataset where there is no compilation checks for schema. So this makes dataSet new single abstraction replacing RDD from earlier versions of spark. We read data using read.text API which is similar to textFile API of RDD. the following creates a DataFrame based on the content of a txt file:

import spark.implicits._
val rklickData = spark.read.text("src/main/resources/rklick.txt").as[String]
val rklickWords = rklickData.flatMap(value => value.split("\\s+"))
val rklickGroupedWords = rklickWords.groupByKey(_.toLowerCase)
val rklickWordCount = rklickGroupedWords.count()
rklickWordCount.show()

dataset

We would look at how we can create more useful tutorials to grow it , then we would be adding more content to it together. If you have any suggestion feel free to suggest us 🙂 Stay tuned.

 

Introduction to Spark 2.0

Overview of Dataset , Dataframe and RDD API :

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.

But due to facing issue related to advanced optimization move to dataframe.

Dataframe brought custom memory management and runtime code generation which greatly improved performance. So in last year most of the improvements went into Dataframe API.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

Though dataframe API solved many issues, it was not a good enough replacement for RDD API. One of the major issues with dataframe API was no compile time safety and not able to work with domain objects. So this held back people using dataframe API everywhere. But with introduction of Dataset API in 1.6, we were able to fill the gap.

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

So in Spark 2.0, Dataset API will be become a stable API. So Dataset API combined with Dataframe API should able to cover most of the use cases where RDD was used earlier. So as a spark developer it is advised to start embracing these two API’s over RDD API from Spark 2.0.

Points to be discussed :

Datasets : Starting in Spark 2.0, DataFrame is just a type alias for Dataset of Row. Both the typed methods (e.g. map, filter, groupByKey) and the untyped methods (e.g. select, groupBy) are available on the Dataset class. Also, this new combined Dataset interface is the abstraction used for Structured Streaming.

For long, RDD was the standard abstraction of Spark. But from Spark 2.0, Dataset will become the new abstraction layer for spark. Though RDD API will be available, it will become low level API, used mostly for runtime and library development. All user land code will be written against the Dataset abstraction and it’s subset Dataframe API.

Dataset is a superset of Dataframe API which is released in Spark 1.3. Dataset together with Dataframe API brings better performance and flexibility to the platform compared to RDD API. Dataset will be also replacing RDD as an abstraction for streaming in future releases.

The major difference is, dataset is collection of domain specific objects where as RDD is collection of any object. Domain object part of definition signifies the schema part of dataset. So dataset API is always strongly typed and optimized using schema where RDD is not.

Dataset definition also talks about Dataframes API. Dataframe is special dataset where there is no compilation checks for schema. So this makes dataSet new single abstraction replacing RDD from earlier versions of spark.

SparkSession: A new entry point that replaces the old SQLContext and HiveContext. For users of the DataFrame API, a common source of confusion for Spark is which “context” to use. Now you can use SparkSession, which subsumes both, as a single entry point, as demonstrated in this notebook. Note that the old SQLContext and HiveContext are still kept for backward compatibility.

In earlier versions of spark, spark context was entry point for Spark. As RDD was main API, it was created and manipulated using context API’s. For every other API,we needed to use different contexts.For streaming, we needed StreamingContext, for SQL sqlContext and for hive HiveContext. But as DataSet and Dataframe API’s are becoming new standard API’s we need an entry point build for them. So in Spark 2.0, we have a new entry point for DataSet and Dataframe API’s called as Spark Session.

SparkSession is essentially combination of SQLContext, HiveContext and future StreamingContext. All the API’s available on those contexts are available on spark session also. Spark session internally has a spark context for actual computation.

 

val sparkSession = SparkSession.builder
.master("local")
.appName("spark session example")
.getOrCreate()

New Accumulator API: We have designed a new Accumulator API that has a simpler type hierarchy and support specialization for primitive types. The old Accumulator API has been deprecated but retained for backward compatibility

 

 Catalog  API :  In Spark2.0 ,A new Catalog API is introduced to access metadata.You can fetch all database list as well as table list using this API.

To access all databases list :-

val sparkSession = SparkSession.builder
           .master("local")
           .appName("catalog example")
           .getOrCreate()

val catalog = sparkSession.catalog
catalog.listDatabases().select("name").show()
To access all table list :-
val sparkSession = SparkSession.builder.
           master("local")
           .appName("example")
           .getOrCreate()

val catalog = sparkSession.catalog

catalog.listTables().select("name").show()

This blog provides a quick introduction to using Spark2.0 .It demonstrates the basic functionality of new API.This is the start of using Spark with Scala, from next week onwards we would be working on this tutorial to make it grow. We would look at how we can add more functionality into it , then we would be adding more modules to it together. If you have any changes then feel free to send.
If you have any suggestion feel free to suggest us 🙂 Stay tuned.
You can access full code from here

 

Tutorial : DataFrame API Functionalities using Spark 1.6

In previous tutorial, we  have explained  about the SparkSQL and DataFrames Operations using Spark 1.6. Now In this tutorial we have covered  DataFrame API Functionalities . And we have provided running example of each functionality for better support. Lets begin the tutorial and discuss about the DataFrame API  Operations using Spark 1.6 .

DataFrame API Example Using Different types of Functionalities

Different type of DataFrame operations are :-

1.Action
2.Basic
3.Operations

Here we are using  JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

Continue reading

Tutorial : Spark SQL and DataFrames Operations using Spark 1.6

In previous tutorial, we  have explained about Spark Core and RDD functionalities. Now In this tutorial we have covered Spark SQL and DataFrame operation from different source like JSON, Text and CSV data files. And we have provided running example of each functionality for better support. Lets begin the tutorial and discuss about the SparkSQL and DataFrames Operations using Spark 1.6

SparkSQL

Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data. Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL. It can also be used to read data from an existing Hive installation.It provides a programming abstraction called DataFrame and can act as distributed SQL query engine. Continue reading

Tutorial : Quick overview of Spark 1.6 Core Functionality

In this blog we will discuss about Spark 1.6 Core Functionality and provides a quick introduction to using Spark. It demonstrates the basic functionality of RDDs. Later on we demonstrate Spark SQL and DataFrame API functionality. We have tried to cover basics of Spark 1.6  core functionality and  programming contexts.

Introduction to Apache Spark

 Spark is a powerful open source processing engine built around speed, ease of use, and sophisticated analytics.It is a cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. Apache Spark is a lightning-fast cluster computing technology, designed for fast computation. It is a framework for performing general data analytics on distributed computing cluster like Hadoop. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application. It provides in memory computations for increase speed and data process over map reduce.It runs on top of existing Hadoop cluster and access Hadoop data store (HDFS), can also process structured data in Hive and Streaming data from HDFS, Flume, Kafka, Twitter. Continue reading

Dribbling with Spark 1.6 GraphX Components

GraphX provide distributed in-memory computing. The GraphX API enables users to view data both as graphs and as collections (i.e., RDDs) without data movement or duplication.

In this example, we have process a small social network with users as vertices’s and relation between users as edges and find out these details:

  • Evaluate what’s the most important users in the graph
  • Find all three users graph where every two users are connected
  • Find pair of users where connection in each direction between them

Continue reading