Pregel. A demonstration.

If you are looking for simple and clear demonstration about Pregel and its implementation in PageRank Algorithm, you are headed to the right place!

This blog post has two parts :

  • One, Pregel Implementation using Python. The interesting python code which is written by Michael Nelson, helps us understand the Pregel concept better.
  • Two, Usage of  GraphX Pregel API which means you can simply write few lines of code to process the graphs in parallel. Official documentation of GraphX Pregel API is confusing and does not provide complete code of Pregel implementation for PageRank. You can read further in this blog to see a complete working code which is in the last section.

Note : One of  my previous blog posts on Parallelism in Apache Spark talks more about Apache Spark and GraphX featuring some toy examples. If you are a beginner to Apache Spark and its ecosystem, I suggest you to skim through that post else continue reading here.

Intro :

If we have to appreciate the working and efficiency of Pregel, we will have to use large datasets and work on them using many clusters. In our scenario, it isn’t possible to show that. However it is possible to understand Pregel concepts by running toy example code. You will see how to do that in the next section. Lastly, we will see how we can deploy on large clusters. (Simulated, pseudo-distributed mode)

I have used Ubuntu 14.04.5 which has 8 gigs of RAM and works on Intel’s i7 processor which supports multi-threading. I have installed Hadoop and Apache Spark in pseudo distributed mode.

Pregel implementation in Python :

The Code :

I have directly forked the code that I found on GitHub (By Michael Nielsen).  Learning about Pregel and it’s implementation can’t be easier when you understand this code.

Refer to the original code if you want to run it on your machine. Below code is partial and is for explanation.

import collections
import threading

class Vertex():

    def __init__(self,id,value,out_vertices): = id
        self.value = value
        self.out_vertices = out_vertices
        self.incoming_messages = []
        self.outgoing_messages = [] = True
        self.superstep = 0

class Pregel():

    def __init__(self,vertices,num_workers):
        self.vertices = vertices
        self.num_workers = num_workers

    def worker(self,vertex):
        return hash(vertex) % self.num_workers

    def superstep(self):
        workers = []
        for vertex_list in self.partition.values():
            worker = Worker(vertex_list)
        for worker in workers:

class Worker(threading.Thread):

    def __init__(self,vertices):
        self.vertices = vertices

    def run(self):

    def superstep(self):
        for vertex in self.vertices:

  • It has a few classes defined for vertices (Vertex class), threads (Worker class) and Pregel class.
  • Each Vertex class is defined in such a way that it can communicate with other vertices by sending or receiving the messages.
  • The superstep function in Pregel class initiates the creation of threads (Worker class) and every thread can be run in parallel. Ofcourse, the race conditions and other conflicts need to be handled separately.

The implementation of PageRank using Pregel can be accessed in this link. Let’s focus on the pregel function and see how it computes the PageRank. Refer to the original code if you want to run it on your machine. Below code is partial and is for explanation.

from pregel import Vertex, Pregel

def pagerank_test(vertices):
    I = mat(eye(num_vertices))
    G = zeros((num_vertices,num_vertices))
    for vertex in vertices:
        num_out_vertices = len(vertex.out_vertices)
        for out_vertex in vertex.out_vertices:
            G[,] = 1.0/num_out_vertices
    P = (1.0/num_vertices)*mat(ones((num_vertices,1)))
    return 0.15*((I-0.85*G).I)*P

def pagerank_pregel(vertices):
    p = Pregel(vertices,num_workers)
    return mat([vertex.value for vertex in p.vertices]).transpose()

class PageRankVertex(Vertex):

    def update(self):
        # This routine has a bug when there are pages with no outgoing
        # links (never the case for our tests).  This problem can be
        # solved by introducing Aggregators into the Pregel framework,
        # but as an initial demonstration this works fine.
        if self.superstep < 50:
            self.value = 0.15 / num_vertices + 0.85*sum(
                [pagerank for (vertex,pagerank) in self.incoming_messages])
            outgoing_pagerank = self.value / len(self.out_vertices)
            self.outgoing_messages = [(vertex,outgoing_pagerank)
                                      for vertex in self.out_vertices]
   = False

  • Vertex class and Pregel class is imported from the Pregel code.
  • The pagerank_test() function computes the page rank using a standard matrix approach. This function is mainly for comparison with Pregel.
  • The pagerank_pregel() function computes the page rank in bulk synchronous parallel. Notice that will start the threads and every vertex is run in parallel. Also, the communication between the vertices also happens during this step.
  • The class PageRankVertex keeps track of the supersteps and it updates every vertex with new values and messages.
  • The code also generates random vertices (pair of values) list and does a comparison between the computation by the test PageRank function and PageRank using Pregel. You can observe the output in the next section.

Experimental setup and Analysis :

  • Download the code from the original links provided in above sections.
  • Make sure Python is installed on your system.
  • From the terminal, type the following commands :
    • $ python
  • The output :

Screen Shot 2017-04-20 at 2.08.53 AM

  • The test computation is based on random pair of values generated by the test code. Pregel computation is based on the parallel processing.
  • The difference in the Page Rank values is very minimal and hence it is very clear that Pregel Implementation produces the correct values for the same random pair of values.
  •  As mentioned earlier in the post, it is not feasible to calculate the performance gain on local computers. Pregel is meant for large scale clusters and this design actually shows lot of performance gains when complex graphs are processed.
  • Ofcourse, the python code I just demonstrated cannot be used on large scale clusters. This is why, we move on to the next section, Pregel implementation using GraphX Pregel API.

Pregel implementation using GraphX Pregel API :

The Code :

In this context, Pregel implementation of PageRank shows the real parallelism. PageRank computation alone doesn’t show any parallelism.

The following snippet is the main part of BSP PageRank code.

/* Pregel Setup */

val primaryMessage = 0.0  // The initial message value
val numberOfIterations = 120 // Number of times the page rank has to run
val probability = 0.15 // The pivot probability assumed to be 0.15 when assigned unknown random vertex else 0.85 when assigned from the same vertex

// The Pregel() function params are based on the callback of following functions. 

def vertex(id: VertexId, value: Double, sumValue: Double): Double = probability + (1.0 - probability) * sumValue
def outgoingMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = Iterator((edge.dstId, edge.srcAttr * edge.attr))
def combineMessages(a: Double, b: Double): Double = a + b

// The following code calls the Pregel() function and does the BSP of PageRank for every superstep

val pagerankGraph = Pregel(primaryGraph, primaryMessage, numberOfIterations)(vertex, outgoingMessage, combineMessages)
println("The Final PageRank graph : <VertexID, Final PageRank of that Vertex> ")

  • As per the documentation, the Pregel object is passed with the functions vertex(), outgoingMessage(), combineMessage().
  • vertex() function calculates the page rank probability.
  • outgoingMessage() function is responsible for message communication.
  • combineMessages() handles multiple messages which has to be sent to the same vertex by combining them.

Experimental setup and Analysis :

  • Install Apache Spark and Hadoop in pseudo distributed mode. This link helps you do that.
  • Once Spark is installed, open your terminal and change to the directory where the scala files are downloaded. Alternatively you can download all the files from here.
  • Execute $ spark-shell
  • Execute scala> :load
  • You will get the output as shown in this link for these nodes. (You can modify nodes according to your usage)
  • To read the output shown in above link takes effort because it’s the terminal recorded output. To simplify, I will just focus on initial and final page ranks.
  • Initial Page Rank Output : Screen Shot 2017-04-20 at 4.49.50 PM
  • Final Page Rank Output :Screen Shot 2017-04-20 at 4.50.18 PM

Conclusion :

I must admit it is a lengthy post, but worth following it. I believe this is one of the best ways to understand Pregel! Thank you. 🙂


Exploring Parallelism in Apache Spark.


What is Spark?

  • Apache Spark is an open-source data processing framework which offers in-memory computing techniques and graph traversal computation API. It works on any file-system.
  • The Spark developers claim it can run programs 100 times faster than Hadoop (A popular Big Data framework) and 10 times faster on disk. (which are mostly ordinary servers and commodity hardware) Also, less of code to be written.

Why Spark?

  • It is designed to overcome serious limitations of one-pass computation nature of MapReduce and non-scalable MPI programming model.
  • Provides fault tolerance and easy to combine with SQL, Streaming data, Machine Learning, Graphs etc.
  •  Mainly, Parallel Processing!

What parallel-paradigm features does Spark boast?

Note :

Before we start to work with examples to understand the parallel action, make sure you have Apache Spark installed on your system. Another blog post gives clear instructions to get Spark  running on your machine. It will also be helpful if you are familiar with the “Hello, World!” examples.

I haven’t worked on clusters or on any commodity servers. This is run on my local machine which has quad-core processor and the parallelism is exploited in the cores i.e. threads run on each core.  If you want to get this working on large cluster of computers, Spark lets you configure the machines. You can control the degree of parallelism too.


The goal of an RDD is to  work with distributed collections as you would with local ones. It is built through parallel transformations like map, filter, reduceByKey, join etc. They exploit data-parallel systems and hence data parallelism.

Take a look at the code below.

val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")

  • Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program.
  • The elements of the collection are copied to form RDD that can be operated on in parallel.
  • Spark will run one task for each partition of the cluster. Typically you would want 2-4 partitions for each CPU in your cluster.

GraphX Pregel API.

  • Graphs created by modern applications can be very large (potentially terabytes or petabytes in size), thus, a single node (computer) will not be able to hold all these nodes in memory.
  • The way forward is to partition and parallelise the graph to involve many machines to process them in parallel.
  • Pregel employs a vertex-centric approach in processing large distributed graphs. (A Bulk synchronous parallel model ) Pregel computations consist of a sequence of iterations, called supersteps.

Let’s understand the type signature of the Pregel operator followed by an implementation of that. (SSSP Algorithm)

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = g.mapReduceTriplets(
        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1

Now we can use the Pregel operator to express computation such as single source shortest path. The implementation of the same can be found in this GitHub link.


I have made an attempt to demonstrate simple examples in RDD and GraphX of the Pregel API, I believe it is enough to get us started to solve more complex graph problems, but of course this is not the only way to do so, there might be some other better examples to explore the parallelism techniques and working.