From dfa413335f54b72839f1fc47bb1100d3b8f85bf7 Mon Sep 17 00:00:00 2001 From: caiconghui <55968745+caiconghui@users.noreply.github.com> Date: Wed, 16 Dec 2020 23:38:04 +0800 Subject: [PATCH] [Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027) This PR is to support fe master get fe heartbeat response by thrift protocol instead of http protocol. --- .../org/apache/doris/common/ClientPool.java | 6 +- .../java/org/apache/doris/common/Config.java | 9 ++ .../doris/service/FrontendServiceImpl.java | 38 +++++++++ .../org/apache/doris/system/HeartbeatMgr.java | 58 +++++++++---- .../apache/doris/system/HeartbeatMgrTest.java | 83 ++++++++++++++++++- gensrc/thrift/FrontendService.thrift | 23 ++++- 6 files changed, 197 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java index 0f0ac40b95..62b3d12dcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java @@ -62,8 +62,10 @@ public class ClientPool { brokerPoolConfig.setMaxWaitMillis(500); // wait for the connection } - public static GenericPool heartbeatPool = - new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs); + public static GenericPool backendHeartbeatPool = + new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs); + public static GenericPool frontendHeartbeatPool = + new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs); public static GenericPool frontendPool = new GenericPool("FrontendService", backendConfig, backendTimeoutMs); public static GenericPool backendPool = diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 656099578a..83e5237888 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1301,4 +1301,13 @@ public class Config extends ConfigBase { */ @ConfField public static boolean enable_alpha_rowset = false; + + /** + * This config is used to solve fe heartbeat response read_timeout problem, + * When config is set to be true, master will get fe heartbeat response by thrift protocol + * instead of http protocol. In order to maintain compatibility with the old version, + * the default is false, and the configuration cannot be changed to true until all fe are upgraded. + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean enable_fe_heartbeat_by_thrift = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ca93063597..7cd1c64cc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.ThriftServerContext; import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; +import org.apache.doris.common.Version; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.LoadJob; import org.apache.doris.load.MiniEtlTaskInfo; @@ -64,6 +65,9 @@ import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFeResult; import org.apache.doris.thrift.TFetchResourceResult; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TFrontendPingFrontendRequest; +import org.apache.doris.thrift.TFrontendPingFrontendResult; +import org.apache.doris.thrift.TFrontendPingFrontendStatusCode; import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetTablesParams; @@ -959,6 +963,40 @@ public class FrontendServiceImpl implements FrontendService.Iface { return new TStatus(TStatusCode.CANCELLED); } + @Override + public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException { + boolean isReady = Catalog.getCurrentCatalog().isReady(); + TFrontendPingFrontendResult result = new TFrontendPingFrontendResult(); + result.setStatus(TFrontendPingFrontendStatusCode.OK); + if (isReady) { + if (request.getClusterId() != Catalog.getCurrentCatalog().getClusterId()) { + result.setStatus(TFrontendPingFrontendStatusCode.FAILED); + result.setMsg("invalid cluster id: " + Catalog.getCurrentCatalog().getClusterId()); + } + + if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { + if (!request.getToken().equals(Catalog.getCurrentCatalog().getToken())) { + result.setStatus(TFrontendPingFrontendStatusCode.FAILED); + result.setMsg("invalid token: " + Catalog.getCurrentCatalog().getToken()); + } + } + + if (result.status == TFrontendPingFrontendStatusCode.OK) { + // cluster id and token are valid, return replayed journal id + long replayedJournalId = Catalog.getCurrentCatalog().getReplayedJournalId(); + result.setMsg("success"); + result.setReplayedJournalId(replayedJournalId); + result.setQueryPort(Config.query_port); + result.setRpcPort(Config.rpc_port); + result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); + } + } else { + result.setStatus(TFrontendPingFrontendStatusCode.FAILED); + result.setMsg("not ready"); + } + return result; + } + private TNetworkAddress getClientAddr() { ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); // For NonBlockingServer, we can not get client ip. diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index cd5da57c8a..3b197b8d45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -30,12 +30,16 @@ import org.apache.doris.http.rest.BootstrapFinishAction; import org.apache.doris.persist.HbPackage; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.HeartbeatService; import org.apache.doris.thrift.TBackendInfo; import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPingBrokerRequest; import org.apache.doris.thrift.TBrokerVersion; +import org.apache.doris.thrift.TFrontendPingFrontendRequest; +import org.apache.doris.thrift.TFrontendPingFrontendResult; +import org.apache.doris.thrift.TFrontendPingFrontendStatusCode; import org.apache.doris.thrift.THeartbeatResult; import org.apache.doris.thrift.TMasterInfo; import org.apache.doris.thrift.TNetworkAddress; @@ -106,11 +110,7 @@ public class HeartbeatMgr extends MasterDaemon { // send frontend heartbeat List frontends = Catalog.getCurrentCatalog().getFrontends(null); - String masterFeNodeName = ""; for (Frontend frontend : frontends) { - if (frontend.getHost().equals(masterInfo.get().getNetworkAddress().getHostname())) { - masterFeNodeName = frontend.getNodeName(); - } FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(frontend, Catalog.getCurrentCatalog().getClusterId(), Catalog.getCurrentCatalog().getToken()); @@ -151,12 +151,6 @@ public class HeartbeatMgr extends MasterDaemon { } } // end for all results - // we also add a 'mocked' master Frontends heartbeat response to synchronize master info to other Frontends. - hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName, - Config.query_port, Config.rpc_port, Catalog.getCurrentCatalog().getEditLog().getMaxJournalId(), - System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH)); - - // write edit log Catalog.getCurrentCatalog().getEditLog().logHeartbeat(hbPackage); } @@ -221,7 +215,7 @@ public class HeartbeatMgr extends MasterDaemon { TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort()); boolean ok = false; try { - client = ClientPool.heartbeatPool.borrowObject(beAddr); + client = ClientPool.backendHeartbeatPool.borrowObject(beAddr); TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get()); copiedMasterInfo.setBackendIp(backend.getHost()); @@ -256,9 +250,9 @@ public class HeartbeatMgr extends MasterDaemon { Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage()); } finally { if (ok) { - ClientPool.heartbeatPool.returnObject(beAddr, client); + ClientPool.backendHeartbeatPool.returnObject(beAddr, client); } else { - ClientPool.heartbeatPool.invalidateObject(beAddr, client); + ClientPool.backendHeartbeatPool.invalidateObject(beAddr, client); } } } @@ -282,13 +276,20 @@ public class HeartbeatMgr extends MasterDaemon { // heartbeat to self if (Catalog.getCurrentCatalog().isReady()) { return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port, - Catalog.getCurrentCatalog().getReplayedJournalId(), System.currentTimeMillis(), + Catalog.getCurrentCatalog().getMaxJournalId(), System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); } else { return new FrontendHbResponse(fe.getNodeName(), "not ready"); } } + if (Config.enable_fe_heartbeat_by_thrift) { + return getHeartbeatResponseByThrift(); + } else { + return getHeartbeatResponseByHttp(); + } + } + private HeartbeatResponse getHeartbeatResponseByHttp() { String url = "http://" + fe.getHost() + ":" + Config.http_port + "/api/bootstrap?cluster_id=" + clusterId + "&token=" + token; try { @@ -338,6 +339,35 @@ public class HeartbeatMgr extends MasterDaemon { Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage()); } } + + private HeartbeatResponse getHeartbeatResponseByThrift() { + FrontendService.Client client = null; + TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort()); + boolean ok = false; + try { + client = ClientPool.frontendHeartbeatPool.borrowObject(addr); + TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token); + TFrontendPingFrontendResult result = client.ping(request); + ok = true; + if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { + return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(), + result.getRpcPort(), result.getReplayedJournalId(), + System.currentTimeMillis(), result.getVersion()); + + } else { + return new FrontendHbResponse(fe.getNodeName(), result.getMsg()); + } + } catch (Exception e) { + return new FrontendHbResponse(fe.getNodeName(), + Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage()); + } finally { + if (ok) { + ClientPool.frontendHeartbeatPool.returnObject(addr, client); + } else { + ClientPool.frontendHeartbeatPool.invalidateObject(addr, client); + } + } + } } // broker heartbeat handler diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java index bb2aaa010e..d1089d0500 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java @@ -20,6 +20,7 @@ package org.apache.doris.system; import mockit.Expectations; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; +import org.apache.doris.common.Config; import org.apache.doris.common.GenericPool; import org.apache.doris.common.Pair; import org.apache.doris.common.util.Util; @@ -27,12 +28,17 @@ import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler; import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPingBrokerRequest; +import org.apache.doris.thrift.TFrontendPingFrontendRequest; +import org.apache.doris.thrift.TFrontendPingFrontendResult; +import org.apache.doris.thrift.TFrontendPingFrontendStatusCode; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; +import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -67,7 +73,7 @@ public class HeartbeatMgrTest { } @Test - public void testFrontendHbHandler() { + public void testFrontendHbHandlerWithHttp() { new MockUp() { @Mock public String getResultForUrl(String urlStr, String encodedAuthInfo, @@ -90,11 +96,11 @@ public class HeartbeatMgrTest { } } }; - + Config.enable_fe_heartbeat_by_thrift = false; + System.out.println(" config " + Config.enable_fe_heartbeat_by_thrift); Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010); FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd"); HeartbeatResponse response = handler.call(); - Assert.assertTrue(response instanceof FrontendHbResponse); FrontendHbResponse hbResponse = (FrontendHbResponse) response; Assert.assertEquals(191224, hbResponse.getReplayedJournalId()); @@ -113,7 +119,78 @@ public class HeartbeatMgrTest { Assert.assertEquals(0, hbResponse.getQueryPort()); Assert.assertEquals(0, hbResponse.getRpcPort()); Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus()); + } + @Test + public void testFrontendHbHandlerWithThirft(@Mocked FrontendService.Client client) throws TException { + new MockUp>() { + @Mock + public FrontendService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, FrontendService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, FrontendService.Client object) { + return; + } + }; + + TFrontendPingFrontendRequest normalRequest = new TFrontendPingFrontendRequest(12345, "abcd"); + TFrontendPingFrontendResult normalResult = new TFrontendPingFrontendResult(); + normalResult.setStatus(TFrontendPingFrontendStatusCode.OK); + normalResult.setMsg("success"); + normalResult.setReplayedJournalId(191224); + normalResult.setQueryPort(9131); + normalResult.setRpcPort(9121); + normalResult.setVersion("test"); + + TFrontendPingFrontendRequest badRequest = new TFrontendPingFrontendRequest(12345, "abcde"); + TFrontendPingFrontendResult badResult = new TFrontendPingFrontendResult(); + badResult.setStatus(TFrontendPingFrontendStatusCode.FAILED); + badResult.setMsg("not ready"); + + new Expectations() { + { + client.ping(normalRequest); + minTimes = 0; + result = normalResult; + + client.ping(badRequest); + minTimes = 0; + result = badResult; + } + }; + + Config.enable_fe_heartbeat_by_thrift = true; + + Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010); + FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd"); + HeartbeatResponse response = handler.call(); + + Assert.assertTrue(response instanceof FrontendHbResponse); + FrontendHbResponse hbResponse = (FrontendHbResponse) response; + Assert.assertEquals(191224, hbResponse.getReplayedJournalId()); + Assert.assertEquals(9131, hbResponse.getQueryPort()); + Assert.assertEquals(9121, hbResponse.getRpcPort()); + Assert.assertEquals(HbStatus.OK, hbResponse.getStatus()); + Assert.assertEquals("test", hbResponse.getVersion()); + + Frontend fe2 = new Frontend(FrontendNodeType.FOLLOWER, "test2", "192.168.1.2", 9010); + handler = new FrontendHeartbeatHandler(fe2, 12345, "abcde"); + response = handler.call(); + + Assert.assertTrue(response instanceof FrontendHbResponse); + hbResponse = (FrontendHbResponse) response; + Assert.assertEquals(0, hbResponse.getReplayedJournalId()); + Assert.assertEquals(0, hbResponse.getQueryPort()); + Assert.assertEquals(0, hbResponse.getRpcPort()); + Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus()); + Assert.assertEquals("not ready", hbResponse.getMsg()); } @Test diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 445d0279ee..35182f56be 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -657,6 +657,25 @@ struct TSnapshotLoaderReportRequest { 5: optional i32 total_num } +enum TFrontendPingFrontendStatusCode { + OK = 0, + FAILED = 1 +} + +struct TFrontendPingFrontendRequest { + 1: required i32 clusterId + 2: required string token +} + +struct TFrontendPingFrontendResult { + 1: required TFrontendPingFrontendStatusCode status + 2: required string msg + 3: required i32 queryPort + 4: required i32 rpcPort + 5: required i64 replayedJournalId + 6: required string version +} + service FrontendService { TGetDbsResult getDbNames(1:TGetDbsParams params) TGetTablesResult getTableNames(1:TGetTablesParams params) @@ -692,4 +711,6 @@ service FrontendService { TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request) Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) -} + + TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request) +} \ No newline at end of file