Hive - a petabyte scale data warehouse using HadoopHive - a petabyte scale data warehouse using HadoopThusoo, Ashish and Sarma, Joydeep Sen and Jain, Namit and Shao, Zheng and Chakka, Prasad and Zhang, Ning and Anthony, Suresh and Liu, Hao and Murthy, Raghotham2010
Paper summaryshagunsodhani[Hive](http://infolab.stanford.edu/~ragho/hive-icde2010.pdf) is an open-source data warehousing solution built on top of Hadoop. It supports an SQL-like query language called HiveQL. These queries are compiled into MapReduce jobs that are executed on Hadoop. While Hive uses Hadoop for execution of queries, it reduces the effort that goes into writing and maintaining MapReduce jobs.
# Data Model
Hive supports database concepts like tables, columns, rows and partitions. Both primitive (integer, float, string) and complex data-types(map, list, struct) are supported. Moreover, these types can be composed to support structures of arbitrary complexity. The tables are serialized/deserialized using default serializers/deserializer. Any new data format and type can be supported by implementing SerDe and ObjectInspector java interface.
# Query Language
Hive query language (HiveQL) consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this:
MAP inputdata USING 'python mapper.py' AS (word, count)
CLUSTER BY word
REDUCE word, count USING 'python reduce.py';
This query uses `mapper.py` for transforming `inputdata` into `(word, count)` pair, distributes data to reducers by hashing on `word` column (given by `CLUSTER`) and uses `reduce.py`.
Notice that Hive allows the order of `FROM` and `SELECT/MAP/REDUCE` to be changed within a sub-query. This allows insertion of different transformation results into different tables/partitions/hdfs/local directory as part of the same query and reduces the number of scans on the input data.
* Only equi-joins are supported.
* Data can not be inserted into existing table/partitions and all inserts overwrite the data.
`INSERT INTO, UPDATE`, and `DELETE` are not supported which makes it easier to handle reader and writer concurrency.
# Data Storage
While a table is the logical data unit in Hive, the data is actually stored into hdfs directories. A **table** is stored as a directory in hdfs, **partition** of a table as a subdirectory within a directory and **bucket** as a file within the table/partition directory. Partitions can be created either when creating tables or by using `INSERT/ALTER` statement. The partitioning information is used to prune data when running queries. For example, suppose we create partition for `day=monday` using the query
ALTER TABLE dummytable ADD PARTITION (day='monday')
Next, we run the query -
SELECT * FROM dummytable WHERE day='monday'
Suppose the data in dummytable is stored in `/user/hive/data/dummytable` directory. This query will only scan the subdirectory `/user/hive/data/dummytable/day=monday` within the `dummytable` directory.
A **bucket** is a file within the leaf directory of a table or a partition. It can be used to prune data when the user runs a `SAMPLE` query.
Any data stored in hdfs can be queried using the `EXTERNAL TABLE` clause by specifying its location with the `LOCATION` clause. When dropping an external table, only its metadata is deleted and not the data itself.
Hive implements the `LazySerDe` as the default SerDe. It deserializes rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. Hive also provides a `RegexSerDe` which allows the use of regular expressions to parse columns out from a row. Hive also supports various formats like `TextInputFormat`, `SequenceFileInputFormat` and `RCFileInputFormat`. Other formats can also be implemented and specified in the query itself. For example,
CREATE TABLE dummytable(key INT, value STRING)
# System Architecture and Component
* **Metastore** - Stores system catalog and metadata about tables, columns and partitions.
* **Driver** - Manages life cycle of a HiveQL statement as it moves through Hive.
* **Query Compiler** - Compiles query into a directed acyclic graph of MapReduce tasks.
* **Execution Engine** - Execute tasks produced by the compiler in proper dependency order.
* **Hive Server** - Provides a thrift interface and a JDBC/ODBC server.
* **Client components** - CLI, web UI, and JDBC/ODBC driver.
* **Extensibility Interfaces** - Interfaces for SerDe, ObjectInspector, UDF (User Defined Function) and UDAF (User-Defined Aggregate Function).
### Life Cycle of a query
The query is submitted via CLI/web UI/any other interface. This query goes to the compiler and undergoes parse, type-check and semantic analysis phases using the metadata from Metastore. The compiler generates a logical plan which is optimized by the rule-based optimizer and an optimized plan in the form of DAG of MapReduce and hdfs tasks is generated. The execution engine executes these tasks in the correct order using Hadoop.
It stores all information about the tables, their partitions, schemas, columns and their types, etc. Metastore runs on traditional RDBMS (so that latency for metadata query is very small) and uses an open source ORM layer called DataNuclues. Matastore is backed up regularly. To make sure that the system scales with the number of queries, no metadata queries are made the mapper/reducer of a job. Any metadata needed by the mapper or the reducer is passed through XML plan files that are generated by the compiler.
### Query Compiler
Hive Query Compiler works similar to traditional database compilers.
* Antlr is used to generate the Abstract Syntax Tree (AST) of the query.
* A logical plan is created using information from the metastore. An intermediate representation called query block (QB) tree is used when transforming AST to operator DAG. Nested queries define the parent-child relationship in QB tree.
* Optimization logic consists of a chain of transformation operations such that output from one operation is input to next operation. Each transformation comprises of a **walk** on operator DAG. Each visited **node** in the DAG is tested for different **rules**. If any rule is satisfied, its corresponding **processor** is invoked. **Dispatcher** maintains a mapping fo different rules and their processors and does rule matching. **GraphWalker** manages the overall traversal process.
* Logical plan generated in the previous step is split into multiple MapReduce and hdfs tasks. Nodes in the plan correspond to physical operators and edges represent the flow of data between operators.
* **Column Pruning** - Only the columns needed in the query processing are projected.
* **Predicate Pushdown** - Predicates are pushed down to the scan so that rows are filtered as early as possible.
* **Partition Pruning** - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate.
* **Map Side Joins** - In case the tables involved in the join are very small, the tables are replicated in all the mappers and the reducers.
* **Join Reordering** - Large tables are streamed and not materialized in-memory in the reducer to reduce memory requirements.
Some optimizations are not enabled by default but can be activated by setting certain flags. These include:
* Repartitioning data to handle skew in `GROUP BY` processing.This is achieved by performing `GROUP BY` in two MapReduce stages - first where data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers.
* Hash bases partial aggregations in the mappers to reduce the data that is sent by the mappers to the reducers which help in reducing the amount of time spent in sorting and merging the resulting data.
### Execution Engine
Execution Engine executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say `INSERT INTO` query).
# Future Work
The paper mentions the following areas for improvements:
* HiveQL should subsume SQL syntax.
* Implementing a cost-based optimizer and using adaptive optimization techniques.
* Columnar storage to improve scan performance.
* Enhancing JDBS/ODBC drivers for Hive to integrate with other BI tools.
* Multi-query optimization and generic n-way join in a single MapReduce job.
Hive - a petabyte scale data warehouse using Hadoop
Sarma, Joydeep Sen
IEEE Computer Society ICDE - 2010 via Local Bibsonomy