Catalyst¶

Encoder¶
a container of serde expressions in Dataset
- Serialize、Deserialize
- ExpressionEncoder(only)
RowEncoder(mapping & convert external rows)
InternalRow¶
- UnsafeRow(Tungsten)
- JoinedRow(Join)
- BaseGenericInternalRow
- GenericInternalRow
- SpecificInternalRow(modify)
- MutableUnsafeRow(update)
TreeNode¶
- Expression
- QueryPlan
- LogicalPlan
- resolved、canonicalized
- UnaryNode、BinaryNode、LeafNode、Other
- SparkPlan
- UnaryExecNode、BinaryExecNode、LeafExecNode、Other
Rule¶
- Rule
- RuleExecutor
- Seq[Batch]
- Strategy
- Once
- FixedPoint
Parser¶
- ANTLR(Lexer、Parser):Adaptive LL(*),Listener、Visitor
- SQL、Dataset、DataFrame -> AstBuilder -> Unresolved LogicalPlan(Relation、Function、Attribute)
Analyzer¶

- Strategy、Rule-Based
- Catalog、Metastore、Rule -> semantically validates and transforms(resolving, removing, modifying) -> Analyzed LogicalPlan
| Batch | Strategy | Rules |
|---|---|---|
| Hints | FixedPoint | ResolveBroadcastHints、ResolveCoalesceHints、RemoveAllHints |
| Simple Sanity Check | Once | LookupFunctions |
| Substitution | FixedPoint | CTESubstitution、WindowsSubstitution、EliminateUnions、SubstituteUnresolvedOrdinals |
| Resolution | FixedPoint | ResolveTableValuedFunctions、ResolveRelations、ResolveReferences、ResolveCreateNamedStruct、ResolveDeserializer、ResolveNewInstance、ResolveUpCast、ResolveGroupingAnalytics、ResolvePivot、ResolveOrdinalInOrderByAndGroupBy、ResolveMissingReferences、ExtractGenerator、ResolveGenerate、ResolveFunctions、ResolveAliases、ResolveSubquery、ResolveWindowOrder、ResolveWindowFrame、ResolveNaturalAndUsingJoin、ExtractWindowExpressions、GlobalAggregates、ResolveAggregateFunctions、TimeWindowing、ResolveInlineTables、TypeCoercion.typeCoercionRules、extendedResolutionRules |
| Post-Hoc Resolution | Once | postHocResolutionRules |
| View | Once | AliasViewChild |
| Nondeterministic | Once | PullOutNondeterministic |
| UDF | Once | HandleNullInputsForUDF |
| FixNullability | Once | FixNullability |
| ResolveTimeZone | Once | ResolveTimeZone |
| Cleanup | FixedPoint | CleanupAliases |
Optimizer¶
- Analyzed LogicalPlan -> RBO(Rule-Based Optimizer) -> Optimized LogicalPlan
| Batch | Strategy | Rules |
|---|---|---|
| Eliminate Distinct | FixedPoint | EliminateDistinct |
| Finish Analysis | Once | EliminateSubqueryAliases、EliminateView、ReplaceExpressions、ComputeCurrentTime、GetCurrentDatabase、RewriteDistinctAggregates、ReplaceDeduplicateWithAggregate |
| Union | Once | CombineUnions |
| LocalRelation early | FixedPoint | ConvertToLocalRelation、PropagateEmptyRelation |
| Pullup Correlated Expressions | Once | PullupCorrelatedPredicates |
| Subquery | Once | OptimizeSubqueries |
| Replace Operators | FixedPoint | RewriteExceptAll、RewriteIntersectAll、ReplaceIntersectWithSemiJoin、ReplaceExceptWithFilter、ReplaceExceptWithAntiJoin、ReplaceDistinctWithAggregate |
| Aggregate | FixedPoint | RemoveLiteralFromGroupExpressions、RemoveRepetitionFromGroupExpressions |
| Join Reorder | Once | CostBasedJoinReorder |
| Remove Redundant Sorts | Once | RemoveRedundantSorts |
| Decimal Optimizations | FixedPoint | DecimalAggregates |
| Object Expressions Optimization | FixedPoint | EliminateMapObjects、CombineTypedFilters |
| LocalRelation | FixedPoint | ConvertToLocalRelation、PropagateEmptyRelation |
| Extract PythonUDF From JoinCondition | Once | PullOutPythonUDFInJoinCondition |
| Check Cartesian Products | Once | CheckCartesianProducts |
| RewriteSubquery | Once | RewritePredicateSubquery、ColumnPruning、CollapseProject、RemoveRedundantProject |
| UpdateAttributeReferences | Once | UpdateNullabilityInAttributeReferences |
- extendedOperatorOptimizationRules
PushProjectionThroughUnion、ReorderJoin、EliminateOuterJoin、PushPredicateThroughJoin、PushDownPredicate、LimitPushDown、ColumnPruning、CollapseRepartition、CollapseProject、CollapseWindow、CombineFilters、CombineLimits、CombineUnions、NullPropagation、ConstantPropagation、FoldablePropagation、OptimizeIn、ConstantFolding、ReorderAssociativeOperator、LikeSimplification、BooleanSimplification、SimplifyConditionals、RemoveDispensableExpressions、SimplifyBinaryComparison、PruneFilters、EliminateSorts、SimplifyCasts、SimplifyCaseConversionExpressions、RewriteCorrelatedScalarSubquery、EliminateSerialization、RemoveRedundantAliases、RemoveRedundantProject、SimplifyExtractValueOps、CombineConcats
- Non-Excludable
PushProjectionThroughUnion、EliminateDistinct、EliminateSubqueryAliases、EliminateView、ReplaceExpressions、ComputeCurrentTime、GetCurrentDatabase、RewriteDistinctAggregates、ReplaceDeduplicateWithAggregate、ReplaceIntersectWithSemiJoin、ReplaceExceptWithFilter、ReplaceExceptWithAntiJoin、RewriteExceptAll、RewriteIntersectAll、ReplaceDistinctWithAggregate、PullupCorrelatedPredicates、RewriteCorrelatedScalarSubquery、RewritePredicateSubquery、PullOutPythonUDFInJoinCondition
Planner & Execution¶

- CBO(Cost-Based Optimizer):Shuffle、Join
- SparkPlan、SparkPlaner、QueryExecution
- Distribution、Partitioning、SortOrder
- SparkPlanInfo(metadata、metrics)
- SparkStrategy(Aggregation、BasicOperators、FlatMapGroupsWithStateStrategy、InMemoryScans、JoinSelection、SpecialLimits、StatefulAggregationStrategy、StreamingDeduplicationStrategy、StreamingRelationStrategy)
- Rule(CollapseCodegenStages、PlanSubqueries、ReuseSubquery、ReuseExchange、EnsureRequirements)
Aggregation¶
- Aggregation Buffer(Schema、Attributes)
- Partial、PartialMerge、Final、Complete
Window¶
windowSpec
: name=identifier #windowRef
| '('name=identifier')' #windowRef
| '('
( CLUSTER BY partition+=expression (',' partition+=expression)*
| ((PARTITION | DISTRIBUTE) BY partition+=expression (',' partition+=expression)*)?
((ORDER | SORT) BY sortItem (',' sortItem)*)?)
windowFrame?
')' #windowDef
;
windowFrame
: frameType=RANGE start=frameBound
| frameType=ROWS start=frameBound
| frameType=RANGE BETWEEN start=frameBound AND end=frameBound
| frameType=ROWS BETWEEN start=frameBound AND end=frameBound
;
frameBound
: UNBOUNDED boundType=(PRECEDING | FOLLOWING)
| boundType=CURRENT ROW
| expression boundType=(PRECEDING | FOLLOWING)
;
Join¶
joinRelation
: (joinType) JOIN right=relationPrimary joinCriteria?
| NATURAL joinType JOIN right=relationPrimary
;
joinType
: INNER?
| CROSS
| LEFT OUTER?
| LEFT SEMI
| RIGHT OUTER?
| FULL OUTER?
| LEFT? ANTI
;
joinCriteria
: ON booleanExpression
| USING identifierList
;
Tungsten¶
- Memory Management and Binary Processing
- Cache-aware computation(CPU L1/L2/L3: Cache Hit, Cache Locality)
- Code generation(Janino、WholeStageCodegen)
- No virtual function dispatches
- Intermediate data in memory vs CPU registers
- Loop unrolling and SIMD
Columnar¶
in-memory columnar format
Codegen/Janino/JIT¶
- HashAggregate
- BroadcastHashJoin
- SortMergeJoin
- RDDScan
- DataSourceScan
- WholeStageCodegen
Hint¶
- Join、Shuffle
- BROADCASTJOIN、MAPJOIN、STREAMTABLE、INDEX、COALESCE、REPARTITION
Statistics¶
- Table(sizeInBytes、rowCount、hints)
- Column(distinctCount、min、max、nullCount、avgLen、maxLen、histogram)
Adapter¶
- Metadata
- Metrics
Data Source¶
- Federation
- Partitions
- Transactional
- Vectorization(Pushdown)
- Parquet
- ORC
- CarbonData
Session¶
- Application -1:n-> Session(Context) -1:n-> Job
- Share & Cache Data
Catalog¶
- Configuration
- View
- Function
- External Catalog
Cache¶
- persist(StorageLevel.MEMORY_AND_DISK)
Compression¶
- Cache tables in-memory columnar format
- Compression batchSize default 10000
ShuffleService¶
- Standalone Mode
- Executor Process