Dribbling with Spark 1.6 GraphX Components

GraphX provide distributed in-memory computing. The GraphX API enables users to view data both as graphs and as collections (i.e., RDDs) without data movement or duplication.

In this example, we have process a small social network with users as vertices’s and relation between users as edges and find out these details:

  • Evaluate what’s the most important users in the graph
  • Find all three users graph where every two users are connected
  • Find pair of users where connection in each direction between them

Each vertex is keyed by a unique 64 bit long identifier, so that’s why we are using ID column as long for VertexId. There are following columns details which exist in csv file:

ID : Its unique Id for all the rows (vertex). Its must be unique 64 bit long.
RelationId : Its connect one to another user behalf of unique Id (ID column).
User : Its vertex attribute.
Relationship : Its edge attribute (define relation between two vertex).

So here we will demonstrate the graph processing using Spark GraphX by following details:

  • Dataframe from Csv file using Spark SQLContext
  • VertexRDD from dataframe
  • EdgeRDD from datframe
  • Graph object using VertexRDD and EdgeRDD.
  • Apply basic graphs algorithm:
  1. Page Rank
  2. Triangle Count
  3. Connected Component
  4. Strongly Connected Component

 

  • Dataframe from Csv File Using Spark SQLContext:

To create dataframe from the file. You can set format of file like csv or txt with help of format function.

val SCHEMA_OPTIONS = Map("header"->"true", "inferSchema"->"true")
val path = "<file-name.csv>"
val df = sqlContext.read.format("csv").options(SCHEMA_OPTIONS).load(path)

You can see the output of dataframe using show function:

df.show

The output is :

+----+----------+-------+------------+
|  ID|RelationId|   User|Relationship|
+----+----------+-------+------------+
|2000|      2001|    Zim|        Like|
|2001|      2002|   John|      friend|
|2002|      2000|   Alex|      follow|
|2003|      2004|Charlie|        Like|
|2004|      2005|    Xin|    follower|
|2005|      2003|Supriya|        Like|
|2006|      2007|  Jelly|      follow|
|2007|      2008|  Allen|      friend|
|2008|      2009|   Pepi|      friend|
|2009|      2010|  Sandy|      friend|
|2010|      2011|    San|      friend|
|2011|      2012|    Bob|        Like|
|2012|      2013|  Alice|        Like|
|2013|      2014|  Eliza|        Like|
|2014|      2015|  Annie|        Like|
+----+----------+-------+------------+

Now we have to create graph component which contains require parameters to construct graph object.

case class GraphComponent(primaryIndex: String, vertexAttr: String,
edgeAttr: String, relation: String)

We are going to create graph object. The basic property graph constructor takes an RDD of vertices (with type RDD[(VertexId, V)]) and an RDD of edges (with type RDD[Edge[E]]) and builds a graph (with type Graph[V, E]).

  • VertexRDD from Dataframe:

To create a method for VertexRDD from dataframe:


/*
* Create VertexRDD
*/
def getVertices(df: DataFrame, graph: GraphComponent): RDD[(Long, String)] = df.map {
  case row =>
    val primaryValue = row.getAs[Any](graph.primaryIndex).toString.toLong
    val vertexAttr = row.getAs[Any](graph.vertexAttr).toString
    (primaryValue, vertexAttr)
}
  • EdgeRDD from Dataframe: To create a method for EdgeRDD from dataframe:
/*
* Create EdgeRDD
*/
def getEdges(df: DataFrame, graph: GraphComponent): RDD[Edge[String]] = {
  df.map {
    case row =>
      val primaryValue = row.getAs[Any](graph.primaryIndex).toString.toLong
      val relIndex = row.getAs[Any](graph.relation).toString.toLong
      val edgeAttr = row.getAs[Any](graph.edgeAttr).toString
      Edge(primaryValue, relIndex, edgeAttr)
  }
}

To create a graphComponent object:

val graphComponent = GraphComponent("ID", "User", "Relationship", "RelationId")
  • Graph object Using VertexRDD and EdgeRDD:
val vertices = getVertices(df, graphComponent)
val edges = getEdges(df, graphComponent)
val graph = Graph(vertices, edges)

You can see graph object using below command:

graph.triplets.collect.foreach(println)

The output of Graph as follows:

((2000,Zim),(2001,John),Like)
((2001,John),(2002,Alex),friend)
((2002,Alex),(2000,Zim),follow)
((2003,Charlie),(2004,Xin),Like)
((2004,Xin),(2005,Supriya),follower)
((2005,Supriya),(2003,Charlie),Like)
((2006,Jelly),(2007,Allen),follow)
((2007,Allen),(2008,Pepi),friend)
((2008,Pepi),(2009,Sandy),friend)
((2009,Sandy),(2010,San),friend)
((2010,San),(2011,Bob),friend)
((2011,Bob),(2012,Alice),Like)
((2012,Alice),(2013,Eliza),Like)
((2013,Eliza),(2014,Annie),Like)
((2014,Annie),(2015,null),Like)
  • Apply Basic Graph Algorithm:

After created Graph you can apply Graph algorithm like page rank, triangle count, connected component and strongly connected component.

1. Page Rank Algorithm:

PageRank measures the importance of each vertex in a graph. We are going to run PageRank to evaluate what the most important vertex (users) in the graph.

Now we are applying pageRank algorithm:

//Page rank Algorithm
val pageRankGraph = graph.pageRank(0.001)
You can see output of pageRankGraph:
pageRankGraph.triplets.collect.foreach(println)
The output as follows:
((2000,0.9944867761927632),(2001,0.9944867761927632),1.0)
((2001,0.9944867761927632),(2002,0.9944867761927632),1.0)
((2002,0.9944867761927632),(2000,0.9944867761927632),1.0)
((2003,0.9944867761927632),(2004,0.9944867761927632),1.0)
((2004,0.9944867761927632),(2005,0.9944867761927632),1.0)
((2005,0.9944867761927632),(2003,0.9944867761927632),1.0)
((2006,0.15),(2007,0.27749999999999997),1.0)
((2007,0.27749999999999997),(2008,0.38587499999999997),1.0)
((2008,0.38587499999999997),(2009,0.47799375),1.0)
((2009,0.47799375),(2010,0.5562946875),1.0)
((2010,0.5562946875),(2011,0.622850484375),1.0)
((2011,0.622850484375),(2012,0.67942291171875),1.0)
((2012,0.67942291171875),(2013,0.7275094749609375),1.0)
((2013,0.7275094749609375),(2014,0.7683830537167969),1.0)
((2014,0.7683830537167969),(2015,0.8031255956592774),1.0)
//Page rank output join with graph object
val graphWithPageRank = graph.outerJoinVertices(pageRankGraph.vertices) {
  case (id, attr, Some(pr)) => (pr, attr)
  case (id, attr, None) => (0.0, attr)
}

You can see output of graph with page rank using below command:

graphWithPageRank.triplets.collect.foreach(println)

The output as follows:

((2000,(0.9944867761927632,Zim)),(2001,(0.9944867761927632,John)),Like)
((2001,(0.9944867761927632,John)),(2002,(0.9944867761927632,Alex)),friend)
((2002,(0.9944867761927632,Alex)),(2000,(0.9944867761927632,Zim)),follow)
((2003,(0.9944867761927632,Charlie)),(2004,(0.9944867761927632,Xin)),Like)
((2004,(0.9944867761927632,Xin)),(2005,(0.9944867761927632,Supriya)),follower)
((2005,(0.9944867761927632,Supriya)),(2003,(0.9944867761927632,Charlie)),Like)
((2006,(0.15,Jelly)),(2007,(0.27749999999999997,Allen)),follow)
((2007,(0.27749999999999997,Allen)),(2008,(0.38587499999999997,Pepi)),friend)
((2008,(0.38587499999999997,Pepi)),(2009,(0.47799375,Sandy)),friend)
((2009,(0.47799375,Sandy)),(2010,(0.5562946875,San)),friend)
((2010,(0.5562946875,San)),(2011,(0.622850484375,Bob)),friend)
((2011,(0.622850484375,Bob)),(2012,(0.67942291171875,Alice)),Like)
((2012,(0.67942291171875,Alice)),(2013,(0.7275094749609375,Eliza)),Like)
((2013,(0.7275094749609375,Eliza)),(2014,(0.7683830537167969,Annie)),Like)
((2014,(0.7683830537167969,Annie)),(2015,(0.8031255956592774,null)),Like)

You can find most important top 10 pagerank users:

println(graphWithPageRank.vertices.top(10)(Ordering.by(_._2._1)).mkString("\n")

The output is:

(2005,(0.9944867761927632,Supriya))
(2004,(0.9944867761927632,Xin))
(2001,(0.9944867761927632,John))
(2002,(0.9944867761927632,Alex))
(2000,(0.9944867761927632,Zim))
(2003,(0.9944867761927632,Charlie))
(2015,(0.8031255956592774,null))
(2014,(0.7683830537167969,Annie))
(2013,(0.7275094749609375,Eliza))
(2012,(0.67942291171875,Alice))

2. Triangle Count:
Triangle Count is very useful in social network analysis. The triangle is a three-node small graph, where every two nodes are connected.

Now we are applying Triangle Count algorithm.


//Triangle Component Algorithm
val triangleComponent = graph.partitionBy(PartitionStrategy.RandomVertexCut).triangleCount().vertices

The output of Triangle Component as follows:

(2000,1)
(2010,0)
(2002,1)
(2014,0)
(2008,0)
(2006,0)
(2012,0)
(2004,1)
(2001,1)
(2003,1)
(2005,1)
(2015,0)
(2013,0)
(2009,0)
(2011,0)
(2007,0)

If triangle count is detected then its 1 otherwise 0. In above output detect 6 vertices(users) as triangle component. There are find 2 three-node graph where every two users are connected.

3. Connected Component:

Compute the connected component membership of each vertex and return a graph with the vertex value containing the lowest vertex id in the connected component containing that vertex.

Now we are applying Connected Component algorithm:

//Connected Component Algorithm and join with triangle component output
val connectedComponent = graph.connectedComponents().vertices

You can see the result of connected component:

connectedComponent.collect.foreach(println)

The output as follows:

(2000,2000)
(2010,2006)
(2002,2000)
(2014,2006)
(2008,2006)
(2006,2006)
(2012,2006)
(2004,2003)
(2001,2000)
(2003,2003)
(2005,2003)
(2015,2006)
(2013,2006)
(2009,2006)
(2011,2006)
(2007,2006)

4. Strongly Connected Component:

A pair of vertices u and v are said to be strongly connected to each other if there is a path in each direction between them.

Now we are applying Connected Component algorithm:

//Strongly Connected Component Algorithm and join with connected component output
val stronglyConnected = graph.stronglyConnectedComponents(3).vertices

You can see the result of strongly connected component:

stronglyConnected.collect.foreach(println)

The output as follows:

(2000,2000)
(2010,2010)
(2002,2002)
(2014,2014)
(2008,2008)
(2006,2006)
(2012,2012)
(2004,2003)
(2001,2001)
(2003,2003)
(2005,2003)
(2015,2015)
(2013,2013)
(2009,2009)
(2011,2011)
(2007,2007)

The above output, we found 2 pair of strongly connected users (2004, 2003) & (2005,2003) where connection in each direction between them.

Now you can merge all the algorithm output in single Graph:

We are going to merge all the output like pagerank, triangle count etc. with graph object

//Triangle Component join with page rank graph output
val triByGraph = graphWithPageRank.outerJoinVertices(triangleComponent) {
 case (id, (rank, attr), Some(tri)) => (rank, tri, attr)
 case (id, (rank, attr), None) => (rank, 0, attr)
}
//Connected Component join with triangle component graph output
val ccByGraph = triByGraph.outerJoinVertices(cComponent) {
 case (id, (rank, tri, attr), Some(cc)) => (rank, tri, cc, attr)
 case (id, (rank, tri, attr), None) => (rank, tri, -1L, attr)
}
//Strongly Connected Component join with connected component output
val stByGraph = ccByGraph.outerJoinVertices(stronglyConnected) {
  case (id, (rank, tri, cc, attr), Some(st)) => (rank, tri, cc, st, attr)
  case (id, (rank, tri, cc, attr), None) => (rank, tri, cc, id.toLong, attr)
}

You can see graph output

stByGraph.triplets.collect.foreach(println)

The final output of Graph as follows:

((2000,(0.9944867761927632,1,2000,2000,Zim)),(2001,(0.9944867761927632,1,2000,2000,John)),Like)
((2001,(0.9944867761927632,1,2000,2000,John)),(2002,(0.9944867761927632,1,2000,2000,Alex)),friend)
((2002,(0.9944867761927632,1,2000,2000,Alex)),(2000,(0.9944867761927632,1,2000,2000,Zim)),follow)
((2003,(0.9944867761927632,1,2003,2003,Charlie)),(2004,(0.9944867761927632,1,2003,2003,Xin)),Like)
((2004,(0.9944867761927632,1,2003,2003,Xin)),(2005,(0.9944867761927632,1,2003,2003,Supriya)),follower)
((2005,(0.9944867761927632,1,2003,2003,Supriya)),(2003,(0.9944867761927632,1,2003,2003,Charlie)),Like)
((2006,(0.15,0,2006,2006,Jelly)),(2007,(0.27749999999999997,0,2006,2007,Allen)),follow)
((2007,(0.27749999999999997,0,2006,2007,Allen)),(2008,(0.38587499999999997,0,2006,2008,Pepi)),friend)
((2008,(0.38587499999999997,0,2006,2008,Pepi)),(2009,(0.47799375,0,2006,2009,Sandy)),friend)
((2009,(0.47799375,0,2006,2009,Sandy)),(2010,(0.5562946875,0,2006,2010,San)),friend)
((2010,(0.5562946875,0,2006,2010,San)),(2011,(0.622850484375,0,2006,2011,Bob)),friend)
((2011,(0.622850484375,0,2006,2011,Bob)),(2012,(0.67942291171875,0,2006,2012,Alice)),Like)
((2012,(0.67942291171875,0,2006,2012,Alice)),(2013,(0.7275094749609375,0,2006,2013,Eliza)),Like)
((2013,(0.7275094749609375,0,2006,2013,Eliza)),(2014,(0.7683830537167969,0,2006,2014,Annie)),Like)
((2014,(0.7683830537167969,0,2006,2014,Annie)),(2015,(0.8031255956592774,0,2006,2015,null)),Like)

To visualize Graph using D3 :

Selection_013(2)

This is the start of using GraphX, from next week onwards we would be working on this topic to make it grow. We would look at how we can create more Graph into it , then we would be adding more GraphX modules to it together. If you have any suggestion feel free to suggest us 🙂 Stay tuned.

Advertisements

3 thoughts on “Dribbling with Spark 1.6 GraphX Components

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s