Skip to content

Commit

Permalink
Pipe/Subscription: avoid executing rollbackFromValidateTask or `rol…
Browse files Browse the repository at this point in the history
…lbackFromValidate` multiple times in retry with rollback scenarios (apache#13825)
  • Loading branch information
VGalaxies committed Oct 25, 2024
1 parent ad1e7e1 commit dfc12fd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public abstract class AbstractOperatePipeProcedureV2
// Only used in rollback to reduce the number of network calls
protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;

// Only used in rollback to avoid executing rollbackFromValidateTask multiple times
// Pure in-memory object, not involved in snapshot serialization and deserialization.
// TODO: consider serializing this variable later
protected boolean isRollbackFromValidateTaskSuccessful = false;

// This variable should not be serialized into procedure store,
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
Expand Down Expand Up @@ -298,10 +303,13 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st

switch (state) {
case VALIDATE_TASK:
try {
rollbackFromValidateTask(env);
} catch (Exception e) {
LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", getProcId(), e);
if (!isRollbackFromValidateTaskSuccessful) {
try {
rollbackFromValidateTask(env);
isRollbackFromValidateTaskSuccessful = true;
} catch (Exception e) {
LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", getProcId(), e);
}
}
break;
case CALCULATE_INFO_FOR_TASK:
Expand Down Expand Up @@ -330,7 +338,7 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st
break;
case OPERATE_ON_DATA_NODES:
try {
// We have to make sure that rollbackFromOperateOnDataNodes is executed before
// We have to make sure that rollbackFromOperateOnDataNodes is executed after
// rollbackFromWriteConfigNodeConsensus, because rollbackFromOperateOnDataNodes is
// executed based on the consensus of config nodes that is written by
// rollbackFromWriteConfigNodeConsensus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public abstract class AbstractOperateSubscriptionProcedure

private static final int RETRY_THRESHOLD = 1;

// Only used in rollback to avoid executing rollbackFromValidate multiple times
// Pure in-memory object, not involved in snapshot serialization and deserialization.
// TODO: consider serializing this variable later
protected boolean isRollbackFromValidateSuccessful = false;

protected AtomicReference<SubscriptionInfo> subscriptionInfo;

protected AtomicReference<SubscriptionInfo> acquireLockInternal(
Expand Down Expand Up @@ -250,15 +255,18 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperateSubscriptionStat

switch (state) {
case VALIDATE:
try {
rollbackFromValidate(env);
} catch (Exception e) {
LOGGER.warn(
"ProcedureId {}: Failed to rollback from state [{}], because {}",
getProcId(),
state,
e.getMessage(),
e);
if (!isRollbackFromValidateSuccessful) {
try {
rollbackFromValidate(env);
isRollbackFromValidateSuccessful = true;
} catch (Exception e) {
LOGGER.warn(
"ProcedureId {}: Failed to rollback from state [{}], because {}",
getProcId(),
state,
e.getMessage(),
e);
}
}
break;
case OPERATE_ON_CONFIG_NODES:
Expand Down

0 comments on commit dfc12fd

Please sign in to comment.