Optimize something (#607)

1. Unify the thrift rpc timeout from BE to FE.
    Add a BE config 'thrift_rpc_timeout_ms', default is 5000
2. Add hostname in "show proc '/frontends';" stmt result.
3. Fix a lock order bug in Load.java
This commit is contained in:
Mingyu Chen
2019-01-31 13:30:45 +08:00
committed by lichaoyong
parent ca2ab5da59
commit af445b6cc2
15 changed files with 90 additions and 66 deletions

View File

@ -127,7 +127,7 @@ AgentStatus MasterServerClient::finish_task(
FrontendServiceConnection client(
_client_cache,
_master_info.network_address,
MASTER_CLIENT_TIMEOUT,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
@ -142,7 +142,7 @@ AgentStatus MasterServerClient::finish_task(
client->finishTask(*result, request);
} catch (TTransportException& e) {
OLAP_LOG_WARNING("master client, retry finishTask: %s", e.what());
client_status = client.reopen(MASTER_CLIENT_TIMEOUT);
client_status = client.reopen(config::thrift_rpc_timeout_ms);
if (!client_status.ok()) {
OLAP_LOG_WARNING("master client, get client from cache failed."
@ -172,7 +172,7 @@ AgentStatus MasterServerClient::report(const TReportRequest request, TMasterResu
FrontendServiceConnection client(
_client_cache,
_master_info.network_address,
MASTER_CLIENT_TIMEOUT,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
@ -193,7 +193,7 @@ AgentStatus MasterServerClient::report(const TReportRequest request, TMasterResu
// if not TIMED_OUT, retry
OLAP_LOG_WARNING("master client, retry report: %s", e.what());
client_status = client.reopen(MASTER_CLIENT_TIMEOUT);
client_status = client.reopen(config::thrift_rpc_timeout_ms);
if (!client_status.ok()) {
OLAP_LOG_WARNING("master client, get client from cache failed."
"host: %s, port: %d, code: %d",

View File

@ -34,8 +34,6 @@
namespace doris {
const uint32_t MASTER_CLIENT_TIMEOUT = 3000;
// client cache
// All service client should be defined in client_cache.h
//class MasterServiceClient;

View File

@ -386,6 +386,12 @@ namespace config {
// sync tablet_meta when modifing meta
CONF_Bool(sync_tablet_meta, "false");
// default thrift rpc timeout ms
CONF_Int32(thrift_rpc_timeout_ms, "5000");
// txn commit rpc timeout
CONF_Int32(txn_commit_rpc_timeout_ms, "10000");
} // namespace config
} // namespace doris

View File

@ -59,11 +59,21 @@ public:
static std::string extract_db_name(const std::string& full_name);
// for default timeout
static Status rpc(
const std::string& ip,
const int32_t port,
std::function<void (ClientConnection<FrontendServiceClient>&)> callback) {
return rpc(ip, port, callback, config::thrift_rpc_timeout_ms);
}
static Status rpc(
const std::string& ip,
const int32_t port,
std::function<void (ClientConnection<FrontendServiceClient>&)> callback,
int timeout_ms = 5000);
int timeout_ms);
private:
static ExecEnv* _s_exec_env;
};

View File

@ -94,7 +94,6 @@ const std::string LABEL_KEY = "label";
const std::string SUB_LABEL_KEY = "sub_label";
const std::string FILE_PATH_KEY = "file_path";
const char* k_100_continue = "100-continue";
const int64_t THRIFT_RPC_TIMEOUT_MS = 3000; // 3 sec
MiniLoadAction::MiniLoadAction(ExecEnv* exec_env) :
_exec_env(exec_env) {
@ -156,7 +155,7 @@ Status MiniLoadAction::_load(
const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
Status status;
FrontendServiceConnection client(
_exec_env->frontend_client_cache(), master_address, THRIFT_RPC_TIMEOUT_MS, &status);
_exec_env->frontend_client_cache(), master_address, config::thrift_rpc_timeout_ms, &status);
if (!status.ok()) {
std::stringstream ss;
ss << "Connect master failed, with address("
@ -192,7 +191,7 @@ Status MiniLoadAction::_load(
LOG(WARNING) << "Retrying mini load from master("
<< master_address.hostname << ":" << master_address.port
<< ") because: " << e.what();
status = client.reopen(THRIFT_RPC_TIMEOUT_MS);
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client repoen failed. with address("
<< master_address.hostname << ":" << master_address.port << ")";
@ -204,7 +203,7 @@ Status MiniLoadAction::_load(
<< master_address.hostname << ":" << master_address.port
<< ") got unknown result: " << e.what();
status = client.reopen(THRIFT_RPC_TIMEOUT_MS);
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client repoen failed. with address("
<< master_address.hostname << ":" << master_address.port << ")";
@ -253,7 +252,7 @@ Status MiniLoadAction::check_auth(
const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
Status status;
FrontendServiceConnection client(
_exec_env->frontend_client_cache(), master_address, THRIFT_RPC_TIMEOUT_MS, &status);
_exec_env->frontend_client_cache(), master_address, config::thrift_rpc_timeout_ms, &status);
if (!status.ok()) {
std::stringstream ss;
ss << "Connect master failed, with address("
@ -270,7 +269,7 @@ Status MiniLoadAction::check_auth(
LOG(WARNING) << "Retrying mini load from master("
<< master_address.hostname << ":" << master_address.port
<< ") because: " << e.what();
status = client.reopen(THRIFT_RPC_TIMEOUT_MS);
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client repoen failed. with address("
<< master_address.hostname << ":" << master_address.port << ")";
@ -282,7 +281,7 @@ Status MiniLoadAction::check_auth(
<< master_address.hostname << ":" << master_address.port
<< ") got unknown result: " << e.what();
status = client.reopen(THRIFT_RPC_TIMEOUT_MS);
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client repoen failed. with address("
<< master_address.hostname << ":" << master_address.port << ")";

View File

@ -300,7 +300,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
master_addr.hostname, master_addr.port,
[&request, &result] (FrontendServiceConnection& client) {
client->loadTxnCommit(result, request);
}, 10000));
}, config::txn_commit_rpc_timeout_ms));
#else
result = k_stream_load_commit_result;
#endif

View File

@ -46,6 +46,10 @@ public class ShowFrontendsStmt extends ShowStmt {
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : FrontendsProcNode.TITLE_NAMES) {
// hide hostname for SHOW FRONTENDS stmt
if (title.equals("HostName")) {
continue;
}
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();

View File

@ -4125,9 +4125,10 @@ public class Catalog {
DropInfo info = new DropInfo(db.getId(), table.getId(), -1L);
editLog.logDropTable(info);
Catalog.getCurrentColocateIndex().removeTable(table.getId());
ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(table.getId());
editLog.logColocateRemoveTable(colocateInfo);
if (Catalog.getCurrentColocateIndex().removeTable(table.getId())) {
ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(table.getId());
editLog.logColocateRemoveTable(colocateInfo);
}
} finally {
db.writeUnlock();
}

View File

@ -108,12 +108,12 @@ public class ColocateTableIndex implements Writable {
balancingGroups.remove(groupId);
}
public void removeTable(long tableId) {
public boolean removeTable(long tableId) {
long groupId;
readLock();
try {
if (!table2Group.containsKey(tableId)) {
return;
return false;
}
groupId = table2Group.get(tableId);
} finally {
@ -121,6 +121,7 @@ public class ColocateTableIndex implements Writable {
}
removeTableFromGroup(tableId, groupId);
return true;
}
private void removeTableFromGroup(long tableId, long groupId) {

View File

@ -25,6 +25,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -37,8 +38,6 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@ -114,14 +113,6 @@ public class BackendsProcDir implements ProcDirInterface {
continue;
}
String hostName = "N/A";
try {
InetAddress address = InetAddress.getByName(backend.getHost());
hostName = address.getHostName();
} catch (UnknownHostException e) {
continue;
}
watch.start();
Integer tabletNum = Catalog.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
watch.stop();
@ -130,7 +121,7 @@ public class BackendsProcDir implements ProcDirInterface {
backendInfo.add(backend.getOwnerClusterName());
backendInfo.add(backend.getHost());
if (Strings.isNullOrEmpty(clusterName)) {
backendInfo.add(hostName);
backendInfo.add(FrontendOptions.getHostnameByIp(backend.getHost()));
backendInfo.add(String.valueOf(backend.getHeartbeatPort()));
backendInfo.add(String.valueOf(backend.getBePort()));
backendInfo.add(String.valueOf(backend.getHttpPort()));

View File

@ -22,11 +22,15 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Frontend;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -36,12 +40,16 @@ import java.util.List;
* SHOW PROC /frontends/
*/
public class FrontendsProcNode implements ProcNodeInterface {
private static final Logger LOG = LogManager.getLogger(FrontendsProcNode.class);
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Name").add("Host").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort")
.add("Name").add("IP").add("HostName").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort")
.add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive")
.add("ReplayedJournalId").add("LastHeartbeat").add("IsHelper").add("ErrMsg")
.build();
public static final int HOSTNAME_INDEX = 2;
private Catalog catalog;
public FrontendsProcNode(Catalog catalog) {
@ -80,6 +88,8 @@ public class FrontendsProcNode implements ProcNodeInterface {
List<String> info = new ArrayList<String>();
info.add(fe.getNodeName());
info.add(fe.getHost());
info.add(FrontendOptions.getHostnameByIp(fe.getHost()));
info.add(Integer.toString(fe.getEditLogPort()));
info.add(Integer.toString(Config.http_port));

View File

@ -39,7 +39,7 @@ public class MetricsAction extends RestBaseAction {
@Override
public void execute(BaseRequest request, BaseResponse response) {
response.setContentType("text/plain");
response.getContent().append(MetricRepo.getMetric(new PrometheusMetricVisitor("palo_fe")));
response.getContent().append(MetricRepo.getMetric(new PrometheusMetricVisitor("doris_fe")));
sendResult(request, response);
}
}

View File

@ -424,7 +424,7 @@ public class Load {
if (tbl != null && tbl.getType() == TableType.OLAP
&& ((OlapTable) tbl).getState() == OlapTableState.RESTORE) {
throw new DdlException("Table " + tbl.getName() + " is in restore process. "
+ "Can not load into it");
+ "Can not load into it");
}
}
} finally {
@ -1015,20 +1015,6 @@ public class Load {
}
}
public boolean isLabelUsed(String fullDbName, String label, long timestamp) throws DdlException {
Database db = Catalog.getInstance().getDb(fullDbName);
if (db == null) {
throw new DdlException("Db does not exist. name: " + fullDbName);
}
readLock();
try {
return isLabelUsed(db.getId(), label, timestamp, true);
} finally {
readUnlock();
}
}
/*
* 1. if label is already used, and this is not a retry request,
* throw exception ("Label already used")
@ -1552,8 +1538,8 @@ public class Load {
public long getLatestJobIdByLabel(long dbId, String labelValue) {
LoadJob job = null;
long jobId = 0;
readLock();
try {
readLock();
List<LoadJob> loadJobs = this.dbToLoadJobs.get(dbId);
if (loadJobs == null) {
return 0;
@ -1585,21 +1571,22 @@ public class Load {
public List<List<Comparable>> getLoadJobUnfinishedInfo(long jobId) {
LinkedList<List<Comparable>> infos = new LinkedList<List<Comparable>>();
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
readLock();
LoadJob loadJob = getLoadJob(jobId);
if (loadJob == null
|| (loadJob.getState() != JobState.LOADING && loadJob.getState() != JobState.QUORUM_FINISHED)) {
return infos;
}
long dbId = loadJob.getDbId();
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
return infos;
}
db.readLock();
try {
LoadJob loadJob = getLoadJob(jobId);
if (loadJob == null
|| (loadJob.getState() != JobState.LOADING && loadJob.getState() != JobState.QUORUM_FINISHED)) {
return infos;
}
long dbId = loadJob.getDbId();
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
return infos;
}
db.readLock();
readLock();
try {
Map<Long, TabletLoadInfo> tabletMap = loadJob.getIdToTabletLoadInfo();
for (long tabletId : tabletMap.keySet()) {
@ -1657,11 +1644,10 @@ public class Load {
} // end for tablet
} finally {
db.readUnlock();
readUnlock();
}
} finally {
readUnlock();
db.readUnlock();
}
// sort by version, backendId

View File

@ -1076,6 +1076,11 @@ public class ShowExecutor {
final ShowFrontendsStmt showStmt = (ShowFrontendsStmt) stmt;
List<List<String>> infos = Lists.newArrayList();
FrontendsProcNode.getFrontendsInfo(Catalog.getCurrentCatalog(), infos);
for (List<String> row : infos) {
row.remove(FrontendsProcNode.HOSTNAME_INDEX);
}
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}

View File

@ -100,6 +100,18 @@ public class FrontendOptions {
return localAddr.getHostName();
}
public static String getHostnameByIp(String ip) {
String hostName = "N/A";
try {
InetAddress address = InetAddress.getByName(ip);
hostName = address.getHostName();
} catch (UnknownHostException e) {
LOG.info("unknow host for {}", ip, e);
hostName = "unknown";
}
return hostName;
}
private static void analyzePriorityCidrs() {
String prior_cidrs = Config.priority_networks;
if (Strings.isNullOrEmpty(prior_cidrs)) {
@ -122,5 +134,6 @@ public class FrontendOptions {
}
return false;
}
}