This blog post has two parts :
- One, Pregel Implementation using Python. The interesting python code which is written by Michael Nelson, helps us understand the Pregel concept better.
- Two, Usage of GraphX Pregel API which means you can simply write few lines of code to process the graphs in parallel. Official documentation of GraphX Pregel API is confusing and does not provide complete code of Pregel implementation for PageRank. You can read further in this blog to see a complete working code which is in the last section.
Note : One of my previous blog posts on Parallelism in Apache Spark talks more about Apache Spark and GraphX featuring some toy examples. If you are a beginner to Apache Spark and its ecosystem, I suggest you to skim through that post else continue reading here.
If we have to appreciate the working and efficiency of Pregel, we will have to use large datasets and work on them using many clusters. In our scenario, it isn’t possible to show that. However it is possible to understand Pregel concepts by running toy example code. You will see how to do that in the next section. Lastly, we will see how we can deploy on large clusters. (Simulated, pseudo-distributed mode)
I have used Ubuntu 14.04.5 which has 8 gigs of RAM and works on Intel’s i7 processor which supports multi-threading. I have installed Hadoop and Apache Spark in pseudo distributed mode.
Pregel implementation in Python :
The Code :
I have directly forked the code that I found on GitHub (By Michael Nielsen). Learning about Pregel and it’s implementation can’t be easier when you understand this code.
Refer to the original code if you want to run it on your machine. Below code is partial and is for explanation.
import collections import threading class Vertex(): def __init__(self,id,value,out_vertices): self.id = id self.value = value self.out_vertices = out_vertices self.incoming_messages =  self.outgoing_messages =  self.active = True self.superstep = 0 class Pregel(): def __init__(self,vertices,num_workers): self.vertices = vertices self.num_workers = num_workers def worker(self,vertex): return hash(vertex) % self.num_workers def superstep(self): workers =  for vertex_list in self.partition.values(): worker = Worker(vertex_list) workers.append(worker) worker.start() for worker in workers: worker.join() class Worker(threading.Thread): def __init__(self,vertices): threading.Thread.__init__(self) self.vertices = vertices def run(self): self.superstep() def superstep(self): for vertex in self.vertices: if vertex.active: vertex.update()
- It has a few classes defined for vertices (Vertex class), threads (Worker class) and Pregel class.
- Each Vertex class is defined in such a way that it can communicate with other vertices by sending or receiving the messages.
- The superstep function in Pregel class initiates the creation of threads (Worker class) and every thread can be run in parallel. Ofcourse, the race conditions and other conflicts need to be handled separately.
The implementation of PageRank using Pregel can be accessed in this link. Let’s focus on the pregel function and see how it computes the PageRank. Refer to the original code if you want to run it on your machine. Below code is partial and is for explanation.
from pregel import Vertex, Pregel def pagerank_test(vertices): I = mat(eye(num_vertices)) G = zeros((num_vertices,num_vertices)) for vertex in vertices: num_out_vertices = len(vertex.out_vertices) for out_vertex in vertex.out_vertices: G[out_vertex.id,vertex.id] = 1.0/num_out_vertices P = (1.0/num_vertices)*mat(ones((num_vertices,1))) return 0.15*((I-0.85*G).I)*P def pagerank_pregel(vertices): p = Pregel(vertices,num_workers) p.run() return mat([vertex.value for vertex in p.vertices]).transpose() class PageRankVertex(Vertex): def update(self): # This routine has a bug when there are pages with no outgoing # links (never the case for our tests). This problem can be # solved by introducing Aggregators into the Pregel framework, # but as an initial demonstration this works fine. if self.superstep &amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;lt; 50: self.value = 0.15 / num_vertices + 0.85*sum( [pagerank for (vertex,pagerank) in self.incoming_messages]) outgoing_pagerank = self.value / len(self.out_vertices) self.outgoing_messages = [(vertex,outgoing_pagerank) for vertex in self.out_vertices] else: self.active = False
- Vertex class and Pregel class is imported from the Pregel code.
- The pagerank_test() function computes the page rank using a standard matrix approach. This function is mainly for comparison with Pregel.
- The pagerank_pregel() function computes the page rank in bulk synchronous parallel. Notice that p.run() will start the threads and every vertex is run in parallel. Also, the communication between the vertices also happens during this step.
- The class PageRankVertex keeps track of the supersteps and it updates every vertex with new values and messages.
- The code also generates random vertices (pair of values) list and does a comparison between the computation by the test PageRank function and PageRank using Pregel. You can observe the output in the next section.
Experimental setup and Analysis :
- Download the code from the original links provided in above sections.
- Make sure Python is installed on your system.
- From the terminal, type the following commands :
- $ python pagerank.py
- The output :
- The test computation is based on random pair of values generated by the test code. Pregel computation is based on the parallel processing.
- The difference in the Page Rank values is very minimal and hence it is very clear that Pregel Implementation produces the correct values for the same random pair of values.
- As mentioned earlier in the post, it is not feasible to calculate the performance gain on local computers. Pregel is meant for large scale clusters and this design actually shows lot of performance gains when complex graphs are processed.
- Ofcourse, the python code I just demonstrated cannot be used on large scale clusters. This is why, we move on to the next section, Pregel implementation using GraphX Pregel API.
Pregel implementation using GraphX Pregel API :
The Code :
In this context, Pregel implementation of PageRank shows the real parallelism. PageRank computation alone doesn’t show any parallelism.
- The link to the serial PageRank code.
- The link to Bulk Synchronous Parallel PageRank code, which I have written.
The following snippet is the main part of BSP PageRank code.
/* Pregel Setup */ val primaryMessage = 0.0 // The initial message value val numberOfIterations = 120 // Number of times the page rank has to run val probability = 0.15 // The pivot probability assumed to be 0.15 when assigned unknown random vertex else 0.85 when assigned from the same vertex // The Pregel() function params are based on the callback of following functions. def vertex(id: VertexId, value: Double, sumValue: Double): Double = probability + (1.0 - probability) * sumValue def outgoingMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = Iterator((edge.dstId, edge.srcAttr * edge.attr)) def combineMessages(a: Double, b: Double): Double = a + b // The following code calls the Pregel() function and does the BSP of PageRank for every superstep val pagerankGraph = Pregel(primaryGraph, primaryMessage, numberOfIterations)(vertex, outgoingMessage, combineMessages) println("The Final PageRank graph : &amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;lt;VertexID, Final PageRank of that Vertex&amp;amp;amp;amp;amp;amp;amp;amp;amp;amp;gt; ") primaryGraph.vertices.take(10)
- As per the documentation, the Pregel object is passed with the functions vertex(), outgoingMessage(), combineMessage().
- vertex() function calculates the page rank probability.
- outgoingMessage() function is responsible for message communication.
- combineMessages() handles multiple messages which has to be sent to the same vertex by combining them.
Experimental setup and Analysis :
- Install Apache Spark and Hadoop in pseudo distributed mode. This link helps you do that.
- Once Spark is installed, open your terminal and change to the directory where the scala files are downloaded. Alternatively you can download all the files from here.
- Execute $ spark-shell
- Execute scala> :load pregel_pagerank.py
- You will get the output as shown in this link for these nodes. (You can modify nodes according to your usage)
- To read the output shown in above link takes effort because it’s the terminal recorded output. To simplify, I will just focus on initial and final page ranks.
- Initial Page Rank Output :
- Final Page Rank Output :
I must admit it is a lengthy post, but worth following it. I believe this is one of the best ways to understand Pregel! Thank you. 🙂