Pregel. A demonstration.

If you are looking for simple and clear demonstration about Pregel and its implementation in PageRank Algorithm, you are headed to the right place!

This blog post has two parts :

  • One, Pregel Implementation using Python. The interesting python code which is written by Michael Nelson, helps us understand the Pregel concept better.
  • Two, Usage of  GraphX Pregel API which means you can simply write few lines of code to process the graphs in parallel. Official documentation of GraphX Pregel API is confusing and does not provide complete code of Pregel implementation for PageRank. You can read further in this blog to see a complete working code which is in the last section.

Note : One of  my previous blog posts on Parallelism in Apache Spark talks more about Apache Spark and GraphX featuring some toy examples. If you are a beginner to Apache Spark and its ecosystem, I suggest you to skim through that post else continue reading here.

Intro :

If we have to appreciate the working and efficiency of Pregel, we will have to use large datasets and work on them using many clusters. In our scenario, it isn’t possible to show that. However it is possible to understand Pregel concepts by running toy example code. You will see how to do that in the next section. Lastly, we will see how we can deploy on large clusters. (Simulated, pseudo-distributed mode)

I have used Ubuntu 14.04.5 which has 8 gigs of RAM and works on Intel’s i7 processor which supports multi-threading. I have installed Hadoop and Apache Spark in pseudo distributed mode.

Pregel implementation in Python :

The Code :

I have directly forked the code that I found on GitHub (By Michael Nielsen).  Learning about Pregel and it’s implementation can’t be easier when you understand this code.

Refer to the original code if you want to run it on your machine. Below code is partial and is for explanation.

pregel.py

import collections
import threading

class Vertex():

    def __init__(self,id,value,out_vertices):
        self.id = id
        self.value = value
        self.out_vertices = out_vertices
        self.incoming_messages = []
        self.outgoing_messages = []
        self.active = True
        self.superstep = 0

class Pregel():

    def __init__(self,vertices,num_workers):
        self.vertices = vertices
        self.num_workers = num_workers

    def worker(self,vertex):
        return hash(vertex) % self.num_workers

    def superstep(self):
        workers = []
        for vertex_list in self.partition.values():
            worker = Worker(vertex_list)
            workers.append(worker)
            worker.start()
        for worker in workers:
            worker.join()

class Worker(threading.Thread):

    def __init__(self,vertices):
        threading.Thread.__init__(self)
        self.vertices = vertices

    def run(self):
        self.superstep()

    def superstep(self):
        for vertex in self.vertices:
            if vertex.active:
                vertex.update()

  • It has a few classes defined for vertices (Vertex class), threads (Worker class) and Pregel class.
  • Each Vertex class is defined in such a way that it can communicate with other vertices by sending or receiving the messages.
  • The superstep function in Pregel class initiates the creation of threads (Worker class) and every thread can be run in parallel. Ofcourse, the race conditions and other conflicts need to be handled separately.

The implementation of PageRank using Pregel can be accessed in this link. Let’s focus on the pregel function and see how it computes the PageRank. Refer to the original code if you want to run it on your machine. Below code is partial and is for explanation.

pagerank.py

from pregel import Vertex, Pregel

def pagerank_test(vertices):
    I = mat(eye(num_vertices))
    G = zeros((num_vertices,num_vertices))
    for vertex in vertices:
        num_out_vertices = len(vertex.out_vertices)
        for out_vertex in vertex.out_vertices:
            G[out_vertex.id,vertex.id] = 1.0/num_out_vertices
    P = (1.0/num_vertices)*mat(ones((num_vertices,1)))
    return 0.15*((I-0.85*G).I)*P

def pagerank_pregel(vertices):
    p = Pregel(vertices,num_workers)
    p.run()
    return mat([vertex.value for vertex in p.vertices]).transpose()

class PageRankVertex(Vertex):

    def update(self):
        # This routine has a bug when there are pages with no outgoing
        # links (never the case for our tests).  This problem can be
        # solved by introducing Aggregators into the Pregel framework,
        # but as an initial demonstration this works fine.
        if self.superstep < 50:
            self.value = 0.15 / num_vertices + 0.85*sum(
                [pagerank for (vertex,pagerank) in self.incoming_messages])
            outgoing_pagerank = self.value / len(self.out_vertices)
            self.outgoing_messages = [(vertex,outgoing_pagerank)
                                      for vertex in self.out_vertices]
        else:
            self.active = False

  • Vertex class and Pregel class is imported from the Pregel code.
  • The pagerank_test() function computes the page rank using a standard matrix approach. This function is mainly for comparison with Pregel.
  • The pagerank_pregel() function computes the page rank in bulk synchronous parallel. Notice that p.run() will start the threads and every vertex is run in parallel. Also, the communication between the vertices also happens during this step.
  • The class PageRankVertex keeps track of the supersteps and it updates every vertex with new values and messages.
  • The code also generates random vertices (pair of values) list and does a comparison between the computation by the test PageRank function and PageRank using Pregel. You can observe the output in the next section.

Experimental setup and Analysis :

  • Download the code from the original links provided in above sections.
  • Make sure Python is installed on your system.
  • From the terminal, type the following commands :
    • $ python pagerank.py
  • The output :

Screen Shot 2017-04-20 at 2.08.53 AM

  • The test computation is based on random pair of values generated by the test code. Pregel computation is based on the parallel processing.
  • The difference in the Page Rank values is very minimal and hence it is very clear that Pregel Implementation produces the correct values for the same random pair of values.
  •  As mentioned earlier in the post, it is not feasible to calculate the performance gain on local computers. Pregel is meant for large scale clusters and this design actually shows lot of performance gains when complex graphs are processed.
  • Ofcourse, the python code I just demonstrated cannot be used on large scale clusters. This is why, we move on to the next section, Pregel implementation using GraphX Pregel API.

Pregel implementation using GraphX Pregel API :

The Code :

In this context, Pregel implementation of PageRank shows the real parallelism. PageRank computation alone doesn’t show any parallelism.

The following snippet is the main part of BSP PageRank code.


/* Pregel Setup */

val primaryMessage = 0.0  // The initial message value
val numberOfIterations = 120 // Number of times the page rank has to run
val probability = 0.15 // The pivot probability assumed to be 0.15 when assigned unknown random vertex else 0.85 when assigned from the same vertex

// The Pregel() function params are based on the callback of following functions. 

def vertex(id: VertexId, value: Double, sumValue: Double): Double = probability + (1.0 - probability) * sumValue
def outgoingMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = Iterator((edge.dstId, edge.srcAttr * edge.attr))
def combineMessages(a: Double, b: Double): Double = a + b

// The following code calls the Pregel() function and does the BSP of PageRank for every superstep

val pagerankGraph = Pregel(primaryGraph, primaryMessage, numberOfIterations)(vertex, outgoingMessage, combineMessages)
println("The Final PageRank graph : <VertexID, Final PageRank of that Vertex> ")
primaryGraph.vertices.take(10) 

  • As per the documentation, the Pregel object is passed with the functions vertex(), outgoingMessage(), combineMessage().
  • vertex() function calculates the page rank probability.
  • outgoingMessage() function is responsible for message communication.
  • combineMessages() handles multiple messages which has to be sent to the same vertex by combining them.

Experimental setup and Analysis :

  • Install Apache Spark and Hadoop in pseudo distributed mode. This link helps you do that.
  • Once Spark is installed, open your terminal and change to the directory where the scala files are downloaded. Alternatively you can download all the files from here.
  • Execute $ spark-shell
  • Execute scala> :load pregel_pagerank.py
  • You will get the output as shown in this link for these nodes. (You can modify nodes according to your usage)
  • To read the output shown in above link takes effort because it’s the terminal recorded output. To simplify, I will just focus on initial and final page ranks.
  • Initial Page Rank Output : Screen Shot 2017-04-20 at 4.49.50 PM
  • Final Page Rank Output :Screen Shot 2017-04-20 at 4.50.18 PM

Conclusion :

I must admit it is a lengthy post, but worth following it. I believe this is one of the best ways to understand Pregel! Thank you. 🙂

Parallelisation of ant colony algorithms.

Intro :

  • Since Ant colony algorithms take long time to solve complex problems, a parallel ant colony algorithm is devised.
  • Parallelism is achieved by using multi-core CPUs. These models need extensive CPU support.

  • MMAS (Max Min Ant System) algorithm is parallelised based on a Apache Spark’s computing platform.
  • MMAS is combined with Apache Spark’s MapReduce. My previous blog post demonstrates usage of Apache Spark.

What happens in MMAS?

  • Initialisation of ants, cities and tabu list.
  • Pheromone value (It is an abstraction for a chemical substance produced by the ant) is initialised to maximum value.
  • m ants are placed randomly at n cities.
  • Each ant updates the tabu list of cities the ant has gone by.
  • Ant paths are then constructed.
  • Screen Shot 2017-04-17 at 6.44.05 PM.png
  • Where τij(t) is the amount of pheromone deposited for transition from city i to j in time t, α is a parameter to control the influence of τij(t), ηij(t) is the desirability of city transition ij (a prior knowledge, typically 1/dij, where d is the distance) in time t and β is a parameter to control the influence of ηij.
  • Screen Shot 2017-04-18 at 12.46.12 AM

MapReduce and Apache Spark overview.

  • Hadoop’s MapReduce and Apache Spark are important before we dive into implementation.
  • You can refer to MapReduce tutorial for more info.
  • My previous blog post has clear explanation about Apache Spark and its parallelism features.

Implementing the MMAS using Spark’s MapReduce Framework.

The code :

  • The code for the implementation can be accessed here.
  • Spark’s data level parallelism involves initialisation of m ants and use parallelize() to convert it into RDD.
  • Map stage involves iteration of every city at every stage.
    • map() will use the original algorithm.
    • It produces a key-value pair where, distance is the key and path travelled is the values.

    • Distances are sorted using sortByKey()

  • Reduce stage involves comparison between every two cities’ walking distances, resulting in shortest path.

  • take() to find shortest distance and the mapping path.

  • Iteration of MapReduce tasks to get the final shortest path value.

Experimental Setup and Results :

  • Make sure you have Apache Spark’s MapReduce framework installed.
  • If not, you can follow instructions from here.
  • If your system is ready with Apache Spark and Hadoop, Download the MMAS MapReduce code and run the following command.
    • bin/hadoop jar contrib/streaming/hadoop-streaming.jar
      -file /home/sb/mapper.py -mapper /home/sb/mmas_mapper.py
      -file /home/sb/reducer.py -reducer /home/sb/mmas_reducer.py
      -input /user/sb/cs336/input.txt -output /user/sb/cs336
    • Change the paths according to your HDFS directories.
  • The input and output files can be accessed in the same directory :
  • Mapper Input : <Edge, Pheromone>
    Reducer Output : <BestEdge, Its Pheromone>
    Code output : BestEdge

Conclusion :

The MapReduce approach to parallelise the MMAS shows significant performance gains when run on large clusters.

 

Parallel Search.

Intro.

Parallel Search which is popularly known as SMP (Symmetric Multi Processing) Search, where the search speed improves with the additional number of processors.

The below graph clearly gives an idea about how the parallel speedup is achieved.

Screen Shot 2017-04-17 at 10.07.28 AM

There are several works related to this topic, but the most popular one being Lazy SMP which mainly deals with parallelising the chess engine. (GitHub Link)

The following section about Lazy SMP is mainly referred from this document.

Lazy SMP.

  • Lazy SMP is a newly discovered parallel algorithm where threads have minimal to no communication between them except a shared hash table.
  • It was experimented with implementing parallelisation in the engine, and it gave results with about 1.6 times speedup with 4 threads on 4 cores.
  • The idea is quite simple, with use of a shared hash-table, let all threads search the full chess tree and race to the finish line. The hash table makes sure that positions fully explored previously by any threads do not have to be re-explored.
  • The algorithm is very different from the other search algorithms, by using a lockless hash-table it avoids having any communication overhead or synchronisation overhead. It trades this by having a larger search overhead.

The lazy SMP test.

The goal of this test is to improve the chess engine on multicore CPUs using the lazy SMP algorithm.

Screen Shot 2017-04-17 at 10.20.20 AM.png

The main result taken from the lazy SMP test is the time to depth. This shows the actual speedup gained from the threads searching to depth 9. With 2 threads there was a speedup of 1.14, and 4 threads doubled the speed of using 1 thread, with a speedup of 2.01.

Implementation.

The implementation of Lazy SMP can be found here. The code was forked from another source. This is just a part of that code for experimentation purpose. I made changes as in changing the number of nodes. I also deliberately removed some search params to observe the behaviour of the code. The results were vague, I couldn’t present complete explanation in this section. I believe there’s a lot can be done when more of this is explored.


void getBestMoveAtDepth(Board *b, MoveList *legalMoves, int depth, int alpha,
int beta, int *bestMoveIndex, int *bestScore, unsigned int startMove,
int threadID, SearchPV *pvLine) {
// SearchParameters *searchParams = &(searchParamsArray[threadID]);
SearchStatistics *searchStats = &(searchStatsArray[threadID]);
SearchPV line;
int color = b->getPlayerToMove();
int tempMove = -1;
int score = -MATE_SCORE;
*bestScore = -INFTY;
SearchStackInfo *ssi = &(ssInfo[threadID][1]);

// Push current position to two fold stack
twoFoldPositions[threadID].push(b->getZobristKey());

// Helps prevent stalls when using more threads than physical cores,
// presumably by preventing this function from completing before the thread
// is able to detach.
std::this_thread::yield();

for (unsigned int i = startMove; i < legalMoves->size(); i++) {
// Output current move info to the GUI. Only do so if 5 seconds of
// search have elapsed to avoid clutter
uint64_t timeSoFar = getTimeElapsed(searchParamsArray[0].startTime);
uint64_t nps = 1000 * getNodes() / timeSoFar;
if (threadID == 0 && timeSoFar > 5 * ONE_SECOND)
cout << "info depth " << depth << " currmove " << moveToString(legalMoves->get(i))
<< " currmovenumber " << i+1 << " nodes " << getNodes() << " nps " << nps << endl; Board copy = b->staticCopy();
copy.doMove(legalMoves->get(i), color);
searchStats->nodes++;

int startSq = getStartSq(legalMoves->get(i));
int endSq = getEndSq(legalMoves->get(i));
int pieceID = b->getPieceOnSquare(color, startSq);
ssi->counterMoveHistory = searchParamsArray[threadID].counterMoveHistory[pieceID][endSq];

if (i != 0) {
score = -PVS(copy, depth-1, -alpha-1, -alpha, threadID, true, ssi, &line);
if (alpha < score && score < beta) { line.pvLength = 0; score = -PVS(copy, depth-1, -beta, -alpha, threadID, false, ssi, &line); } } else { score = -PVS(copy, depth-1, -beta, -alpha, threadID, false, ssi, &line); } // Stop condition. If stopping, return search results from incomplete // search, if any. if (isStop || stopSignal) break; if (score > *bestScore) {
*bestScore = score;
if (score > alpha) {
alpha = score;
tempMove = (int) i;
changePV(legalMoves->get(i), pvLine, &line);
}
// To get a PV if failing low
else if (i == 0)
changePV(legalMoves->get(i), pvLine, &line);
}

if (score >= beta)
break;
}

twoFoldPositions[threadID].pop();

*bestMoveIndex = tempMove;

// This thread is finished running.
threadsRunningMutex.lock();
threadsRunning--;
threadsRunningMutex.unlock();
}

Though I wasn’t able to comprehend the code very clearly, it is certain that this part of the code is responsible for implementing the Lazy SMP algorithm.

With all the relevant input to the function, it does the parallel search from the start to the end of the moves and based on the score param it does a recursive depth wise search. The computation is carefully done without conflicting the threads.

Exploring Parallelism in Apache Spark.

spark-logo-trademark

What is Spark?

  • Apache Spark is an open-source data processing framework which offers in-memory computing techniques and graph traversal computation API. It works on any file-system.
  • The Spark developers claim it can run programs 100 times faster than Hadoop (A popular Big Data framework) and 10 times faster on disk. (which are mostly ordinary servers and commodity hardware) Also, less of code to be written.


Why Spark?

  • It is designed to overcome serious limitations of one-pass computation nature of MapReduce and non-scalable MPI programming model.
  • Provides fault tolerance and easy to combine with SQL, Streaming data, Machine Learning, Graphs etc.
  •  Mainly, Parallel Processing!


What parallel-paradigm features does Spark boast?

Note :

Before we start to work with examples to understand the parallel action, make sure you have Apache Spark installed on your system. Another blog post gives clear instructions to get Spark  running on your machine. It will also be helpful if you are familiar with the “Hello, World!” examples.

I haven’t worked on clusters or on any commodity servers. This is run on my local machine which has quad-core processor and the parallelism is exploited in the cores i.e. threads run on each core.  If you want to get this working on large cluster of computers, Spark lets you configure the machines. You can control the degree of parallelism too.

RDD.

The goal of an RDD is to  work with distributed collections as you would with local ones. It is built through parallel transformations like map, filter, reduceByKey, join etc. They exploit data-parallel systems and hence data parallelism.

Take a look at the code below.


val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =&amp;amp;amp;amp;amp;gt;
  val x = math.random
  val y = math.random
  x*x + y*y &amp;amp;amp;amp;amp;lt; 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")

  • Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program.
  • The elements of the collection are copied to form RDD that can be operated on in parallel.
  • Spark will run one task for each partition of the cluster. Typically you would want 2-4 partitions for each CPU in your cluster.

GraphX Pregel API.

  • Graphs created by modern applications can be very large (potentially terabytes or petabytes in size), thus, a single node (computer) will not be able to hold all these nodes in memory.
  • The way forward is to partition and parallelise the graph to involve many machines to process them in parallel.
  • Pregel employs a vertex-centric approach in processing large distributed graphs. (A Bulk synchronous parallel model ) Pregel computations consist of a sequence of iterations, called supersteps.

Let’s understand the type signature of the Pregel operator followed by an implementation of that. (SSSP Algorithm)


class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) =&amp;gt; VD,
       sendMsg: EdgeTriplet[VD, ED] =&amp;gt; Iterator[(VertexId, A)],
       mergeMsg: (A, A) =&amp;gt; A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) =&amp;gt; vprog(vid, vdata, initialMsg) ).cache()
    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages &amp;gt; 0 &amp;amp;&amp;amp; i &amp;lt; maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = g.mapReduceTriplets(
        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
  }
}

Now we can use the Pregel operator to express computation such as single source shortest path. The implementation of the same can be found in this GitHub link.

Conclusion

I have made an attempt to demonstrate simple examples in RDD and GraphX of the Pregel API, I believe it is enough to get us started to solve more complex graph problems, but of course this is not the only way to do so, there might be some other better examples to explore the parallelism techniques and working.

Cache and Branch-Prediction Profiler.

This blog post demonstrates usage of tools like Cachegrind  to check the number of times the code hits and misses to/from the cache. It gives level-wise cache and branch prediction analysis.

Overview :

Given below are some of the technical terms and abbreviations that are useful to know before we explore Cachegrind.

  • Ir : Instruction cache reads.
  • I1mr : L1 Instruction cache misses.
  • ILmr : Last level (L3) Instruction cache misses.
  • Dr : Data cache reads.
  • Dlmr : L1 Data cache reads.
  • DLmr : Last level (L3) Data cache reads.
  • D1mw : L1 data cache miss.
  • DLmw : Last level (L3) data cache miss.
  • Bc : Conditional branches executed.
  • Bcm : Conditional branches mis-predicted.
  • Bi : Indirect branches executed.
  • Bim : Indirect branches missed.


Installation :

Cachegrind is one of the utility from the tool-suite called Valgrind. Therefore Valgrind has to be installed on your system.

If you are using this on a Ubuntu machine then enter this command from the Terminal :

sudo apt-get install valgrind

For other OS platforms, you need to build from source files. Follow instructions given in this website


Usage :

Cache insights along with branch prediction analysis :

$ valgrind --tool=cachegrind --branch-sim=yes ./a.out

Function level analysis :

$ cg_annotate <cachegrind.out.pid>
The output file is generated in the same directory where the valgrind tool is used.

Line by line analysis :

$ cg_annotate <cachegrind.out.pid> --auto=yes


Demonstration :

For demonstration purpose, I am using a C program which does in-place M x N size matrix transposition and has fixed input.

Compilation :

screen-shot-2017-02-03-at-7-49-07-pm
screen-shot-2017-02-03-at-7-48-12-pm

Insights (Output) :

screen-shot-2017-02-03-at-7-45-19-pm

Observations imply that Instruction cache miss rate is 0.10% and mis-prediction rate is 5.1% for the transpose code.


Limitations :

  • It doesn’t account for other process activity.
  • It doesn’t account for virtual-to-physical address mappings; hence the entire simulation is not a true representation of what’s happening in the cache.
  • Valgrind’s custom threads implementation will schedule threads differently to the standard one. This could warp the results for threaded programs.