Spark Storage

Concept

Stores

  • Memory, Disk, and Off-Heap()

Levels

  • persist(mark), cache(StorageLevel.MEMORY_ONLY)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {

// TODO: Also add fields for caching priority, dataset ID, and flushing.
private def this(flags: Int, replication: Int) {
this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}

def this() = this(false, true, false, false) // For deserialization

def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication

// ...
}

object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

// ...
}

Master/Slave

  • Executor(Driver): SparkContext -> SparkEnv -> BlockTransferService(NettyBlockTransferService), BlockManagerMaster(BlockManagerMasterEndpoint), BlockManager

RPC

Update(Read/Write)

RDD -> BlockManager -> Remote: BlockTransferService(fetch/upload), Local: MemoryStore/DiskStore(get/put)

MemoryManager

  • acquire/release, get/set

  • MemoryMode(ON_HEAP, OFF_HEAP)

StaticMemoryManager

  • MaxStorageMemory = systemMaxMemory spark.storage.memoryFraction spark.storage.safetyFraction

  • MaxExecutionMemory = systemMaxMemory spark.shuffle.memoryFraction spark.shuffle.safetyFraction

UnifiedMemoryManager

  • MemoryPool(StorageMemoryPool, ExecutionMemoryPool)

  • acquireExecutionMemory, acquireStorageMemory, acquireUnrollMemory

  • getMaxMemory = (systemMemory - reservedMemory) * spark.memory.fraction