Optimize Doris On Elasticsearch performance (#2237)

Pure DocValue optimization for doris-on-es

Future todo:
Today, for every tuple scan we check if pure_docvalue is enabled, this is not reasonable,  should check pure_docvalue enabled for one whole scan outside,  I will add this todo in future
This commit is contained in:
Yunfeng,Wu
2019-12-04 12:57:45 +08:00
committed by Mingyu Chen
parent f0c0a715d1
commit 0f00febd21
19 changed files with 447 additions and 122 deletions

2
.gitignore vendored
View File

@ -20,4 +20,4 @@ thirdparty/installed
/Default/
be/cmake-build
be/cmake-build-debug
be/cmake-build-release
be/cmake-build-release

View File

@ -27,7 +27,7 @@
#include "exec/es/es_scroll_query.h"
namespace doris {
const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields";
const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";
const std::string REQUEST_SCROLL_PATH = "_scroll";
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";

View File

@ -105,28 +105,52 @@ static const string ERROR_COL_DATA_IS_ARRAY = "Data source returned an array for
} while (false)
#define RETURN_ERROR_IF_PARSING_FAILED(result, type) \
#define RETURN_ERROR_IF_PARSING_FAILED(result, col, type) \
do { \
if (result != StringParser::PARSE_SUCCESS) { \
return Status::RuntimeError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type))); \
std::stringstream ss; \
ss << "Expected value of type: " \
<< type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Docuemnt source slice is : " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} \
} while (false)
#define RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type) \
do { \
std::stringstream ss; \
ss << "Expected value of type: " \
<< type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Docuemnt slice is : " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} while (false)
template <typename T>
static Status get_int_value(const rapidjson::Value &col, PrimitiveType type, void* slot) {
static Status get_int_value(const rapidjson::Value &col, PrimitiveType type, void* slot, bool pure_doc_value) {
if (col.IsNumber()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col.GetInt() : col.GetInt64());
return Status::OK();
}
if (pure_doc_value && col.IsArray()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
return Status::OK();
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
StringParser::ParseResult result;
const std::string& val = col.GetString();
size_t len = col.GetStringLength();
T v = StringParser::string_to_int<T>(val.c_str(), len, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, type);
RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
if (sizeof(T) < 16) {
*reinterpret_cast<T*>(slot) = v;
@ -139,13 +163,18 @@ static Status get_int_value(const rapidjson::Value &col, PrimitiveType type, voi
}
template <typename T>
static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, void* slot) {
static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, void* slot, bool pure_doc_value) {
DCHECK(sizeof(T) == 4 || sizeof(T) == 8);
if (col.IsNumber()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col.GetFloat() : col.GetDouble());
return Status::OK();
}
if (pure_doc_value && col.IsArray()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
return Status::OK();
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
@ -153,7 +182,7 @@ static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, v
const std::string& val = col.GetString();
size_t len = col.GetStringLength();
T v = StringParser::string_to_float<T>(val.c_str(), len, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, type);
RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
*reinterpret_cast<T*>(slot) = v;
return Status::OK();
@ -231,19 +260,19 @@ int ScrollParser::get_total() {
return _total;
}
Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
Tuple* tuple, MemPool* tuple_pool, bool* line_eof) {
Tuple* tuple, MemPool* tuple_pool, bool* line_eof, const std::map<std::string, std::string>& docvalue_context) {
*line_eof = true;
if (_size <= 0 || _line_index >= _size) {
return Status::OK();
}
const rapidjson::Value& obj = _inner_hits_node[_line_index++];
const rapidjson::Value& line = obj[FIELD_SOURCE];
if (!line.IsObject()) {
return Status::InternalError("Parse inner hits failed");
bool pure_doc_value = false;
if (obj.HasMember("fields")) {
pure_doc_value = true;
}
const rapidjson::Value& line = obj.HasMember(FIELD_SOURCE) ? obj[FIELD_SOURCE] : obj["fields"];
tuple->init(tuple_desc->byte_size());
for (int i = 0; i < tuple_desc->slots().size(); ++i) {
@ -253,7 +282,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
continue;
}
const char* col_name = slot_desc->col_name().c_str();
// if pure_doc_value enabled, docvalue_context must contains the key
// todo: need move all `pure_docvalue` for every tuple outside fill_tuple
// should check pure_docvalue for one table scan not every tuple
const char* col_name = pure_doc_value ? docvalue_context.at(slot_desc->col_name()).c_str() : slot_desc->col_name().c_str();
rapidjson::Value::ConstMemberIterator itr = line.FindMember(col_name);
if (itr == line.MemberEnd()) {
tuple->set_null(slot_desc->null_indicator_offset());
@ -268,15 +301,23 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
switch (type) {
case TYPE_CHAR:
case TYPE_VARCHAR: {
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
// sometimes elasticsearch user post some not-string value to Elasticsearch Index.
// because of reading value from _source, we can not process all json type and then just transfer the value to original string representation
// this may be a tricky, but we can workaround this issue
std::string val;
if (!col.IsString()) {
val = json_value_to_string(col);
if (pure_doc_value) {
if (!col[0].IsString()) {
val = json_value_to_string(col[0]);
} else {
val = col[0].GetString();
}
} else {
val = col.GetString();
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
}
size_t val_size = val.length();
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size));
@ -292,7 +333,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_TINYINT: {
Status status = get_int_value<int8_t>(col, type, slot);
Status status = get_int_value<int8_t>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -300,7 +341,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_SMALLINT: {
Status status = get_int_value<int16_t>(col, type, slot);
Status status = get_int_value<int16_t>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -308,7 +349,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_INT: {
Status status = get_int_value<int32_t>(col, type, slot);
Status status = get_int_value<int32_t>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -316,7 +357,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_BIGINT: {
Status status = get_int_value<int64_t>(col, type, slot);
Status status = get_int_value<int64_t>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -324,7 +365,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_LARGEINT: {
Status status = get_int_value<__int128>(col, type, slot);
Status status = get_int_value<__int128>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -332,7 +373,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_DOUBLE: {
Status status = get_float_value<double>(col, type, slot);
Status status = get_float_value<double>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -340,7 +381,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
}
case TYPE_FLOAT: {
Status status = get_float_value<float>(col, type, slot);
Status status = get_float_value<float>(col, type, slot, pure_doc_value);
if (!status.ok()) {
return status;
}
@ -357,6 +398,10 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
*reinterpret_cast<int8_t*>(slot) = col.GetInt();
break;
}
if (pure_doc_value && col.IsArray()) {
*reinterpret_cast<int8_t*>(slot) = col[0].GetBool();
break;
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
@ -366,7 +411,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
StringParser::ParseResult result;
bool b =
StringParser::string_to_bool(val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, type);
RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
*reinterpret_cast<int8_t*>(slot) = b;
break;
}
@ -375,7 +420,19 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
case TYPE_DATETIME: {
if (col.IsNumber()) {
if (!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.GetInt64(), "+08:00")) {
return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type)));
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
if (type == TYPE_DATE) {
reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
} else {
reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
}
break;
}
if (pure_doc_value && col.IsArray()) {
if (!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col[0].GetInt64(), "+08:00")) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
if (type == TYPE_DATE) {
@ -393,11 +450,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
const std::string& val = col.GetString();
size_t val_size = col.GetStringLength();
if (!ts_slot->from_date_str(val.c_str(), val_size)) {
return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type)));
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
if (ts_slot->year() < 1900) {
return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, type_to_string(type)));
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
if (type == TYPE_DATE) {

View File

@ -35,7 +35,7 @@ public:
Status parse(const std::string& scroll_result);
Status fill_tuple(const TupleDescriptor* _tuple_desc, Tuple* tuple,
MemPool* mem_pool, bool* line_eof);
MemPool* mem_pool, bool* line_eof, const std::map<std::string, std::string>& docvalue_context);
const std::string& get_scroll_id();
int get_total();

View File

@ -17,7 +17,6 @@
#include "exec/es/es_scroll_query.h"
#include <boost/algorithm/string/join.hpp>
#include <sstream>
#include "common/logging.h"
@ -64,7 +63,7 @@ std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr
std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields,
std::vector<EsPredicate*>& predicates) {
std::vector<EsPredicate*>& predicates, const std::map<std::string, std::string>& docvalue_context) {
rapidjson::Document es_query_dsl;
rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator();
es_query_dsl.SetObject();
@ -75,15 +74,39 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
BooleanQueryBuilder::to_query(predicates, &scratch_document, &query_node);
// note: add `query` for this value....
es_query_dsl.AddMember("query", query_node, allocator);
// just filter the selected fields for reducing the network cost
if (fields.size() > 0) {
rapidjson::Value source_node(rapidjson::kArrayType);
for (auto iter = fields.begin(); iter != fields.end(); iter++) {
rapidjson::Value field(iter->c_str(), allocator);
bool pure_docvalue = true;
// check docvalue sacan optimization
if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
pure_docvalue = false;
} else {
for (auto& select_field : fields) {
if (docvalue_context.find(select_field) == docvalue_context.end()) {
pure_docvalue = false;
break;
}
}
}
rapidjson::Value source_node(rapidjson::kArrayType);
if (pure_docvalue) {
for (auto& select_field : fields) {
rapidjson::Value field(docvalue_context.at(select_field).c_str(), allocator);
source_node.PushBack(field, allocator);
}
} else {
for (auto& select_field : fields) {
rapidjson::Value field(select_field.c_str(), allocator);
source_node.PushBack(field, allocator);
}
}
// just filter the selected fields for reducing the network cost
if (pure_docvalue) {
es_query_dsl.AddMember("stored_fields", "_none_", allocator);
es_query_dsl.AddMember("docvalue_fields", source_node, allocator);
} else {
es_query_dsl.AddMember("_source", source_node, allocator);
}
int size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents

View File

@ -35,6 +35,6 @@ public:
// @note: predicates should processed before pass it to this method,
// tie breaker for predicate wheather can push down es can reference the push-down filters
static std::string build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields, std::vector<EsPredicate*>& predicates);
const std::vector<std::string>& fields, std::vector<EsPredicate*>& predicates, const std::map<std::string, std::string>& docvalue_context);
};
}

View File

@ -55,6 +55,10 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
// use TEsScanNode
_properties = tnode.es_scan_node.properties;
if (tnode.es_scan_node.__isset.docvalue_context) {
_docvalue_context = tnode.es_scan_node.docvalue_context;
}
return Status::OK();
}
@ -333,7 +337,7 @@ Status EsHttpScanNode::scanner_scan(
memset(tuple, 0, _tuple_desc->num_null_bytes());
// Get from scanner
RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof));
RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof, _docvalue_context));
if (scanner_eof) {
continue;
}
@ -426,7 +430,7 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Stat
properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size());
properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts);
properties[ESScanReader::KEY_QUERY]
= ESScrollQueryBuilder::build(properties, _column_names, _predicates);
= ESScrollQueryBuilder::build(properties, _column_names, _predicates, _docvalue_context);
// start scanner to scan
std::unique_ptr<EsHttpScanner> scanner(new EsHttpScanner(

View File

@ -99,6 +99,7 @@ private:
std::vector<std::thread> _scanner_threads;
std::vector<std::promise<Status>> _scanners_status;
std::map<std::string, std::string> _properties;
std::map<std::string, std::string> _docvalue_context;
std::vector<TScanRangeParams> _scan_ranges;
std::vector<std::string> _column_names;

View File

@ -88,7 +88,7 @@ Status EsHttpScanner::open() {
return Status::OK();
}
Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, const std::map<std::string, std::string>& docvalue_context) {
SCOPED_TIMER(_read_timer);
if (_line_eof && _batch_eof) {
*eof = true;
@ -107,7 +107,7 @@ Status EsHttpScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
RETURN_IF_ERROR(_es_scroll_parser->fill_tuple(
_tuple_desc, tuple, tuple_pool, &_line_eof));
_tuple_desc, tuple, tuple_pool, &_line_eof, docvalue_context));
if (!_line_eof) {
break;
}

View File

@ -1,3 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@ -66,7 +67,7 @@ public:
Status open();
Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof);
Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, const std::map<std::string, std::string>& docvalue_context);
void close();

View File

@ -222,7 +222,8 @@ TEST_F(MockESServerTest, workflow) {
props[ESScanReader::KEY_SHARD] = "0";
props[ESScanReader::KEY_BATCH_SIZE] = "1";
std::vector<EsPredicate*> predicates;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates);
std::map<std::string, std::string> docvalue_context;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates, docvalue_context);
ESScanReader reader(target, props);
auto st = reader.open();
ASSERT_TRUE(st.ok());

View File

@ -3973,6 +3973,7 @@ public class Catalog {
sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n");
sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\"\n");
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\"\n");
sb.append(")");
}
sb.append(";");

View File

@ -18,37 +18,44 @@
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.external.EsMajorVersion;
import org.apache.doris.external.EsTableState;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Strings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.Adler32;
public class EsTable extends Table {
private static final Logger LOG = LogManager.getLogger(EsTable.class);
public static final Set<String> DEFAULT_DOCVALUE_DISABLED_FIELDS = new HashSet<>(Arrays.asList("text"));
public static final String HOSTS = "hosts";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String INDEX = "index";
public static final String TYPE = "type";
public static final String TRANSPORT = "transport";
public static final String VERSION = "version";
public static final String TRANSPORT_HTTP = "http";
public static final String TRANSPORT_THRIFT = "thrift";
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
private String hosts;
private String[] seeds;
@ -61,18 +68,56 @@ public class EsTable extends Table {
// partition list is got from es cluster dynamically and is saved in esTableState
private PartitionInfo partitionInfo;
private EsTableState esTableState;
private boolean enableDocValueScan = false;
public EsMajorVersion majorVersion = null;
private Map<String, String> tableContext = new HashMap<>();
// used to indicate which fields can get from ES docavalue
// because elasticsearch can have "fields" feature, field can have
// two or more types, the first type maybe have not docvalue but other
// can have, such as (text field not have docvalue, but keyword can have):
// "properties": {
// "city": {
// "type": "text",
// "fields": {
// "raw": {
// "type": "keyword"
// }
// }
// }
// }
// then the docvalue context provided the mapping between the select field and real request field :
// {"city": "city.raw"}
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
private Map<String, String> docValueContext = new HashMap<>();
public EsTable() {
super(TableType.ELASTICSEARCH);
}
public EsTable(long id, String name, List<Column> schema,
Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException {
Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException {
super(id, name, TableType.ELASTICSEARCH, schema);
this.partitionInfo = partitionInfo;
validate(properties);
}
public void addDocValueField(String name, String fieldsName) {
docValueContext.put(name, fieldsName);
}
public Map<String, String> docValueContext() {
return docValueContext;
}
public boolean isDocValueScanEnable() {
return enableDocValueScan;
}
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
throw new DdlException("Please set properties of elasticsearch table, "
@ -104,6 +149,29 @@ public class EsTable extends Table {
}
indexName = properties.get(INDEX).trim();
// Explicit setting for cluster version to avoid detecting version failure
if (properties.containsKey(VERSION)) {
try {
majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim());
} catch (Exception e) {
throw new DdlException("fail to parse ES major version, version= "
+ properties.get(VERSION).trim() + ", shoud be like '6.5.3' ");
}
}
// Explicit setting for cluster version to avoid detecting version failure
if (properties.containsKey(DOC_VALUE_SCAN)) {
try {
enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim());
} catch (Exception e) {
throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= "
+ properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
+ " shoud be like 'true' or 'false', value should be double quotation marks");
}
} else {
enableDocValueScan = false;
}
if (!Strings.isNullOrEmpty(properties.get(TYPE))
&& !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
mappingType = properties.get(TYPE).trim();
@ -116,6 +184,16 @@ public class EsTable extends Table {
+ " but value is " + transport);
}
}
tableContext.put("hosts", hosts);
tableContext.put("userName", userName);
tableContext.put("passwd", passwd);
tableContext.put("indexName", indexName);
tableContext.put("mappingType", mappingType);
tableContext.put("transport", transport);
if (majorVersion != null) {
tableContext.put("majorVersion", majorVersion.toString());
}
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
}
public TTableDescriptor toThrift() {
@ -137,18 +215,24 @@ public class EsTable extends Table {
adler32.update(name.getBytes(charsetName));
// type
adler32.update(type.name().getBytes(charsetName));
// host
adler32.update(hosts.getBytes(charsetName));
// username
adler32.update(userName.getBytes(charsetName));
// passwd
adler32.update(passwd.getBytes(charsetName));
// mysql db
adler32.update(indexName.getBytes(charsetName));
// mysql table
adler32.update(mappingType.getBytes(charsetName));
// transport
adler32.update(transport.getBytes(charsetName));
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_68) {
for (Map.Entry<String, String> entry : tableContext.entrySet()) {
adler32.update(entry.getValue().getBytes(charsetName));
}
} else {
// host
adler32.update(hosts.getBytes(charsetName));
// username
adler32.update(userName.getBytes(charsetName));
// passwd
adler32.update(passwd.getBytes(charsetName));
// index name
adler32.update(indexName.getBytes(charsetName));
// mappingType
adler32.update(mappingType.getBytes(charsetName));
// transport
adler32.update(transport.getBytes(charsetName));
}
} catch (UnsupportedEncodingException e) {
LOG.error("encoding error", e);
return -1;
@ -160,34 +244,75 @@ public class EsTable extends Table {
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, hosts);
Text.writeString(out, userName);
Text.writeString(out, passwd);
Text.writeString(out, indexName);
Text.writeString(out, mappingType);
out.writeInt(tableContext.size());
for (Map.Entry<String, String> entry : tableContext.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
Text.writeString(out, partitionInfo.getType().name());
partitionInfo.write(out);
Text.writeString(out, transport);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
hosts = Text.readString(in);
seeds = hosts.split(",");
userName = Text.readString(in);
passwd = Text.readString(in);
indexName = Text.readString(in);
mappingType = Text.readString(in);
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
partitionInfo = SinglePartitionInfo.read(in);
} else if (partType == PartitionType.RANGE) {
partitionInfo = RangePartitionInfo.read(in);
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_68) {
int size = in.readInt();
for (int i = 0; i < size; ++i) {
String key = Text.readString(in);
String value = Text.readString(in);
tableContext.put(key, value);
}
hosts = tableContext.get("hosts");
seeds = hosts.split(",");
userName = tableContext.get("userName");
passwd = tableContext.get("passwd");
indexName = tableContext.get("indexName");
mappingType = tableContext.get("mappingType");
transport = tableContext.get("transport");
if (tableContext.containsKey("majorVersion")) {
try {
majorVersion = EsMajorVersion.parse(tableContext.get("majorVersion"));
} catch (Exception e) {
majorVersion = EsMajorVersion.V_5_X;
}
}
enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan"));
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
partitionInfo = SinglePartitionInfo.read(in);
} else if (partType == PartitionType.RANGE) {
partitionInfo = RangePartitionInfo.read(in);
} else {
throw new IOException("invalid partition type: " + partType);
}
} else {
throw new IOException("invalid partition type: " + partType);
hosts = Text.readString(in);
seeds = hosts.split(",");
userName = Text.readString(in);
passwd = Text.readString(in);
indexName = Text.readString(in);
mappingType = Text.readString(in);
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
partitionInfo = SinglePartitionInfo.read(in);
} else if (partType == PartitionType.RANGE) {
partitionInfo = RangePartitionInfo.read(in);
} else {
throw new IOException("invalid partition type: " + partType);
}
transport = Text.readString(in);
// for upgrading write
tableContext.put("hosts", hosts);
tableContext.put("userName", userName);
tableContext.put("passwd", passwd);
tableContext.put("indexName", indexName);
tableContext.put("mappingType", mappingType);
tableContext.put("transport", transport);
tableContext.put("enableDocValueScan", "false");
}
transport = Text.readString(in);
}
public String getHosts() {

View File

@ -38,5 +38,5 @@ public class FeConstants {
// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_67;
public static int meta_version = FeMetaVersion.VERSION_CURRENT;
}

View File

@ -138,10 +138,16 @@ public final class FeMetaVersion {
public static final int VERSION_63 = 63;
// for table create time
public static final int VERSION_64 = 64;
// support sql mode, change sql_mode from string to long
public static final int VERSION_65 = 65;
// routine load/stream load persist session variables
public static final int VERSION_66 = 66;
// load_mem_limit session variable
public static final int VERSION_67 = 67;
// for es table context
public static final int VERSION_68 = 68;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_68;
}

View File

@ -93,6 +93,25 @@ public class EsRestClient {
}
/**
*
* Get the Elasticsearch cluster version
*
* @return
*/
public EsMajorVersion version() {
Map<String, String> versionMap = get("/", "version");
EsMajorVersion majorVersion;
try {
majorVersion = EsMajorVersion.parse(versionMap.get("version"));
} catch (Exception e) {
LOG.warn("detect es version failure on node [{}]", currentNode);
return EsMajorVersion.V_5_X;
}
return majorVersion;
}
/**
* execute request for specific path
*
@ -110,7 +129,7 @@ public class EsRestClient {
// maybe should add HTTP schema to the address
// actually, at this time we can only process http protocol
// NOTE. currentNode may have some spaces.
// NOTE. currentNode may have some spaces.
// User may set a config like described below:
// hosts: "http://192.168.0.1:8200, http://192.168.0.2:8200"
// then currentNode will be "http://192.168.0.1:8200", " http://192.168.0.2:8200"

View File

@ -31,22 +31,11 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.Daemon;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import okhttp3.Authenticator;
import okhttp3.Call;
import okhttp3.Credentials;
@ -54,22 +43,31 @@ import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.Route;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.json.JSONObject;
/**
* it is used to call es api to get shard allocation state
*
*/
public class EsStateStore extends Daemon {
private static final Logger LOG = LogManager.getLogger(EsStateStore.class);
private Map<Long, EsTable> esTables;
public EsStateStore() {
super(Config.es_state_sync_interval_second * 1000);
esTables = Maps.newConcurrentMap();
}
public void registerTable(EsTable esTable) {
if (Catalog.isCheckpointThread()) {
return;
@ -77,17 +75,22 @@ public class EsStateStore extends Daemon {
esTables.put(esTable.getId(), esTable);
LOG.info("register a new table [{}] to sync list", esTable.toString());
}
public void deRegisterTable(long tableId) {
esTables.remove(tableId);
LOG.info("deregister table [{}] from sync list", tableId);
}
protected void runOneCycle() {
for (EsTable esTable : esTables.values()) {
try {
EsRestClient client = new EsRestClient(esTable.getSeeds(),
esTable.getUserName(), esTable.getPasswd());
// if user not specify the es version, try to get the remote cluster versoin
// in the future, we maybe need this version
if (esTable.majorVersion == null) {
esTable.majorVersion = client.version();
}
String indexMetaData = client.getIndexMetaData(esTable.getIndexName());
if (indexMetaData == null) {
continue;
@ -106,16 +109,19 @@ public class EsStateStore extends Daemon {
}
}
}
// should call this method to init the state store after loading image
// the rest of tables will be added or removed by replaying edit log
// when fe is start to load image, should call this method to init the state store
public void loadTableFromCatalog() {
if (Catalog.isCheckpointThread()) {
return;
}
List<Long> dbIds = Catalog.getCurrentCatalog().getDbIds();
for(Long dbId : dbIds) {
for (Long dbId : dbIds) {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
List<Table> tables = database.getTables();
for (Table table : tables) {
if (table.getType() == TableType.ELASTICSEARCH) {
@ -124,7 +130,7 @@ public class EsStateStore extends Daemon {
}
}
}
private EsTableState loadEsIndexMetadataV55(final EsTable esTable) {
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
clientBuilder.authenticator(new Authenticator() {
@ -136,7 +142,7 @@ public class EsStateStore extends Daemon {
});
String[] seeds = esTable.getSeeds();
for (String seed : seeds) {
String url = seed + "/_cluster/state?indices="
String url = seed + "/_cluster/state?indices="
+ esTable.getIndexName()
+ "&metric=routing_table,nodes,metadata&expand_wildcards=open";
String basicAuth = "";
@ -155,8 +161,7 @@ public class EsStateStore extends Daemon {
if (esTableState != null) {
return esTableState;
}
}
catch (Exception e) {
} catch (Exception e) {
LOG.warn("errors while parse response msg {}", responseStr, e);
}
} else {
@ -168,17 +173,75 @@ public class EsStateStore extends Daemon {
}
return null;
}
@VisibleForTesting
public EsTableState parseClusterState55(String responseStr, EsTable esTable)
public EsTableState parseClusterState55(String responseStr, EsTable esTable)
throws DdlException, AnalysisException, ExternalDataSourceException {
JSONObject jsonObject = new JSONObject(responseStr);
String clusterName = jsonObject.getString("cluster_name");
JSONObject nodesMap = jsonObject.getJSONObject("nodes");
// we build the doc value context for fields maybe used for scanning
// "properties": {
// "city": {
// "type": "text", // text field does not have docvalue
// "fields": {
// "raw": {
// "type": "keyword"
// }
// }
// }
// }
// then the docvalue context provided the mapping between the select field and real request field :
// {"city": "city.raw"}
JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices");
JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName());
if (esTable.isDocValueScanEnable() && indexMetaMap != null) {
JSONObject mappings = indexMetaMap.optJSONObject("mappings");
JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType());
JSONObject schema = rootSchema.optJSONObject("properties");
List<Column> cols = esTable.getFullSchema();
for (Column col : cols) {
String colName = col.getName();
if (!schema.has(colName)) {
continue;
}
JSONObject fieldObject = schema.optJSONObject(colName);
String fieldType = fieldObject.optString("type");
if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
JSONObject fieldsObject = fieldObject.optJSONObject("fields");
if (fieldsObject != null) {
for (String key : fieldsObject.keySet()) {
JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) {
continue;
}
if (innerTypeObject.has("doc_values")) {
boolean docValue = innerTypeObject.getBoolean("doc_values");
if (docValue) {
esTable.addDocValueField(colName, colName);
}
} else {
// a : {c : {}} -> a -> a.c
esTable.addDocValueField(colName, colName + "." + key);
}
}
}
// skip this field
continue;
}
// set doc_value = false manually
if (fieldObject.has("doc_values")) {
boolean docValue = fieldObject.optBoolean("doc_values");
if (!docValue) {
continue;
}
}
esTable.addDocValueField(colName, colName);
}
}
JSONObject indicesRoutingMap = jsonObject.getJSONObject("routing_table").getJSONObject("indices");
EsTableState esTableState = new EsTableState();
EsTableState esTableState = new EsTableState();
PartitionInfo partitionInfo = null;
if (esTable.getPartitionInfo() != null) {
if (esTable.getPartitionInfo() instanceof RangePartitionInfo) {
@ -196,30 +259,32 @@ public class EsStateStore extends Daemon {
idx++;
}
sb.append(")");
LOG.debug("begin to parse es table [{}] state from cluster state,"
LOG.debug("begin to parse es table [{}] state from cluster state,"
+ " with partition info [{}]", esTable.getName(), sb.toString());
}
} else if (esTable.getPartitionInfo() instanceof SinglePartitionInfo) {
LOG.debug("begin to parse es table [{}] state from cluster state, "
LOG.debug("begin to parse es table [{}] state from cluster state, "
+ "with no partition info", esTable.getName());
} else {
throw new ExternalDataSourceException("es table only support range partition, "
+ "but current partition type is "
+ esTable.getPartitionInfo().getType());
throw new ExternalDataSourceException("es table only support range partition, "
+ "but current partition type is "
+ esTable.getPartitionInfo().getType());
}
}
for (String indexName : indicesRoutingMap.keySet()) {
EsIndexState indexState = EsIndexState.parseIndexStateV55(indexName,
indicesRoutingMap, nodesMap,
EsIndexState indexState = EsIndexState.parseIndexStateV55(indexName,
indicesRoutingMap, nodesMap,
indicesMetaMap, partitionInfo);
esTableState.addIndexState(indexName, indexState);
LOG.debug("add index {} to es table {}", indexState, esTable.getName());
}
if (partitionInfo instanceof RangePartitionInfo) {
// sort the index state according to partition key and then add to range map
List<EsIndexState> esIndexStates = esTableState.getPartitionedIndexStates().values()
.stream().collect(Collectors.toList());
.stream().collect(Collectors.toList());
Collections.sort(esIndexStates, new Comparator<EsIndexState>() {
@Override
public int compare(EsIndexState o1, EsIndexState o2) {
@ -229,11 +294,11 @@ public class EsStateStore extends Daemon {
long partitionId = 0;
for (EsIndexState esIndexState : esIndexStates) {
Range<PartitionKey> range = ((RangePartitionInfo) partitionInfo).handleNewSinglePartitionDesc(
esIndexState.getPartitionDesc(),
esIndexState.getPartitionDesc(),
partitionId);
esTableState.addPartition(esIndexState.getIndexName(), partitionId);
esIndexState.setPartitionId(partitionId);
++ partitionId;
++partitionId;
LOG.debug("add parition to es table [{}] with range [{}]", esTable.getName(), range);
}
}

View File

@ -119,7 +119,11 @@ public class EsScanNode extends ScanNode {
properties.put(EsTable.PASSWORD, table.getPasswd());
TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
esScanNode.setProperties(properties);
if (table.isDocValueScanEnable()) {
esScanNode.setDocvalue_context(table.docValueContext());
}
msg.es_scan_node = esScanNode;
}
// TODO(ygl) assign backend that belong to the same cluster

View File

@ -198,6 +198,24 @@ struct TBrokerScanNode {
struct TEsScanNode {
1: required Types.TTupleId tuple_id
2: optional map<string,string> properties
// used to indicate which fields can get from ES docavalue
// because elasticsearch can have "fields" feature, field can have
// two or more types, the first type maybe have not docvalue but other
// can have, such as (text field not have docvalue, but keyword can have):
// "properties": {
// "city": {
// "type": "text",
// "fields": {
// "raw": {
// "type": "keyword"
// }
// }
// }
// }
// then the docvalue context provided the mapping between the select field and real request field :
// {"city": "city.raw"}
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
3: optional map<string, string> docvalue_context
}
struct TMiniLoadEtlFunction {