Skip to content

Commit

Permalink
[pick](mtmv) pick 4 PR from master apache#41472 apache#40106 apache#4…
Browse files Browse the repository at this point in the history
…0173 apache#42206 (apache#42325)

pick from master

pr: apache#41472
commitId: 2745e04

pr: apache#40106
commitId: 0fdb1ee

pr: apache#40173
commitId: 0d07e3d

pr: apache#42206
commitId: 2bcaa5b
  • Loading branch information
seawinde authored Oct 24, 2024
1 parent d27cab0 commit a1690f0
Show file tree
Hide file tree
Showing 24 changed files with 980 additions and 97 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public MTMV() {
mvRwLock = new ReentrantReadWriteLock(true);
}

@Override
public boolean needReadLockWhenPlan() {
return true;
}

public MTMVRefreshInfo getRefreshInfo() {
readMvLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
Expand All @@ -39,6 +40,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

import java.util.List;
Expand All @@ -55,6 +57,9 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
ctx.getState().reset();
ctx.setThreadLocalInfo();
ctx.getSessionVariable().allowModifyMaterializedViewData = true;
// Disable add default limit rule to avoid refresh data wrong
ctx.getSessionVariable().setDisableNereidsRules(
String.join(",", ImmutableSet.of(RuleType.ADD_DEFAULT_LIMIT.name())));
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class CascadesContext implements ScheduleContext {
private final Optional<CascadesContext> parent;

private final Set<MaterializationContext> materializationContexts;
private final Set<List<String>> materializationRewrittenSuccessSet = new HashSet<>();
private boolean isLeadingJoin = false;

private boolean isLeadingDisableJoinReorder = false;
Expand Down Expand Up @@ -368,6 +369,14 @@ public void addMaterializationContext(MaterializationContext materializationCont
this.materializationContexts.add(materializationContext);
}

public Set<List<String>> getMaterializationRewrittenSuccessSet() {
return materializationRewrittenSuccessSet;
}

public void addMaterializationRewrittenSuccess(List<String> materializationQualifier) {
this.materializationRewrittenSuccessSet.add(materializationQualifier);
}

/**
* getAndCacheSessionVariable
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ public synchronized void addTableReadLock(TableIf tableIf) {
String fullTableName = tableIf.getNameWithFullQualifiers();
String resourceName = "tableReadLock(" + fullTableName + ")";
plannerResources.push(new CloseableResource(
resourceName, Thread.currentThread().getName(), originStatement.originStmt, tableIf::readUnlock));
resourceName, Thread.currentThread().getName(),
originStatement == null ? null : originStatement.originStmt, tableIf::readUnlock));
}

/** releasePlannerResources */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.memo;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo;
Expand Down Expand Up @@ -126,6 +127,9 @@ public void refresh(Group group, CascadesContext cascadesContext) {
List<Set<BitSet>> childrenTableMap = new LinkedList<>();
if (groupExpression.children().isEmpty()) {
BitSet leaf = constructLeaf(groupExpression, cascadesContext);
if (leaf.isEmpty()) {
break;
}
groupExpressionMap.put(leaf, Pair.of(groupExpression, new LinkedList<>()));
continue;
}
Expand Down Expand Up @@ -163,9 +167,19 @@ public void refresh(Group group, CascadesContext cascadesContext) {
private BitSet constructLeaf(GroupExpression groupExpression, CascadesContext cascadesContext) {
Plan plan = groupExpression.getPlan();
BitSet tableMap = new BitSet();
boolean enableMaterializedViewNestRewrite = cascadesContext.getConnectContext().getSessionVariable()
.isEnableMaterializedViewNestRewrite();
if (plan instanceof LogicalCatalogRelation) {
TableIf table = ((LogicalCatalogRelation) plan).getTable();
// If disable materialized view nest rewrite, and mv already rewritten successfully once, doesn't construct
// table id map for nest mv rewrite
if (!enableMaterializedViewNestRewrite
&& cascadesContext.getMaterializationRewrittenSuccessSet().contains(table.getFullQualifiers())) {
return tableMap;

}
tableMap.set(cascadesContext.getStatementContext()
.getTableId(((LogicalCatalogRelation) plan).getTable()).asInt());
.getTableId(table).asInt());
}
// one row relation / CTE consumer
return tableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewFilterProjectJoinRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewFilterProjectScanRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewFilterScanRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewOnlyJoinRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewOnlyScanRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewProjectAggregateRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewProjectFilterAggregateRule;
Expand Down Expand Up @@ -228,7 +227,6 @@ public class RuleSet {
.build();

public static final List<Rule> MATERIALIZED_VIEW_RULES = planRuleFactories()
.add(MaterializedViewOnlyJoinRule.INSTANCE)
.add(MaterializedViewProjectJoinRule.INSTANCE)
.add(MaterializedViewFilterJoinRule.INSTANCE)
.add(MaterializedViewFilterProjectJoinRule.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.rules.rewrite.MergeProjects;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
Expand Down Expand Up @@ -261,6 +262,12 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
Expand Down Expand Up @@ -354,6 +361,13 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
rewrittenPlanOutput, queryPlan.getOutput()));
continue;
}
// Merge project
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.bottomUp(new MergeProjects()))).execute();
return childContext.getRewritePlan();
}, rewrittenPlan, queryPlan);
if (!isOutputValid(queryPlan, rewrittenPlan)) {
LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties();
materializationContext.recordFailReason(queryStructInfo,
Expand All @@ -363,11 +377,11 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
logicalProperties, queryPlan.getLogicalProperties()));
continue;
}
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext);
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
}
return rewriteResults;
}
Expand Down Expand Up @@ -852,8 +866,9 @@ protected boolean checkMaterializationPattern(StructInfo structInfo, CascadesCon
return checkQueryPattern(structInfo, cascadesContext);
}

protected void recordIfRewritten(Plan plan, MaterializationContext context) {
protected void recordIfRewritten(Plan plan, MaterializationContext context, CascadesContext cascadesContext) {
context.setSuccess(true);
cascadesContext.addMaterializationRewrittenSuccess(context.generateMaterializationIdentifier());
if (plan.getGroupExpression().isPresent()) {
context.addMatchedGroup(plan.getGroupExpression().get().getOwnerGroup().getGroupId(), true);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

/**
* This is responsible for join pattern such as project on filter on join
* Needed because variant data type would have filter on join directly, such as query query3_5 in variant_mv.groovy
*/
public class MaterializedViewProjectFilterJoinRule extends AbstractMaterializedViewJoinRule {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -212,8 +213,8 @@ public static List<StructInfo> extractStructInfo(Plan plan, Plan originalPlan, C
structInfosBuilder.add(structInfo);
}
}
return structInfosBuilder.build();
}
return structInfosBuilder.build();
}
// if plan doesn't belong to any group, construct it directly
return ImmutableList.of(StructInfo.of(plan, originalPlan, cascadesContext));
Expand Down Expand Up @@ -267,11 +268,22 @@ public static Plan rewriteByRules(
CascadesContext rewrittenPlanContext = CascadesContext.initContext(
cascadesContext.getStatementContext(), rewrittenPlan,
cascadesContext.getCurrentJobContext().getRequiredProperties());
// Tmp old disable rule variable
Set<String> oldDisableRuleNames = rewrittenPlanContext.getStatementContext().getConnectContext()
.getSessionVariable()
.getDisableNereidsRuleNames();
rewrittenPlanContext.getStatementContext().getConnectContext().getSessionVariable()
.setDisableNereidsRules(String.join(",", ImmutableSet.of(RuleType.ADD_DEFAULT_LIMIT.name())));
rewrittenPlanContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
rewrittenPlanContext.getConnectContext().setSkipAuth(true);
rewrittenPlan = planRewriter.apply(rewrittenPlanContext);
} finally {
rewrittenPlanContext.getConnectContext().setSkipAuth(false);
// Recover old disable rules variable
rewrittenPlanContext.getStatementContext().getConnectContext().getSessionVariable()
.setDisableNereidsRules(String.join(",", oldDisableRuleNames));
rewrittenPlanContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
for (Slot slot : rewrittenPlan.getOutput()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.ColWithComment;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
Expand All @@ -31,19 +30,13 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.util.List;
import java.util.Set;

/** AlterViewInfo */
public class AlterViewInfo extends BaseViewInfo {
Expand Down Expand Up @@ -83,18 +76,6 @@ public void init(ConnectContext ctx) throws UserException {
createFinalCols(outputs);
}

/**validate*/
public void validate(ConnectContext ctx) throws UserException {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.planWithLock(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
Set<String> colSets = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (Column col : finalCols) {
if (!colSets.add(col.getName())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, col.getName());
}
}
}

/**translateToLegacyStmt*/
public AlterViewStmt translateToLegacyStmt(ConnectContext ctx) {
List<ColWithComment> cols = Lists.newArrayList();
Expand Down
Loading

0 comments on commit a1690f0

Please sign in to comment.