Add EsScanNode (#450)

This commit is contained in:
Salieri1969
2019-01-17 17:59:33 +08:00
committed by ZHAO Chun
parent 723ef04f51
commit 4d5f92cce7
16 changed files with 1103 additions and 4 deletions

View File

@ -62,6 +62,7 @@ set(EXEC_FILES
mysql_scanner.cpp
csv_scan_node.cpp
csv_scanner.cpp
es_scan_node.cpp
spill_sort_node.cc
union_node.cpp
union_node_ir.cpp

View File

@ -0,0 +1,792 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "es_scan_node.h"
#include <string>
#include <boost/algorithm/string.hpp>
#include <gutil/strings/substitute.h>
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Exprs_types.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
#include "runtime/client_cache.h"
#include "util/runtime_profile.h"
#include "util/debug_util.h"
#include "service/backend_options.h"
#include "olap/olap_common.h"
#include "olap/utils.h"
#include "exprs/expr_context.h"
#include "exprs/expr.h"
#include "exprs/in_predicate.h"
#include "exprs/slot_ref.h"
namespace doris {
// $0 = column type (e.g. INT)
const string ERROR_INVALID_COL_DATA = "Data source returned inconsistent column data. "
"Expected value of type $0 based on column metadata. This likely indicates a "
"problem with the data source library.";
const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to allocate "
"$1 bytes for $2.";
EsScanNode::EsScanNode(
ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs) :
ScanNode(pool, tnode, descs),
_tuple_id(tnode.es_scan_node.tuple_id),
_scan_range_idx(0) {
if (tnode.es_scan_node.__isset.properties) {
_properties = tnode.es_scan_node.properties;
}
}
EsScanNode::~EsScanNode() {
}
Status EsScanNode::prepare(RuntimeState* state) {
VLOG(1) << "EsScanNode::Prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
std::stringstream ss;
ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
_env = state->exec_env();
return Status::OK;
}
Status EsScanNode::open(RuntimeState* state) {
VLOG(1) << "EsScanNode::Open";
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
// TExtOpenParams.row_schema
vector<TExtColumnDesc> cols;
for (const SlotDescriptor* slot : _tuple_desc->slots()) {
TExtColumnDesc col;
col.__set_name(slot->col_name());
col.__set_type(slot->type().to_thrift());
cols.emplace_back(std::move(col));
}
TExtTableSchema row_schema;
row_schema.cols = std::move(cols);
row_schema.__isset.cols = true;
// TExtOpenParams.predicates
vector<vector<TExtPredicate> > predicates;
vector<int> predicate_to_conjunct;
for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
VLOG(1) << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string();
vector<TExtPredicate> disjuncts;
if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), disjuncts)) {
predicates.emplace_back(std::move(disjuncts));
predicate_to_conjunct.push_back(i);
}
}
// open every scan range
vector<int> conjunct_accepted_times(_conjunct_ctxs.size(), 0);
for (int i = 0; i < _scan_ranges.size(); ++i) {
TEsScanRange& es_scan_range = _scan_ranges[i];
if (es_scan_range.es_hosts.empty()) {
std::stringstream ss;
ss << "es fail to open: hosts empty";
LOG(WARNING) << ss.str();
return Status(ss.str());
}
// TExtOpenParams
TExtOpenParams params;
params.__set_query_id(state->query_id());
_properties["index"] = es_scan_range.index;
if (es_scan_range.__isset.type) {
_properties["type"] = es_scan_range.type;
}
_properties["shard_id"] = std::to_string(es_scan_range.shard_id);
params.__set_properties(_properties);
params.__set_row_schema(row_schema);
params.__set_batch_size(state->batch_size());
params.__set_predicates(predicates);
TExtOpenResult result;
// choose an es node, local is the first choice
std::string localhost = BackendOptions::get_localhost();
bool is_success = false;
for (int j = 0; j < 2; ++j) {
for (auto& es_host : es_scan_range.es_hosts) {
if ((j == 0 && es_host.hostname != localhost)
|| (j == 1 && es_host.hostname == localhost)) {
continue;
}
Status status = open_es(es_host, result, params);
if (status.ok()) {
is_success = true;
_addresses.push_back(es_host);
_scan_handles.push_back(result.scan_handle);
if (result.__isset.accepted_conjuncts) {
for (int index : result.accepted_conjuncts) {
conjunct_accepted_times[predicate_to_conjunct[index]]++;
}
}
break;
} else if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) {
// if shard not found, try other nodes
LOG(WARNING) << "shard not found on es node: "
<< ", address=" << es_host
<< ", scan_range_idx=" << i << ", try other nodes";
} else {
LOG(WARNING) << "es open error: scan_range_idx=" << i
<< ", address=" << es_host
<< ", msg=" << status.get_error_msg();
return status;
}
}
if (is_success) {
break;
}
}
if (!is_success) {
std::stringstream ss;
ss << "es open error: scan_range_idx=" << i
<< ", can't find shard on any node";
return Status(ss.str());
}
}
// remove those conjuncts that accepted by all scan ranges
for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) {
int conjunct_index = predicate_to_conjunct[i];
if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) {
_conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
}
}
for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
if (!check_left_conjuncts(_conjunct_ctxs[i]->root())) {
return Status("esquery could only be executed on es, but could not push down to es");
}
}
return Status::OK;
}
Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
VLOG(1) << "EsScanNode::GetNext";
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(materialize_tuple_timer());
// create tuple
MemPool* tuple_pool = row_batch->tuple_data_pool();
int64_t tuple_buffer_size;
uint8_t* tuple_buffer = nullptr;
RETURN_IF_ERROR(row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer));
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
// get batch
TExtGetNextResult result;
RETURN_IF_ERROR(get_next_from_es(result));
VLOG(1) << "es get next success: result=" << apache::thrift::ThriftDebugString(result);
_offsets[_scan_range_idx] += result.rows.num_rows;
// convert
VLOG(1) << "begin to convert: scan_range_idx=" << _scan_range_idx
<< ", num_rows=" << result.rows.num_rows;
vector<TExtColumnData>& cols = result.rows.cols;
// indexes of the next non-null value in the row batch, per column.
vector<int> cols_next_val_idx(_tuple_desc->slots().size(), 0);
for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) {
if (reached_limit()) {
*eos = true;
break;
}
RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, cols_next_val_idx));
TupleRow* tuple_row = row_batch->get_row(row_batch->add_row());
tuple_row->set_tuple(0, tuple);
if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), tuple_row)) {
row_batch->commit_last_row();
tuple = reinterpret_cast<Tuple*>(
reinterpret_cast<uint8_t*>(tuple) + _tuple_desc->byte_size());
++_num_rows_returned;
}
}
VLOG(1) << "finish one batch: num_rows=" << row_batch->num_rows();
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
if (result.__isset.eos && result.eos) {
VLOG(1) << "es finish one scan_range: scan_range_idx=" << _scan_range_idx;
++_scan_range_idx;
}
if (_scan_range_idx == _scan_ranges.size()) {
*eos = true;
}
return Status::OK;
}
Status EsScanNode::close(RuntimeState* state) {
if (is_closed()) return Status::OK;
VLOG(1) << "EsScanNode::Close";
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
SCOPED_TIMER(_runtime_profile->total_time_counter());
for (int i = 0; i < _addresses.size(); ++i) {
TExtCloseParams params;
params.__set_scan_handle(_scan_handles[i]);
TExtCloseResult result;
#ifndef BE_TEST
const TNetworkAddress& address = _addresses[i];
try {
Status status;
ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
ExtDataSourceServiceConnection client(client_cache, address, 10000, &status);
if (!status.ok()) {
LOG(WARNING) << "es create client error: scan_range_idx=" << i
<< ", address=" << address
<< ", msg=" << status.get_error_msg();
return status;
}
try {
VLOG(1) << "es close param=" << apache::thrift::ThriftDebugString(params);
client->close(result, params);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "es close retrying, because: " << e.what();
RETURN_IF_ERROR(client.reopen());
client->close(result, params);
}
} catch (apache::thrift::TException &e) {
std::stringstream ss;
ss << "es close error: scan_range_idx=" << i
<< ", msg=" << e.what();
LOG(WARNING) << ss.str();
return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false);
}
VLOG(1) << "es close result=" << apache::thrift::ThriftDebugString(result);
Status status(result.status);
if (!status.ok()) {
LOG(WARNING) << "es close error: : scan_range_idx=" << i
<< ", msg=" << status.get_error_msg();
return status;
}
#else
TStatus status;
result.__set_status(status);
#endif
}
RETURN_IF_ERROR(ExecNode::close(state));
return Status::OK;
}
void EsScanNode::debug_string(int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "EsScanNode(tupleid=" << _tuple_id;
*out << ")" << std::endl;
for (int i = 0; i < _children.size(); ++i) {
_children[i]->debug_string(indentation_level + 1, out);
}
}
Status EsScanNode::set_scan_ranges(const vector<TScanRangeParams>& scan_ranges) {
for (int i = 0; i < scan_ranges.size(); ++i) {
TScanRangeParams scan_range = scan_ranges[i];
DCHECK(scan_range.scan_range.__isset.es_scan_range);
TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range;
_scan_ranges.push_back(es_scan_range);
}
_offsets.resize(scan_ranges.size(), 0);
return Status::OK;
}
Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params) {
VLOG(1) << "es open param=" << apache::thrift::ThriftDebugString(params);
#ifndef BE_TEST
try {
ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
Status status;
ExtDataSourceServiceConnection client(client_cache, address, 10000, &status);
if (!status.ok()) {
std::stringstream ss;
ss << "es create client error: address=" << address
<< ", msg=" << status.get_error_msg();
return Status(ss.str());
}
try {
client->open(result, params);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "es open retrying, because: " << e.what();
RETURN_IF_ERROR(client.reopen());
client->open(result, params);
}
VLOG(1) << "es open result=" << apache::thrift::ThriftDebugString(result);
return Status(result.status);
} catch (apache::thrift::TException &e) {
std::stringstream ss;
ss << "es open error: address=" << address << ", msg=" << e.what();
return Status(ss.str());
}
#else
TStatus status;
result.__set_status(status);
result.__set_scan_handle("0");
return Status(status);
#endif
}
// legacy conjuncts must not contain match function
bool EsScanNode::check_left_conjuncts(Expr* conjunct) {
if (is_match_func(conjunct)) {
return false;
} else {
int num_children = conjunct->get_num_children();
for (int child_idx = 0; child_idx < num_children; ++child_idx) {
if (!check_left_conjuncts(conjunct->get_child(child_idx))) {
return false;
}
}
return true;
}
}
bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct,
vector<TExtPredicate>& disjuncts) {
if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
if (conjunct->children().size() != 2) {
VLOG(1) << "get disjuncts fail: number of childs is not 2";
return false;
}
SlotRef* slotRef;
TExprOpcode::type op;
Expr* expr;
if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) {
expr = conjunct->get_child(1);
slotRef = (SlotRef*)(conjunct->get_child(0));
op = conjunct->op();
} else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) {
expr = conjunct->get_child(0);
slotRef = (SlotRef*)(conjunct->get_child(1));
op = conjunct->op();
} else {
VLOG(1) << "get disjuncts fail: no SLOT_REF child";
return false;
}
SlotDescriptor* slot_desc = get_slot_desc(slotRef);
if (slot_desc == nullptr) {
VLOG(1) << "get disjuncts fail: slot_desc is null";
return false;
}
TExtLiteral literal;
if (!to_ext_literal(context, expr, &literal)) {
VLOG(1) << "get disjuncts fail: can't get literal, node_type="
<< expr->node_type();
return false;
}
TExtColumnDesc columnDesc;
columnDesc.__set_name(slot_desc->col_name());
columnDesc.__set_type(slot_desc->type().to_thrift());
TExtBinaryPredicate binaryPredicate;
binaryPredicate.__set_col(columnDesc);
binaryPredicate.__set_op(op);
binaryPredicate.__set_value(std::move(literal));
TExtPredicate predicate;
predicate.__set_node_type(TExprNodeType::BINARY_PRED);
predicate.__set_binary_predicate(binaryPredicate);
disjuncts.push_back(std::move(predicate));
return true;
} else if (is_match_func(conjunct)) {
// if this is a function call expr and function name is match, then push
// down it to es
TExtFunction match_function;
match_function.__set_func_name(conjunct->fn().name.function_name);
vector<TExtLiteral> query_conditions;
TExtLiteral literal;
if (!to_ext_literal(context, conjunct->get_child(1), &literal)) {
VLOG(1) << "get disjuncts fail: can't get literal, node_type="
<< conjunct->get_child(1)->node_type();
return false;
}
query_conditions.push_back(std::move(literal));
match_function.__set_values(query_conditions);
TExtPredicate predicate;
predicate.__set_node_type(TExprNodeType::FUNCTION_CALL);
predicate.__set_ext_function(match_function);
disjuncts.push_back(std::move(predicate));
return true;
} else if (TExprNodeType::IN_PRED == conjunct->node_type()) {
TExtInPredicate ext_in_predicate;
vector<TExtLiteral> in_pred_values;
InPredicate* pred = dynamic_cast<InPredicate*>(conjunct);
ext_in_predicate.__set_is_not_in(pred->is_not_in());
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
return false;
}
SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0));
SlotDescriptor* slot_desc = get_slot_desc(slot_ref);
if (slot_desc == nullptr) {
return false;
}
TExtColumnDesc columnDesc;
columnDesc.__set_name(slot_desc->col_name());
columnDesc.__set_type(slot_desc->type().to_thrift());
ext_in_predicate.__set_col(columnDesc);
for (int i = 1; i < pred->children().size(); ++i) {
// varchar, string, all of them are string type, but varchar != string
// TODO add date, datetime support?
if (pred->get_child(0)->type().is_string_type()) {
if (!pred->get_child(i)->type().is_string_type()) {
return false;
}
} else {
if (pred->get_child(i)->type().type != pred->get_child(0)->type().type) {
return false;
}
}
TExtLiteral literal;
if (!to_ext_literal(context, pred->get_child(i), &literal)) {
VLOG(1) << "get disjuncts fail: can't get literal, node_type="
<< pred->get_child(i)->node_type();
return false;
}
in_pred_values.push_back(literal);
}
ext_in_predicate.__set_values(in_pred_values);
TExtPredicate predicate;
predicate.__set_node_type(TExprNodeType::IN_PRED);
predicate.__set_in_predicate(ext_in_predicate);
disjuncts.push_back(std::move(predicate));
return true;
} else if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) {
if (TExprOpcode::COMPOUND_OR != conjunct->op()) {
VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR";
return false;
}
if (!get_disjuncts(context, conjunct->get_child(0), disjuncts)) {
return false;
}
if (!get_disjuncts(context, conjunct->get_child(1), disjuncts)) {
return false;
}
return true;
} else {
VLOG(1) << "get disjuncts fail: node type is " << conjunct->node_type()
<< ", should be BINARY_PRED or COMPOUND_PRED";
return false;
}
}
bool EsScanNode::is_match_func(Expr* conjunct) {
if (TExprNodeType::FUNCTION_CALL == conjunct->node_type()
&& conjunct->fn().name.function_name == "esquery") {
return true;
}
return false;
}
SlotDescriptor* EsScanNode::get_slot_desc(SlotRef* slotRef) {
std::vector<SlotId> slot_ids;
slotRef->get_slot_ids(&slot_ids);
SlotDescriptor* slot_desc = nullptr;
for (SlotDescriptor* slot : _tuple_desc->slots()) {
if (slot->id() == slot_ids[0]) {
slot_desc = slot;
break;
}
}
return slot_desc;
}
bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal) {
literal->__set_node_type(expr->node_type());
switch (expr->node_type()) {
case TExprNodeType::BOOL_LITERAL: {
TBoolLiteral bool_literal;
void* value = context->get_value(expr, NULL);
bool_literal.__set_value(*reinterpret_cast<bool*>(value));
literal->__set_bool_literal(bool_literal);
return true;
}
case TExprNodeType::DATE_LITERAL: {
void* value = context->get_value(expr, NULL);
DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
char str[MAX_DTVALUE_STR_LEN];
date_value.to_string(str);
TDateLiteral date_literal;
date_literal.__set_value(str);
literal->__set_date_literal(date_literal);
return true;
}
case TExprNodeType::FLOAT_LITERAL: {
TFloatLiteral float_literal;
void* value = context->get_value(expr, NULL);
float_literal.__set_value(*reinterpret_cast<float*>(value));
literal->__set_float_literal(float_literal);
return true;
}
case TExprNodeType::INT_LITERAL: {
TIntLiteral int_literal;
void* value = context->get_value(expr, NULL);
int_literal.__set_value(*reinterpret_cast<int32_t*>(value));
literal->__set_int_literal(int_literal);
return true;
}
case TExprNodeType::STRING_LITERAL: {
TStringLiteral string_literal;
void* value = context->get_value(expr, NULL);
string_literal.__set_value(*reinterpret_cast<string*>(value));
literal->__set_string_literal(string_literal);
return true;
}
case TExprNodeType::DECIMAL_LITERAL: {
TDecimalLiteral decimal_literal;
void* value = context->get_value(expr, NULL);
decimal_literal.__set_value(reinterpret_cast<DecimalValue*>(value)->to_string());
literal->__set_decimal_literal(decimal_literal);
return true;
}
case TExprNodeType::LARGE_INT_LITERAL: {
char buf[48];
int len = 48;
void* value = context->get_value(expr, NULL);
char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len);
TLargeIntLiteral large_int_literal;
large_int_literal.__set_value(v);
literal->__set_large_int_literal(large_int_literal);
return true;
}
default:
return false;
}
}
Status EsScanNode::get_next_from_es(TExtGetNextResult& result) {
TExtGetNextParams params;
params.__set_scan_handle(_scan_handles[_scan_range_idx]);
params.__set_offset(_offsets[_scan_range_idx]);
// getNext
const TNetworkAddress &address = _addresses[_scan_range_idx];
#ifndef BE_TEST
try {
Status create_client_status;
ExtDataSourceServiceClientCache *client_cache = _env->extdatasource_client_cache();
ExtDataSourceServiceConnection client(client_cache, address, 10000, &create_client_status);
if (!create_client_status.ok()) {
LOG(WARNING) << "es create client error: scan_range_idx=" << _scan_range_idx
<< ", address=" << address
<< ", msg=" << create_client_status.get_error_msg();
return create_client_status;
}
try {
VLOG(1) << "es get_next param=" << apache::thrift::ThriftDebugString(params);
client->getNext(result, params);
} catch (apache::thrift::transport::TTransportException& e) {
std::stringstream ss;
ss << "es get_next error: scan_range_idx=" << _scan_range_idx
<< ", msg=" << e.what();
LOG(WARNING) << ss.str();
RETURN_IF_ERROR(client.reopen());
return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false);
}
} catch (apache::thrift::TException &e) {
std::stringstream ss;
ss << "es get_next error: scan_range_idx=" << _scan_range_idx
<< ", msg=" << e.what();
LOG(WARNING) << ss.str();
return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false);
}
#else
TStatus status;
result.__set_status(status);
result.__set_eos(true);
TExtColumnData col_data;
std::vector<bool> is_null;
is_null.push_back(false);
col_data.__set_is_null(is_null);
std::vector<int32_t> int_vals;
int_vals.push_back(1);
int_vals.push_back(2);
col_data.__set_int_vals(int_vals);
std::vector<TExtColumnData> cols;
cols.push_back(col_data);
TExtRowBatch rows;
rows.__set_cols(cols);
rows.__set_num_rows(2);
result.__set_rows(rows);
return Status(status);
#endif
// check result
VLOG(1) << "es get_next result=" << apache::thrift::ThriftDebugString(result);
Status get_next_status(result.status);
if (!get_next_status.ok()) {
LOG(WARNING) << "es get_next error: scan_range_idx=" << _scan_range_idx
<< ", address=" << address
<< ", msg=" << get_next_status.get_error_msg();
return get_next_status;
}
if (!result.__isset.rows || !result.rows.__isset.num_rows) {
std::stringstream ss;
ss << "es get_next error: scan_range_idx=" << _scan_range_idx
<< ", msg=rows or num_rows not in result";
LOG(WARNING) << ss.str();
return Status(ss.str());
}
return Status::OK;
}
Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
const vector<TExtColumnData>& cols, int row_idx,
vector<int>& cols_next_val_idx) {
tuple->init(_tuple_desc->byte_size());
for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
const SlotDescriptor* slot_desc = _tuple_desc->slots()[i];
if (!slot_desc->is_materialized()) {
continue;
}
void* slot = tuple->get_slot(slot_desc->tuple_offset());
const TExtColumnData& col = cols[i];
if (col.is_null[row_idx]) {
tuple->set_null(slot_desc->null_indicator_offset());
continue;
} else {
tuple->set_not_null(slot_desc->null_indicator_offset());
}
int val_idx = cols_next_val_idx[i]++;
switch (slot_desc->type().type) {
case TYPE_CHAR:
case TYPE_VARCHAR: {
if (val_idx >= col.string_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "STRING"));
}
const string& val = col.string_vals[val_idx];
size_t val_size = val.size();
char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size));
if (UNLIKELY(buffer == NULL)) {
string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow",
val_size, "string slot");
return tuple_pool->mem_tracker()->MemLimitExceeded(NULL, details, val_size);
}
memcpy(buffer, val.data(), val_size);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
reinterpret_cast<StringValue*>(slot)->len = val_size;
break;
}
case TYPE_TINYINT:
if (val_idx >= col.byte_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TINYINT"));
}
*reinterpret_cast<int8_t*>(slot) = col.byte_vals[val_idx];
break;
case TYPE_SMALLINT:
if (val_idx >= col.short_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "SMALLINT"));
}
*reinterpret_cast<int16_t*>(slot) = col.short_vals[val_idx];
break;
case TYPE_INT:
if (val_idx >= col.int_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "INT"));
}
*reinterpret_cast<int32_t*>(slot) = col.int_vals[val_idx];
break;
case TYPE_BIGINT:
if (val_idx >= col.long_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "BIGINT"));
}
*reinterpret_cast<int64_t*>(slot) = col.long_vals[val_idx];
break;
case TYPE_DOUBLE:
if (val_idx >= col.double_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "DOUBLE"));
}
*reinterpret_cast<double*>(slot) = col.double_vals[val_idx];
break;
case TYPE_FLOAT:
if (val_idx >= col.double_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "FLOAT"));
}
*reinterpret_cast<float*>(slot) = col.double_vals[val_idx];
break;
case TYPE_BOOLEAN:
if (val_idx >= col.bool_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN"));
}
*reinterpret_cast<int8_t*>(slot) = col.bool_vals[val_idx];
break;
case TYPE_DATE:
case TYPE_DATETIME: {
if (val_idx >= col.long_vals.size() ||
!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx])) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE|TYPE_DATETIME"));
}
break;
}
case TYPE_DECIMAL: {
if (val_idx >= col.binary_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "DECIMAL"));
}
const string& val = col.binary_vals[val_idx];
*reinterpret_cast<DecimalValue*>(slot) = *reinterpret_cast<const DecimalValue*>(&val);
break;
}
default:
DCHECK(false);
}
}
return Status::OK;
}
}

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef BDG_PALO_BE_SRC_QUERY_EXEC_ES_SCAN_NODE_H
#define BDG_PALO_BE_SRC_QUERY_EXEC_ES_SCAN_NODE_H
#include <memory>
#include <vector>
#include "runtime/descriptors.h"
#include "runtime/tuple.h"
#include "exec/scan_node.h"
#include "exprs/slot_ref.h"
#include "runtime/exec_env.h"
#include "gen_cpp/TExtDataSourceService.h"
#include "gen_cpp/PaloExternalDataSourceService_types.h"
namespace doris {
class TupleDescriptor;
class RuntimeState;
class Status;
class EsScanNode : public ScanNode {
public:
EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~EsScanNode();
virtual Status prepare(RuntimeState* state) override;
virtual Status open(RuntimeState* state) override;
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status close(RuntimeState* state) override;
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
protected:
// Write debug string of this into out.
virtual void debug_string(int indentation_level, std::stringstream* out) const;
private:
Status open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params);
Status materialize_row(MemPool* tuple_pool, Tuple* tuple,
const vector<TExtColumnData>& cols, int next_row_idx,
vector<int>& cols_next_val_idx);
Status get_next_from_es(TExtGetNextResult& result);
bool get_disjuncts(ExprContext* context, Expr* conjunct, vector<TExtPredicate>& disjuncts);
bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal);
bool is_match_func(Expr* conjunct);
SlotDescriptor* get_slot_desc(SlotRef* slotRef);
// check if open result meets condition
// 1. check if left conjuncts contain "match" function, since match function could only be executed on es
bool check_left_conjuncts(Expr* conjunct);
private:
TupleId _tuple_id;
std::map<std::string, std::string> _properties;
const TupleDescriptor* _tuple_desc;
ExecEnv* _env;
std::vector<TEsScanRange> _scan_ranges;
// scan range's iterator, used in get_next()
int _scan_range_idx;
// store every scan range's netaddress/handle/offset
std::vector<TNetworkAddress> _addresses;
std::vector<std::string> _scan_handles;
std::vector<int> _offsets;
};
}
#endif

View File

@ -30,6 +30,7 @@
#include "exec/partitioned_aggregation_node.h"
#include "exec/new_partitioned_aggregation_node.h"
#include "exec/csv_scan_node.h"
#include "exec/es_scan_node.h"
#include "exec/pre_aggregation_node.h"
#include "exec/hash_join_node.h"
#include "exec/broker_scan_node.h"
@ -359,6 +360,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
*node = pool->add(new MysqlScanNode(pool, tnode, descs));
return Status::OK;
case TPlanNodeType::ES_SCAN_NODE:
*node = pool->add(new EsScanNode(pool, tnode, descs));
return Status::OK;
case TPlanNodeType::SCHEMA_SCAN_NODE:
*node = pool->add(new SchemaScanNode(pool, tnode, descs));
return Status::OK;
@ -507,6 +512,7 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, vector<ExecNode*>* n
void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes);
}
void ExecNode::init_runtime_profile(const std::string& name) {

View File

@ -159,6 +159,10 @@ public:
return _node_type;
}
const TFunction& fn() const {
return _fn;
}
bool is_slotref() const {
return _is_slotref;
}

View File

@ -174,6 +174,7 @@ private:
friend class ScalarFnCall;
friend class InPredicate;
friend class OlapScanNode;
friend class EsScanNode;
/// FunctionContexts for each registered expression. The FunctionContexts are created
/// and owned by this ExprContext.

View File

@ -32,6 +32,9 @@ set(SRC_FILES
${GEN_CPP_DIR}/HeartbeatService_types.cpp
${GEN_CPP_DIR}/PaloInternalService_constants.cpp
${GEN_CPP_DIR}/PaloInternalService_types.cpp
${GEN_CPP_DIR}/PaloExternalDataSourceService_constants.cpp
${GEN_CPP_DIR}/PaloExternalDataSourceService_types.cpp
${GEN_CPP_DIR}/TExtDataSourceService.cpp
${GEN_CPP_DIR}/FrontendService.cpp
${GEN_CPP_DIR}/FrontendService_constants.cpp
${GEN_CPP_DIR}/FrontendService_types.cpp

View File

@ -274,6 +274,9 @@ typedef ClientConnection<FrontendServiceClient> FrontendServiceConnection;
class TPaloBrokerServiceClient;
typedef ClientCache<TPaloBrokerServiceClient> BrokerServiceClientCache;
typedef ClientConnection<TPaloBrokerServiceClient> BrokerServiceConnection;
class TExtDataSourceServiceClient;
typedef ClientCache<TExtDataSourceServiceClient> ExtDataSourceServiceClientCache;
typedef ClientConnection<TExtDataSourceServiceClient> ExtDataSourceServiceConnection;
}

View File

@ -54,6 +54,7 @@ class WebPageHandler;
class BackendServiceClient;
class FrontendServiceClient;
class TPaloBrokerServiceClient;
class TExtDataSourceServiceClient;
template<class T> class ClientCache;
// Execution environment for queries/plan fragments.
@ -88,6 +89,7 @@ public:
ClientCache<BackendServiceClient>* client_cache() { return _client_cache; }
ClientCache<FrontendServiceClient>* frontend_client_cache() { return _frontend_client_cache; }
ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; }
ClientCache<TExtDataSourceServiceClient>* extdatasource_client_cache() { return _extdatasource_client_cache; }
MemTracker* process_mem_tracker() { return _mem_tracker; }
PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; }
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
@ -131,6 +133,7 @@ private:
ClientCache<BackendServiceClient>* _client_cache = nullptr;
ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr;
ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr;
ClientCache<TExtDataSourceServiceClient>* _extdatasource_client_cache = nullptr;
MemTracker* _mem_tracker = nullptr;
PoolMemTrackerRegistry* _pool_mem_trackers = nullptr;
ThreadResourceMgr* _thread_mgr = nullptr;

View File

@ -54,6 +54,7 @@
#include "gen_cpp/BackendService.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/TPaloBrokerService.h"
#include "gen_cpp/TExtDataSourceService.h"
#include "gen_cpp/HeartbeatService_types.h"
namespace doris {
@ -71,6 +72,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_client_cache = new BackendServiceClientCache();
_frontend_client_cache = new FrontendServiceClientCache();
_broker_client_cache = new BrokerServiceClientCache();
_extdatasource_client_cache = new ExtDataSourceServiceClientCache();
_mem_tracker = nullptr;
_pool_mem_trackers = new PoolMemTrackerRegistry();
_thread_mgr = new ThreadResourceMgr();
@ -97,6 +99,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_client_cache->init_metrics(DorisMetrics::metrics(), "backend");
_frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend");
_broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker");
_extdatasource_client_cache->init_metrics(DorisMetrics::metrics(), "extdatasource");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
_etl_job_mgr->init();
@ -198,6 +201,7 @@ void ExecEnv::_destory() {
delete _pool_mem_trackers;
delete _mem_tracker;
delete _broker_client_cache;
delete _extdatasource_client_cache;
delete _frontend_client_cache;
delete _client_cache;
delete _result_mgr;

View File

@ -43,6 +43,7 @@ ADD_BE_TEST(plain_text_line_reader_lzop_test)
ADD_BE_TEST(broker_reader_test)
ADD_BE_TEST(broker_scanner_test)
ADD_BE_TEST(broker_scan_node_test)
ADD_BE_TEST(es_scan_node_test)
ADD_BE_TEST(olap_table_info_test)
ADD_BE_TEST(olap_table_sink_test)
#ADD_BE_TEST(schema_scan_node_test)

View File

@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <string>
#include "common/object_pool.h"
#include "exec/es_scan_node.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/mem_pool.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
#include "util/runtime_profile.h"
#include "util/debug_util.h"
using std::vector;
namespace doris {
// mock
class EsScanNodeTest : public testing::Test {
public:
EsScanNodeTest() : _runtime_state("EsScanNodeTest") {
_runtime_state._instance_mem_tracker.reset(new MemTracker());
TDescriptorTable t_desc_table;
// table descriptors
TTableDescriptor t_table_desc;
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::ES_TABLE;
t_table_desc.numCols = 0;
t_table_desc.numClusteringCols = 0;
t_table_desc.__isset.esTable = true;
t_desc_table.tableDescriptors.push_back(t_table_desc);
t_desc_table.__isset.tableDescriptors = true;
// TSlotDescriptor
int offset = 1;
int i = 0;
// id
{
TSlotDescriptor t_slot_desc;
t_slot_desc.__set_slotType(TypeDescriptor(TYPE_INT).to_thrift());
t_slot_desc.__set_columnPos(i);
t_slot_desc.__set_byteOffset(offset);
t_slot_desc.__set_nullIndicatorByte(0);
t_slot_desc.__set_nullIndicatorBit(-1);
t_slot_desc.__set_slotIdx(i);
t_slot_desc.__set_isMaterialized(true);
t_desc_table.slotDescriptors.push_back(t_slot_desc);
offset += sizeof(int);
}
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 0;
t_tuple_desc.byteSize = offset;
t_tuple_desc.numNullBytes = 1;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
t_desc_table.__isset.slotDescriptors = true;
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
_runtime_state.set_desc_tbl(_desc_tbl);
// Node Id
_tnode.node_id = 0;
_tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
_tnode.num_children = 0;
_tnode.limit = -1;
_tnode.row_tuples.push_back(0);
_tnode.nullable_tuples.push_back(false);
_tnode.es_scan_node.tuple_id = 0;
std::map<std::string, std::string> properties;
_tnode.es_scan_node.__set_properties(properties);
_tnode.__isset.es_scan_node = true;
}
protected:
virtual void SetUp() {
}
virtual void TearDown() {
}
TPlanNode _tnode;
ObjectPool _obj_pool;
DescriptorTbl* _desc_tbl;
RuntimeState _runtime_state;
};
TEST_F(EsScanNodeTest, normal_use) {
EsScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
Status status = scan_node.prepare(&_runtime_state);
ASSERT_TRUE(status.ok());
TEsScanRange es_scan_range;
es_scan_range.__set_index("index1");
es_scan_range.__set_type("docs");
es_scan_range.__set_shard_id(0);
TNetworkAddress es_host;
es_host.__set_hostname("host");
es_host.__set_port(8200);
std::vector<TNetworkAddress> es_hosts;
es_hosts.push_back(es_host);
es_scan_range.__set_es_hosts(es_hosts);
TScanRange scan_range;
scan_range.__set_es_scan_range(es_scan_range);
TScanRangeParams scan_range_params;
scan_range_params.__set_scan_range(scan_range);
std::vector<TScanRangeParams> scan_ranges;
scan_ranges.push_back(scan_range_params);
status = scan_node.set_scan_ranges(scan_ranges);
ASSERT_TRUE(status.ok());
std::stringstream out;
scan_node.debug_string(1, &out);
LOG(WARNING) << out.str();
status = scan_node.open(&_runtime_state);
ASSERT_TRUE(status.ok());
RowBatch row_batch(scan_node._row_descriptor, _runtime_state.batch_size(), new MemTracker(-1));
bool eos = false;
status = scan_node.get_next(&_runtime_state, &row_batch, &eos);
ASSERT_TRUE(status.ok());
ASSERT_EQ(2, row_batch.num_rows());
ASSERT_TRUE(eos);
status = scan_node.close(&_runtime_state);
ASSERT_TRUE(status.ok());
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -438,7 +438,7 @@ visible_functions = [
[['coalesce'], 'DATETIME', ['DATETIME', '...'], ''],
[['coalesce'], 'DECIMAL', ['DECIMAL', '...'], ''],
[['match'], 'BOOLEAN', ['VARCHAR', 'VARCHAR'],
[['esquery'], 'BOOLEAN', ['VARCHAR', 'VARCHAR'],
'_ZN5doris11ESFunctions5matchEPN'
'9doris_udf15FunctionContextERKNS1_9StringValES6_'],

View File

@ -57,6 +57,7 @@ struct TExtLiteral {
// The column and the value are guaranteed to be type compatible in Impala,
// but they are not necessarily the same type, so the data source
// implementation may need to do an implicit cast.
// > < = != >= <=
struct TExtBinaryPredicate {
// Column on which the predicate is applied. Always set.
1: optional TExtColumnDesc col
@ -66,10 +67,40 @@ struct TExtBinaryPredicate {
3: optional TExtLiteral value
}
struct TExtInPredicate {
1: optional bool is_not_in
// Column on which the predicate is applied. Always set.
2: optional TExtColumnDesc col
// Value on the right side of the binary predicate. Always set.
3: optional list<TExtLiteral> values
}
struct TExtLikePredicate {
1: optional TExtColumnDesc col
2: optional TExtLiteral value
}
struct TExtIsNullPredicate {
1: optional bool is_not_null
2: optional TExtColumnDesc col
}
struct TExtFunction {
1: optional string func_name
// input parameter column descs
2: optional list<TExtColumnDesc> cols
// input parameter column literals
3: optional list<TExtLiteral> values
}
// a union of all predicates
struct TExtPredicate {
1: required Exprs.TExprNodeType node_type
2: optional TExtBinaryPredicate binary_predicate
3: optional TExtInPredicate in_predicate
4: optional TExtLikePredicate like_predicate
5: optional TExtIsNullPredicate is_null_predicate
6: optional TExtFunction ext_function
}
// A union over all possible return types for a column of data
@ -82,7 +113,7 @@ struct TExtColumnData {
// Only one is set, only non-null values are set. this indicates one column data for a row batch
2: optional list<bool> bool_vals;
3: optional binary byte_vals;
3: optional list<byte> byte_vals;
4: optional list<i16> short_vals;
5: optional list<i32> int_vals;
6: optional list<i64> long_vals;
@ -170,6 +201,7 @@ struct TExtOpenResult {
// An opaque handle used in subsequent getNext()/close() calls. Required.
2: optional string scan_handle
3: optional list<i32> accepted_conjuncts
}
// Parameters to getNext()
@ -215,4 +247,4 @@ service TExtDataSourceService {
TExtGetNextResult getNext(1: TExtGetNextParams params);
// 1. es will release the context when receiving the data
TExtCloseResult close(1: TExtCloseParams params);
}
}

View File

@ -35,7 +35,12 @@ enum TStatusCode {
MINIMUM_RESERVATION_UNAVAILABLE,
PUBLISH_TIMEOUT,
LABEL_ALREADY_EXISTS,
DATA_QUALITY_ERROR,
ES_INTERNAL_ERROR,
ES_INDEX_NOT_FOUND,
ES_SHARD_NOT_FOUND,
ES_INVALID_CONTEXTID,
ES_INVALID_OFFSET,
ES_REQUEST_ERROR
}
struct TStatus {

View File

@ -162,6 +162,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lz4frame_test
${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test
${DORIS_TEST_BINARY_DIR}/exec/broker_scanner_test
${DORIS_TEST_BINARY_DIR}/exec/broker_scan_node_test
${DORIS_TEST_BINARY_DIR}/exec/es_scan_node_test
${DORIS_TEST_BINARY_DIR}/exec/olap_table_info_test
${DORIS_TEST_BINARY_DIR}/exec/olap_table_sink_test