[Doris On ES] Add docvalue limitation for doc_values scan and enable doc_values scan default (#4055)
This commit is contained in:
@ -40,6 +40,7 @@ public:
|
||||
static constexpr const char* KEY_QUERY = "query";
|
||||
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
|
||||
static constexpr const char* KEY_TERMINATE_AFTER = "limit";
|
||||
static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode";
|
||||
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode);
|
||||
~ESScanReader();
|
||||
|
||||
|
||||
@ -76,14 +76,20 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
|
||||
// note: add `query` for this value....
|
||||
es_query_dsl.AddMember("query", query_node, allocator);
|
||||
bool pure_docvalue = true;
|
||||
// check docvalue sacan optimization
|
||||
if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
|
||||
pure_docvalue = false;
|
||||
|
||||
// Doris FE already has checked docvalue-scan optimization
|
||||
if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) {
|
||||
pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str());
|
||||
} else {
|
||||
for (auto& select_field : fields) {
|
||||
if (docvalue_context.find(select_field) == docvalue_context.end()) {
|
||||
pure_docvalue = false;
|
||||
break;
|
||||
// check docvalue scan optimization, used for compatibility
|
||||
if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
|
||||
pure_docvalue = false;
|
||||
} else {
|
||||
for (auto& select_field : fields) {
|
||||
if (docvalue_context.find(select_field) == docvalue_context.end()) {
|
||||
pure_docvalue = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -4036,6 +4036,7 @@ public class Catalog {
|
||||
sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
|
||||
sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n");
|
||||
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
|
||||
sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
|
||||
sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
|
||||
sb.append(")");
|
||||
} else if (table.getType() == TableType.HIVE) {
|
||||
|
||||
@ -57,28 +57,51 @@ public class EsTable extends Table {
|
||||
public static final String TYPE = "type";
|
||||
public static final String TRANSPORT = "transport";
|
||||
public static final String VERSION = "version";
|
||||
public static final String DOC_VALUES_MODE = "doc_values_mode";
|
||||
|
||||
public static final String TRANSPORT_HTTP = "http";
|
||||
public static final String TRANSPORT_THRIFT = "thrift";
|
||||
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
|
||||
public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
|
||||
public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
|
||||
|
||||
private String hosts;
|
||||
private String[] seeds;
|
||||
private String userName = "";
|
||||
private String passwd = "";
|
||||
// index name can be specific index、wildcard matched or alias.
|
||||
private String indexName;
|
||||
|
||||
// which type used for `indexName`, default to `_doc`
|
||||
private String mappingType = "_doc";
|
||||
private String transport = "http";
|
||||
// only save the partition definition, save the partition key,
|
||||
// partition list is got from es cluster dynamically and is saved in esTableState
|
||||
private PartitionInfo partitionInfo;
|
||||
private EsTablePartitions esTablePartitions;
|
||||
private boolean enableDocValueScan = false;
|
||||
private boolean enableKeywordSniff = true;
|
||||
|
||||
// Whether to enable docvalues scan optimization for fetching fields more fast, default to true
|
||||
private boolean enableDocValueScan = true;
|
||||
// Whether to enable sniffing keyword for filtering more reasonable, default to true
|
||||
private boolean enableKeywordSniff = true;
|
||||
// if the number of fields which value extracted from `doc_value` exceeding this max limitation
|
||||
// would downgrade to extract value from `stored_fields`
|
||||
private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
|
||||
|
||||
// Solr doc_values vs stored_fields performance-smackdown indicate:
|
||||
// It is possible to notice that retrieving an high number of fields leads
|
||||
// to a sensible worsening of performance if DocValues are used.
|
||||
// Instead, the (almost) surprising thing is that, by returning less than 20 fields,
|
||||
// DocValues performs better than stored fields and the difference gets little as the number of fields returned increases.
|
||||
// Asking for 9 DocValues fields and 1 stored field takes an average query time is 6.86 (more than returning 10 stored fields)
|
||||
// Here we have a slightly conservative value of 20, but at the same time we also provide configurable parameters for expert-using
|
||||
// @see `MAX_DOCVALUE_FIELDS`
|
||||
private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
|
||||
|
||||
// version would be used to be compatible with different ES Cluster
|
||||
public EsMajorVersion majorVersion = null;
|
||||
|
||||
// tableContext is used for being convenient to persist some configuration parameters uniformly
|
||||
private Map<String, String> tableContext = new HashMap<>();
|
||||
|
||||
// record the latest and recently exception when sync ES table metadata (mapping, shard location)
|
||||
@ -104,6 +127,10 @@ public class EsTable extends Table {
|
||||
return esMetaStateTracker.searchContext().docValueFieldsContext();
|
||||
}
|
||||
|
||||
public int maxDocValueFields() {
|
||||
return maxDocValueFields;
|
||||
}
|
||||
|
||||
public boolean isDocValueScanEnable() {
|
||||
return enableDocValueScan;
|
||||
}
|
||||
@ -166,8 +193,6 @@ public class EsTable extends Table {
|
||||
+ properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
|
||||
+ " shoud be like 'true' or 'false', value should be double quotation marks");
|
||||
}
|
||||
} else {
|
||||
enableDocValueScan = false;
|
||||
}
|
||||
|
||||
if (properties.containsKey(KEYWORD_SNIFF)) {
|
||||
@ -194,6 +219,17 @@ public class EsTable extends Table {
|
||||
+ " but value is " + transport);
|
||||
}
|
||||
}
|
||||
|
||||
if (properties.containsKey(MAX_DOCVALUE_FIELDS)) {
|
||||
try {
|
||||
maxDocValueFields = Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim());
|
||||
if (maxDocValueFields < 0) {
|
||||
maxDocValueFields = 0;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
|
||||
}
|
||||
}
|
||||
tableContext.put("hosts", hosts);
|
||||
tableContext.put("userName", userName);
|
||||
tableContext.put("passwd", passwd);
|
||||
@ -205,6 +241,7 @@ public class EsTable extends Table {
|
||||
}
|
||||
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
|
||||
tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
|
||||
tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
|
||||
}
|
||||
|
||||
public TTableDescriptor toThrift() {
|
||||
@ -294,6 +331,13 @@ public class EsTable extends Table {
|
||||
} else {
|
||||
enableKeywordSniff = true;
|
||||
}
|
||||
if (tableContext.containsKey("maxDocValueFields")) {
|
||||
try {
|
||||
maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields"));
|
||||
} catch (Exception e) {
|
||||
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
|
||||
}
|
||||
}
|
||||
|
||||
PartitionType partType = PartitionType.valueOf(Text.readString(in));
|
||||
if (partType == PartitionType.UNPARTITIONED) {
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.EsTable;
|
||||
@ -40,9 +41,6 @@ import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -50,6 +48,9 @@ import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -59,7 +60,7 @@ import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
public class EsScanNode extends ScanNode {
|
||||
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(EsScanNode.class);
|
||||
|
||||
private final Random random = new Random(System.currentTimeMillis());
|
||||
@ -80,10 +81,10 @@ public class EsScanNode extends ScanNode {
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
|
||||
|
||||
assignBackends();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getNumInstances() {
|
||||
return shardScanRanges.size();
|
||||
@ -93,7 +94,7 @@ public class EsScanNode extends ScanNode {
|
||||
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
|
||||
return shardScanRanges;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
if (isFinalized) {
|
||||
@ -109,6 +110,34 @@ public class EsScanNode extends ScanNode {
|
||||
isFinalized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* return whether can use the doc_values scan
|
||||
* 0 and 1 are returned to facilitate Doris BE processing
|
||||
*
|
||||
* @param desc the fields needs to read from ES
|
||||
* @param docValueContext the mapping for docvalues fields from origin field to doc_value fields
|
||||
* @return
|
||||
*/
|
||||
private int useDocValueScan(TupleDescriptor desc, Map<String, String> docValueContext) {
|
||||
ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots();
|
||||
List<String> selectedFields = new ArrayList<>(slotDescriptors.size());
|
||||
for (SlotDescriptor slotDescriptor : slotDescriptors) {
|
||||
selectedFields.add(slotDescriptor.getColumn().getName());
|
||||
}
|
||||
if (selectedFields.size() > table.maxDocValueFields()) {
|
||||
return 0;
|
||||
}
|
||||
Set<String> docValueFields = docValueContext.keySet();
|
||||
boolean useDocValue = true;
|
||||
for (String selectedField : selectedFields) {
|
||||
if (!docValueFields.contains(selectedField)) {
|
||||
useDocValue = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return useDocValue ? 1 : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
|
||||
@ -123,6 +152,7 @@ public class EsScanNode extends ScanNode {
|
||||
esScanNode.setProperties(properties);
|
||||
if (table.isDocValueScanEnable()) {
|
||||
esScanNode.setDocvalue_context(table.docValueContext());
|
||||
properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
|
||||
}
|
||||
if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
|
||||
esScanNode.setFields_context(table.fieldsContext());
|
||||
@ -169,9 +199,9 @@ public class EsScanNode extends ScanNode {
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("partition prune finished, unpartitioned index [{}], "
|
||||
+ "partitioned index [{}]",
|
||||
String.join(",", unPartitionedIndices),
|
||||
LOG.debug("partition prune finished, unpartitioned index [{}], "
|
||||
+ "partitioned index [{}]",
|
||||
String.join(",", unPartitionedIndices),
|
||||
String.join(",", partitionedIndices));
|
||||
}
|
||||
int beIndex = random.nextInt(backendList.size());
|
||||
@ -241,7 +271,7 @@ public class EsScanNode extends ScanNode {
|
||||
* if the index name is an alias or index pattern, then the es table is related
|
||||
* with one or more indices some indices could be pruned by using partition info
|
||||
* in index settings currently only support range partition setting
|
||||
*
|
||||
*
|
||||
* @param partitionInfo
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
@ -254,7 +284,7 @@ public class EsScanNode extends ScanNode {
|
||||
switch (partitionInfo.getType()) {
|
||||
case RANGE: {
|
||||
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
|
||||
Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
|
||||
Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
|
||||
partitionPruner = new RangePartitionPruner(keyRangeById, rangePartitionInfo.getPartitionColumns(),
|
||||
columnFilters);
|
||||
return partitionPruner.prune();
|
||||
|
||||
Reference in New Issue
Block a user