[Bug] Fix bug that query meta fields has been sent twice (#4529)

Fix bug that query meta fields has been sent twice.
This bug is introduced by #4330 , and related PR is #4450
This commit is contained in:
Mingyu Chen
2020-09-04 09:31:14 +08:00
committed by GitHub
parent 8d60352737
commit 72f04ebdb8
4 changed files with 80 additions and 44 deletions

View File

@ -120,6 +120,9 @@ public class DebugUtil {
}
public static String printId(final TUniqueId id) {
if (id == null) {
return "";
}
StringBuilder builder = new StringBuilder();
builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
return builder.toString();

View File

@ -21,8 +21,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
@ -59,8 +59,8 @@ public class ProfileManager {
public static final String SQL_STATEMENT = "Sql Statement";
public static final String USER = "User";
public static final String DEFAULT_DB = "Default Db";
public static final String IS_CACHED = "IsCached";
public static final String IS_CACHED = "Is Cached";
public static final ArrayList<String> PROFILE_HEADERS = new ArrayList(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE));

View File

@ -589,41 +589,62 @@ public class StmtExecutor {
context.getState().setOk();
}
private void sendChannel(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues, boolean isEos)
// send values from cache.
// return true if the meta fields has been sent, otherwise, return false.
// the meta fields must be sent right before the first batch of data(or eos flag).
// so if it has data(or eos is true), this method must return true.
private boolean sendCachedValues(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues,
SelectStmt selectStmt, boolean isSendFields, boolean isEos)
throws Exception {
RowBatch batch = null;
boolean isSend = isSendFields;
for (CacheBeProxy.CacheValue value : cacheValues) {
batch = value.getRowBatch();
if (!isSend) {
// send meta fields before sending first data batch.
sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs());
isSend = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
if (isEos) {
if (batch != null) {
statisticsForAuditLog = batch.getQueryStatistics();
}
if (!isSend) {
sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs());
isSend = true;
}
context.getState().setEof();
return;
}
return isSend;
}
private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception {
/**
* Handle the SelectStmt via Cache.
*/
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception {
RowBatch batch = null;
CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
CacheMode mode = cacheAnalyzer.getCacheMode();
SelectStmt newSelectStmt = selectStmt;
boolean isSendFields = false;
if (cacheResult != null) {
isCached = true;
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
sendChannel(channel, cacheResult.getValueList(), true);
return true;
sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, true);
return;
}
//rewrite sql
// rewrite sql
if (mode == CacheMode.Partition) {
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) {
sendChannel(channel, cacheResult.getValueList(), false);
isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
}
SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt();
newSelectStmt = cacheAnalyzer.getRewriteStmt();
newSelectStmt.reset();
analyzer = new Analyzer(context.getCatalog(), context);
newSelectStmt.analyze(analyzer);
@ -641,6 +662,10 @@ public class StmtExecutor {
batch = coord.getNext();
if (batch.getBatch() != null) {
cacheAnalyzer.copyRowBatch(batch);
if (!isSendFields) {
sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs());
isSendFields = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
@ -652,13 +677,19 @@ public class StmtExecutor {
}
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
sendChannel(channel, cacheResult.getValueList(), false);
isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
}
cacheAnalyzer.updateCache();
if (!isSendFields) {
sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs());
isSendFields = true;
}
statisticsForAuditLog = batch.getQueryStatistics();
context.getState().setEof();
return false;
return;
}
// Process a select statement.
@ -682,6 +713,17 @@ public class StmtExecutor {
return;
}
RowBatch batch;
MysqlChannel channel = context.getMysqlChannel();
boolean isOutfileQuery = queryStmt.hasOutFileClause();
// Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
if (cacheAnalyzer.enableCache() && !isOutfileQuery && queryStmt instanceof SelectStmt) {
handleCacheStmt(cacheAnalyzer, channel, (SelectStmt) queryStmt);
return;
}
// send result
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
// We will not send real query result to client. Instead, we only send OK to client with
@ -690,21 +732,7 @@ public class StmtExecutor {
// Query OK, 10 rows affected (0.01 sec)
//
// 2. If this is a query, send the result expr fields first, and send result data back to client.
RowBatch batch;
MysqlChannel channel = context.getMysqlChannel();
boolean isOutfileQuery = queryStmt.hasOutFileClause();
boolean isSendFields = false;
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
}
//Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
if (cacheAnalyzer.enableCache()) {
handleCacheStmt(cacheAnalyzer, channel);
return;
}
coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));

View File

@ -28,11 +28,13 @@ import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.OlapScanNode;
@ -40,11 +42,10 @@ import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -179,9 +180,11 @@ public class CacheAnalyzer {
private CacheMode innerCheckCacheMode(long now) {
if (!enableCache()) {
LOG.debug("cache is disabled. queryid {}", DebugUtil.printId(queryId));
return CacheMode.NoNeed;
}
if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) {
LOG.debug("not a select stmt or no scan node. queryid {}", DebugUtil.printId(queryId));
return CacheMode.NoNeed;
}
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
@ -192,6 +195,7 @@ public class CacheAnalyzer {
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (!(node instanceof OlapScanNode)) {
LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
OlapScanNode oNode = (OlapScanNode) node;
@ -217,6 +221,7 @@ public class CacheAnalyzer {
}
if (!enablePartitionCache()) {
LOG.debug("partition query cache is disabled. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
@ -224,33 +229,34 @@ public class CacheAnalyzer {
//Only one table can be updated in Config.cache_last_version_interval_second range
for (int i = 1; i < tblTimeList.size(); i++) {
if ((now - tblTimeList.get(i).latestTime) < Config.cache_last_version_interval_second * 1000) {
LOG.info("the time of other tables is newer than {}", Config.cache_last_version_interval_second);
LOG.debug("the time of other tables is newer than {} s, queryid {}",
Config.cache_last_version_interval_second, DebugUtil.printId(queryId));
return CacheMode.None;
}
}
olapTable = latestTable.olapTable;
if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
LOG.debug("the partition of OlapTable not RANGE type");
LOG.debug("the partition of OlapTable not RANGE type, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
partitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
List<Column> columns = partitionInfo.getPartitionColumns();
//Partition key has only one column
if (columns.size() != 1) {
LOG.info("the size of columns for partition key is {}", columns.size());
LOG.debug("more than one partition column, queryid {}", columns.size(), DebugUtil.printId(queryId));
return CacheMode.None;
}
partColumn = columns.get(0);
//Check if group expr contain partition column
if (!checkGroupByPartitionKey(this.selectStmt, partColumn)) {
LOG.info("not group by partition key, key {}", partColumn.getName());
LOG.debug("group by columns does not contains all partition column, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
//Check if whereClause have one CompoundPredicate of partition column
List<CompoundPredicate> compoundPredicates = Lists.newArrayList();
getPartitionKeyFromSelectStmt(this.selectStmt, partColumn, compoundPredicates);
if (compoundPredicates.size() != 1) {
LOG.info("the predicate size include partition key has {}", compoundPredicates.size());
LOG.debug("empty or more than one predicates contain partition column, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
partitionPredicate = compoundPredicates.get(0);
@ -265,22 +271,21 @@ public class CacheAnalyzer {
CacheProxy.FetchCacheResult cacheResult = null;
cacheMode = innerCheckCacheMode(0);
if (cacheMode == CacheMode.NoNeed) {
return cacheResult;
return null;
}
if (cacheMode == CacheMode.None) {
LOG.info("check cache mode {}, queryid {}", cacheMode, DebugUtil.printId(queryId));
return cacheResult;
return null;
}
Status status = new Status();
cacheResult = cache.getCacheData(status);
if (status.ok() && cacheResult != null) {
LOG.info("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
LOG.debug("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
cacheMode, DebugUtil.printId(queryId),
cacheResult.all_count, cacheResult.value_count,
cacheResult.row_count, cacheResult.data_size);
} else {
LOG.info("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
LOG.debug("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg());
cacheResult = null;
}