Catalyst¶
Encoder¶
a container of serde expressions in Dataset
- Serialize、Deserialize
- ExpressionEncoder(only)
RowEncoder(mapping & convert external rows)
InternalRow¶
- UnsafeRow(Tungsten)
- JoinedRow(Join)
- BaseGenericInternalRow
- GenericInternalRow
- SpecificInternalRow(modify)
- MutableUnsafeRow(update)
TreeNode¶
- Expression
- QueryPlan
- LogicalPlan
- resolved、canonicalized
- UnaryNode、BinaryNode、LeafNode、Other
- SparkPlan
- UnaryExecNode、BinaryExecNode、LeafExecNode、Other
Rule¶
- Rule
- RuleExecutor
- Seq[Batch]
- Strategy
- Once
- FixedPoint
Parser¶
- ANTLR(Lexer、Parser):Adaptive LL(*),Listener、Visitor
- SQL、Dataset、DataFrame -> AstBuilder -> Unresolved LogicalPlan(Relation、Function、Attribute)
Analyzer¶
- Strategy、Rule-Based
- Catalog、Metastore、Rule -> semantically validates and transforms(resolving, removing, modifying) -> Analyzed LogicalPlan
Batch | Strategy | Rules |
---|---|---|
Hints | FixedPoint | ResolveBroadcastHints、ResolveCoalesceHints、RemoveAllHints |
Simple Sanity Check | Once | LookupFunctions |
Substitution | FixedPoint | CTESubstitution、WindowsSubstitution、EliminateUnions、SubstituteUnresolvedOrdinals |
Resolution | FixedPoint | ResolveTableValuedFunctions、ResolveRelations、ResolveReferences、ResolveCreateNamedStruct、ResolveDeserializer、ResolveNewInstance、ResolveUpCast、ResolveGroupingAnalytics、ResolvePivot、ResolveOrdinalInOrderByAndGroupBy、ResolveMissingReferences、ExtractGenerator、ResolveGenerate、ResolveFunctions、ResolveAliases、ResolveSubquery、ResolveWindowOrder、ResolveWindowFrame、ResolveNaturalAndUsingJoin、ExtractWindowExpressions、GlobalAggregates、ResolveAggregateFunctions、TimeWindowing、ResolveInlineTables、TypeCoercion.typeCoercionRules、extendedResolutionRules |
Post-Hoc Resolution | Once | postHocResolutionRules |
View | Once | AliasViewChild |
Nondeterministic | Once | PullOutNondeterministic |
UDF | Once | HandleNullInputsForUDF |
FixNullability | Once | FixNullability |
ResolveTimeZone | Once | ResolveTimeZone |
Cleanup | FixedPoint | CleanupAliases |
Optimizer¶
- Analyzed LogicalPlan -> RBO(Rule-Based Optimizer) -> Optimized LogicalPlan
Batch | Strategy | Rules |
---|---|---|
Eliminate Distinct | FixedPoint | EliminateDistinct |
Finish Analysis | Once | EliminateSubqueryAliases、EliminateView、ReplaceExpressions、ComputeCurrentTime、GetCurrentDatabase、RewriteDistinctAggregates、ReplaceDeduplicateWithAggregate |
Union | Once | CombineUnions |
LocalRelation early | FixedPoint | ConvertToLocalRelation、PropagateEmptyRelation |
Pullup Correlated Expressions | Once | PullupCorrelatedPredicates |
Subquery | Once | OptimizeSubqueries |
Replace Operators | FixedPoint | RewriteExceptAll、RewriteIntersectAll、ReplaceIntersectWithSemiJoin、ReplaceExceptWithFilter、ReplaceExceptWithAntiJoin、ReplaceDistinctWithAggregate |
Aggregate | FixedPoint | RemoveLiteralFromGroupExpressions、RemoveRepetitionFromGroupExpressions |
Join Reorder | Once | CostBasedJoinReorder |
Remove Redundant Sorts | Once | RemoveRedundantSorts |
Decimal Optimizations | FixedPoint | DecimalAggregates |
Object Expressions Optimization | FixedPoint | EliminateMapObjects、CombineTypedFilters |
LocalRelation | FixedPoint | ConvertToLocalRelation、PropagateEmptyRelation |
Extract PythonUDF From JoinCondition | Once | PullOutPythonUDFInJoinCondition |
Check Cartesian Products | Once | CheckCartesianProducts |
RewriteSubquery | Once | RewritePredicateSubquery、ColumnPruning、CollapseProject、RemoveRedundantProject |
UpdateAttributeReferences | Once | UpdateNullabilityInAttributeReferences |
- extendedOperatorOptimizationRules
PushProjectionThroughUnion、ReorderJoin、EliminateOuterJoin、PushPredicateThroughJoin、PushDownPredicate、LimitPushDown、ColumnPruning、CollapseRepartition、CollapseProject、CollapseWindow、CombineFilters、CombineLimits、CombineUnions、NullPropagation、ConstantPropagation、FoldablePropagation、OptimizeIn、ConstantFolding、ReorderAssociativeOperator、LikeSimplification、BooleanSimplification、SimplifyConditionals、RemoveDispensableExpressions、SimplifyBinaryComparison、PruneFilters、EliminateSorts、SimplifyCasts、SimplifyCaseConversionExpressions、RewriteCorrelatedScalarSubquery、EliminateSerialization、RemoveRedundantAliases、RemoveRedundantProject、SimplifyExtractValueOps、CombineConcats
- Non-Excludable
PushProjectionThroughUnion、EliminateDistinct、EliminateSubqueryAliases、EliminateView、ReplaceExpressions、ComputeCurrentTime、GetCurrentDatabase、RewriteDistinctAggregates、ReplaceDeduplicateWithAggregate、ReplaceIntersectWithSemiJoin、ReplaceExceptWithFilter、ReplaceExceptWithAntiJoin、RewriteExceptAll、RewriteIntersectAll、ReplaceDistinctWithAggregate、PullupCorrelatedPredicates、RewriteCorrelatedScalarSubquery、RewritePredicateSubquery、PullOutPythonUDFInJoinCondition
Planner & Execution¶
- CBO(Cost-Based Optimizer):Shuffle、Join
- SparkPlan、SparkPlaner、QueryExecution
- Distribution、Partitioning、SortOrder
- SparkPlanInfo(metadata、metrics)
- SparkStrategy(Aggregation、BasicOperators、FlatMapGroupsWithStateStrategy、InMemoryScans、JoinSelection、SpecialLimits、StatefulAggregationStrategy、StreamingDeduplicationStrategy、StreamingRelationStrategy)
- Rule(CollapseCodegenStages、PlanSubqueries、ReuseSubquery、ReuseExchange、EnsureRequirements)
Aggregation¶
- Aggregation Buffer(Schema、Attributes)
- Partial、PartialMerge、Final、Complete
Window¶
windowSpec : name=identifier #windowRef | '('name=identifier')' #windowRef | '(' ( CLUSTER BY partition+=expression (',' partition+=expression)* | ((PARTITION | DISTRIBUTE) BY partition+=expression (',' partition+=expression)*)? ((ORDER | SORT) BY sortItem (',' sortItem)*)?) windowFrame? ')' #windowDef ; windowFrame : frameType=RANGE start=frameBound | frameType=ROWS start=frameBound | frameType=RANGE BETWEEN start=frameBound AND end=frameBound | frameType=ROWS BETWEEN start=frameBound AND end=frameBound ; frameBound : UNBOUNDED boundType=(PRECEDING | FOLLOWING) | boundType=CURRENT ROW | expression boundType=(PRECEDING | FOLLOWING) ;
Join¶
joinRelation : (joinType) JOIN right=relationPrimary joinCriteria? | NATURAL joinType JOIN right=relationPrimary ; joinType : INNER? | CROSS | LEFT OUTER? | LEFT SEMI | RIGHT OUTER? | FULL OUTER? | LEFT? ANTI ; joinCriteria : ON booleanExpression | USING identifierList ;
Tungsten¶
- Memory Management and Binary Processing
- Cache-aware computation(CPU L1/L2/L3: Cache Hit, Cache Locality)
- Code generation(Janino、WholeStageCodegen)
- No virtual function dispatches
- Intermediate data in memory vs CPU registers
- Loop unrolling and SIMD
Columnar¶
in-memory columnar format
Codegen/Janino/JIT¶
- HashAggregate
- BroadcastHashJoin
- SortMergeJoin
- RDDScan
- DataSourceScan
- WholeStageCodegen
Hint¶
- Join、Shuffle
- BROADCASTJOIN、MAPJOIN、STREAMTABLE、INDEX、COALESCE、REPARTITION
Statistics¶
- Table(sizeInBytes、rowCount、hints)
- Column(distinctCount、min、max、nullCount、avgLen、maxLen、histogram)
Adapter¶
- Metadata
- Metrics
Data Source¶
- Federation
- Partitions
- Transactional
- Vectorization(Pushdown)
- Parquet
- ORC
- CarbonData
Session¶
- Application -1:n-> Session(Context) -1:n-> Job
- Share & Cache Data
Catalog¶
- Configuration
- View
- Function
- External Catalog
Cache¶
- persist(StorageLevel.MEMORY_AND_DISK)
Compression¶
- Cache tables in-memory columnar format
- Compression batchSize default 10000
ShuffleService¶
- Standalone Mode
- Executor Process