[Fix]Fix insert select missing audit log when connect follower FE (#36481)

## Proposed changes

pick #36472
This commit is contained in:
wangbo
2024-06-20 15:16:16 +08:00
committed by GitHub
parent c5bb0e3a21
commit 88e02c836d
9 changed files with 128 additions and 85 deletions

View File

@ -289,6 +289,7 @@ public class StreamLoadPlanner {
params.setDescTbl(analyzer.getDescTbl().toThrift());
params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
params.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
// user load id (streamLoadTask.id) as query id
@ -521,6 +522,7 @@ public class StreamLoadPlanner {
pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
pipParams.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
pipParams.setQueryId(loadId);
pipParams.per_exch_num_senders = Maps.newHashMap();
pipParams.destinations = Lists.newArrayList();

View File

@ -233,6 +233,10 @@ public class ConnectContext {
private Exec exec;
private boolean runProcedure = false;
// isProxy used for forward request from other FE and used in one thread
// it's default thread-safe
private boolean isProxy = false;
public void setUserQueryTimeout(int queryTimeout) {
if (queryTimeout > 0) {
sessionVariable.setQueryTimeoutS(queryTimeout);
@ -355,6 +359,7 @@ public class ConnectContext {
mysqlChannel = new MysqlChannel(connection, this);
} else if (isProxy) {
mysqlChannel = new ProxyMysqlChannel();
this.isProxy = isProxy;
} else {
mysqlChannel = new DummyMysqlChannel();
}
@ -1146,4 +1151,8 @@ public class ConnectContext {
public void setUserVars(Map<String, LiteralExpr> userVars) {
this.userVars = userVars;
}
public boolean isProxy() {
return isProxy;
}
}

View File

@ -131,6 +131,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
@ -197,6 +198,10 @@ public class Coordinator implements CoordInterface {
private final TQueryGlobals queryGlobals = new TQueryGlobals();
private TQueryOptions queryOptions;
private TNetworkAddress coordAddress;
// fe audit log in connected FE,if a query is forward
// we should send the connected FE to be,
// then be report query statistics to the connected FE
private TNetworkAddress currentConnectFE;
// protects all fields below
private final Lock lock = new ReentrantLock();
@ -551,6 +556,13 @@ public class Coordinator implements CoordInterface {
}
coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
if (ConnectContext.get() != null && ConnectContext.get().isProxy() && !StringUtils.isEmpty(
ConnectContext.get().getCurrentConnectedFEIp())) {
currentConnectFE = new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
Config.rpc_port);
} else {
currentConnectFE = coordAddress;
}
this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
if (LOG.isDebugEnabled()) {
@ -3803,6 +3815,7 @@ public class Coordinator implements CoordInterface {
params.params.setSenderId(i);
params.params.setNumSenders(instanceExecParams.size());
params.setCoord(coordAddress);
params.setCurrentConnectFe(currentConnectFE);
params.setBackendNum(backendNum++);
params.setQueryGlobals(queryGlobals);
params.setQueryOptions(queryOptions);
@ -3908,6 +3921,7 @@ public class Coordinator implements CoordInterface {
params.setDestinations(destinations);
params.setNumSenders(instanceExecParams.size());
params.setCoord(coordAddress);
params.setCurrentConnectFe(currentConnectFE);
params.setQueryGlobals(queryGlobals);
params.setQueryOptions(queryOptions);
params.query_options.setEnablePipelineEngine(true);

View File

@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.thrift.TQueryStatistics;
@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
// NOTE: not using a lock for beToQueryStatsMap's update because it should void global lock for all be
// this may cause in some corner case missing statistics update,for example:
// time1: clear logic judge query 1 is timeout
// time2: query 1 is update by report
// time3: clear logic remove query 1
// in this case, lost query stats is allowed. because query report time out is 60s by default,
// when this case happens, we should find why be not report for so long first.
public class WorkloadRuntimeStatusMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap();
private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
private Map<Long, BeReportInfo> beToQueryStatsMap = Maps.newConcurrentMap();
private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock();
private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
private class BeReportInfo {
volatile long beLastReportTime;
BeReportInfo(long beLastReportTime) {
this.beLastReportTime = beLastReportTime;
}
Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = Maps.newConcurrentMap();
}
public WorkloadRuntimeStatusMgr() {
super("workload-runtime-stats-thread", Config.workload_runtime_status_thread_interval_ms);
}
@ -116,45 +131,65 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
return;
}
long beId = params.backend_id;
Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
beLastReportTime.put(beId, System.currentTimeMillis());
if (queryIdMap == null) {
queryIdMap = Maps.newConcurrentMap();
queryIdMap.putAll(params.query_statistics_map);
beToQueryStatsMap.put(beId, queryIdMap);
// NOTE(wb) one be sends update request one by one,
// so there is no need a global lock for beToQueryStatsMap here,
// just keep one be's put/remove/get is atomic operation is enough
long currentTime = System.currentTimeMillis();
BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
if (beReportInfo == null) {
beReportInfo = new BeReportInfo(currentTime);
beToQueryStatsMap.put(beId, beReportInfo);
} else {
long currentTime = System.currentTimeMillis();
for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) {
queryIdMap.put(entry.getKey(), entry.getValue());
queryLastReportTime.put(entry.getKey(), currentTime);
beReportInfo.beLastReportTime = currentTime;
}
for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) {
beReportInfo.queryStatsMap.put(entry.getKey(), Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
}
}
void clearReportTimeoutBeStatistics() {
// 1 clear report timeout be
Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
Long currentTime = System.currentTimeMillis();
for (Long beId : currentBeIdSet) {
BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
if (currentTime - beReportInfo.beLastReportTime > Config.be_report_query_statistics_timeout_ms) {
beToQueryStatsMap.remove(beId);
continue;
}
Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
for (String queryId : queryIdSet) {
Pair<Long, TQueryStatistics> pair = beReportInfo.queryStatsMap.get(queryId);
long queryLastReportTime = pair.first;
if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) {
beReportInfo.queryStatsMap.remove(queryId);
}
}
}
}
// NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap
// so there is no need lock or null check when visit beToQueryStatsMap
public Map<String, TQueryStatistics> getQueryStatisticsMap() {
// 1 merge query stats in all be
Set<Long> beIdSet = beToQueryStatsMap.keySet();
Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
for (Long beId : beIdSet) {
Map<String, TQueryStatistics> currentQueryMap = beToQueryStatsMap.get(beId);
Set<String> queryIdSet = currentQueryMap.keySet();
BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
for (String queryId : queryIdSet) {
TQueryStatistics retQuery = retQueryMap.get(queryId);
TQueryStatistics curQueryStats = beReportInfo.queryStatsMap.get(queryId).second;
TQueryStatistics retQuery = resultQueryMap.get(queryId);
if (retQuery == null) {
retQuery = new TQueryStatistics();
retQueryMap.put(queryId, retQuery);
resultQueryMap.put(queryId, retQuery);
}
TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
mergeQueryStatistics(retQuery, curQueryStats);
}
}
return retQueryMap;
}
public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() {
return beToQueryStatsMap;
return resultQueryMap;
}
private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
@ -168,44 +203,6 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
}
}
void clearReportTimeoutBeStatistics() {
// 1 clear report timeout be
Set<Long> beNeedToRemove = new HashSet<>();
Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
Long currentTime = System.currentTimeMillis();
for (Long beId : currentBeIdSet) {
Long lastReportTime = beLastReportTime.get(beId);
if (lastReportTime != null
&& currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) {
beNeedToRemove.add(beId);
}
}
for (Long beId : beNeedToRemove) {
beToQueryStatsMap.remove(beId);
beLastReportTime.remove(beId);
}
// 2 clear report timeout query
Set<String> queryNeedToClear = new HashSet<>();
Long newCurrentTime = System.currentTimeMillis();
Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
for (String queryId : queryLastReportTimeKeySet) {
Long lastReportTime = queryLastReportTime.get(queryId);
if (lastReportTime != null
&& newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) {
queryNeedToClear.add(queryId);
}
}
Set<Long> beIdSet = beToQueryStatsMap.keySet();
for (String queryId : queryNeedToClear) {
for (Long beId : beIdSet) {
beToQueryStatsMap.get(beId).remove(queryId);
}
queryLastReportTime.remove(queryId);
}
}
private void queryAuditEventLogWriteLock() {
queryAuditEventLock.writeLock().lock();
}