diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp index 99d0f5d094..1d045b7189 100644 --- a/be/src/agent/utils.cpp +++ b/be/src/agent/utils.cpp @@ -127,7 +127,7 @@ AgentStatus MasterServerClient::finish_task( FrontendServiceConnection client( _client_cache, _master_info.network_address, - MASTER_CLIENT_TIMEOUT, + config::thrift_rpc_timeout_ms, &client_status); if (!client_status.ok()) { @@ -142,7 +142,7 @@ AgentStatus MasterServerClient::finish_task( client->finishTask(*result, request); } catch (TTransportException& e) { OLAP_LOG_WARNING("master client, retry finishTask: %s", e.what()); - client_status = client.reopen(MASTER_CLIENT_TIMEOUT); + client_status = client.reopen(config::thrift_rpc_timeout_ms); if (!client_status.ok()) { OLAP_LOG_WARNING("master client, get client from cache failed." @@ -172,7 +172,7 @@ AgentStatus MasterServerClient::report(const TReportRequest request, TMasterResu FrontendServiceConnection client( _client_cache, _master_info.network_address, - MASTER_CLIENT_TIMEOUT, + config::thrift_rpc_timeout_ms, &client_status); if (!client_status.ok()) { @@ -193,7 +193,7 @@ AgentStatus MasterServerClient::report(const TReportRequest request, TMasterResu // if not TIMED_OUT, retry OLAP_LOG_WARNING("master client, retry report: %s", e.what()); - client_status = client.reopen(MASTER_CLIENT_TIMEOUT); + client_status = client.reopen(config::thrift_rpc_timeout_ms); if (!client_status.ok()) { OLAP_LOG_WARNING("master client, get client from cache failed." "host: %s, port: %d, code: %d", diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h index db3d0a3407..63f0507757 100644 --- a/be/src/agent/utils.h +++ b/be/src/agent/utils.h @@ -34,8 +34,6 @@ namespace doris { -const uint32_t MASTER_CLIENT_TIMEOUT = 3000; - // client cache // All service client should be defined in client_cache.h //class MasterServiceClient; diff --git a/be/src/common/config.h b/be/src/common/config.h index fec970752f..a2dc8a16dc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -386,6 +386,12 @@ namespace config { // sync tablet_meta when modifing meta CONF_Bool(sync_tablet_meta, "false"); + + // default thrift rpc timeout ms + CONF_Int32(thrift_rpc_timeout_ms, "5000"); + + // txn commit rpc timeout + CONF_Int32(txn_commit_rpc_timeout_ms, "10000"); } // namespace config } // namespace doris diff --git a/be/src/exec/schema_scanner/frontend_helper.h b/be/src/exec/schema_scanner/frontend_helper.h index 3f86bde2e8..b13e92f26b 100644 --- a/be/src/exec/schema_scanner/frontend_helper.h +++ b/be/src/exec/schema_scanner/frontend_helper.h @@ -59,11 +59,21 @@ public: static std::string extract_db_name(const std::string& full_name); + // for default timeout + static Status rpc( + const std::string& ip, + const int32_t port, + std::function&)> callback) { + + return rpc(ip, port, callback, config::thrift_rpc_timeout_ms); + } + static Status rpc( const std::string& ip, const int32_t port, std::function&)> callback, - int timeout_ms = 5000); + int timeout_ms); + private: static ExecEnv* _s_exec_env; }; diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index ab5ccbe594..5bf151e6a7 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -94,7 +94,6 @@ const std::string LABEL_KEY = "label"; const std::string SUB_LABEL_KEY = "sub_label"; const std::string FILE_PATH_KEY = "file_path"; const char* k_100_continue = "100-continue"; -const int64_t THRIFT_RPC_TIMEOUT_MS = 3000; // 3 sec MiniLoadAction::MiniLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { @@ -156,7 +155,7 @@ Status MiniLoadAction::_load( const TNetworkAddress& master_address = _exec_env->master_info()->network_address; Status status; FrontendServiceConnection client( - _exec_env->frontend_client_cache(), master_address, THRIFT_RPC_TIMEOUT_MS, &status); + _exec_env->frontend_client_cache(), master_address, config::thrift_rpc_timeout_ms, &status); if (!status.ok()) { std::stringstream ss; ss << "Connect master failed, with address(" @@ -192,7 +191,7 @@ Status MiniLoadAction::_load( LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":" << master_address.port << ") because: " << e.what(); - status = client.reopen(THRIFT_RPC_TIMEOUT_MS); + status = client.reopen(config::thrift_rpc_timeout_ms); if (!status.ok()) { LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname << ":" << master_address.port << ")"; @@ -204,7 +203,7 @@ Status MiniLoadAction::_load( << master_address.hostname << ":" << master_address.port << ") got unknown result: " << e.what(); - status = client.reopen(THRIFT_RPC_TIMEOUT_MS); + status = client.reopen(config::thrift_rpc_timeout_ms); if (!status.ok()) { LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname << ":" << master_address.port << ")"; @@ -253,7 +252,7 @@ Status MiniLoadAction::check_auth( const TNetworkAddress& master_address = _exec_env->master_info()->network_address; Status status; FrontendServiceConnection client( - _exec_env->frontend_client_cache(), master_address, THRIFT_RPC_TIMEOUT_MS, &status); + _exec_env->frontend_client_cache(), master_address, config::thrift_rpc_timeout_ms, &status); if (!status.ok()) { std::stringstream ss; ss << "Connect master failed, with address(" @@ -270,7 +269,7 @@ Status MiniLoadAction::check_auth( LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":" << master_address.port << ") because: " << e.what(); - status = client.reopen(THRIFT_RPC_TIMEOUT_MS); + status = client.reopen(config::thrift_rpc_timeout_ms); if (!status.ok()) { LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname << ":" << master_address.port << ")"; @@ -282,7 +281,7 @@ Status MiniLoadAction::check_auth( << master_address.hostname << ":" << master_address.port << ") got unknown result: " << e.what(); - status = client.reopen(THRIFT_RPC_TIMEOUT_MS); + status = client.reopen(config::thrift_rpc_timeout_ms); if (!status.ok()) { LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname << ":" << master_address.port << ")"; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index f99c4cff7a..d43d085ad9 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -300,7 +300,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { master_addr.hostname, master_addr.port, [&request, &result] (FrontendServiceConnection& client) { client->loadTxnCommit(result, request); - }, 10000)); + }, config::txn_commit_rpc_timeout_ms)); #else result = k_stream_load_commit_result; #endif diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowFrontendsStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowFrontendsStmt.java index 52c84fd5e5..38af38de62 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowFrontendsStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowFrontendsStmt.java @@ -46,6 +46,10 @@ public class ShowFrontendsStmt extends ShowStmt { public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); for (String title : FrontendsProcNode.TITLE_NAMES) { + // hide hostname for SHOW FRONTENDS stmt + if (title.equals("HostName")) { + continue; + } builder.addColumn(new Column(title, ScalarType.createVarchar(30))); } return builder.build(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 4b53877cd1..61a2502569 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4125,9 +4125,10 @@ public class Catalog { DropInfo info = new DropInfo(db.getId(), table.getId(), -1L); editLog.logDropTable(info); - Catalog.getCurrentColocateIndex().removeTable(table.getId()); - ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(table.getId()); - editLog.logColocateRemoveTable(colocateInfo); + if (Catalog.getCurrentColocateIndex().removeTable(table.getId())) { + ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(table.getId()); + editLog.logColocateRemoveTable(colocateInfo); + } } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index faf18c9b3e..f91dfa477e 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -108,12 +108,12 @@ public class ColocateTableIndex implements Writable { balancingGroups.remove(groupId); } - public void removeTable(long tableId) { + public boolean removeTable(long tableId) { long groupId; readLock(); try { if (!table2Group.containsKey(tableId)) { - return; + return false; } groupId = table2Group.get(tableId); } finally { @@ -121,6 +121,7 @@ public class ColocateTableIndex implements Writable { } removeTableFromGroup(tableId, groupId); + return true; } private void removeTableFromGroup(long tableId, long groupId) { diff --git a/fe/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index feeb912d6f..6d0ace0e2a 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -37,8 +38,6 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -114,14 +113,6 @@ public class BackendsProcDir implements ProcDirInterface { continue; } - String hostName = "N/A"; - try { - InetAddress address = InetAddress.getByName(backend.getHost()); - hostName = address.getHostName(); - } catch (UnknownHostException e) { - continue; - } - watch.start(); Integer tabletNum = Catalog.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); watch.stop(); @@ -130,7 +121,7 @@ public class BackendsProcDir implements ProcDirInterface { backendInfo.add(backend.getOwnerClusterName()); backendInfo.add(backend.getHost()); if (Strings.isNullOrEmpty(clusterName)) { - backendInfo.add(hostName); + backendInfo.add(FrontendOptions.getHostnameByIp(backend.getHost())); backendInfo.add(String.valueOf(backend.getHeartbeatPort())); backendInfo.add(String.valueOf(backend.getBePort())); backendInfo.add(String.valueOf(backend.getHttpPort())); diff --git a/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index b2043427c2..fe42982b2e 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -22,11 +22,15 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Frontend; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -36,12 +40,16 @@ import java.util.List; * SHOW PROC /frontends/ */ public class FrontendsProcNode implements ProcNodeInterface { + private static final Logger LOG = LogManager.getLogger(FrontendsProcNode.class); + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("Name").add("Host").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort") + .add("Name").add("IP").add("HostName").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort") .add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive") .add("ReplayedJournalId").add("LastHeartbeat").add("IsHelper").add("ErrMsg") .build(); + public static final int HOSTNAME_INDEX = 2; + private Catalog catalog; public FrontendsProcNode(Catalog catalog) { @@ -80,6 +88,8 @@ public class FrontendsProcNode implements ProcNodeInterface { List info = new ArrayList(); info.add(fe.getNodeName()); info.add(fe.getHost()); + + info.add(FrontendOptions.getHostnameByIp(fe.getHost())); info.add(Integer.toString(fe.getEditLogPort())); info.add(Integer.toString(Config.http_port)); diff --git a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java index 5f1219fe3c..35c8bfeb54 100644 --- a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java +++ b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java @@ -39,7 +39,7 @@ public class MetricsAction extends RestBaseAction { @Override public void execute(BaseRequest request, BaseResponse response) { response.setContentType("text/plain"); - response.getContent().append(MetricRepo.getMetric(new PrometheusMetricVisitor("palo_fe"))); + response.getContent().append(MetricRepo.getMetric(new PrometheusMetricVisitor("doris_fe"))); sendResult(request, response); } } diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 00688ed415..2187e61e99 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -424,7 +424,7 @@ public class Load { if (tbl != null && tbl.getType() == TableType.OLAP && ((OlapTable) tbl).getState() == OlapTableState.RESTORE) { throw new DdlException("Table " + tbl.getName() + " is in restore process. " - + "Can not load into it"); + + "Can not load into it"); } } } finally { @@ -1015,20 +1015,6 @@ public class Load { } } - public boolean isLabelUsed(String fullDbName, String label, long timestamp) throws DdlException { - Database db = Catalog.getInstance().getDb(fullDbName); - if (db == null) { - throw new DdlException("Db does not exist. name: " + fullDbName); - } - - readLock(); - try { - return isLabelUsed(db.getId(), label, timestamp, true); - } finally { - readUnlock(); - } - } - /* * 1. if label is already used, and this is not a retry request, * throw exception ("Label already used") @@ -1552,8 +1538,8 @@ public class Load { public long getLatestJobIdByLabel(long dbId, String labelValue) { LoadJob job = null; long jobId = 0; + readLock(); try { - readLock(); List loadJobs = this.dbToLoadJobs.get(dbId); if (loadJobs == null) { return 0; @@ -1585,21 +1571,22 @@ public class Load { public List> getLoadJobUnfinishedInfo(long jobId) { LinkedList> infos = new LinkedList>(); TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); - readLock(); + + LoadJob loadJob = getLoadJob(jobId); + if (loadJob == null + || (loadJob.getState() != JobState.LOADING && loadJob.getState() != JobState.QUORUM_FINISHED)) { + return infos; + } + + long dbId = loadJob.getDbId(); + Database db = Catalog.getInstance().getDb(dbId); + if (db == null) { + return infos; + } + + db.readLock(); try { - LoadJob loadJob = getLoadJob(jobId); - if (loadJob == null - || (loadJob.getState() != JobState.LOADING && loadJob.getState() != JobState.QUORUM_FINISHED)) { - return infos; - } - - long dbId = loadJob.getDbId(); - Database db = Catalog.getInstance().getDb(dbId); - if (db == null) { - return infos; - } - - db.readLock(); + readLock(); try { Map tabletMap = loadJob.getIdToTabletLoadInfo(); for (long tabletId : tabletMap.keySet()) { @@ -1657,11 +1644,10 @@ public class Load { } // end for tablet } finally { - db.readUnlock(); + readUnlock(); } - } finally { - readUnlock(); + db.readUnlock(); } // sort by version, backendId diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java index 3486de7fef..162a17c627 100644 --- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1076,6 +1076,11 @@ public class ShowExecutor { final ShowFrontendsStmt showStmt = (ShowFrontendsStmt) stmt; List> infos = Lists.newArrayList(); FrontendsProcNode.getFrontendsInfo(Catalog.getCurrentCatalog(), infos); + + for (List row : infos) { + row.remove(FrontendsProcNode.HOSTNAME_INDEX); + } + resultSet = new ShowResultSet(showStmt.getMetaData(), infos); } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendOptions.java b/fe/src/main/java/org/apache/doris/service/FrontendOptions.java index bcdff8aa5e..1febf5bbc5 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendOptions.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendOptions.java @@ -100,6 +100,18 @@ public class FrontendOptions { return localAddr.getHostName(); } + public static String getHostnameByIp(String ip) { + String hostName = "N/A"; + try { + InetAddress address = InetAddress.getByName(ip); + hostName = address.getHostName(); + } catch (UnknownHostException e) { + LOG.info("unknow host for {}", ip, e); + hostName = "unknown"; + } + return hostName; + } + private static void analyzePriorityCidrs() { String prior_cidrs = Config.priority_networks; if (Strings.isNullOrEmpty(prior_cidrs)) { @@ -122,5 +134,6 @@ public class FrontendOptions { } return false; } + }