[link]
Strong consistency is easy to reason about, but typically requires coordination which increases latency. Weaker consistency can improve performance but is difficult to reason about. This paper presents a program analysis and distributed protocol to run transactions with coordination-free strong consistency. Analysing Transactions. We model a database as a finite map from objects to integers. Transactions are ordered sequences of reads, writes, computations, and prints; this is formalized below. A transaction T executes on a database D to produce a new database D' state and a log G' of printed values. Formally, eval(D, T) = <D', G'>. Symbolic tables categorize the behavior of a transaction based on the initial database sate. Formally, a symbolic table for transaction T is a binary relation Q of pairs <P, T'> where P is a formula in first order logic describing the contents of a database, and T' is a transaction such that T and T' are observationally equivalent when run on databases satisfying P. A symbolic table can also be built for a set of transactions. Formally, transactions are expressed in a language L which is essentially IMP with database reads, database writes, and prints. A somewhat simple recursive algorithm walks backwards through the program computing symbolic tables. Essentially, the algorithm traces all paths through the programs control flow. There is also a higher-level language L++ which can be compiled to L. Homeostasis Protocol. Assume data is partitioned (not replicated) across a cluster of K nodes. We model a distributed database as a pair <D, Loc> where D is a database and Loc is a function from objects to an index between 1 and K. Each transaction T runs on a site i; formally, l(T) = i. For simplicity, we assume that transactions only write to objects local to the site it is running on. Each transaction runs on some site. It reads fresh versions of values on the site and stale versions of values on other sites. Nodes establish treaties with one another such that operating with stale data does not affect the correctness of the transaction. This is best explained by way of example. Imagine the following transaction is running on a site where x is remote. ``` x' = r(x) if x' > 0: write(y = 1) else: write(y = 2) ``` If we establish the treaty x > 0, then it doesn't matter what the actual value of x is. We now formalize this notion. Given a database D, a local-remote partition is a function p from objects to booleans. We can represent a database D with respect to a local-remote p as a pair (l, r) where l is a vector of values x such that p(x), and r is a vector of values x such that not p(x). In words, we can model a database as disjoint sets of local and remote values. We say <(l, r), G> = <(l', r') G'> if l = l' and r = r'. Given a database D, local-remote partition p, transaction T, and set of vectors L and R, we say (L, R) is a local-remote slice (LR-slice) for T if Eval((l, r), T) = Eval((l, r'), T) for all l in L and r, r' in R. In words, (L, R) is a local-remote slice for T if T's output depends only on the values of local values. A global treaty Gamma is a subset of possible database states. A global treaty is valid for a set of transactions {T1, ..., Tn} if ({l | (l, r) in Gamma}, {r | (l, r) in Gamma}) is an LR-slice for all T. The homoeostasis protocol proceeds in rounds where each round has three phases: 1. Treaty generation The system generates a treaty for the current database state. 2. Normal execution. Transactions can execute without coordination reading a snapshot of remote values. After each site executes a transaction, it checks that it does not bring the database to a state outside the treaty. If it doesn't, the transaction is committed. If it does, we enter the next phase. 3. Cleanup. All sites synchronize and communicate all values that have changed since the last round. All sites then run the transaction that caused the violation. Finally, we enter the next round. Generating Treaties. Two big questions remain: how do we generate treaties, and how do we enforce treaties? Given an initial database state D, we could always pick Gamma = {D}. This requires that we synchronize after every single database modification. We want to pick the treaties that let us run as long as possible before synchronizing. We can pick the predicate P in the symbolic table that D satisfies but this isn't guaranteed to be a valid treaty. Instead we take the predicate P and divide it into a set of local treaties P1, ..., PK where the conjunction of all local treaties imply the global treaty. Moreover, each local treaty must be satisfied by the database. The conjunction of the local treaties is our global treaty and is guaranteed to be valid. Finding good local treaties is not easy. In fact, it can be undecidable pretty easily. We limit ourselves to linear arithmetic and leverage SMT solvers to do the heavy lifting for us. First, we decompose the global treaty into a conjunction of linear constraints. We then generate templates from the constraints and instantiate them using Z3. Homeostasis in Practice. Roy et al. present a homoeostasis prototype. An offline preprocessing component takes in L++ transactions and computes join symbolic tables, using tricks to keep the tables small. It them initializes global and local treaties. The online execution component executes the homeostasis protocol described above. It is implemented in Java over MySQL. The analysis uses ANTLR-4 and Z3. |
[link]
Building fault tolerant systems is hard, like really hard. There are are a couple approaches to building fault-tolerant systems, but none are perfect. - In a bottom-up approach, we verify individual components of a system are fault-tolerant and then glue the components together. Unfortunately, fault-tolerance is not closed under composition: the combination of two fault-tolerant systems may not be fault tolerant. - In a top-down, we can inject faults into an existing system and see whether or not it fails. Fault-injection can discover shallow bugs, but has trouble surfacing bugs that require more complex failure scenarios (e.g. a network partition followed by a particular sequence of machine failures). Lineage-driven Fault Injection (LDFI) is a top-down approach which uses lineage and fault injection to carefully discover fault-tolerance bugs in distributed systems. If a bug is found, the lineage of the bug is given to the user to help discover the root cause of the bug. If no bugs are found, LDFI provides some guarantees that there are no possible bugs for that particular configuration. In a nutshell, LDFI takes a program written in something like Bloom, inputs to the program, and some parameters (e.g. to bound the length the execution, to bound the number of faults, etc.). It runs the given program on the given input and computes a provenance graph of the output. It then carefully selects a small number of faults that invalidate every derivation. It then injects these faults into the system to see if it surfaces a bug. This is repeated until a bug is found or no such bugs exist. In this paper, Alvaro presents LDFI and an LDFI implementation named Molly. #### System Model LDFI injects faults, but not every kind of fault. We'll explore dropped messages, failed nodes, and network partitions. We won't explore message reordering or crash recovery. While we sacrifice generality, we gain tractability. LDFI is governed by three parameters: 1. LDFI does not operate over arbitrarily long executions. A parameter EOT (end of time) prescribes the maximum logical time of any execution examined by LDFI. 2. Distributed systems typically tolerate some number of faults, but cannot possible tolerate complete message loss for example. A parameter EFF (end of finite failures) < EOT sets a logical time after which LDFI will not introduce faults. The time between EFF and EOT allows a distributed system to recover from message losses. 3. A parameter Crashes sets an upper limit on the number of node crashes LDFI will consider. A failure specification is a three-tuple (EOT, EFF, Crashes). Molly will automatically find a good failure specification by repeatedly increasing EOT until programs can create meaningful results and increasing EFF until faults occur. We assume programs are written in Dedalus and that pre- and postconditions are expressed as special relations pre and post in the program. #### LDFI Consider an interaction between Molly and a user trying to implement a fault-tolerant broadcast protocol between three nodes A, B, and C where A begins with a message to broadcast. Our correctness condition asserts that if a message is delivered to any non-failed node, it is delivered to all of them. - Implementation 1. Assume a user writes an incredibly naive broadcast protocol in which A sends a copy of the message to B and C once. Molly drops the message from A to B inducing a bug. - Implementation 2. The author user then amends the program so that A continually sends the message to B and C. Molly drops the message from A to B and then crashes A. C has the message but B doesn't: a bug. - Implementation 3. The author then amends the program so that all three nodes continuously broadcast the message. Now, Molly cannot find a set of dropped messages or node crashes to prevent all three nodes from obtaining the message. - Implementation 4. While implementation 3 is fault-tolerant it is also inefficient. The user amends the protocol so that nodes broadcast messages until they receive an ack from the other nodes. Molly can devise a sequence of message drops and node crashes to prevent the message from being delivered to all three nodes, but when it runs the system again with the same faults, the messages still appear. - Implementation 5. A "classic" implementation of broadcast in which a node broadcasts any message it receives once is found to be buggy by Molly. #### Molly Molly begins by rewriting a Dedalus program into a Datalog program. Each relation is augmented with a time column. ``` foo(A,B) ==> foo(A,B,T) foo(B,C) ==> bar(B,C,T) baz(A,C) ==> baz(A,C,T) ``` The time column of every predicate in the body of a rule is bound to the same variable T. ``` _ :- foo(A,B), bar(B,C) ==> _ :- foo(A,B,T), bar(B,C,T) ``` The head of every deductive rule is bound to T. ``` baz(A,C) :- foo(A,B), bar(B,C) ==> baz(A,C,T) :- foo(A,B,T), bar(B,C,T) ``` The head of every inductive rule is bound to T + 1. ``` baz(A,C) :- foo(A,B), bar(B,C) ==> baz(A,C,T+1) :- foo(A,B,T), bar(B,C,T) ``` For asynchronous rules, we introduce a Clock(From, To, T) relation which contains an entry (n, m, T) if node n sent a message to m at time T. Then, the body of asynchronous rules at node n whose heads are destined for node n are augmented with a Clock(n, m, t) predicate while the head is augmented with T + 1. Molly can add and remove entries from Clock to simulate faults. ``` foo(A,B) :- foo(B,A) ==> foo(A,B,T+1) :- foo(B,A,T), Clock(B,A,T) ``` It then rewrites the Datalog program to maintain its own provenance graph, and extracts lineage from the graph via recursive queries that walk the graph. Given an execution of the Datalog program, Molly generates a CNF formula where the disjuncts inside each conjunct x1 or .. or xn represent a message drop or node failure that would invalidate a particular derivation. If all derivations can be invalidated, then the formula is unsatisfiable and the program is fault-tolerant. If the formula is satisfiable, then each satisfied conjunct represents a counterexample of the derivation. Molly uses a SMT solver to solve these formulas. |
[link]
Storm was Twitter's first stream processing system. Unfortunately, it wasn't good enough for a number of reasons. Heron is a Storm API compliant rewrite of Storm. Motivation. In Storm, computation is expressed as a directed graph, or topology, where vertexes are computations and edges transport tuples. Storm topologies are run on a cluster of workers overseen by a central Nimbus. Each worker runs multiple worker processes which run a JVM which run multiple executors which run multiple tasks. The executors run multiple threads, and each worker also has a multi-threaded supervisor. This worker architecture was far too complex. Multiple components were making scheduling decisions (e.g. OS schedules processes, JVM schedules executors; executors schedule tasks) which made it hard to predict when certain tasks would be run. Moreover, putting different types of tasks on the same executor complicated logs, exception handling, garbage collection, etc. The Storm scheduler was also not good at scheduling tasks with different resource requirements. The fact that workers were very multi-threaded meant that messages were traversing a lot of thread boundaries. The Nimbus was a complicated piece of code that did too much stuff. It also often became a bottleneck and was a single point of failure. It's scheduling was so poor, that Twitter used to reserve nodes to exclusively run a single topology. The Nimbus also communicated with workers through ZooKeeper which became a bottleneck. Storm also did not implement backpressure; when bolts became overloaded; packets were just dropped. Design Alternatives. Twitter considered extending and modifying Storm to fix its problems, but its flaws were deeply entrenched in its design, so a rewrite would be difficult. They considered using other existing stream processing systems, but didn't want to break the Storm API and have to rewrite a bunch of applications. In the end, they felt like a rewrite was the best bet. Data Model and API. Heron follows the exact same API as Storm. Computation is expressed as a directed graph where vertexes are spouts (sources of tuples) or bolts (tuple processors) and edges transfer tuples between vertexes. Users provide logical plans which are expanded to physical plans in order to exploit data parallelism. Heron provides at least once and at most once semantics. Architecture Overview. Users submit Heron topologies to Aurora, though Heron is able to run on top of Mesos, YARN, ECS, etc. Each topology is run as a set of containers. One container runs the Topology Master (TM). The rest run a Stream Manager (SM), a Metrics Manager (MM), and multiple Heron Instances (HI). Topology state is kept in ZooKeeper, and the TM can have a standby. All communication is done via protobufs. Topology Master. The TM is responsible for overseeing the execution of a topology and reporting its status. The TM holds an ephemeral node in ZooKeeper to ensure there is only ever one TM and so that other things can discover it. Stream Manager. Stream Managers are responsible for routing tuples. There are k Stream Managers that form a clique. Though, O(k^2) connections is a lot, the number of Heron Instances can scale independently of k. Stream Managers communicate via TCP, short-circuiting if delivering within a container. Heron, unlike Storm, implements backpressure. Here are three kinds of backpressure implementations: - TCP Backpressure. Heron Instances communicate with Stream Managers via TCP. This provides a natural form of backpressure. If a TCP consumer is too slow, the TCP producer will slow down. This form of backpressure is easy to implement. However, multiple logical edges are mapped over a single SM to SM TCP connection which means that sometimes nodes are inadvertently slowed down when they shouldn't be. - Spout Backpressure. When a bolt is overloaded, the SM in charge of the bolt can tell the spout that is feeding into it to slow down. This is somewhat inefficient in that slowing an intermediate node may be sufficient. - Stage-by-Stage Backpressure. Backpressure can be propagated backwards from vertex to vertex. Heron implements TCP and Spout Backpressure. Each socket is associated with a queue whose size has a high and low watermark. If the size exceeds the high watermark, backpressure is applied until it drops below the low watermark. Heron Instances. Each Heron instance runs a single JVM which runs a single task which makes debugging significantly easier. Heron instances cannot be single-threaded because slow user code could prevent things like metrics from being reported in a timely manner. So, Heron implements Heron Instances with two threads: a Gateway Thread and a Task Execution Thread. The Gateway Thread communicates with the Task Execution Thread and also communicates with the SM and MM. The Task Execution Thread runs user code and gathers metrics. The Gateway Thread communicates with the Task Execution Thread using a set of one-directional queues. The sizes of these queues can be adjusted to avoid bad GC. Metrics Manager. The metrics manager, well, manages metrics. It reports metrics to the TM and to a Twitter monitoring system. |
[link]
This paper talks about how Spark SQL intends to integrate relational processing with Spark itself. It builds on the experience of previous efforts like Shark and introduces two major additions to bridge the gap between relational and procedural processing: 1. DataFrame API that provides a tight integration between relational and procedural processing by allowing both relational and procedural operations on multiple data sources. 2. Catalyst, a highly extensible optimizer which makes it easy to add new data sources and algorithms. #### Programming Interface Spark SQL uses a nested data model based on Hive and supports all major SQL data types along with complex (eg array, map, etc) and user-defined data types. It ships with a schema inference algorithm for JSON and other semistructured data. This algorithm is also used for inferring the schemas of RRDDs (Resilient Distributed Datasets) of Python objects. The algorithm attempts to infer a static tree structure of STRUCT types (which in turn may contain basic types, arrays etc) in one pass over the data. The algorithm starts by finding the most specific Spark SQL type for each record and then merges them using an associative most specific supertype function that generalizes the types of each field. A DataFrame is a distributed collection of rows with the same schema. It is equivalent to a table in an RDBMS. They are similar to the native RDDs of Spark as they are evaluated lazily, but unlike RDDs, they have a schema. A DataFrame represents a logical plan and a physical plan is built only when an output function like save is called. Deferring the execution in this way makes more space for optimizations. Moreover. DataFrames are analyzed eagerly to identify if the column names and data types are valid or not. DataFrames supports query using both SQL and a DSL which includes all common relational operators like select, where, join and groupBy. All these operators build up an abstract syntax tree (AST) of the expression (think of an expression as a column in a table), which is then optimized by the Catalyst. Spark SQL can cache data in memory using columnar storage which is more efficient than Spark’s native cache which simply stores data as JVM objects. The DataFrame API supports User-defined functions (UDFs) which can use the full Spark API internally and can be registered easily. To query native datasets, Spark SQL creates a logical data scan operator (pointing to the RDD) which is compiled into a physical operator that accesses fields of the native objects in-place, extracting only the fiel needed for a query. This is better than traditional object-relational mapping (ORM) which translates an entire object into a different format. Spark MLlib implemented a new API based on pipeline concept (think of a pipeline as a graph of transformations on the data) and choose DataFrame as the format to exchange data between pipeline stages. This makes is much easier to expose MLlib’s algorithms in Spark SQL. #### Catalyst Catalyst is an extensible optimizer based on Scala’s standard features. It supports both rule-based and cost-based optimizations and makes it easy to add new optimization techniques and data sources to Spark SQL. At its core, Catalyst is powered by a general library that represents and manipulates trees by applying rules to them. Tree is the main data type in Catalyst and is composed of node objects where a node has a type and zero or more children. Rules are functions to transform one tree to another. Trees offer a transform method that applies a pattern matching function recursively on all the nodes of the tree, transforming only the matching nodes. Rules can match multiple patterns in the same transform call and can contain arbitrary Scala code which removes the restriction of using a Domain Specific Language (DSL) only. Catalyst groups rules into batches and executes each batch until it reaches a fixed point (ie the tree stops changing). This means that each rule can be simple and self-contained while producing a global effect on the tree. Since both nodes and trees are immutable, optimizations can be easily performed in parallel as well. Spark SQL uses Catalyst in four phases: **Logical Plan Analysis** which requires resolving attribute references (one for which we do not know the type or which have not been matched to an input table). It uses a Catalog object to track the tables in all data sources to resolve references. **Logical Optimization** phase applies standard rule-based optimizations to the logical plan which include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, etc. In **Physical Planning** phase, Spark SQL generates multiple physical plans corresponding to a single logical plan and then selects one of the plans using a cost model. It also performs some rule-based physical optimizations like the previous case. In **Code Generation** phase, Catalyst uses quassiquotes (provided by Scala) to construct an AST that can be fed to Scala Compiler and bytecode can be generated at runtime. Extensions can be added even without understanding how Catalyst works. For example, to add a data source, one needs to implement a createRelation function that takes a set of key-value parameters and returns a BaseRelation object if successfully loaded. This BaseRelation can then implement interfaces to allow Spark SQL access to data. Similarly to add user-define types (UDTs), the UDTs are mapped to Catalyst’s inbuilt types. So one needs to provide a mapping from an object of UDT to a Catalyst row of built in types and an inverse mapping back. #### Future Work Some recent work has also shown that it is quite easy to add special planning rules to optimize for specific use cases. For example, researchers in ADAM Project added a new rule to use interval trees instead of using normal joins for a computational genomics problem. Similarly, other works have used Catalyst to improve generality of online aggregation to support nested aggregate queries. These examples show that Spark and Spark SQL is quite easy to adapt to new use cases as well. As I mentioned previously, I am experimenting with Spark SQL and it does look promising. I have implemented some operators and it is indeed quite easy to extend. I am now looking forward to developing more concrete thing on top of it. |