From 0f00febd2138bef48807a6a2c42b8452dd5d61bf Mon Sep 17 00:00:00 2001 From: "Yunfeng,Wu" Date: Wed, 4 Dec 2019 12:57:45 +0800 Subject: [PATCH] Optimize Doris On Elasticsearch performance (#2237) Pure DocValue optimization for doris-on-es Future todo: Today, for every tuple scan we check if pure_docvalue is enabled, this is not reasonable, should check pure_docvalue enabled for one whole scan outside, I will add this todo in future --- .gitignore | 2 +- be/src/exec/es/es_scan_reader.cpp | 2 +- be/src/exec/es/es_scroll_parser.cpp | 111 +++++++--- be/src/exec/es/es_scroll_parser.h | 2 +- be/src/exec/es/es_scroll_query.cpp | 37 +++- be/src/exec/es/es_scroll_query.h | 2 +- be/src/exec/es_http_scan_node.cpp | 8 +- be/src/exec/es_http_scan_node.h | 1 + be/src/exec/es_http_scanner.cpp | 4 +- be/src/exec/es_http_scanner.h | 3 +- be/test/exec/es_scan_reader_test.cpp | 3 +- .../org/apache/doris/catalog/Catalog.java | 1 + .../org/apache/doris/catalog/EsTable.java | 197 ++++++++++++++---- .../org/apache/doris/common/FeConstants.java | 2 +- .../apache/doris/common/FeMetaVersion.java | 6 + .../apache/doris/external/EsRestClient.java | 21 +- .../apache/doris/external/EsStateStore.java | 145 +++++++++---- .../org/apache/doris/planner/EsScanNode.java | 4 + gensrc/thrift/PlanNodes.thrift | 18 ++ 19 files changed, 447 insertions(+), 122 deletions(-) diff --git a/.gitignore b/.gitignore index 1d6007dc00..50bfde9a87 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,4 @@ thirdparty/installed /Default/ be/cmake-build be/cmake-build-debug -be/cmake-build-release +be/cmake-build-release \ No newline at end of file diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 8fc3fa8ea8..ef97652893 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -27,7 +27,7 @@ #include "exec/es/es_scroll_query.h" namespace doris { -const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields"; +const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields"; const std::string REQUEST_SCROLL_PATH = "_scroll"; const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:"; const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp index ea6a1e03aa..b605be3349 100644 --- a/be/src/exec/es/es_scroll_parser.cpp +++ b/be/src/exec/es/es_scroll_parser.cpp @@ -105,28 +105,52 @@ static const string ERROR_COL_DATA_IS_ARRAY = "Data source returned an array for } while (false) -#define RETURN_ERROR_IF_PARSING_FAILED(result, type) \ +#define RETURN_ERROR_IF_PARSING_FAILED(result, col, type) \ do { \ if (result != StringParser::PARSE_SUCCESS) { \ - return Status::RuntimeError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type))); \ + std::stringstream ss; \ + ss << "Expected value of type: " \ + << type_to_string(type) \ + << "; but found type: " << json_type_to_string(col.GetType()) \ + << "; Docuemnt source slice is : " << json_value_to_string(col); \ + return Status::RuntimeError(ss.str()); \ } \ } while (false) +#define RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type) \ + do { \ + std::stringstream ss; \ + ss << "Expected value of type: " \ + << type_to_string(type) \ + << "; but found type: " << json_type_to_string(col.GetType()) \ + << "; Docuemnt slice is : " << json_value_to_string(col); \ + return Status::RuntimeError(ss.str()); \ + } while (false) + template -static Status get_int_value(const rapidjson::Value &col, PrimitiveType type, void* slot) { +static Status get_int_value(const rapidjson::Value &col, PrimitiveType type, void* slot, bool pure_doc_value) { + + if (col.IsNumber()) { *reinterpret_cast(slot) = (T)(sizeof(T) < 8 ? col.GetInt() : col.GetInt64()); return Status::OK(); } + if (pure_doc_value && col.IsArray()) { + *reinterpret_cast(slot) = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64()); + return Status::OK(); + } + + RETURN_ERROR_IF_COL_IS_ARRAY(col, type); RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type); + StringParser::ParseResult result; const std::string& val = col.GetString(); size_t len = col.GetStringLength(); T v = StringParser::string_to_int(val.c_str(), len, &result); - RETURN_ERROR_IF_PARSING_FAILED(result, type); + RETURN_ERROR_IF_PARSING_FAILED(result, col, type); if (sizeof(T) < 16) { *reinterpret_cast(slot) = v; @@ -139,13 +163,18 @@ static Status get_int_value(const rapidjson::Value &col, PrimitiveType type, voi } template -static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, void* slot) { +static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, void* slot, bool pure_doc_value) { DCHECK(sizeof(T) == 4 || sizeof(T) == 8); if (col.IsNumber()) { *reinterpret_cast(slot) = (T)(sizeof(T) == 4 ? col.GetFloat() : col.GetDouble()); return Status::OK(); } + if (pure_doc_value && col.IsArray()) { + *reinterpret_cast(slot) = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble()); + return Status::OK(); + } + RETURN_ERROR_IF_COL_IS_ARRAY(col, type); RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type); @@ -153,7 +182,7 @@ static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, v const std::string& val = col.GetString(); size_t len = col.GetStringLength(); T v = StringParser::string_to_float(val.c_str(), len, &result); - RETURN_ERROR_IF_PARSING_FAILED(result, type); + RETURN_ERROR_IF_PARSING_FAILED(result, col, type); *reinterpret_cast(slot) = v; return Status::OK(); @@ -231,19 +260,19 @@ int ScrollParser::get_total() { return _total; } - Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, - Tuple* tuple, MemPool* tuple_pool, bool* line_eof) { + Tuple* tuple, MemPool* tuple_pool, bool* line_eof, const std::map& docvalue_context) { *line_eof = true; if (_size <= 0 || _line_index >= _size) { return Status::OK(); } const rapidjson::Value& obj = _inner_hits_node[_line_index++]; - const rapidjson::Value& line = obj[FIELD_SOURCE]; - if (!line.IsObject()) { - return Status::InternalError("Parse inner hits failed"); + bool pure_doc_value = false; + if (obj.HasMember("fields")) { + pure_doc_value = true; } + const rapidjson::Value& line = obj.HasMember(FIELD_SOURCE) ? obj[FIELD_SOURCE] : obj["fields"]; tuple->init(tuple_desc->byte_size()); for (int i = 0; i < tuple_desc->slots().size(); ++i) { @@ -253,7 +282,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, continue; } - const char* col_name = slot_desc->col_name().c_str(); + // if pure_doc_value enabled, docvalue_context must contains the key + // todo: need move all `pure_docvalue` for every tuple outside fill_tuple + // should check pure_docvalue for one table scan not every tuple + const char* col_name = pure_doc_value ? docvalue_context.at(slot_desc->col_name()).c_str() : slot_desc->col_name().c_str(); + rapidjson::Value::ConstMemberIterator itr = line.FindMember(col_name); if (itr == line.MemberEnd()) { tuple->set_null(slot_desc->null_indicator_offset()); @@ -268,15 +301,23 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, switch (type) { case TYPE_CHAR: case TYPE_VARCHAR: { - RETURN_ERROR_IF_COL_IS_ARRAY(col, type); // sometimes elasticsearch user post some not-string value to Elasticsearch Index. // because of reading value from _source, we can not process all json type and then just transfer the value to original string representation // this may be a tricky, but we can workaround this issue std::string val; - if (!col.IsString()) { - val = json_value_to_string(col); + if (pure_doc_value) { + if (!col[0].IsString()) { + val = json_value_to_string(col[0]); + } else { + val = col[0].GetString(); + } } else { - val = col.GetString(); + RETURN_ERROR_IF_COL_IS_ARRAY(col, type); + if (!col.IsString()) { + val = json_value_to_string(col); + } else { + val = col.GetString(); + } } size_t val_size = val.length(); char* buffer = reinterpret_cast(tuple_pool->try_allocate_unaligned(val_size)); @@ -292,7 +333,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_TINYINT: { - Status status = get_int_value(col, type, slot); + Status status = get_int_value(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -300,7 +341,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_SMALLINT: { - Status status = get_int_value(col, type, slot); + Status status = get_int_value(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -308,7 +349,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_INT: { - Status status = get_int_value(col, type, slot); + Status status = get_int_value(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -316,7 +357,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_BIGINT: { - Status status = get_int_value(col, type, slot); + Status status = get_int_value(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -324,7 +365,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_LARGEINT: { - Status status = get_int_value<__int128>(col, type, slot); + Status status = get_int_value<__int128>(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -332,7 +373,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_DOUBLE: { - Status status = get_float_value(col, type, slot); + Status status = get_float_value(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -340,7 +381,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, } case TYPE_FLOAT: { - Status status = get_float_value(col, type, slot); + Status status = get_float_value(col, type, slot, pure_doc_value); if (!status.ok()) { return status; } @@ -357,6 +398,10 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, *reinterpret_cast(slot) = col.GetInt(); break; } + if (pure_doc_value && col.IsArray()) { + *reinterpret_cast(slot) = col[0].GetBool(); + break; + } RETURN_ERROR_IF_COL_IS_ARRAY(col, type); RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type); @@ -366,7 +411,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, StringParser::ParseResult result; bool b = StringParser::string_to_bool(val.c_str(), val_size, &result); - RETURN_ERROR_IF_PARSING_FAILED(result, type); + RETURN_ERROR_IF_PARSING_FAILED(result, col, type); *reinterpret_cast(slot) = b; break; } @@ -375,7 +420,19 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, case TYPE_DATETIME: { if (col.IsNumber()) { if (!reinterpret_cast(slot)->from_unixtime(col.GetInt64(), "+08:00")) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type))); + RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type); + } + + if (type == TYPE_DATE) { + reinterpret_cast(slot)->cast_to_date(); + } else { + reinterpret_cast(slot)->set_type(TIME_DATETIME); + } + break; + } + if (pure_doc_value && col.IsArray()) { + if (!reinterpret_cast(slot)->from_unixtime(col[0].GetInt64(), "+08:00")) { + RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type); } if (type == TYPE_DATE) { @@ -393,11 +450,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, const std::string& val = col.GetString(); size_t val_size = col.GetStringLength(); if (!ts_slot->from_date_str(val.c_str(), val_size)) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type))); + RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type); } if (ts_slot->year() < 1900) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type))); + RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type); } if (type == TYPE_DATE) { diff --git a/be/src/exec/es/es_scroll_parser.h b/be/src/exec/es/es_scroll_parser.h index 5af75a85ee..29c5cf3748 100644 --- a/be/src/exec/es/es_scroll_parser.h +++ b/be/src/exec/es/es_scroll_parser.h @@ -35,7 +35,7 @@ public: Status parse(const std::string& scroll_result); Status fill_tuple(const TupleDescriptor* _tuple_desc, Tuple* tuple, - MemPool* mem_pool, bool* line_eof); + MemPool* mem_pool, bool* line_eof, const std::map& docvalue_context); const std::string& get_scroll_id(); int get_total(); diff --git a/be/src/exec/es/es_scroll_query.cpp b/be/src/exec/es/es_scroll_query.cpp index fb91b5f117..2cb6fdc289 100644 --- a/be/src/exec/es/es_scroll_query.cpp +++ b/be/src/exec/es/es_scroll_query.cpp @@ -17,7 +17,6 @@ #include "exec/es/es_scroll_query.h" -#include #include #include "common/logging.h" @@ -64,7 +63,7 @@ std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr std::string ESScrollQueryBuilder::build(const std::map& properties, const std::vector& fields, - std::vector& predicates) { + std::vector& predicates, const std::map& docvalue_context) { rapidjson::Document es_query_dsl; rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator(); es_query_dsl.SetObject(); @@ -75,15 +74,39 @@ std::string ESScrollQueryBuilder::build(const std::map BooleanQueryBuilder::to_query(predicates, &scratch_document, &query_node); // note: add `query` for this value.... es_query_dsl.AddMember("query", query_node, allocator); - // just filter the selected fields for reducing the network cost - if (fields.size() > 0) { - rapidjson::Value source_node(rapidjson::kArrayType); - for (auto iter = fields.begin(); iter != fields.end(); iter++) { - rapidjson::Value field(iter->c_str(), allocator); + bool pure_docvalue = true; + // check docvalue sacan optimization + if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) { + pure_docvalue = false; + } else { + for (auto& select_field : fields) { + if (docvalue_context.find(select_field) == docvalue_context.end()) { + pure_docvalue = false; + break; + } + } + } + rapidjson::Value source_node(rapidjson::kArrayType); + if (pure_docvalue) { + for (auto& select_field : fields) { + rapidjson::Value field(docvalue_context.at(select_field).c_str(), allocator); source_node.PushBack(field, allocator); } + } else { + for (auto& select_field : fields) { + rapidjson::Value field(select_field.c_str(), allocator); + source_node.PushBack(field, allocator); + } + } + + // just filter the selected fields for reducing the network cost + if (pure_docvalue) { + es_query_dsl.AddMember("stored_fields", "_none_", allocator); + es_query_dsl.AddMember("docvalue_fields", source_node, allocator); + } else { es_query_dsl.AddMember("_source", source_node, allocator); } + int size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str()); rapidjson::Value sort_node(rapidjson::kArrayType); // use the scroll-scan mode for scan index documents diff --git a/be/src/exec/es/es_scroll_query.h b/be/src/exec/es/es_scroll_query.h index 0f6c20457a..959ec455dd 100644 --- a/be/src/exec/es/es_scroll_query.h +++ b/be/src/exec/es/es_scroll_query.h @@ -35,6 +35,6 @@ public: // @note: predicates should processed before pass it to this method, // tie breaker for predicate wheather can push down es can reference the push-down filters static std::string build(const std::map& properties, - const std::vector& fields, std::vector& predicates); + const std::vector& fields, std::vector& predicates, const std::map& docvalue_context); }; } diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index 855be987f4..1f4fcc2e21 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -55,6 +55,10 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) { // use TEsScanNode _properties = tnode.es_scan_node.properties; + + if (tnode.es_scan_node.__isset.docvalue_context) { + _docvalue_context = tnode.es_scan_node.docvalue_context; + } return Status::OK(); } @@ -333,7 +337,7 @@ Status EsHttpScanNode::scanner_scan( memset(tuple, 0, _tuple_desc->num_null_bytes()); // Get from scanner - RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof)); + RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof, _docvalue_context)); if (scanner_eof) { continue; } @@ -426,7 +430,7 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promisebatch_size()); properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts); properties[ESScanReader::KEY_QUERY] - = ESScrollQueryBuilder::build(properties, _column_names, _predicates); + = ESScrollQueryBuilder::build(properties, _column_names, _predicates, _docvalue_context); // start scanner to scan std::unique_ptr scanner(new EsHttpScanner( diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h index 982beb85cf..9990a76c27 100644 --- a/be/src/exec/es_http_scan_node.h +++ b/be/src/exec/es_http_scan_node.h @@ -99,6 +99,7 @@ private: std::vector _scanner_threads; std::vector> _scanners_status; std::map _properties; + std::map _docvalue_context; std::vector _scan_ranges; std::vector _column_names; diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp index f142770c94..4eecaa0066 100644 --- a/be/src/exec/es_http_scanner.cpp +++ b/be/src/exec/es_http_scanner.cpp @@ -88,7 +88,7 @@ Status EsHttpScanner::open() { return Status::OK(); } -Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { +Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, const std::map& docvalue_context) { SCOPED_TIMER(_read_timer); if (_line_eof && _batch_eof) { *eof = true; @@ -107,7 +107,7 @@ Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(_es_scroll_parser->fill_tuple( - _tuple_desc, tuple, tuple_pool, &_line_eof)); + _tuple_desc, tuple, tuple_pool, &_line_eof, docvalue_context)); if (!_line_eof) { break; } diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h index ed4cf9bd8b..23ebe7c1c6 100644 --- a/be/src/exec/es_http_scanner.h +++ b/be/src/exec/es_http_scanner.h @@ -1,3 +1,4 @@ + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -66,7 +67,7 @@ public: Status open(); - Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof); + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, const std::map& docvalue_context); void close(); diff --git a/be/test/exec/es_scan_reader_test.cpp b/be/test/exec/es_scan_reader_test.cpp index c45c8d1755..32f8af5def 100644 --- a/be/test/exec/es_scan_reader_test.cpp +++ b/be/test/exec/es_scan_reader_test.cpp @@ -222,7 +222,8 @@ TEST_F(MockESServerTest, workflow) { props[ESScanReader::KEY_SHARD] = "0"; props[ESScanReader::KEY_BATCH_SIZE] = "1"; std::vector predicates; - props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates); + std::map docvalue_context; + props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates, docvalue_context); ESScanReader reader(target, props); auto st = reader.open(); ASSERT_TRUE(st.ok()); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 5926411181..bfba9ab84b 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3973,6 +3973,7 @@ public class Catalog { sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n"); sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n"); sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\"\n"); + sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\"\n"); sb.append(")"); } sb.append(";"); diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index 66fecdc4c4..1b856c26a7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -18,37 +18,44 @@ package org.apache.doris.catalog; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; +import org.apache.doris.external.EsMajorVersion; import org.apache.doris.external.EsTableState; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; - -import com.google.common.base.Strings; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; - +import com.google.common.base.Strings; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.zip.Adler32; public class EsTable extends Table { private static final Logger LOG = LogManager.getLogger(EsTable.class); + public static final Set DEFAULT_DOCVALUE_DISABLED_FIELDS = new HashSet<>(Arrays.asList("text")); + public static final String HOSTS = "hosts"; public static final String USER = "user"; public static final String PASSWORD = "password"; public static final String INDEX = "index"; public static final String TYPE = "type"; public static final String TRANSPORT = "transport"; + public static final String VERSION = "version"; public static final String TRANSPORT_HTTP = "http"; public static final String TRANSPORT_THRIFT = "thrift"; + public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; private String hosts; private String[] seeds; @@ -61,18 +68,56 @@ public class EsTable extends Table { // partition list is got from es cluster dynamically and is saved in esTableState private PartitionInfo partitionInfo; private EsTableState esTableState; + private boolean enableDocValueScan = false; + + public EsMajorVersion majorVersion = null; + + private Map tableContext = new HashMap<>(); + + // used to indicate which fields can get from ES docavalue + // because elasticsearch can have "fields" feature, field can have + // two or more types, the first type maybe have not docvalue but other + // can have, such as (text field not have docvalue, but keyword can have): + // "properties": { + // "city": { + // "type": "text", + // "fields": { + // "raw": { + // "type": "keyword" + // } + // } + // } + // } + // then the docvalue context provided the mapping between the select field and real request field : + // {"city": "city.raw"} + // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw` + private Map docValueContext = new HashMap<>(); public EsTable() { super(TableType.ELASTICSEARCH); } public EsTable(long id, String name, List schema, - Map properties, PartitionInfo partitionInfo) throws DdlException { + Map properties, PartitionInfo partitionInfo) throws DdlException { super(id, name, TableType.ELASTICSEARCH, schema); this.partitionInfo = partitionInfo; validate(properties); } + + public void addDocValueField(String name, String fieldsName) { + docValueContext.put(name, fieldsName); + } + + public Map docValueContext() { + return docValueContext; + } + + public boolean isDocValueScanEnable() { + return enableDocValueScan; + } + + private void validate(Map properties) throws DdlException { if (properties == null) { throw new DdlException("Please set properties of elasticsearch table, " @@ -104,6 +149,29 @@ public class EsTable extends Table { } indexName = properties.get(INDEX).trim(); + // Explicit setting for cluster version to avoid detecting version failure + if (properties.containsKey(VERSION)) { + try { + majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim()); + } catch (Exception e) { + throw new DdlException("fail to parse ES major version, version= " + + properties.get(VERSION).trim() + ", shoud be like '6.5.3' "); + } + } + + // Explicit setting for cluster version to avoid detecting version failure + if (properties.containsKey(DOC_VALUE_SCAN)) { + try { + enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim()); + } catch (Exception e) { + throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= " + + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`" + + " shoud be like 'true' or 'false', value should be double quotation marks"); + } + } else { + enableDocValueScan = false; + } + if (!Strings.isNullOrEmpty(properties.get(TYPE)) && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { mappingType = properties.get(TYPE).trim(); @@ -116,6 +184,16 @@ public class EsTable extends Table { + " but value is " + transport); } } + tableContext.put("hosts", hosts); + tableContext.put("userName", userName); + tableContext.put("passwd", passwd); + tableContext.put("indexName", indexName); + tableContext.put("mappingType", mappingType); + tableContext.put("transport", transport); + if (majorVersion != null) { + tableContext.put("majorVersion", majorVersion.toString()); + } + tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); } public TTableDescriptor toThrift() { @@ -137,18 +215,24 @@ public class EsTable extends Table { adler32.update(name.getBytes(charsetName)); // type adler32.update(type.name().getBytes(charsetName)); - // host - adler32.update(hosts.getBytes(charsetName)); - // username - adler32.update(userName.getBytes(charsetName)); - // passwd - adler32.update(passwd.getBytes(charsetName)); - // mysql db - adler32.update(indexName.getBytes(charsetName)); - // mysql table - adler32.update(mappingType.getBytes(charsetName)); - // transport - adler32.update(transport.getBytes(charsetName)); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_68) { + for (Map.Entry entry : tableContext.entrySet()) { + adler32.update(entry.getValue().getBytes(charsetName)); + } + } else { + // host + adler32.update(hosts.getBytes(charsetName)); + // username + adler32.update(userName.getBytes(charsetName)); + // passwd + adler32.update(passwd.getBytes(charsetName)); + // index name + adler32.update(indexName.getBytes(charsetName)); + // mappingType + adler32.update(mappingType.getBytes(charsetName)); + // transport + adler32.update(transport.getBytes(charsetName)); + } } catch (UnsupportedEncodingException e) { LOG.error("encoding error", e); return -1; @@ -160,34 +244,75 @@ public class EsTable extends Table { @Override public void write(DataOutput out) throws IOException { super.write(out); - Text.writeString(out, hosts); - Text.writeString(out, userName); - Text.writeString(out, passwd); - Text.writeString(out, indexName); - Text.writeString(out, mappingType); + out.writeInt(tableContext.size()); + for (Map.Entry entry : tableContext.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } Text.writeString(out, partitionInfo.getType().name()); partitionInfo.write(out); - Text.writeString(out, transport); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - hosts = Text.readString(in); - seeds = hosts.split(","); - userName = Text.readString(in); - passwd = Text.readString(in); - indexName = Text.readString(in); - mappingType = Text.readString(in); - PartitionType partType = PartitionType.valueOf(Text.readString(in)); - if (partType == PartitionType.UNPARTITIONED) { - partitionInfo = SinglePartitionInfo.read(in); - } else if (partType == PartitionType.RANGE) { - partitionInfo = RangePartitionInfo.read(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_68) { + int size = in.readInt(); + for (int i = 0; i < size; ++i) { + String key = Text.readString(in); + String value = Text.readString(in); + tableContext.put(key, value); + } + hosts = tableContext.get("hosts"); + seeds = hosts.split(","); + userName = tableContext.get("userName"); + passwd = tableContext.get("passwd"); + indexName = tableContext.get("indexName"); + mappingType = tableContext.get("mappingType"); + transport = tableContext.get("transport"); + if (tableContext.containsKey("majorVersion")) { + try { + majorVersion = EsMajorVersion.parse(tableContext.get("majorVersion")); + } catch (Exception e) { + majorVersion = EsMajorVersion.V_5_X; + } + } + + enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan")); + + PartitionType partType = PartitionType.valueOf(Text.readString(in)); + if (partType == PartitionType.UNPARTITIONED) { + partitionInfo = SinglePartitionInfo.read(in); + } else if (partType == PartitionType.RANGE) { + partitionInfo = RangePartitionInfo.read(in); + } else { + throw new IOException("invalid partition type: " + partType); + } } else { - throw new IOException("invalid partition type: " + partType); + hosts = Text.readString(in); + seeds = hosts.split(","); + userName = Text.readString(in); + passwd = Text.readString(in); + indexName = Text.readString(in); + mappingType = Text.readString(in); + PartitionType partType = PartitionType.valueOf(Text.readString(in)); + if (partType == PartitionType.UNPARTITIONED) { + partitionInfo = SinglePartitionInfo.read(in); + } else if (partType == PartitionType.RANGE) { + partitionInfo = RangePartitionInfo.read(in); + } else { + throw new IOException("invalid partition type: " + partType); + } + transport = Text.readString(in); + // for upgrading write + tableContext.put("hosts", hosts); + tableContext.put("userName", userName); + tableContext.put("passwd", passwd); + tableContext.put("indexName", indexName); + tableContext.put("mappingType", mappingType); + tableContext.put("transport", transport); + tableContext.put("enableDocValueScan", "false"); } - transport = Text.readString(in); } public String getHosts() { diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 2bb1020835..83e2a890a0 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -38,5 +38,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_67; + public static int meta_version = FeMetaVersion.VERSION_CURRENT; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 0ea579d032..abdc5640e6 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -138,10 +138,16 @@ public final class FeMetaVersion { public static final int VERSION_63 = 63; // for table create time public static final int VERSION_64 = 64; + // support sql mode, change sql_mode from string to long public static final int VERSION_65 = 65; // routine load/stream load persist session variables public static final int VERSION_66 = 66; // load_mem_limit session variable public static final int VERSION_67 = 67; + // for es table context + public static final int VERSION_68 = 68; + + // note: when increment meta version, should assign the latest version to VERSION_CURRENT + public static final int VERSION_CURRENT = VERSION_68; } diff --git a/fe/src/main/java/org/apache/doris/external/EsRestClient.java b/fe/src/main/java/org/apache/doris/external/EsRestClient.java index e63a7af58d..73759c4a4e 100644 --- a/fe/src/main/java/org/apache/doris/external/EsRestClient.java +++ b/fe/src/main/java/org/apache/doris/external/EsRestClient.java @@ -93,6 +93,25 @@ public class EsRestClient { } + /** + * + * Get the Elasticsearch cluster version + * + * @return + */ + public EsMajorVersion version() { + Map versionMap = get("/", "version"); + + EsMajorVersion majorVersion; + try { + majorVersion = EsMajorVersion.parse(versionMap.get("version")); + } catch (Exception e) { + LOG.warn("detect es version failure on node [{}]", currentNode); + return EsMajorVersion.V_5_X; + } + return majorVersion; + } + /** * execute request for specific path * @@ -110,7 +129,7 @@ public class EsRestClient { // maybe should add HTTP schema to the address // actually, at this time we can only process http protocol - // NOTE. currentNode may have some spaces. + // NOTE. currentNode may have some spaces. // User may set a config like described below: // hosts: "http://192.168.0.1:8200, http://192.168.0.2:8200" // then currentNode will be "http://192.168.0.1:8200", " http://192.168.0.2:8200" diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java index 5361254e06..030136b9da 100644 --- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java +++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java @@ -31,22 +31,11 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.Daemon; - +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Range; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; - -import java.io.IOException; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - import okhttp3.Authenticator; import okhttp3.Call; import okhttp3.Credentials; @@ -54,22 +43,31 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.Route; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +import org.json.JSONObject; + /** * it is used to call es api to get shard allocation state - * */ public class EsStateStore extends Daemon { - + private static final Logger LOG = LogManager.getLogger(EsStateStore.class); private Map esTables; - + public EsStateStore() { super(Config.es_state_sync_interval_second * 1000); esTables = Maps.newConcurrentMap(); } - + public void registerTable(EsTable esTable) { if (Catalog.isCheckpointThread()) { return; @@ -77,17 +75,22 @@ public class EsStateStore extends Daemon { esTables.put(esTable.getId(), esTable); LOG.info("register a new table [{}] to sync list", esTable.toString()); } - + public void deRegisterTable(long tableId) { esTables.remove(tableId); LOG.info("deregister table [{}] from sync list", tableId); } - + protected void runOneCycle() { for (EsTable esTable : esTables.values()) { try { EsRestClient client = new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd()); + // if user not specify the es version, try to get the remote cluster versoin + // in the future, we maybe need this version + if (esTable.majorVersion == null) { + esTable.majorVersion = client.version(); + } String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); if (indexMetaData == null) { continue; @@ -106,16 +109,19 @@ public class EsStateStore extends Daemon { } } } - + // should call this method to init the state store after loading image // the rest of tables will be added or removed by replaying edit log + // when fe is start to load image, should call this method to init the state store + public void loadTableFromCatalog() { if (Catalog.isCheckpointThread()) { return; } List dbIds = Catalog.getCurrentCatalog().getDbIds(); - for(Long dbId : dbIds) { + for (Long dbId : dbIds) { Database database = Catalog.getCurrentCatalog().getDb(dbId); + List tables = database.getTables(); for (Table table : tables) { if (table.getType() == TableType.ELASTICSEARCH) { @@ -124,7 +130,7 @@ public class EsStateStore extends Daemon { } } } - + private EsTableState loadEsIndexMetadataV55(final EsTable esTable) { OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder(); clientBuilder.authenticator(new Authenticator() { @@ -136,7 +142,7 @@ public class EsStateStore extends Daemon { }); String[] seeds = esTable.getSeeds(); for (String seed : seeds) { - String url = seed + "/_cluster/state?indices=" + String url = seed + "/_cluster/state?indices=" + esTable.getIndexName() + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; String basicAuth = ""; @@ -155,8 +161,7 @@ public class EsStateStore extends Daemon { if (esTableState != null) { return esTableState; } - } - catch (Exception e) { + } catch (Exception e) { LOG.warn("errors while parse response msg {}", responseStr, e); } } else { @@ -168,17 +173,75 @@ public class EsStateStore extends Daemon { } return null; } - + @VisibleForTesting - public EsTableState parseClusterState55(String responseStr, EsTable esTable) + public EsTableState parseClusterState55(String responseStr, EsTable esTable) throws DdlException, AnalysisException, ExternalDataSourceException { JSONObject jsonObject = new JSONObject(responseStr); String clusterName = jsonObject.getString("cluster_name"); JSONObject nodesMap = jsonObject.getJSONObject("nodes"); - + // we build the doc value context for fields maybe used for scanning + // "properties": { + // "city": { + // "type": "text", // text field does not have docvalue + // "fields": { + // "raw": { + // "type": "keyword" + // } + // } + // } + // } + // then the docvalue context provided the mapping between the select field and real request field : + // {"city": "city.raw"} JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices"); + JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName()); + if (esTable.isDocValueScanEnable() && indexMetaMap != null) { + JSONObject mappings = indexMetaMap.optJSONObject("mappings"); + JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType()); + JSONObject schema = rootSchema.optJSONObject("properties"); + List cols = esTable.getFullSchema(); + for (Column col : cols) { + String colName = col.getName(); + if (!schema.has(colName)) { + continue; + } + JSONObject fieldObject = schema.optJSONObject(colName); + String fieldType = fieldObject.optString("type"); + if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) { + JSONObject fieldsObject = fieldObject.optJSONObject("fields"); + if (fieldsObject != null) { + for (String key : fieldsObject.keySet()) { + JSONObject innerTypeObject = fieldsObject.optJSONObject(key); + if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) { + continue; + } + if (innerTypeObject.has("doc_values")) { + boolean docValue = innerTypeObject.getBoolean("doc_values"); + if (docValue) { + esTable.addDocValueField(colName, colName); + } + } else { + // a : {c : {}} -> a -> a.c + esTable.addDocValueField(colName, colName + "." + key); + } + } + } + // skip this field + continue; + } + // set doc_value = false manually + if (fieldObject.has("doc_values")) { + boolean docValue = fieldObject.optBoolean("doc_values"); + if (!docValue) { + continue; + } + } + esTable.addDocValueField(colName, colName); + } + } + JSONObject indicesRoutingMap = jsonObject.getJSONObject("routing_table").getJSONObject("indices"); - EsTableState esTableState = new EsTableState(); + EsTableState esTableState = new EsTableState(); PartitionInfo partitionInfo = null; if (esTable.getPartitionInfo() != null) { if (esTable.getPartitionInfo() instanceof RangePartitionInfo) { @@ -196,30 +259,32 @@ public class EsStateStore extends Daemon { idx++; } sb.append(")"); - LOG.debug("begin to parse es table [{}] state from cluster state," + LOG.debug("begin to parse es table [{}] state from cluster state," + " with partition info [{}]", esTable.getName(), sb.toString()); } } else if (esTable.getPartitionInfo() instanceof SinglePartitionInfo) { - LOG.debug("begin to parse es table [{}] state from cluster state, " + LOG.debug("begin to parse es table [{}] state from cluster state, " + "with no partition info", esTable.getName()); } else { - throw new ExternalDataSourceException("es table only support range partition, " - + "but current partition type is " - + esTable.getPartitionInfo().getType()); + throw new ExternalDataSourceException("es table only support range partition, " + + "but current partition type is " + + esTable.getPartitionInfo().getType()); } } + + for (String indexName : indicesRoutingMap.keySet()) { - EsIndexState indexState = EsIndexState.parseIndexStateV55(indexName, - indicesRoutingMap, nodesMap, + EsIndexState indexState = EsIndexState.parseIndexStateV55(indexName, + indicesRoutingMap, nodesMap, indicesMetaMap, partitionInfo); esTableState.addIndexState(indexName, indexState); LOG.debug("add index {} to es table {}", indexState, esTable.getName()); } - + if (partitionInfo instanceof RangePartitionInfo) { // sort the index state according to partition key and then add to range map List esIndexStates = esTableState.getPartitionedIndexStates().values() - .stream().collect(Collectors.toList()); + .stream().collect(Collectors.toList()); Collections.sort(esIndexStates, new Comparator() { @Override public int compare(EsIndexState o1, EsIndexState o2) { @@ -229,11 +294,11 @@ public class EsStateStore extends Daemon { long partitionId = 0; for (EsIndexState esIndexState : esIndexStates) { Range range = ((RangePartitionInfo) partitionInfo).handleNewSinglePartitionDesc( - esIndexState.getPartitionDesc(), + esIndexState.getPartitionDesc(), partitionId); esTableState.addPartition(esIndexState.getIndexName(), partitionId); esIndexState.setPartitionId(partitionId); - ++ partitionId; + ++partitionId; LOG.debug("add parition to es table [{}] with range [{}]", esTable.getName(), range); } } diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 6c55d35e4d..6ba1f1791c 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -119,7 +119,11 @@ public class EsScanNode extends ScanNode { properties.put(EsTable.PASSWORD, table.getPasswd()); TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt()); esScanNode.setProperties(properties); + if (table.isDocValueScanEnable()) { + esScanNode.setDocvalue_context(table.docValueContext()); + } msg.es_scan_node = esScanNode; + } // TODO(ygl) assign backend that belong to the same cluster diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 908fe4dc04..9b15b30a48 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -198,6 +198,24 @@ struct TBrokerScanNode { struct TEsScanNode { 1: required Types.TTupleId tuple_id 2: optional map properties + // used to indicate which fields can get from ES docavalue + // because elasticsearch can have "fields" feature, field can have + // two or more types, the first type maybe have not docvalue but other + // can have, such as (text field not have docvalue, but keyword can have): + // "properties": { + // "city": { + // "type": "text", + // "fields": { + // "raw": { + // "type": "keyword" + // } + // } + // } + // } + // then the docvalue context provided the mapping between the select field and real request field : + // {"city": "city.raw"} + // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw` + 3: optional map docvalue_context } struct TMiniLoadEtlFunction {