This week I read upon [GraphX](https://amplab.cs.berkeley.edu/wpcontent/uploads/2014/02/graphx.pdf), a distributed graph computation framework that unifies graphparallel and dataparallel computation. Graphparallel systems efficiently express iterative algorithms (by exploiting the static graph structure) but do not perform well on operations that require a more general view of the graph like operations that move data out of the graph. Dataparallel systems perform well on such tasks but directly implementing graph algorithms on dataparallel systems is inefficient due to complex joins and excessive data movement. This is the gap that GraphX fills in by allowing the same data to be viewed and operated upon both as a graph and as a table. ### Preliminaries Let $G = (V, E)$ be a graph where $V = \{1, ..., n\}$ is the set of vertices and $E$ is the set of $m$ directed edges. Each directed edge is a tuple of the form $(i, j) \in E$ where $i \in V$ is the source vertex and $j \in V$ is the target vertex. The vertex properties are represented as $P_V(i)$ where $i \in V$ and edge properties as $P_E (i, j)$ for edge $(i, j) \in E$. The collection of all the properties is $P = (P_V, P_E)$. The combination of graph structure and properties defines a property graph $G(P) = (V, E, P)$. GraphParallel Systems consist of a property graph $G = (V, E, P)$ and a vertexprogram $Q$ that is instantiated simultaneously on all the vertices. The execution on vertex $v$, called $Q(v)$, interacts with execution on the adjacent vertices by message passing or shared state and can read/modify properties on the vertex, edges and adjacent vertices. $Q$ can run in two different modes: * **bulksynchronous mode**  all vertex programs run concurrently in a sequence of supersteps. * **asynchronous mode**  vertex programs run as and when resources are available and impose constraints on whether neighbouring vertexprograms can run concurrently. **GatherApplyScatter (GAS)** decomposition model breaks down a vertexprogram into purely edgeparallel and vertexparallel stages. The associative *gather* function collects the inbound messages on the vertices, the *apply* function operates only on the vertices and updates its value and the *scatter* function computes the message to be sent along each edge and can be safely executed in parallel. GrapX uses bulksynchronous model and adopts the GAS decomposition model. ### GraphX Data Model The GraphX Data Model consists of immutable collections and property graphs. Collections consist of unordered tuples (keyvalue pairs) and are used to represent unstructured data. The property graph combines the structural information (in the form of collections of vertices and edges) with properties describing this structure. Properties are just collections of form $(i, P_V (i))$ and $((i, j), P_E (i, j))$. The collection of vertices and edges are represented using RDDs (Resilient Distributed Datasets). Edges can be partitioned as per a user defined function. Within a partition, edges are clustered by source vertex id and there is an unclustered index on target vertex id. The vertices are hash partitioned by id and stored in a hash index within a partition. Each vertex partition contains a bitmask which allows for set intersection and filtering. It also contains a routing table that logically maps a vertex id to set of edge partitions containing the adjacent edges. This table is used when constructing triplets and is stored as a compressed bitmap. ### Operators Other than standard dataparallel operators like `filter`, `map`, `leftJoin`, and `reduceByKey`, GraphX supports following graphparallel operators: * `graph`  constructs property graph given a collection of edges and vertices. * `vertices`, `edges`  decompose the graph into a collection of vertices or edges by extracting vertex or edge RDDs. * `mapV`, `mapE`  transform the vertex or edge collection. * `triplets` returns collection of form $((i, j), (P_V (i), P_E (i, j), P_V (j)))$. The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled. * `leftJoin`  given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure. * `subgraph`  returns a subgraph of the original graph by applying predicates on edges and vertices * `mrTriplets` (MapReduce triplet)  logical composition of triplets followed by map and reduceByKey. It is the building block of graphparallel algorithms. All these operators can be expressed in terms on relational operators and can be composed together to express different graphparallel abstractions. The paper shows how these operators can be used to construct a enhanced version of Pregel based on GAS. It also shows how to express connected components algorithm and `coarsen` operator. ### Structural Index Reuse Collections and graphs, being immutable, share the structural indexes associated within each vertex and edge partition to both reduce memory overhead and accelerate local graph operations. Most of the operators preserve the structural indexes to reuse them. For operators like subgraph which restrict the graph, the bitmask is used to construct the restricted view. ### Distributed Join Optimization ##### Incremental View Maintenance The number of vertices that change between different steps of iterative graph algorithms decreases as the computation converges. After each operation, GraphX tracks which vertices have been changed by maintaining a bit mask. When materializing a vertex view, it uses values from the previous view for vertices which have not changed and ships only those vertices which are changed. This also allows for another optimization when using the `mrTriplets` operation: `mrTriplets` support an optional argument called *skipStale*. when this option is enabled, the `mrTriplets` function does not apply on edges origination from vertices that have not changed since its last iteration. This optimization uses the same bitmask that incremental views were using. ##### Automatic Join elimination GraphX has implemented a JVM bytecode analyzer that determines whether source/target vertex attributes are referenced in a mrTriplet UDF (for map) or not. Since edges already contain the vertex ids, a 3way join can be brought down to 2way join if only source/target vertex attributes are needed (as in PageRank algorithm) or the join can be completely eliminated if none of the vertex attributes are referenced. ### Sequential Scan vs Index Scan Using structural indices, while reduces computation cost in iterative algorithms, prevents physical data from shrinking. To counter this issue, GraphX switches from sequential scan to bitmap index scan when the fraction of active vertices drops below 0.8. Since edges are clustered by source vertex id, bitmap index scan can efficiently join edges and vertexes together. ### Other Optimizations * Though GraphX uses Spark's shuffle mechanism, it materializes shuffled data in memory itself, unlike Spark which materializes shuffle data in disk and relies on OS buffer cache to cache the data. The rationale behind this modification is that graph algorithms tend to be communication intensive and inability to control when buffers are flushed can lead to additional overhead. * When implementing join step, vertices routed to the same target are batched, converted from roworientation to columnorientation and compressed by LZF algorithm and then sent to their destination. * During shuffling, integers are encoded using a variable encoding scheme where for each byte, the first 7 bits encode the value, and the highest order bit indicates if another byte is needed for encoding the value. So smaller integers can be encoded with fewer bytes and since, in most cases, vertex ids are smaller than 64 bits, the technique helps to reduce an amount of data to be moved. ### System Evaluation GraphX was evaluated against graph algorithms implemented over Spark 0.8.1, Giraph 1.0 and GraphLab 2.2 for both graphparallel computation tasks and endtoend graph analytic pipelines. Key observations: * GraphLab benefits from its native runtime and performs best among all the implementations for both PageRank and Connected Components algorithm. * For connected components algorithm, Giraph benefits from using edge cuts but suffers from Hadoop overhead. * GraphX outperforms idiomatic implementation of PageRank on Spark, benefitting from various optimizations discussed earlier. * As more machines are added, GraphX does not scale linearly but it still outperforms the speedup achieved by GraphLab (for PageRank). * GraphX outperforms Giraph and GraphLab for a multistep, endtoend graph analytics pipeline that parses Wikipedia articles to make a link graph, runs PageRank on the link graph and joins top 20 articles with their text. GraphX provides a small set of core graphprocessing operators, implemented on top of relational operators, by efficiently encoding graphs as a collection of edges and vertices with two indexing data structures. While it does lag behind specialised systems like Giraph and GraphLab in terms of graphparallel computation tasks, GraphX does not aim at speeding up such tasks. It instead aims to provide an efficient workflow in endtoend graph analytics system by combining dataparallel and graphparallel computations in the same framework. Given that it does outperform all the specialised systems in terms of endtoend runtime for graph pipelines and makes the development process easier by eliminating the need to learn and maintain multiple systems, it does seem to be a promising candidate for the use case it is attempting to solve.
Your comment:
