Skip to content

Commit

Permalink
new scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 25, 2024
1 parent 5f10b21 commit 81726f3
Show file tree
Hide file tree
Showing 163 changed files with 4,385 additions and 437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ struct AggregateFunctionLinearHistogramData {
left += interval;
bucket_json.AddMember("upper", left, allocator);
count = (idx == key) ? count_ : 0;
bucket_json.AddMember("count", static_cast<uint64_t>(count), allocator);
bucket_json.AddMember("count", count, allocator);
acc_count += count;
bucket_json.AddMember("acc_count", static_cast<uint64_t>(acc_count), allocator);
bucket_json.AddMember("acc_count", acc_count, allocator);

bucket_arr.PushBack(bucket_json, allocator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.NereidsSqlCoordinator;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
Expand Down Expand Up @@ -134,6 +137,9 @@ public BrokerLoadJob createBrokerLoadJob() {

public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer, Planner planner,
StatsErrorEstimator statsErrorEstimator) {
if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner()) {
return new NereidsSqlCoordinator(context, analyzer, (NereidsPlanner) planner, statsErrorEstimator);
}
return new Coordinator(context, analyzer, planner, statsErrorEstimator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class SummaryProfile {
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
NEREIDS_TRANSLATE_TIME,
NEREIDS_DISTRIBUTE_TIME,
WORKLOAD_GROUP,
ANALYSIS_TIME,
PLAN_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
}
}

private Plan planWithoutLock(
protected Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
// resolve column, table and function
Expand Down Expand Up @@ -309,7 +309,7 @@ private Plan planWithoutLock(
return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
protected LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

Expand All @@ -320,7 +320,7 @@ private void initCascadesContext(LogicalPlan plan, PhysicalProperties requirePro
}
}

private void analyze(boolean showPlanProcess) {
protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
}
Expand All @@ -335,7 +335,7 @@ private void analyze(boolean showPlanProcess) {
/**
* Logical plan rewrite based on a series of heuristic rules.
*/
private void rewrite(boolean showPlanProcess) {
protected void rewrite(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start rewrite plan");
}
Expand All @@ -347,7 +347,7 @@ private void rewrite(boolean showPlanProcess) {
}

// DependsRules: EnsureProjectOnTopJoin.class
private void optimize() {
protected void optimize() {
if (LOG.isDebugEnabled()) {
LOG.debug("Start optimize plan");
}
Expand All @@ -358,7 +358,7 @@ private void optimize() {
}
}

private void splitFragments(PhysicalPlan resultPlan) {
protected void splitFragments(PhysicalPlan resultPlan) {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -453,7 +453,7 @@ private void splitFragments(PhysicalPlan resultPlan) {
}
}

private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
protected void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
return;
Expand All @@ -463,18 +463,21 @@ private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
}

splitFragments(physicalPlan);
doDistribute(canUseNereidsDistributePlanner);
}

protected void doDistribute(boolean canUseNereidsDistributePlanner) {
if (!canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(fragments).plan();
distributedPlans = new DistributePlanner(this, fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.system.Backend;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.sparkproject.guava.base.Throwables;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -169,6 +170,8 @@ public class StatementContext implements Closeable {

private List<PlannerHook> plannerHooks = new ArrayList<>();

private Backend groupCommitMergeBackend;

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -558,4 +561,13 @@ public TableId getTableId(TableIf tableIf) {
this.tableIdMapping.put(tableIdentifier, tableId);
return tableId;
}

public Backend getGroupCommitMergeBackend() {
return groupCommitMergeBackend;
}

public void setGroupCommitMergeBackend(
Backend groupCommitMergeBackend) {
this.groupCommitMergeBackend = groupCommitMergeBackend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ private void ensureChildrenRewritten() {

// some rule return new plan tree, which the number of new plan node > 1,
// we should transform this new plan nodes too.
// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(plan)) {
pushChildrenJobs(plan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected final RewriteResult rewrite(Plan plan, List<Rule> rules, RewriteJobCon
}
Plan newPlan = newPlans.get(0);
if (!newPlan.deepEquals(plan)) {
// don't remove this comment, it can help us to trace some bug when developing.

NereidsTracer.logRewriteEvent(rule.toString(), pattern, plan, newPlan);
String traceBefore = null;
if (showPlanProcess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void execute() {
RewriteJobContext newRewriteJobContext = rewriteJobContext.withChildrenVisited(true);
pushJob(new PlanTreeRewriteTopDownJob(newRewriteJobContext, context, isTraverseChildren, rules));

// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(rewriteJobContext.plan)) {
pushChildrenJobs(newRewriteJobContext);
}
Expand Down
Loading

0 comments on commit 81726f3

Please sign in to comment.