Skip to content

Commit

Permalink
support broker load
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 25, 2024
1 parent 10db0f8 commit 98ab59f
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 159 deletions.
23 changes: 21 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
Expand Down Expand Up @@ -147,8 +151,23 @@ public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer,
public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<ScanNode> scanNodes,
String timezone, boolean loadZeroTolerance, boolean enableProfile) {
return new Coordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance,
enableProfile);
if (SessionVariable.canUseNereidsDistributePlanner()) {
ConnectContext connectContext = new ConnectContext();
connectContext.setQueryId(queryId);
StatementContext statementContext = new StatementContext(
connectContext, new OriginStatement("", 0)
);
DistributePlanner distributePlanner = new DistributePlanner(statementContext, fragments);
FragmentIdMapping<DistributedPlan> distributedPlans = distributePlanner.plan();

return new NereidsSqlCoordinator(
jobId, queryId, descTable, fragments, distributedPlans.valueList(),
scanNodes, timezone, loadZeroTolerance, enableProfile
);
}
return new Coordinator(
jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile
);
}

public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ public CascadesContext getCascadesContext() {
return cascadesContext;
}

public ConnectContext getConnectContext() {
return cascadesContext.getConnectContext();
}

public static PhysicalProperties buildInitRequireProperties() {
return PhysicalProperties.GATHER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.qe;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
Expand All @@ -26,7 +27,6 @@
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
Expand Down Expand Up @@ -77,30 +77,39 @@
public class NereidsSqlCoordinator extends Coordinator {
private static final Logger LOG = LogManager.getLogger(NereidsSqlCoordinator.class);

private final SqlCoordinatorContext coordinatorContext;
protected final SqlCoordinatorContext coordinatorContext;

private volatile SqlPipelineTask executionTask;
protected volatile SqlPipelineTask executionTask;

public NereidsSqlCoordinator(ConnectContext context, Analyzer analyzer,
NereidsPlanner planner, StatsErrorEstimator statsErrorEstimator) {
super(context, analyzer, planner, statsErrorEstimator);

this.coordinatorContext = SqlCoordinatorContext.build(planner, this);
this.coordinatorContext = SqlCoordinatorContext.buildForSql(planner, this);
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));

Preconditions.checkState(!planner.getFragments().isEmpty()
&& coordinatorContext.instanceNum > 0, "Fragment and Instance can not be empty˚");
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
}

public NereidsSqlCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<PipelineDistributedPlan> distributedPlans,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance,
boolean enableProfile) {
super(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile);
this.coordinatorContext = SqlCoordinatorContext.buildForLoad(
this, jobId, queryId, fragments, distributedPlans, scanNodes,
descTable, timezone, loadZeroTolerance, enableProfile
);
}

@Override
public void exec() throws Exception {
coordinatorContext.updateProfileIfPresent(SummaryProfile::setAssignFragmentTime);

enqueue(coordinatorContext.connectContext);

processTopSink(coordinatorContext, coordinatorContext.planner);
processTopSink(coordinatorContext, coordinatorContext.topDistributedPlan);

QeProcessorImpl.INSTANCE.registerInstances(coordinatorContext.queryId, coordinatorContext.instanceNum);
QeProcessorImpl.INSTANCE.registerInstances(coordinatorContext.queryId, coordinatorContext.instanceNum.get());

Map<DistributedPlanWorker, TPipelineFragmentParamsList> workerToFragments
= ThriftPlansBuilder.plansToThrift(coordinatorContext);
Expand All @@ -110,14 +119,14 @@ public void exec() throws Exception {

@Override
public boolean isTimeout() {
return System.currentTimeMillis() > coordinatorContext.timeoutDeadline;
return System.currentTimeMillis() > coordinatorContext.timeoutDeadline.get();
}

@Override
public void cancel(Status cancelReason) {
coordinatorContext.getQueueToken().ifPresent(QueueToken::cancel);

for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) {
for (ScanNode scanNode : coordinatorContext.scanNodes) {
scanNode.stop();
}

Expand Down Expand Up @@ -326,7 +335,7 @@ public List<FragmentInstanceInfo> getFragmentInstanceInfos() {

@Override
public List<PlanFragment> getFragments() {
return coordinatorContext.planner.getFragments();
return coordinatorContext.fragments;
}

@Override
Expand Down Expand Up @@ -389,7 +398,7 @@ public void close() {
}

try {
for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) {
for (ScanNode scanNode : coordinatorContext.scanNodes) {
scanNode.stop();
}
} catch (Throwable t) {
Expand All @@ -401,9 +410,8 @@ protected void cancelInternal(Status cancelReason) {
coordinatorContext.withLock(() -> coordinatorContext.getJobProcessor().cancel(cancelReason));
}

private void processTopSink(SqlCoordinatorContext coordinatorContext, NereidsPlanner nereidsPlanner)
throws AnalysisException {
PipelineDistributedPlan topPlan = (PipelineDistributedPlan) nereidsPlanner.getDistributedPlans().last();
protected void processTopSink(
SqlCoordinatorContext coordinatorContext, PipelineDistributedPlan topPlan) throws AnalysisException {
setForArrowFlight(coordinatorContext, topPlan);
setForBroker(coordinatorContext, topPlan);
}
Expand Down Expand Up @@ -474,7 +482,7 @@ private boolean shouldQueue(ConnectContext context) {
return false;
}
// a query with ScanNode need not queue only when all its scan node is SchemaScanNode
for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) {
for (ScanNode scanNode : coordinatorContext.scanNodes) {
if (!(scanNode instanceof SchemaScanNode)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3373,11 +3373,11 @@ public static boolean canUseNereidsDistributePlanner() {
}
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return false;
return true;
}
StatementContext statementContext = connectContext.getStatementContext();
if (statementContext == null) {
return false;
return true;
}
StatementBase parsedStatement = statementContext.getParsedStatement();
if (!(parsedStatement instanceof LogicalPlanAdapter)) {
Expand Down
Loading

0 comments on commit 98ab59f

Please sign in to comment.