Follower don't forward non-query statement to master repeatedly (#5160)
Co-authored-by: lanhuajian <lanhuajian@sankuai.com>
This commit is contained in:
@ -89,7 +89,7 @@ public class SystemAction extends WebBaseAction {
|
||||
// forward to master
|
||||
String showProcStmt = "SHOW PROC \"" + procPath + "\"";
|
||||
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(new OriginStatement(showProcStmt, 0),
|
||||
ConnectContext.get(), RedirectStatus.FORWARD_NO_SYNC);
|
||||
ConnectContext.get(), RedirectStatus.FORWARD_NO_SYNC, true);
|
||||
try {
|
||||
masterOpExecutor.execute();
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -79,7 +79,7 @@ public class ShowProcAction extends RestBaseAction {
|
||||
context.setQualifiedUser(ConnectContext.get().getQualifiedUser());
|
||||
context.setRemoteIP(ConnectContext.get().getRemoteIP());
|
||||
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(new OriginStatement(showProcStmt, 0), context,
|
||||
RedirectStatus.FORWARD_NO_SYNC);
|
||||
RedirectStatus.FORWARD_NO_SYNC, true);
|
||||
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
|
||||
|
||||
try {
|
||||
|
||||
@ -104,7 +104,7 @@ public class SystemController extends BaseController {
|
||||
String showProcStmt = "SHOW PROC \"" + procPath + "\"";
|
||||
|
||||
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(new OriginStatement(showProcStmt, 0),
|
||||
ConnectContext.get(), RedirectStatus.FORWARD_NO_SYNC);
|
||||
ConnectContext.get(), RedirectStatus.FORWARD_NO_SYNC, true);
|
||||
try {
|
||||
masterOpExecutor.execute();
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -43,7 +43,9 @@ public class MasterOpExecutor {
|
||||
// the total time of thrift connectTime add readTime and writeTime
|
||||
private int thriftTimeoutMs;
|
||||
|
||||
public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status) {
|
||||
private boolean shouldNotRetry;
|
||||
|
||||
public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) {
|
||||
this.originStmt = originStmt;
|
||||
this.ctx = ctx;
|
||||
if (status.isNeedToWaitJournalSync()) {
|
||||
@ -52,6 +54,8 @@ public class MasterOpExecutor {
|
||||
this.waitTimeoutMs = 0;
|
||||
}
|
||||
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
|
||||
// if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency
|
||||
this.shouldNotRetry = !isQuery;
|
||||
}
|
||||
|
||||
public void execute() throws Exception {
|
||||
@ -110,7 +114,7 @@ public class MasterOpExecutor {
|
||||
if (!ok) {
|
||||
throw e;
|
||||
}
|
||||
if (e.getType() == TTransportException.TIMED_OUT) {
|
||||
if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.warn("Forward statement "+ ctx.getStmtId() +" to Master " + thriftAddress + " twice", e);
|
||||
|
||||
@ -385,7 +385,8 @@ public class StmtExecutor {
|
||||
}
|
||||
|
||||
private void forwardToMaster() throws Exception {
|
||||
masterOpExecutor = new MasterOpExecutor(originStmt, context, redirectStatus);
|
||||
boolean isQuery = parsedStmt instanceof QueryStmt;
|
||||
masterOpExecutor = new MasterOpExecutor(originStmt, context, redirectStatus, isQuery);
|
||||
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
|
||||
masterOpExecutor.execute();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user