We have open-sourced Apache Spark and Apache Spark SQL interfaces for Succinct as a Spark Package. The package facilitates compressing RDDs in Apache Spark and DataFrames in Apache Spark SQL and enables queries directly on the compressed representation.
Interface
The Succinct on Apache Spark exposes the following APIs:
- A SuccinctRDD API that views an RDD as an unstructured “flat-file” and enables queries on its compressed representation.
- A SuccinctKVRDD API that provides a key-value abstraction for the data, and supports search and random-access over the values.
- A SuccinctJsonRDD API that enables random access and search on a collection of compressed JSON documents.
- DataFrame API that integrates with the Apache Spark SQL interface via Data Sources, and supports SQL queries on compressed structured data. The Apache Spark SQL interface is currently experimental, and only efficient for selected SQL operators. We aim to make the Apache Spark SQL integration more efficient in future releases.
SuccinctRDD
SuccinctRDD provides a “flat file” view for an RDD, where the entire RDD is viewed as a single unstructured file. For instance, applications in log analytics can directly employ SuccinctRDD to perform efficient search (e.g., errors for debugging) and random access (e.g., extract logs at certain timestamps) on unstructured logs. This functionality is similar to Lucene, which uses full-text indexing to enable search over unstructured data.
API
SuccinctRDD supports the following operations:
Operation | Meaning |
---|---|
extract(offset, length) | The extract operation provides random access to any offset in the flat file. |
search(query) | Returns offsets (into the flat file) of all occurrences of the query string in uncompressed flat file (while operating directly on the compressed RDD). Applications can search for arbitrary strings in the text; that is, query strings are not limited to be specific words in the text. |
count(query) | The count interface is an extremely fast operation, and returns the number of occurrences of the query string in the uncompressed flat file. |
Example
SuccinctRDD can be used as follows:
1 | import edu.berkeley.cs.succinct._ |
Performance
We compare the performance of Succinct on Apache Spark against Apache Spark’s native (uncompressed) RDDs. We use a collection of 40GB Wikipedia articles, all combined into a single text corpus. All benchmarks were run on an Amazon EC2 cluster with five c3.4xlarge (one used as a master), each having 30GB RAM and 16 vCPUs. Note that Succinct on Apache Spark requires significantly less memory (roughly 23GB) and could easily run the entire benchmark on a single server. However, we use a larger cluster to compare Succinct on Apache Spark’s performance against the best-case scenario for native Apache Spark — when everything for native Apache Spark fits in memory. The search queries use words with varying number of occurrences (1–10,000) with uniform random distribution across 10 bins (1–1000, 1000-2000, etc). Note that the y-axis is on log scale.
We note that Succinct on Apache Spark provides significant speed ups over native Apache Spark (as much as two to three orders of magnitude) even when native Apache Spark RDD fits in memory, while executing queries on compressed data. These are the benefits of avoiding data scans, the second problem we highlighted at the outset of the post. Also note that Succinct on Apache Spark requires roughly 2x lower memory than native Apache Spark RDD, and can thus provide such low latency performance for a larger range of input dataset sizes compared to native Apache Spark.
Comments
Input Constraints. We don’t support non-ASCII characters in the input for now, since the algorithms depend on using certain non-ASCII characters as internal symbols.
Construction Time. Another constraint to consider is the construction time for Succinct data-structures. As for any block compression scheme, Succinct requires non-trivial amount of time to compress an input dataset. It is strongly advised that the SuccinctRDD be cached in memory (using RDD.cache()) and persisted on disk after construcion completes, to be able to re-use the constructed data-structures without trigerring re-construction:
1 | import edu.berkeley.cs.succinct._ |
SuccinctKVRDD
The SuccinctKVRDD interface models semi-structured data, similar to those seen in document stores or key-value stores. Example use-cases include searching across the collection wikipedia articles, and extracting relevant parts of the article-text, or searching across tweets stored in a key-value store and retrieving tweets from a particular user. The functionality of the SuccinctKVRDD is akin to Elasticsearch, which supports search on documents over a document-store interface.
The SuccinctKVRDD implements the RDD[(K, Array[Byte]] interface, where key can be of the specified (ordered) type while the value is a serialized array of bytes.
API
SuccinctKVRDD supports the following operations:
Operation | Meaning |
---|---|
get(key) | Random access functionality similar to typical Key-Value stores and documents stores; the value or the document is returned. |
extract(key, offset, length) | Random access within a value/document given the relative offset. This may be useful when an application may not want to access the entire document, but only a subset of the document. |
search(query) | Similar to flat file interface, but now returns the keys (or document identifiers) whose values (or document text) contain the query string. |
searchOffsets(query) | Finds actual matches for the search query, and returns (key, offset) pairs corresponding to each match. The offset is relative to the beginning of the value/document. |
count(query) | Returns the number of actual matches for the search query across the values/documents. |
Example
SuccinctKVRDD can be used as follows:
1 | import edu.berkeley.cs.succinct.kv._ |
Performance
We evaluate the performance of Succinct on Apache Spark against native Apache Spark RDD for the key-value store and document store functionality, using a setup similar to that of flat file interface with the only change that we use a 50GB dataset composed of metadata for video streams from Conviva for the KV-store; for document store, we use the same Wikipedia dataset, but now each article has a unique documentID.
As with the flat file interface, Succinct on Apache Spark for the key-value store functionality achieves two to three orders of magnitude faster performance compared to Apache Spark’s native RDD.
For search performance, we compare Succinct’s performance against both Apache Spark’s native RDD, as well as against Elasticsearch – a popular document-store that supports search across documents.
Interestingly, Succinct on Apache Spark is roughly 2.75x faster than Elasticsearch! This is when ElasticSearch does not have the overhead of Apache Spark’s job execution, and has all the data fit completely in memory. Succinct on Apache Spark achieves this speed up while requiring roughly 2.5x lower memory than ElasticSearch (due to compression, and due to storing no additional indexes)! As earlier, Succinct is over two orders of magnitude faster than Apache Spark’s native RDDs. Random access on documents has performance similar to that for flat files.
Comments
The input constraints for SuccinctRDD carry over to the SuccinctKVRDD as well. Similar to the flat-file interface, we suggest that the KV data be persisted to disk for repeated-use scenarios:
1 | import edu.berkeley.cs.succinct.kv._ |
DataFrame
Apache Spark 1.3 added a new DataFrame API that provides powerful and convenient operators to work with structured data. We provide access to Succinct-encoded data through the DataFrame API via Data Sources as an experimental feature. While it is quite efficient for several filters, we are working on several interesting projects to efficiently support other operators.
At a high level, the DataFrame interface enables search and random access on structured data like tables. An example use-case for this interface is point queries on columnar stores. For instance, given a table with the schema [UserID, Location D.O.B., Salary], we might want to search for all users that were born between 1980 and 1985.
API
Since Succinct is implemented as a Data Source in Apache Spark SQL, the API for DataFrames remains unchanged. The API documentation for DataFrames can be found here.
Example
The DataFrame API can be used as follows:
1 | import edu.berkeley.cs.succinct.sql._ |
Performance
For this interface, we compare Succinct’s performance to the columnar Parquet DataSource supported natively by Apache Spark. We revisit the 40GB of Conviva Dataset, which has roughly 98 columns and 43.7 million rows, and consider the following SQL query:
1 | SELECT * FROM conviva_table WHERE col[i]=value |
We vary the columns and the value being matched, and analyze the variation of the performance of this query as the number of matched rows increase. This particular query involves both search (to perform the filter based on the WHERE clause) and random access (to obtain all the other column values for the matched column value, based on the SELECT * projection). We note Succinct’s latency for performing random access increases with the amount of data extracted and the latency for search increases with the number of matched results.
This is evident in the results above; interestingly, Succinct can find as many as 100,000 matches with latency lower than that for Parquet’s columnar representation.
Comments
The DataFrame API for Succinct is experimental for now, and only supports selected data types and filters. The supported Apache Spark SQL types include:
1 | BooleanType |
The supported filters include:
1 | StringStartsWith |
Note that certain SQL operations, like joins, might be inefficient on the DataFrame API for now. We plan on improving the performance for generic SQL operations in a future release.
Getting Started
Succinct on Apache Spark includes a few examples that elucidate the usage of its API. To run these examples, we provide convenient scripts to run them in the bin/ directory. In particular, to execute the Wikipedia Search example using SuccinctRDD, run as follows:
1 | ./bin/wiki-search [num-partitions] |
The num-partitions parameter is simply the number of partitions that the original dataset should be divided into for creating Succinct data structures. This defaults to 1 by default; note that due to Java constraints, we do not support partitions of sizes greater than 2GB yet.
The KV Search and Table Search examples are executed similarly.
Requirements and Dependency
Succinct on Apache Spark requires Apache Spark 1.6+.
Maven
In your pom.xml, add:
1 | <dependencies> |
SBT
Add the dependency to your SBT project by adding the following to build.sbt (see the Spark Packages listing for spark-submit and Maven instructions):
1 | resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven" |
The succinct-spark jar file can also be added to a Spark shell using the –jars command line option. For example, to include it when starting the spark shell:
1 | bin/spark-shell --jars succinct-0.1.7.jar |
- 原文链接:+Apache Spark