Skip to content

Commit

Permalink
[fix](task) Abort creating replica task if sending RPC failed (#42276)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Oct 24, 2024
1 parent 152cc2c commit 84370ff
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ protected void createShadowIndexReplica() throws AlterCancelException {
ok = false;
}

if (!ok) {
if (!ok || !countDownLatch.getStatus().ok()) {
// create replicas failed. just cancel the job
// clear tasks and show the failed replicas to user
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
Expand Down
11 changes: 8 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ private void checkAndPrepareMeta() {
ok = true;
}

if (ok) {
if (ok && latch.getStatus().ok()) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}", this);
}
Expand Down Expand Up @@ -1043,8 +1043,13 @@ private void checkAndPrepareMeta() {
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
status = new Status(ErrCode.COMMON_ERROR,
"Failed to create replicas for restore. unfinished marks: " + idStr);
String reason = "TIMEDOUT";
if (!latch.getStatus().ok()) {
reason = latch.getStatus().getErrorMsg();
}
String errMsg = String.format(
"Failed to create replicas for restore: %s, unfinished marks: %s", reason, idStr);
status = new Status(ErrCode.COMMON_ERROR, errMsg);
return;
}
LOG.info("finished to prepare meta. {}", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ public synchronized boolean markedCountDown(K key, V value) {
return false;
}

public synchronized boolean markedCountDownWithStatus(K key, V value, Status status) {
// update status first before countDown.
// so that the waiting thread will get the correct status.
if (st.ok()) {
st = status;
}

if (marks.remove(key, value)) {
super.countDown();
return true;
}
return false;
}

public synchronized List<Entry<K, V>> getLeftMarks() {
return Lists.newArrayList(marks.entries());
}
Expand Down
20 changes: 10 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ public void run() {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
String errMsg = "";
try {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
errMsg = String.format("backend %d is not alive", backendId);
continue;
}
List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
Expand All @@ -169,30 +171,28 @@ public void run() {
client = ClientPool.backendPool.borrowObject(address);
List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
try {
agentTaskRequests.add(toAgentTaskRequest(task));
} catch (Exception e) {
task.failed();
throw e;
}
agentTaskRequests.add(toAgentTaskRequest(task));
}
client.submitTasks(agentTaskRequests);
if (LOG.isDebugEnabled()) {
for (AgentTask task : tasks) {
if (LOG.isDebugEnabled()) {
LOG.debug("send task: type[{}], backend[{}], signature[{}]",
task.getTaskType(), backendId, task.getSignature());
}
LOG.debug("send task: type[{}], backend[{}], signature[{}]",
task.getTaskType(), backendId, task.getSignature());
}
}
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
for (AgentTask task : tasks) {
task.failedWithMsg(errMsg);
}
}
}
} // end for backend
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public void failed() {
++this.failedTimes;
}

public void failedWithMsg(String errMsg) {
failed();
}

public int getFailedTimes() {
return this.failedTimes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ public void countDownToZero(String errMsg) {
}
}

@Override
public void failedWithMsg(String errMsg) {
super.failedWithMsg(errMsg);

// CreateReplicaTask will not trigger a retry in ReportTask. Therefore, it needs to
// be marked as failed here and all threads waiting for the result of
// CreateReplicaTask need to be awakened.
if (this.latch != null) {
Status s = new Status(TStatusCode.CANCELLED, errMsg);
latch.markedCountDownWithStatus(getBackendId(), getTabletId(), s);
if (LOG.isDebugEnabled()) {
LOG.debug("CreateReplicaTask failed with msg: {}, tablet: {}, backend: {}",
errMsg, getTabletId(), getBackendId());
}
}
}

public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
this.latch = latch;
}
Expand Down

0 comments on commit 84370ff

Please sign in to comment.