[feature](thrift) Add FE thrift rpc redirect master address (#25371)
Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
@ -260,6 +260,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
private ConcurrentHashMap<Long, Integer> multiTableFragmentInstanceIdIndexMap =
|
||||
new ConcurrentHashMap<>(64);
|
||||
|
||||
private static TNetworkAddress getMasterAddress() {
|
||||
Env env = Env.getCurrentEnv();
|
||||
String masterHost = env.getMasterHost();
|
||||
int masterRpcPort = env.getMasterRpcPort();
|
||||
return new TNetworkAddress(masterHost, masterRpcPort);
|
||||
}
|
||||
|
||||
public FrontendServiceImpl(ExecuteEnv exeEnv) {
|
||||
masterImpl = new MasterImpl();
|
||||
this.exeEnv = exeEnv;
|
||||
@ -1221,6 +1228,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
status.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
result.setMasterAddress(getMasterAddress());
|
||||
LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
@ -1611,6 +1619,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
status.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
result.setMasterAddress(getMasterAddress());
|
||||
LOG.error("failed to get commitTxn: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
@ -1798,6 +1807,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
status.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
result.setMasterAddress(getMasterAddress());
|
||||
LOG.error("failed to get rollbackTxn: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
@ -2801,6 +2811,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
status.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
result.setMasterAddress(getMasterAddress());
|
||||
LOG.error("failed to get getSnapshot: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
@ -2887,6 +2898,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
status.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
result.setMasterAddress(getMasterAddress());
|
||||
LOG.error("failed to get restoreSnapshot: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
@ -3005,6 +3017,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
status.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
result.setMasterAddress(getMasterAddress());
|
||||
LOG.error("failed to get getMasterToken: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -569,6 +569,7 @@ struct TBeginTxnResult {
|
||||
2: optional i64 txn_id
|
||||
3: optional string job_status // if label already used, set status of existing job
|
||||
4: optional i64 db_id
|
||||
5: optional Types.TNetworkAddress master_address
|
||||
}
|
||||
|
||||
// StreamLoad request, used to load a streaming to engine
|
||||
@ -743,6 +744,7 @@ struct TCommitTxnRequest {
|
||||
|
||||
struct TCommitTxnResult {
|
||||
1: optional Status.TStatus status
|
||||
2: optional Types.TNetworkAddress master_address
|
||||
}
|
||||
|
||||
struct TLoadTxn2PCRequest {
|
||||
@ -779,6 +781,7 @@ struct TRollbackTxnRequest {
|
||||
|
||||
struct TRollbackTxnResult {
|
||||
1: optional Status.TStatus status
|
||||
2: optional Types.TNetworkAddress master_address
|
||||
}
|
||||
|
||||
struct TLoadTxnRollbackRequest {
|
||||
@ -1099,6 +1102,7 @@ struct TGetSnapshotResult {
|
||||
1: optional Status.TStatus status
|
||||
2: optional binary meta
|
||||
3: optional binary job_info
|
||||
4: optional Types.TNetworkAddress master_address
|
||||
}
|
||||
|
||||
struct TTableRef {
|
||||
@ -1123,6 +1127,7 @@ struct TRestoreSnapshotRequest {
|
||||
|
||||
struct TRestoreSnapshotResult {
|
||||
1: optional Status.TStatus status
|
||||
2: optional Types.TNetworkAddress master_address
|
||||
}
|
||||
|
||||
struct TGetMasterTokenRequest {
|
||||
@ -1134,6 +1139,7 @@ struct TGetMasterTokenRequest {
|
||||
struct TGetMasterTokenResult {
|
||||
1: optional Status.TStatus status
|
||||
2: optional string token
|
||||
3: optional Types.TNetworkAddress master_address
|
||||
}
|
||||
|
||||
typedef TGetBinlogRequest TGetBinlogLagRequest
|
||||
|
||||
Reference in New Issue
Block a user