Succinct RDDs in Apache Spark

We have open-sourced Apache Spark and Apache Spark SQL interfaces for Succinct as a Spark Package. The package facilitates compressing RDDs in Apache Spark and DataFrames in Apache Spark SQL and enables queries directly on the compressed representation.

Interface

The Succinct on Apache Spark exposes the following APIs:

  • A SuccinctRDD API that views an RDD as an unstructured “flat-file” and enables queries on its compressed representation.
  • A SuccinctKVRDD API that provides a key-value abstraction for the data, and supports search and random-access over the values.
  • A SuccinctJsonRDD API that enables random access and search on a collection of compressed JSON documents.
  • DataFrame API that integrates with the Apache Spark SQL interface via Data Sources, and supports SQL queries on compressed structured data. The Apache Spark SQL interface is currently experimental, and only efficient for selected SQL operators. We aim to make the Apache Spark SQL integration more efficient in future releases.

SuccinctRDD

SuccinctRDD provides a “flat file” view for an RDD, where the entire RDD is viewed as a single unstructured file. For instance, applications in log analytics can directly employ SuccinctRDD to perform efficient search (e.g., errors for debugging) and random access (e.g., extract logs at certain timestamps) on unstructured logs. This functionality is similar to Lucene, which uses full-text indexing to enable search over unstructured data.

API

SuccinctRDD supports the following operations:

Operation Meaning
extract(offset, length) The extract operation provides random access to any offset in the flat file.
search(query) Returns offsets (into the flat file) of all occurrences of the query string in uncompressed flat file (while operating directly on the compressed RDD). Applications can search for arbitrary strings in the text; that is, query strings are not limited to be specific words in the text.
count(query) The count interface is an extremely fast operation, and returns the number of occurrences of the query string in the uncompressed flat file.

Example

SuccinctRDD can be used as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import edu.berkeley.cs.succinct._

// Read text data from file; sc is the SparkContext
val wikiData = sc.textFile("/path/to/data").map(_.getBytes)

// Converts the wikiData RDD to a SuccinctRDD, serializing each record into an
// array of bytes. We persist the RDD in memory to perform in-memory queries.
val wikiSuccinctData = wikiData.succinct.persist()

// Count the number of occurrences of "Berkeley" in the RDD
val berkeleyOccCount = wikiSuccinctData.count("Berkeley")
println("# of times Berkeley appears in text = " + berkeleyOccCount)

// Find all offsets of occurrences of "Berkeley" in the RDD
val searchOffsets = wikiSuccinctData.search("Berkeley")
println("First 10 locations in the RDD where Berkeley occurs: ")
searchOffsets.take(10).foreach(println)

// Find all occurrences of the regular expression "(berkeley|stanford)\\.edu"
val regexOccurrences = wikiSuccinctData.regexSearch("(stanford|berkeley)\\.edu").collect()
println("# of matches for the regular expression (stanford|berkeley)\\.edu = " + regexOccurrences.count)

// Extract 10 bytes at offset 5 in the RDD
val extractedData = wikiSuccinctData.extract(5, 10)
println("Extracted data = [" + new String(extractedData) + "]")

Performance

We compare the performance of Succinct on Apache Spark against Apache Spark’s native (uncompressed) RDDs. We use a collection of 40GB Wikipedia articles, all combined into a single text corpus. All benchmarks were run on an Amazon EC2 cluster with five c3.4xlarge (one used as a master), each having 30GB RAM and 16 vCPUs. Note that Succinct on Apache Spark requires significantly less memory (roughly 23GB) and could easily run the entire benchmark on a single server. However, we use a larger cluster to compare Succinct on Apache Spark’s performance against the best-case scenario for native Apache Spark — when everything for native Apache Spark fits in memory. The search queries use words with varying number of occurrences (1–10,000) with uniform random distribution across 10 bins (1–1000, 1000-2000, etc). Note that the y-axis is on log scale.

We note that Succinct on Apache Spark provides significant speed ups over native Apache Spark (as much as two to three orders of magnitude) even when native Apache Spark RDD fits in memory, while executing queries on compressed data. These are the benefits of avoiding data scans, the second problem we highlighted at the outset of the post. Also note that Succinct on Apache Spark requires roughly 2x lower memory than native Apache Spark RDD, and can thus provide such low latency performance for a larger range of input dataset sizes compared to native Apache Spark.

Comments

Input Constraints. We don’t support non-ASCII characters in the input for now, since the algorithms depend on using certain non-ASCII characters as internal symbols.

Construction Time. Another constraint to consider is the construction time for Succinct data-structures. As for any block compression scheme, Succinct requires non-trivial amount of time to compress an input dataset. It is strongly advised that the SuccinctRDD be cached in memory (using RDD.cache()) and persisted on disk after construcion completes, to be able to re-use the constructed data-structures without trigerring re-construction:

1
2
3
4
5
6
7
8
9
10
import edu.berkeley.cs.succinct._

// Read text data from file; sc is the SparkContext
val wikiData = sc.textFile("/path/to/data").map(_.getBytes)

// Construct the succinct RDD and save it as follows
wikiData.saveAsSuccinctFile("/path/to/data")

// Load into memory again as follows; sc is the SparkContext
val loadedSuccinctRDD = sc.succinctFile("/path/to/data")

SuccinctKVRDD

The SuccinctKVRDD interface models semi-structured data, similar to those seen in document stores or key-value stores. Example use-cases include searching across the collection wikipedia articles, and extracting relevant parts of the article-text, or searching across tweets stored in a key-value store and retrieving tweets from a particular user. The functionality of the SuccinctKVRDD is akin to Elasticsearch, which supports search on documents over a document-store interface.

The SuccinctKVRDD implements the RDD[(K, Array[Byte]] interface, where key can be of the specified (ordered) type while the value is a serialized array of bytes.

API

SuccinctKVRDD supports the following operations:

Operation Meaning
get(key) Random access functionality similar to typical Key-Value stores and documents stores; the value or the document is returned.
extract(key, offset, length) Random access within a value/document given the relative offset. This may be useful when an application may not want to access the entire document, but only a subset of the document.
search(query) Similar to flat file interface, but now returns the keys (or document identifiers) whose values (or document text) contain the query string.
searchOffsets(query) Finds actual matches for the search query, and returns (key, offset) pairs corresponding to each match. The offset is relative to the beginning of the value/document.
count(query) Returns the number of actual matches for the search query across the values/documents.

Example

SuccinctKVRDD can be used as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import edu.berkeley.cs.succinct.kv._

val wikiData = ctx.textFile(dataPath, partitions).map(_.getBytes)
val wikiKVData = wikiData.zipWithIndex().map(t => (t.\_2, t.\_1))

val succinctKVRDD = wikiKVData.succinctKV

// Get the value for key 0
val value = succinctKVRDD.get(0)
println("Value corresponding to key 0 = " + new String(value))

// Fetch 3 bytes at offset 1 for the value corresponding to key = 0
val valueData = succinctKVRDD.extract(0, 1, 3)
println("Value data for key 0 at offset 1 and length 3 = " + new String(valueData))

// count the number of occurrences of "Berkeley" accross all values
val count = succinctKVRDD.count("Berkeley")
println("Number of times Berkeley occurs in the values: " + count)

// Get the individual occurrences of Berkeley as offsets into each value
val searchOffsets = succinctKVRDD.searchOffsets("Berkeley")
println("First 10 matches for Berkeley as (key, offset) pairs: ")
searchOffsets.take(10).foreach(println)

// Search for values containing "Berkley", and fetch corresponding keys
val keys = succinctKVRDD.search("Berkeley")
println("First 10 keys matching the search query:")
keys.take(10).foreach(println)

// Regex search to find values containing matches of "(stanford|berkeley)\\.edu",
// and fetch the corresponding of keys
val regexKeys = succinctKVRDD.regexSearch("(stanford|berkeley)\\.edu")
println("First 10 keys matching the regex query:")
regexKeys.take(10).foreach(println)

Performance

We evaluate the performance of Succinct on Apache Spark against native Apache Spark RDD for the key-value store and document store functionality, using a setup similar to that of flat file interface with the only change that we use a 50GB dataset composed of metadata for video streams from Conviva for the KV-store; for document store, we use the same Wikipedia dataset, but now each article has a unique documentID.

As with the flat file interface, Succinct on Apache Spark for the key-value store functionality achieves two to three orders of magnitude faster performance compared to Apache Spark’s native RDD.

For search performance, we compare Succinct’s performance against both Apache Spark’s native RDD, as well as against Elasticsearch – a popular document-store that supports search across documents.

Interestingly, Succinct on Apache Spark is roughly 2.75x faster than Elasticsearch! This is when ElasticSearch does not have the overhead of Apache Spark’s job execution, and has all the data fit completely in memory. Succinct on Apache Spark achieves this speed up while requiring roughly 2.5x lower memory than ElasticSearch (due to compression, and due to storing no additional indexes)! As earlier, Succinct is over two orders of magnitude faster than Apache Spark’s native RDDs. Random access on documents has performance similar to that for flat files.

Comments

The input constraints for SuccinctRDD carry over to the SuccinctKVRDD as well. Similar to the flat-file interface, we suggest that the KV data be persisted to disk for repeated-use scenarios:

1
2
3
4
5
6
7
8
9
10
11
import edu.berkeley.cs.succinct.kv._

// Read data from file; sc is the SparkContext
val wikiData = ctx.textFile("/path/to/data").map(_.getBytes)
val wikiKVData = wikiData.zipWithIndex().map(t => (t.\_2, t.\_1))

// Construct the SuccinctKVRDD and save it as follows
wikiKVData.saveAsSuccinctKV("/path/to/data")

// Load into memory again as follows; sc is the SparkContext
val loadedSuccinctKVRDD = sc.succinctKV("/path/to/data")

DataFrame

Apache Spark 1.3 added a new DataFrame API that provides powerful and convenient operators to work with structured data. We provide access to Succinct-encoded data through the DataFrame API via Data Sources as an experimental feature. While it is quite efficient for several filters, we are working on several interesting projects to efficiently support other operators.

At a high level, the DataFrame interface enables search and random access on structured data like tables. An example use-case for this interface is point queries on columnar stores. For instance, given a table with the schema [UserID, Location D.O.B., Salary], we might want to search for all users that were born between 1980 and 1985.

API

Since Succinct is implemented as a Data Source in Apache Spark SQL, the API for DataFrames remains unchanged. The API documentation for DataFrames can be found here.

Example

The DataFrame API can be used as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import edu.berkeley.cs.succinct.sql._

// Create a schema
val citySchema = StructType(Seq(
StructField("Name", StringType, false),
StructField("Length", IntegerType, true),
StructField("Area", DoubleType, false),
StructField("Airport", BooleanType, true)))

// Create an RDD of Rows with some data
val cityRDD = sparkContext.parallelize(Seq(
Row("San Francisco", 12, 44.52, true),
Row("Palo Alto", 12, 22.33, false),
Row("Munich", 8, 3.14, true)))

// Create a data frame from the RDD and the schema
val cityDataFrame = sqlContext.createDataFrame(cityRDD, citySchema)

// Save the DataFrame in the "Succinct" format
cityDataFrame.write.format("edu.berkeley.cs.succinct.sql").save("/path/to/data")

// Read the Succinct DataFrame from the saved path
val succinctCities = sqlContext.succinctFile("/path/to/data")

// Filter and prune
val bigCities = succinctCities.filter("Area >= 22.0").select("Name").collect

// Alternately, use the DataFrameReader API:
cityDataFrame.write.format("edu.berkeley.cs.succinct.sql").save("/path/to/data")
val succinctCities2 = sqlContext.read.format("edu.berkeley.cs.succinct.sql").load("/path/to/data")
val smallCities = succinctCities2.filter("Area <= 10.0").select("Name").collect

Performance

For this interface, we compare Succinct’s performance to the columnar Parquet DataSource supported natively by Apache Spark. We revisit the 40GB of Conviva Dataset, which has roughly 98 columns and 43.7 million rows, and consider the following SQL query:

1
SELECT * FROM conviva_table WHERE col[i]=value

We vary the columns and the value being matched, and analyze the variation of the performance of this query as the number of matched rows increase. This particular query involves both search (to perform the filter based on the WHERE clause) and random access (to obtain all the other column values for the matched column value, based on the SELECT * projection). We note Succinct’s latency for performing random access increases with the amount of data extracted and the latency for search increases with the number of matched results.

This is evident in the results above; interestingly, Succinct can find as many as 100,000 matches with latency lower than that for Parquet’s columnar representation.

Comments

The DataFrame API for Succinct is experimental for now, and only supports selected data types and filters. The supported Apache Spark SQL types include:

1
2
3
4
5
6
7
8
9
BooleanType
ByteType
ShortType
IntegerType
LongType
FloatType
DoubleType
DecimalType
StringType

The supported filters include:

1
2
3
4
5
6
7
8
StringStartsWith
StringEndsWith
StringContains
EqualTo
LessThan
LessThanOrEqual
GreaterThan
GreaterThanOrEqual

Note that certain SQL operations, like joins, might be inefficient on the DataFrame API for now. We plan on improving the performance for generic SQL operations in a future release.

Getting Started

Succinct on Apache Spark includes a few examples that elucidate the usage of its API. To run these examples, we provide convenient scripts to run them in the bin/ directory. In particular, to execute the Wikipedia Search example using SuccinctRDD, run as follows:

1
./bin/wiki-search [num-partitions]

The num-partitions parameter is simply the number of partitions that the original dataset should be divided into for creating Succinct data structures. This defaults to 1 by default; note that due to Java constraints, we do not support partitions of sizes greater than 2GB yet.

The KV Search and Table Search examples are executed similarly.

Requirements and Dependency

Succinct on Apache Spark requires Apache Spark 1.6+.

Maven

In your pom.xml, add:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependencies>
<!-- list of dependencies -->
<dependency>
<groupId>amplab</groupId>
<artifactId>succinct</artifactId>
<version>0.1.7</version>
</dependency>
</dependencies>
<repositories>
<!-- list of other repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>

SBT

Add the dependency to your SBT project by adding the following to build.sbt (see the Spark Packages listing for spark-submit and Maven instructions):

1
2
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies += "amplab" % "succinct" % "0.1.7"

The succinct-spark jar file can also be added to a Spark shell using the –jars command line option. For example, to include it when starting the spark shell:

1
$ bin/spark-shell --jars succinct-0.1.7.jar

相关链接:Succinct
Spark