Introduction to Spark 2.0

Overview of Dataset , Dataframe and RDD API :

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.

But due to facing issue related to advanced optimization move to dataframe.

Dataframe brought custom memory management and runtime code generation which greatly improved performance. So in last year most of the improvements went into Dataframe API.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

Though dataframe API solved many issues, it was not a good enough replacement for RDD API. One of the major issues with dataframe API was no compile time safety and not able to work with domain objects. So this held back people using dataframe API everywhere. But with introduction of Dataset API in 1.6, we were able to fill the gap.

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

So in Spark 2.0, Dataset API will be become a stable API. So Dataset API combined with Dataframe API should able to cover most of the use cases where RDD was used earlier. So as a spark developer it is advised to start embracing these two API’s over RDD API from Spark 2.0.

Points to be discussed :

Datasets : Starting in Spark 2.0, DataFrame is just a type alias for Dataset of Row. Both the typed methods (e.g. map, filter, groupByKey) and the untyped methods (e.g. select, groupBy) are available on the Dataset class. Also, this new combined Dataset interface is the abstraction used for Structured Streaming.

For long, RDD was the standard abstraction of Spark. But from Spark 2.0, Dataset will become the new abstraction layer for spark. Though RDD API will be available, it will become low level API, used mostly for runtime and library development. All user land code will be written against the Dataset abstraction and it’s subset Dataframe API.

Dataset is a superset of Dataframe API which is released in Spark 1.3. Dataset together with Dataframe API brings better performance and flexibility to the platform compared to RDD API. Dataset will be also replacing RDD as an abstraction for streaming in future releases.

The major difference is, dataset is collection of domain specific objects where as RDD is collection of any object. Domain object part of definition signifies the schema part of dataset. So dataset API is always strongly typed and optimized using schema where RDD is not.

Dataset definition also talks about Dataframes API. Dataframe is special dataset where there is no compilation checks for schema. So this makes dataSet new single abstraction replacing RDD from earlier versions of spark.

SparkSession: A new entry point that replaces the old SQLContext and HiveContext. For users of the DataFrame API, a common source of confusion for Spark is which “context” to use. Now you can use SparkSession, which subsumes both, as a single entry point, as demonstrated in this notebook. Note that the old SQLContext and HiveContext are still kept for backward compatibility.

In earlier versions of spark, spark context was entry point for Spark. As RDD was main API, it was created and manipulated using context API’s. For every other API,we needed to use different contexts.For streaming, we needed StreamingContext, for SQL sqlContext and for hive HiveContext. But as DataSet and Dataframe API’s are becoming new standard API’s we need an entry point build for them. So in Spark 2.0, we have a new entry point for DataSet and Dataframe API’s called as Spark Session.

SparkSession is essentially combination of SQLContext, HiveContext and future StreamingContext. All the API’s available on those contexts are available on spark session also. Spark session internally has a spark context for actual computation.

 

val sparkSession = SparkSession.builder
.master("local")
.appName("spark session example")
.getOrCreate()

New Accumulator API: We have designed a new Accumulator API that has a simpler type hierarchy and support specialization for primitive types. The old Accumulator API has been deprecated but retained for backward compatibility

 

 Catalog  API :  In Spark2.0 ,A new Catalog API is introduced to access metadata.You can fetch all database list as well as table list using this API.

To access all databases list :-

val sparkSession = SparkSession.builder
           .master("local")
           .appName("catalog example")
           .getOrCreate()

val catalog = sparkSession.catalog
catalog.listDatabases().select("name").show()
To access all table list :-
val sparkSession = SparkSession.builder.
           master("local")
           .appName("example")
           .getOrCreate()

val catalog = sparkSession.catalog

catalog.listTables().select("name").show()

This blog provides a quick introduction to using Spark2.0 .It demonstrates the basic functionality of new API.This is the start of using Spark with Scala, from next week onwards we would be working on this tutorial to make it grow. We would look at how we can add more functionality into it , then we would be adding more modules to it together. If you have any changes then feel free to send.
If you have any suggestion feel free to suggest us 🙂 Stay tuned.
You can access full code from here

 

Advertisements

Elasticsearch Graph capabilities

Step 1 — Downloading and Installing Elasticsearch :


a) Download the elasticsearch using the following command  :


wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.2/elasticsearch-2.3.2.tar.gz

b) After downloading untar it ,using this command :

tar -xzf elasticsearch-2.3.2.tar.gz

c) go to elasticsearch directory

cd elasticsearch-2.3.2

Step 2 – Install Graph into Elasticsearch :

a) For Graph license :

bin/plugin install license

b) For installing Graph plugin :

bin/plugin install graph

Step 3: Install Graph into Kibana :

bin/kibana plugin --install elasticsearch/graph/latest

Step 4 : Run Elasticsearch :

bin/elasticsearch

Step 5 : Run kibana :

bin/kibana

Step 6 : Ingest data into Elasticsearch :
Download csv from the following link and ingest into elasticsearch either using curl or you can follow my last blog to insert spreadsheet data into elasticsearch directly.

https://drive.google.com/file/d/0BxeFnKIg5Lg_UEFqTkdNMmt6SzQ/view

Step 7 : Go to Graph UI :

After ingesting go to

http://HOST:PORT/app/graph 

You would see this screen

FirstGraphScreeshot.png

Step 8 : Select you index and fields :

In this step , select you index from drop down box and also select  Item ,Region, Rep from field column .

Step 9 : Type your data in search box to get relational graph :

Suppose we type central pencil in search box.

You would see like this :

GraphScreenshot2.png

 

click on the link between central and pencil then you would in right hand side a link summary.

Link summary explains that :

a) 104 documents having pencil.

b) 192 documents having central.

c) 72 documents having both pencil and central.

Data ingestion from Google spreadsheet to Elasticsearch

In this blog we are elaborate how to ingest data from Google spreadsheet to Elasticsearch.

So, There are 5 steps to ingest data from Google spreadsheet to Elasticsearch. Please follow the below steps:

Step – 1)  Login to your account .

Step – 2) Open Spreadsheet and follow step.

Open the spreadsheet and click on Add one and type elasticsearch in search box.You would see below screen.

SearchToAddonsElasticsearch

 

Now click to add elasticsearch plugin. After adding ,you have to give permission to it.After giving permission, elasticsearch plugin would be added into your account.

 

Step – 3) Add elasticsearch plugin :

–  Now click on  Add-ons , you would see below screen.

ClickToAddOnes.png

 

Step – 4) Fill Cluster Information :  

Click on send to cluster.Now  you would below screen

TypeHostAndPassword.png

Here ,in right hand side ,you have to type Host and Port along with Username and Password.

Step -5) Test the Connection :

Test to check connection with elasticsearch. After filling all the things, click on Test. You would see this message  “Successfully connected to your cluster”. Click to Save and click to Edit Data Details.

Step – 6) Edit Details :

After clicking Edit Data Details   ,Select id column and type index name and type name in which you want to ingest this spreadsheet data. You would  see below screen.

EditDataDetailsES.png

 

Step – 7) Push to Cluster :

After filling all the things ,click on Push to Cluster . You would see below screen

SuccessfulllIngestDateIntoES.png

 

After pushing data into cluster .You would see this message  “Success! The data is accessible here”.

Now click to link here of receive message and see your ingested data into ES.

 

Kafka & ZooKeeper | Multi Node Cluster Setup

TODO

In This blog we will explains the setup of the Kafka & ZooKeeper Multi-Node cluster on a distributed environment.

What is Apache Kafka?

A high-throughput distributed messaging system is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumer.

What is ZooKeeper?

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.

Learn more about ZooKeeper on the ZooKeeper Wiki.

Prerequisites

  1. Install Java if you do not have it already. You can get it from here
  2. Kafka Binary files : http://kafka.apache.org/downloads.html

Installation

  • Now first download the Kafka Tarball or binaries on your all instances and extract them
$ tar -xzvf kafka_2.11-0.9.0.1.tgz
$ mv kafka_2.11-0.9.0.1 kafka
  • On Both the Instances, you only need two properties to be changed i.e. zookeeper.properties & server.properties

Lets start to edit “zookeeper.properties” on all the instances

$ vi ~/kafka/config/zookeeper.properties
# The number of milliseconds of each tick
tickTime=2000
 
# The number of ticks that the initial synchronization phase can take
initLimit=10
 
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5

# zoo servers
server.1=x.x.x.x:2888:3888
server.2=x.x.x.x:2888:3888
server.3=x.x.x.x:2888:3888
#add here more servers if you want

Now edit all instances “server.properties” and update the following this

$ vi ~/kafka/config/server.properties
broker.id=1 //Increase by one as per node count
host.name=x.x.x.x //Current node IP
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
  • After this go to the /tmp of every instance and create following things
$ cd /tmp/
$ mkdir zookeeper #Zookeeper temp dir
$ cd zookeeper
$ touch myid  #Zookeeper temp file
$ echo '1' >> myid #Add Server ID for Respective Instances i.e. "server.1 and server.2 etc"
  • Now all is done, Need to start ZooKeeper and Kafka Server on all instances

$ bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties

$ bin/kafka-server-start.sh ~/kafka/config/server.properties

We would look at how we can provide more useful tutorials to grow it , then we would be adding more content to it together. If you have any suggestion feel free to suggest us 🙂 Stay tuned.

Tutorial : DataFrame API Functionalities using Spark 1.6

In previous tutorial, we  have explained  about the SparkSQL and DataFrames Operations using Spark 1.6. Now In this tutorial we have covered  DataFrame API Functionalities . And we have provided running example of each functionality for better support. Lets begin the tutorial and discuss about the DataFrame API  Operations using Spark 1.6 .

DataFrame API Example Using Different types of Functionalities

Different type of DataFrame operations are :-

1.Action
2.Basic
3.Operations

Here we are using  JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

Continue reading

Tutorial : Spark SQL and DataFrames Operations using Spark 1.6

In previous tutorial, we  have explained about Spark Core and RDD functionalities. Now In this tutorial we have covered Spark SQL and DataFrame operation from different source like JSON, Text and CSV data files. And we have provided running example of each functionality for better support. Lets begin the tutorial and discuss about the SparkSQL and DataFrames Operations using Spark 1.6

SparkSQL

Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data. Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL. It can also be used to read data from an existing Hive installation.It provides a programming abstraction called DataFrame and can act as distributed SQL query engine. Continue reading

Tutorial : Quick overview of Spark 1.6 Core Functionality

In this blog we will discuss about Spark 1.6 Core Functionality and provides a quick introduction to using Spark. It demonstrates the basic functionality of RDDs. Later on we demonstrate Spark SQL and DataFrame API functionality. We have tried to cover basics of Spark 1.6  core functionality and  programming contexts.

Introduction to Apache Spark

 Spark is a powerful open source processing engine built around speed, ease of use, and sophisticated analytics.It is a cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. Apache Spark is a lightning-fast cluster computing technology, designed for fast computation. It is a framework for performing general data analytics on distributed computing cluster like Hadoop. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application. It provides in memory computations for increase speed and data process over map reduce.It runs on top of existing Hadoop cluster and access Hadoop data store (HDFS), can also process structured data in Hive and Streaming data from HDFS, Flume, Kafka, Twitter. Continue reading