[Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027)

This PR is to support fe master get fe heartbeat response by thrift protocol instead of http protocol.
This commit is contained in:
caiconghui
2020-12-16 23:38:04 +08:00
committed by GitHub
parent 650536d53e
commit dfa413335f
6 changed files with 197 additions and 20 deletions

View File

@ -62,8 +62,10 @@ public class ClientPool {
brokerPoolConfig.setMaxWaitMillis(500); // wait for the connection
}
public static GenericPool<HeartbeatService.Client> heartbeatPool =
new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
public static GenericPool<HeartbeatService.Client> backendHeartbeatPool =
new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs);
public static GenericPool<FrontendService.Client> frontendPool =
new GenericPool("FrontendService", backendConfig, backendTimeoutMs);
public static GenericPool<BackendService.Client> backendPool =

View File

@ -1301,4 +1301,13 @@ public class Config extends ConfigBase {
*/
@ConfField
public static boolean enable_alpha_rowset = false;
/**
* This config is used to solve fe heartbeat response read_timeout problem,
* When config is set to be true, master will get fe heartbeat response by thrift protocol
* instead of http protocol. In order to maintain compatibility with the old version,
* the default is false, and the configuration cannot be changed to true until all fe are upgraded.
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_fe_heartbeat_by_thrift = false;
}

View File

@ -37,6 +37,7 @@ import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
@ -64,6 +65,9 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFeResult;
import org.apache.doris.thrift.TFetchResourceResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetTablesParams;
@ -959,6 +963,40 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return new TStatus(TStatusCode.CANCELLED);
}
@Override
public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException {
boolean isReady = Catalog.getCurrentCatalog().isReady();
TFrontendPingFrontendResult result = new TFrontendPingFrontendResult();
result.setStatus(TFrontendPingFrontendStatusCode.OK);
if (isReady) {
if (request.getClusterId() != Catalog.getCurrentCatalog().getClusterId()) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("invalid cluster id: " + Catalog.getCurrentCatalog().getClusterId());
}
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
if (!request.getToken().equals(Catalog.getCurrentCatalog().getToken())) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("invalid token: " + Catalog.getCurrentCatalog().getToken());
}
}
if (result.status == TFrontendPingFrontendStatusCode.OK) {
// cluster id and token are valid, return replayed journal id
long replayedJournalId = Catalog.getCurrentCatalog().getReplayedJournalId();
result.setMsg("success");
result.setReplayedJournalId(replayedJournalId);
result.setQueryPort(Config.query_port);
result.setRpcPort(Config.rpc_port);
result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
}
} else {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("not ready");
}
return result;
}
private TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.

View File

@ -30,12 +30,16 @@ import org.apache.doris.http.rest.BootstrapFinishAction;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TBackendInfo;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
@ -106,11 +110,7 @@ public class HeartbeatMgr extends MasterDaemon {
// send frontend heartbeat
List<Frontend> frontends = Catalog.getCurrentCatalog().getFrontends(null);
String masterFeNodeName = "";
for (Frontend frontend : frontends) {
if (frontend.getHost().equals(masterInfo.get().getNetworkAddress().getHostname())) {
masterFeNodeName = frontend.getNodeName();
}
FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(frontend,
Catalog.getCurrentCatalog().getClusterId(),
Catalog.getCurrentCatalog().getToken());
@ -151,12 +151,6 @@ public class HeartbeatMgr extends MasterDaemon {
}
} // end for all results
// we also add a 'mocked' master Frontends heartbeat response to synchronize master info to other Frontends.
hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName,
Config.query_port, Config.rpc_port, Catalog.getCurrentCatalog().getEditLog().getMaxJournalId(),
System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH));
// write edit log
Catalog.getCurrentCatalog().getEditLog().logHeartbeat(hbPackage);
}
@ -221,7 +215,7 @@ public class HeartbeatMgr extends MasterDaemon {
TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort());
boolean ok = false;
try {
client = ClientPool.heartbeatPool.borrowObject(beAddr);
client = ClientPool.backendHeartbeatPool.borrowObject(beAddr);
TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get());
copiedMasterInfo.setBackendIp(backend.getHost());
@ -256,9 +250,9 @@ public class HeartbeatMgr extends MasterDaemon {
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (ok) {
ClientPool.heartbeatPool.returnObject(beAddr, client);
ClientPool.backendHeartbeatPool.returnObject(beAddr, client);
} else {
ClientPool.heartbeatPool.invalidateObject(beAddr, client);
ClientPool.backendHeartbeatPool.invalidateObject(beAddr, client);
}
}
}
@ -282,13 +276,20 @@ public class HeartbeatMgr extends MasterDaemon {
// heartbeat to self
if (Catalog.getCurrentCatalog().isReady()) {
return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
Catalog.getCurrentCatalog().getReplayedJournalId(), System.currentTimeMillis(),
Catalog.getCurrentCatalog().getMaxJournalId(), System.currentTimeMillis(),
Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
} else {
return new FrontendHbResponse(fe.getNodeName(), "not ready");
}
}
if (Config.enable_fe_heartbeat_by_thrift) {
return getHeartbeatResponseByThrift();
} else {
return getHeartbeatResponseByHttp();
}
}
private HeartbeatResponse getHeartbeatResponseByHttp() {
String url = "http://" + fe.getHost() + ":" + Config.http_port
+ "/api/bootstrap?cluster_id=" + clusterId + "&token=" + token;
try {
@ -338,6 +339,35 @@ public class HeartbeatMgr extends MasterDaemon {
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
}
}
private HeartbeatResponse getHeartbeatResponseByThrift() {
FrontendService.Client client = null;
TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
boolean ok = false;
try {
client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token);
TFrontendPingFrontendResult result = client.ping(request);
ok = true;
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(),
result.getRpcPort(), result.getReplayedJournalId(),
System.currentTimeMillis(), result.getVersion());
} else {
return new FrontendHbResponse(fe.getNodeName(), result.getMsg());
}
} catch (Exception e) {
return new FrontendHbResponse(fe.getNodeName(),
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (ok) {
ClientPool.frontendHeartbeatPool.returnObject(addr, client);
} else {
ClientPool.frontendHeartbeatPool.invalidateObject(addr, client);
}
}
}
}
// broker heartbeat handler

View File

@ -20,6 +20,7 @@ package org.apache.doris.system;
import mockit.Expectations;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.Config;
import org.apache.doris.common.GenericPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Util;
@ -27,12 +28,17 @@ import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler;
import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -67,7 +73,7 @@ public class HeartbeatMgrTest {
}
@Test
public void testFrontendHbHandler() {
public void testFrontendHbHandlerWithHttp() {
new MockUp<Util>() {
@Mock
public String getResultForUrl(String urlStr, String encodedAuthInfo,
@ -90,11 +96,11 @@ public class HeartbeatMgrTest {
}
}
};
Config.enable_fe_heartbeat_by_thrift = false;
System.out.println(" config " + Config.enable_fe_heartbeat_by_thrift);
Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010);
FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd");
HeartbeatResponse response = handler.call();
Assert.assertTrue(response instanceof FrontendHbResponse);
FrontendHbResponse hbResponse = (FrontendHbResponse) response;
Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
@ -113,7 +119,78 @@ public class HeartbeatMgrTest {
Assert.assertEquals(0, hbResponse.getQueryPort());
Assert.assertEquals(0, hbResponse.getRpcPort());
Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
}
@Test
public void testFrontendHbHandlerWithThirft(@Mocked FrontendService.Client client) throws TException {
new MockUp<GenericPool<FrontendService.Client>>() {
@Mock
public FrontendService.Client borrowObject(TNetworkAddress address) throws Exception {
return client;
}
@Mock
public void returnObject(TNetworkAddress address, FrontendService.Client object) {
return;
}
@Mock
public void invalidateObject(TNetworkAddress address, FrontendService.Client object) {
return;
}
};
TFrontendPingFrontendRequest normalRequest = new TFrontendPingFrontendRequest(12345, "abcd");
TFrontendPingFrontendResult normalResult = new TFrontendPingFrontendResult();
normalResult.setStatus(TFrontendPingFrontendStatusCode.OK);
normalResult.setMsg("success");
normalResult.setReplayedJournalId(191224);
normalResult.setQueryPort(9131);
normalResult.setRpcPort(9121);
normalResult.setVersion("test");
TFrontendPingFrontendRequest badRequest = new TFrontendPingFrontendRequest(12345, "abcde");
TFrontendPingFrontendResult badResult = new TFrontendPingFrontendResult();
badResult.setStatus(TFrontendPingFrontendStatusCode.FAILED);
badResult.setMsg("not ready");
new Expectations() {
{
client.ping(normalRequest);
minTimes = 0;
result = normalResult;
client.ping(badRequest);
minTimes = 0;
result = badResult;
}
};
Config.enable_fe_heartbeat_by_thrift = true;
Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010);
FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd");
HeartbeatResponse response = handler.call();
Assert.assertTrue(response instanceof FrontendHbResponse);
FrontendHbResponse hbResponse = (FrontendHbResponse) response;
Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
Assert.assertEquals(9131, hbResponse.getQueryPort());
Assert.assertEquals(9121, hbResponse.getRpcPort());
Assert.assertEquals(HbStatus.OK, hbResponse.getStatus());
Assert.assertEquals("test", hbResponse.getVersion());
Frontend fe2 = new Frontend(FrontendNodeType.FOLLOWER, "test2", "192.168.1.2", 9010);
handler = new FrontendHeartbeatHandler(fe2, 12345, "abcde");
response = handler.call();
Assert.assertTrue(response instanceof FrontendHbResponse);
hbResponse = (FrontendHbResponse) response;
Assert.assertEquals(0, hbResponse.getReplayedJournalId());
Assert.assertEquals(0, hbResponse.getQueryPort());
Assert.assertEquals(0, hbResponse.getRpcPort());
Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
Assert.assertEquals("not ready", hbResponse.getMsg());
}
@Test