From 4d5f92cce7a8e7affd4e91af6d256111fbfcc93e Mon Sep 17 00:00:00 2001 From: Salieri1969 Date: Thu, 17 Jan 2019 17:59:33 +0800 Subject: [PATCH] Add EsScanNode (#450) --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/es_scan_node.cpp | 792 ++++++++++++++++++ be/src/exec/es_scan_node.h | 89 ++ be/src/exec/exec_node.cpp | 6 + be/src/exprs/expr.h | 4 + be/src/exprs/expr_context.h | 1 + be/src/gen_cpp/CMakeLists.txt | 3 + be/src/runtime/client_cache.h | 3 + be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 4 + be/test/exec/CMakeLists.txt | 1 + be/test/exec/es_scan_node_test.cpp | 154 ++++ gensrc/script/doris_builtins_functions.py | 2 +- .../PaloExternalDataSourceService.thrift | 36 +- gensrc/thrift/Status.thrift | 7 +- run-ut.sh | 1 + 16 files changed, 1103 insertions(+), 4 deletions(-) create mode 100644 be/src/exec/es_scan_node.cpp create mode 100644 be/src/exec/es_scan_node.h create mode 100644 be/test/exec/es_scan_node_test.cpp diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 1d512caa0f..ac101769ef 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -62,6 +62,7 @@ set(EXEC_FILES mysql_scanner.cpp csv_scan_node.cpp csv_scanner.cpp + es_scan_node.cpp spill_sort_node.cc union_node.cpp union_node_ir.cpp diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp new file mode 100644 index 0000000000..39492fd2e7 --- /dev/null +++ b/be/src/exec/es_scan_node.cpp @@ -0,0 +1,792 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "es_scan_node.h" + +#include +#include +#include + +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Exprs_types.h" +#include "runtime/runtime_state.h" +#include "runtime/row_batch.h" +#include "runtime/string_value.h" +#include "runtime/tuple_row.h" +#include "runtime/client_cache.h" +#include "util/runtime_profile.h" +#include "util/debug_util.h" +#include "service/backend_options.h" +#include "olap/olap_common.h" +#include "olap/utils.h" +#include "exprs/expr_context.h" +#include "exprs/expr.h" +#include "exprs/in_predicate.h" +#include "exprs/slot_ref.h" + +namespace doris { + +// $0 = column type (e.g. INT) +const string ERROR_INVALID_COL_DATA = "Data source returned inconsistent column data. " + "Expected value of type $0 based on column metadata. This likely indicates a " + "problem with the data source library."; +const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to allocate " + "$1 bytes for $2."; + +EsScanNode::EsScanNode( + ObjectPool* pool, + const TPlanNode& tnode, + const DescriptorTbl& descs) : + ScanNode(pool, tnode, descs), + _tuple_id(tnode.es_scan_node.tuple_id), + _scan_range_idx(0) { + if (tnode.es_scan_node.__isset.properties) { + _properties = tnode.es_scan_node.properties; + } +} + +EsScanNode::~EsScanNode() { +} + +Status EsScanNode::prepare(RuntimeState* state) { + VLOG(1) << "EsScanNode::Prepare"; + + RETURN_IF_ERROR(ScanNode::prepare(state)); + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id; + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + _env = state->exec_env(); + + return Status::OK; +} + +Status EsScanNode::open(RuntimeState* state) { + VLOG(1) << "EsScanNode::Open"; + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + RETURN_IF_CANCELLED(state); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + + // TExtOpenParams.row_schema + vector cols; + for (const SlotDescriptor* slot : _tuple_desc->slots()) { + TExtColumnDesc col; + col.__set_name(slot->col_name()); + col.__set_type(slot->type().to_thrift()); + cols.emplace_back(std::move(col)); + } + TExtTableSchema row_schema; + row_schema.cols = std::move(cols); + row_schema.__isset.cols = true; + + // TExtOpenParams.predicates + vector > predicates; + vector predicate_to_conjunct; + for (int i = 0; i < _conjunct_ctxs.size(); ++i) { + VLOG(1) << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string(); + vector disjuncts; + if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), disjuncts)) { + predicates.emplace_back(std::move(disjuncts)); + predicate_to_conjunct.push_back(i); + } + } + + // open every scan range + vector conjunct_accepted_times(_conjunct_ctxs.size(), 0); + for (int i = 0; i < _scan_ranges.size(); ++i) { + TEsScanRange& es_scan_range = _scan_ranges[i]; + + if (es_scan_range.es_hosts.empty()) { + std::stringstream ss; + ss << "es fail to open: hosts empty"; + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + + + // TExtOpenParams + TExtOpenParams params; + params.__set_query_id(state->query_id()); + _properties["index"] = es_scan_range.index; + if (es_scan_range.__isset.type) { + _properties["type"] = es_scan_range.type; + } + _properties["shard_id"] = std::to_string(es_scan_range.shard_id); + params.__set_properties(_properties); + params.__set_row_schema(row_schema); + params.__set_batch_size(state->batch_size()); + params.__set_predicates(predicates); + TExtOpenResult result; + + // choose an es node, local is the first choice + std::string localhost = BackendOptions::get_localhost(); + bool is_success = false; + for (int j = 0; j < 2; ++j) { + for (auto& es_host : es_scan_range.es_hosts) { + if ((j == 0 && es_host.hostname != localhost) + || (j == 1 && es_host.hostname == localhost)) { + continue; + } + Status status = open_es(es_host, result, params); + if (status.ok()) { + is_success = true; + _addresses.push_back(es_host); + _scan_handles.push_back(result.scan_handle); + if (result.__isset.accepted_conjuncts) { + for (int index : result.accepted_conjuncts) { + conjunct_accepted_times[predicate_to_conjunct[index]]++; + } + } + break; + } else if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) { + // if shard not found, try other nodes + LOG(WARNING) << "shard not found on es node: " + << ", address=" << es_host + << ", scan_range_idx=" << i << ", try other nodes"; + } else { + LOG(WARNING) << "es open error: scan_range_idx=" << i + << ", address=" << es_host + << ", msg=" << status.get_error_msg(); + return status; + } + } + if (is_success) { + break; + } + } + + if (!is_success) { + std::stringstream ss; + ss << "es open error: scan_range_idx=" << i + << ", can't find shard on any node"; + return Status(ss.str()); + } + } + + // remove those conjuncts that accepted by all scan ranges + for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) { + int conjunct_index = predicate_to_conjunct[i]; + if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) { + _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index); + } + } + + for (int i = 0; i < _conjunct_ctxs.size(); ++i) { + if (!check_left_conjuncts(_conjunct_ctxs[i]->root())) { + return Status("esquery could only be executed on es, but could not push down to es"); + } + } + + return Status::OK; +} + +Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { + VLOG(1) << "EsScanNode::GetNext"; + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); + RETURN_IF_CANCELLED(state); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + SCOPED_TIMER(materialize_tuple_timer()); + + // create tuple + MemPool* tuple_pool = row_batch->tuple_data_pool(); + int64_t tuple_buffer_size; + uint8_t* tuple_buffer = nullptr; + RETURN_IF_ERROR(row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer)); + Tuple* tuple = reinterpret_cast(tuple_buffer); + + // get batch + TExtGetNextResult result; + RETURN_IF_ERROR(get_next_from_es(result)); + VLOG(1) << "es get next success: result=" << apache::thrift::ThriftDebugString(result); + _offsets[_scan_range_idx] += result.rows.num_rows; + + // convert + VLOG(1) << "begin to convert: scan_range_idx=" << _scan_range_idx + << ", num_rows=" << result.rows.num_rows; + vector& cols = result.rows.cols; + // indexes of the next non-null value in the row batch, per column. + vector cols_next_val_idx(_tuple_desc->slots().size(), 0); + for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) { + if (reached_limit()) { + *eos = true; + break; + } + RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, cols_next_val_idx)); + TupleRow* tuple_row = row_batch->get_row(row_batch->add_row()); + tuple_row->set_tuple(0, tuple); + if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), tuple_row)) { + row_batch->commit_last_row(); + tuple = reinterpret_cast( + reinterpret_cast(tuple) + _tuple_desc->byte_size()); + ++_num_rows_returned; + } + } + + VLOG(1) << "finish one batch: num_rows=" << row_batch->num_rows(); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + if (result.__isset.eos && result.eos) { + VLOG(1) << "es finish one scan_range: scan_range_idx=" << _scan_range_idx; + ++_scan_range_idx; + } + if (_scan_range_idx == _scan_ranges.size()) { + *eos = true; + } + + return Status::OK; +} + +Status EsScanNode::close(RuntimeState* state) { + if (is_closed()) return Status::OK; + VLOG(1) << "EsScanNode::Close"; + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + + for (int i = 0; i < _addresses.size(); ++i) { + TExtCloseParams params; + params.__set_scan_handle(_scan_handles[i]); + TExtCloseResult result; + +#ifndef BE_TEST + const TNetworkAddress& address = _addresses[i]; + try { + Status status; + ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache(); + ExtDataSourceServiceConnection client(client_cache, address, 10000, &status); + if (!status.ok()) { + LOG(WARNING) << "es create client error: scan_range_idx=" << i + << ", address=" << address + << ", msg=" << status.get_error_msg(); + return status; + } + + try { + VLOG(1) << "es close param=" << apache::thrift::ThriftDebugString(params); + client->close(result, params); + } catch (apache::thrift::transport::TTransportException& e) { + LOG(WARNING) << "es close retrying, because: " << e.what(); + RETURN_IF_ERROR(client.reopen()); + client->close(result, params); + } + } catch (apache::thrift::TException &e) { + std::stringstream ss; + ss << "es close error: scan_range_idx=" << i + << ", msg=" << e.what(); + LOG(WARNING) << ss.str(); + return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false); + } + + VLOG(1) << "es close result=" << apache::thrift::ThriftDebugString(result); + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "es close error: : scan_range_idx=" << i + << ", msg=" << status.get_error_msg(); + return status; + } +#else + TStatus status; + result.__set_status(status); +#endif + } + + RETURN_IF_ERROR(ExecNode::close(state)); + return Status::OK; +} + +void EsScanNode::debug_string(int indentation_level, stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "EsScanNode(tupleid=" << _tuple_id; + *out << ")" << std::endl; + + for (int i = 0; i < _children.size(); ++i) { + _children[i]->debug_string(indentation_level + 1, out); + } +} + +Status EsScanNode::set_scan_ranges(const vector& scan_ranges) { + for (int i = 0; i < scan_ranges.size(); ++i) { + TScanRangeParams scan_range = scan_ranges[i]; + DCHECK(scan_range.scan_range.__isset.es_scan_range); + TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range; + _scan_ranges.push_back(es_scan_range); + } + + _offsets.resize(scan_ranges.size(), 0); + return Status::OK; +} + +Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params) { + + VLOG(1) << "es open param=" << apache::thrift::ThriftDebugString(params); +#ifndef BE_TEST + try { + ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache(); + Status status; + ExtDataSourceServiceConnection client(client_cache, address, 10000, &status); + if (!status.ok()) { + std::stringstream ss; + ss << "es create client error: address=" << address + << ", msg=" << status.get_error_msg(); + return Status(ss.str()); + } + + try { + client->open(result, params); + } catch (apache::thrift::transport::TTransportException& e) { + LOG(WARNING) << "es open retrying, because: " << e.what(); + RETURN_IF_ERROR(client.reopen()); + client->open(result, params); + } + VLOG(1) << "es open result=" << apache::thrift::ThriftDebugString(result); + return Status(result.status); + } catch (apache::thrift::TException &e) { + std::stringstream ss; + ss << "es open error: address=" << address << ", msg=" << e.what(); + return Status(ss.str()); + } +#else + TStatus status; + result.__set_status(status); + result.__set_scan_handle("0"); + return Status(status); +#endif +} + +// legacy conjuncts must not contain match function +bool EsScanNode::check_left_conjuncts(Expr* conjunct) { + if (is_match_func(conjunct)) { + return false; + } else { + int num_children = conjunct->get_num_children(); + for (int child_idx = 0; child_idx < num_children; ++child_idx) { + if (!check_left_conjuncts(conjunct->get_child(child_idx))) { + return false; + } + } + return true; + } +} + +bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct, + vector& disjuncts) { + if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { + if (conjunct->children().size() != 2) { + VLOG(1) << "get disjuncts fail: number of childs is not 2"; + return false; + } + SlotRef* slotRef; + TExprOpcode::type op; + Expr* expr; + if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) { + expr = conjunct->get_child(1); + slotRef = (SlotRef*)(conjunct->get_child(0)); + op = conjunct->op(); + } else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) { + expr = conjunct->get_child(0); + slotRef = (SlotRef*)(conjunct->get_child(1)); + op = conjunct->op(); + } else { + VLOG(1) << "get disjuncts fail: no SLOT_REF child"; + return false; + } + + SlotDescriptor* slot_desc = get_slot_desc(slotRef); + if (slot_desc == nullptr) { + VLOG(1) << "get disjuncts fail: slot_desc is null"; + return false; + } + + TExtLiteral literal; + if (!to_ext_literal(context, expr, &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << expr->node_type(); + return false; + } + + TExtColumnDesc columnDesc; + columnDesc.__set_name(slot_desc->col_name()); + columnDesc.__set_type(slot_desc->type().to_thrift()); + TExtBinaryPredicate binaryPredicate; + binaryPredicate.__set_col(columnDesc); + binaryPredicate.__set_op(op); + binaryPredicate.__set_value(std::move(literal)); + TExtPredicate predicate; + predicate.__set_node_type(TExprNodeType::BINARY_PRED); + predicate.__set_binary_predicate(binaryPredicate); + disjuncts.push_back(std::move(predicate)); + return true; + } else if (is_match_func(conjunct)) { + // if this is a function call expr and function name is match, then push + // down it to es + TExtFunction match_function; + match_function.__set_func_name(conjunct->fn().name.function_name); + vector query_conditions; + + + TExtLiteral literal; + if (!to_ext_literal(context, conjunct->get_child(1), &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << conjunct->get_child(1)->node_type(); + return false; + } + + query_conditions.push_back(std::move(literal)); + match_function.__set_values(query_conditions); + TExtPredicate predicate; + predicate.__set_node_type(TExprNodeType::FUNCTION_CALL); + predicate.__set_ext_function(match_function); + disjuncts.push_back(std::move(predicate)); + return true; + } else if (TExprNodeType::IN_PRED == conjunct->node_type()) { + TExtInPredicate ext_in_predicate; + vector in_pred_values; + InPredicate* pred = dynamic_cast(conjunct); + ext_in_predicate.__set_is_not_in(pred->is_not_in()); + if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { + return false; + } + + SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0)); + SlotDescriptor* slot_desc = get_slot_desc(slot_ref); + if (slot_desc == nullptr) { + return false; + } + TExtColumnDesc columnDesc; + columnDesc.__set_name(slot_desc->col_name()); + columnDesc.__set_type(slot_desc->type().to_thrift()); + ext_in_predicate.__set_col(columnDesc); + + for (int i = 1; i < pred->children().size(); ++i) { + // varchar, string, all of them are string type, but varchar != string + // TODO add date, datetime support? + if (pred->get_child(0)->type().is_string_type()) { + if (!pred->get_child(i)->type().is_string_type()) { + return false; + } + } else { + if (pred->get_child(i)->type().type != pred->get_child(0)->type().type) { + return false; + } + } + TExtLiteral literal; + if (!to_ext_literal(context, pred->get_child(i), &literal)) { + VLOG(1) << "get disjuncts fail: can't get literal, node_type=" + << pred->get_child(i)->node_type(); + return false; + } + in_pred_values.push_back(literal); + } + ext_in_predicate.__set_values(in_pred_values); + TExtPredicate predicate; + predicate.__set_node_type(TExprNodeType::IN_PRED); + predicate.__set_in_predicate(ext_in_predicate); + disjuncts.push_back(std::move(predicate)); + return true; + } else if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) { + if (TExprOpcode::COMPOUND_OR != conjunct->op()) { + VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR"; + return false; + } + if (!get_disjuncts(context, conjunct->get_child(0), disjuncts)) { + return false; + } + if (!get_disjuncts(context, conjunct->get_child(1), disjuncts)) { + return false; + } + return true; + } else { + VLOG(1) << "get disjuncts fail: node type is " << conjunct->node_type() + << ", should be BINARY_PRED or COMPOUND_PRED"; + return false; + } +} + +bool EsScanNode::is_match_func(Expr* conjunct) { + if (TExprNodeType::FUNCTION_CALL == conjunct->node_type() + && conjunct->fn().name.function_name == "esquery") { + return true; + } + return false; +} + +SlotDescriptor* EsScanNode::get_slot_desc(SlotRef* slotRef) { + std::vector slot_ids; + slotRef->get_slot_ids(&slot_ids); + SlotDescriptor* slot_desc = nullptr; + for (SlotDescriptor* slot : _tuple_desc->slots()) { + if (slot->id() == slot_ids[0]) { + slot_desc = slot; + break; + } + } + return slot_desc; +} + +bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal) { + literal->__set_node_type(expr->node_type()); + switch (expr->node_type()) { + case TExprNodeType::BOOL_LITERAL: { + TBoolLiteral bool_literal; + void* value = context->get_value(expr, NULL); + bool_literal.__set_value(*reinterpret_cast(value)); + literal->__set_bool_literal(bool_literal); + return true; + } + case TExprNodeType::DATE_LITERAL: { + void* value = context->get_value(expr, NULL); + DateTimeValue date_value = *reinterpret_cast(value); + char str[MAX_DTVALUE_STR_LEN]; + date_value.to_string(str); + TDateLiteral date_literal; + date_literal.__set_value(str); + literal->__set_date_literal(date_literal); + return true; + } + case TExprNodeType::FLOAT_LITERAL: { + TFloatLiteral float_literal; + void* value = context->get_value(expr, NULL); + float_literal.__set_value(*reinterpret_cast(value)); + literal->__set_float_literal(float_literal); + return true; + } + case TExprNodeType::INT_LITERAL: { + TIntLiteral int_literal; + void* value = context->get_value(expr, NULL); + int_literal.__set_value(*reinterpret_cast(value)); + literal->__set_int_literal(int_literal); + return true; + } + case TExprNodeType::STRING_LITERAL: { + TStringLiteral string_literal; + void* value = context->get_value(expr, NULL); + string_literal.__set_value(*reinterpret_cast(value)); + literal->__set_string_literal(string_literal); + return true; + } + case TExprNodeType::DECIMAL_LITERAL: { + TDecimalLiteral decimal_literal; + void* value = context->get_value(expr, NULL); + decimal_literal.__set_value(reinterpret_cast(value)->to_string()); + literal->__set_decimal_literal(decimal_literal); + return true; + } + case TExprNodeType::LARGE_INT_LITERAL: { + char buf[48]; + int len = 48; + void* value = context->get_value(expr, NULL); + char* v = LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len); + TLargeIntLiteral large_int_literal; + large_int_literal.__set_value(v); + literal->__set_large_int_literal(large_int_literal); + return true; + } + default: + return false; + } +} + +Status EsScanNode::get_next_from_es(TExtGetNextResult& result) { + TExtGetNextParams params; + params.__set_scan_handle(_scan_handles[_scan_range_idx]); + params.__set_offset(_offsets[_scan_range_idx]); + + // getNext + const TNetworkAddress &address = _addresses[_scan_range_idx]; +#ifndef BE_TEST + try { + Status create_client_status; + ExtDataSourceServiceClientCache *client_cache = _env->extdatasource_client_cache(); + ExtDataSourceServiceConnection client(client_cache, address, 10000, &create_client_status); + if (!create_client_status.ok()) { + LOG(WARNING) << "es create client error: scan_range_idx=" << _scan_range_idx + << ", address=" << address + << ", msg=" << create_client_status.get_error_msg(); + return create_client_status; + } + + try { + VLOG(1) << "es get_next param=" << apache::thrift::ThriftDebugString(params); + client->getNext(result, params); + } catch (apache::thrift::transport::TTransportException& e) { + std::stringstream ss; + ss << "es get_next error: scan_range_idx=" << _scan_range_idx + << ", msg=" << e.what(); + LOG(WARNING) << ss.str(); + RETURN_IF_ERROR(client.reopen()); + return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false); + } + } catch (apache::thrift::TException &e) { + std::stringstream ss; + ss << "es get_next error: scan_range_idx=" << _scan_range_idx + << ", msg=" << e.what(); + LOG(WARNING) << ss.str(); + return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false); + } +#else + TStatus status; + result.__set_status(status); + result.__set_eos(true); + TExtColumnData col_data; + std::vector is_null; + is_null.push_back(false); + col_data.__set_is_null(is_null); + std::vector int_vals; + int_vals.push_back(1); + int_vals.push_back(2); + col_data.__set_int_vals(int_vals); + std::vector cols; + cols.push_back(col_data); + TExtRowBatch rows; + rows.__set_cols(cols); + rows.__set_num_rows(2); + result.__set_rows(rows); + return Status(status); +#endif + + // check result + VLOG(1) << "es get_next result=" << apache::thrift::ThriftDebugString(result); + Status get_next_status(result.status); + if (!get_next_status.ok()) { + LOG(WARNING) << "es get_next error: scan_range_idx=" << _scan_range_idx + << ", address=" << address + << ", msg=" << get_next_status.get_error_msg(); + return get_next_status; + } + if (!result.__isset.rows || !result.rows.__isset.num_rows) { + std::stringstream ss; + ss << "es get_next error: scan_range_idx=" << _scan_range_idx + << ", msg=rows or num_rows not in result"; + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + + return Status::OK; +} + +Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple, + const vector& cols, int row_idx, + vector& cols_next_val_idx) { + tuple->init(_tuple_desc->byte_size()); + + for (int i = 0; i < _tuple_desc->slots().size(); ++i) { + const SlotDescriptor* slot_desc = _tuple_desc->slots()[i]; + + if (!slot_desc->is_materialized()) { + continue; + } + + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + const TExtColumnData& col = cols[i]; + + if (col.is_null[row_idx]) { + tuple->set_null(slot_desc->null_indicator_offset()); + continue; + } else { + tuple->set_not_null(slot_desc->null_indicator_offset()); + } + + int val_idx = cols_next_val_idx[i]++; + switch (slot_desc->type().type) { + case TYPE_CHAR: + case TYPE_VARCHAR: { + if (val_idx >= col.string_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "STRING")); + } + const string& val = col.string_vals[val_idx]; + size_t val_size = val.size(); + char* buffer = reinterpret_cast(tuple_pool->try_allocate_unaligned(val_size)); + if (UNLIKELY(buffer == NULL)) { + string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", + val_size, "string slot"); + return tuple_pool->mem_tracker()->MemLimitExceeded(NULL, details, val_size); + } + memcpy(buffer, val.data(), val_size); + reinterpret_cast(slot)->ptr = buffer; + reinterpret_cast(slot)->len = val_size; + break; + } + case TYPE_TINYINT: + if (val_idx >= col.byte_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TINYINT")); + } + *reinterpret_cast(slot) = col.byte_vals[val_idx]; + break; + case TYPE_SMALLINT: + if (val_idx >= col.short_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "SMALLINT")); + } + *reinterpret_cast(slot) = col.short_vals[val_idx]; + break; + case TYPE_INT: + if (val_idx >= col.int_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "INT")); + } + *reinterpret_cast(slot) = col.int_vals[val_idx]; + break; + case TYPE_BIGINT: + if (val_idx >= col.long_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "BIGINT")); + } + *reinterpret_cast(slot) = col.long_vals[val_idx]; + break; + case TYPE_DOUBLE: + if (val_idx >= col.double_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "DOUBLE")); + } + *reinterpret_cast(slot) = col.double_vals[val_idx]; + break; + case TYPE_FLOAT: + if (val_idx >= col.double_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "FLOAT")); + } + *reinterpret_cast(slot) = col.double_vals[val_idx]; + break; + case TYPE_BOOLEAN: + if (val_idx >= col.bool_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN")); + } + *reinterpret_cast(slot) = col.bool_vals[val_idx]; + break; + case TYPE_DATE: + case TYPE_DATETIME: { + if (val_idx >= col.long_vals.size() || + !reinterpret_cast(slot)->from_unixtime(col.long_vals[val_idx])) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE|TYPE_DATETIME")); + } + break; + } + case TYPE_DECIMAL: { + if (val_idx >= col.binary_vals.size()) { + return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "DECIMAL")); + } + const string& val = col.binary_vals[val_idx]; + *reinterpret_cast(slot) = *reinterpret_cast(&val); + break; + } + default: + DCHECK(false); + } + } + return Status::OK; +} + +} diff --git a/be/src/exec/es_scan_node.h b/be/src/exec/es_scan_node.h new file mode 100644 index 0000000000..18640a006d --- /dev/null +++ b/be/src/exec/es_scan_node.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BDG_PALO_BE_SRC_QUERY_EXEC_ES_SCAN_NODE_H +#define BDG_PALO_BE_SRC_QUERY_EXEC_ES_SCAN_NODE_H + +#include +#include + +#include "runtime/descriptors.h" +#include "runtime/tuple.h" +#include "exec/scan_node.h" +#include "exprs/slot_ref.h" +#include "runtime/exec_env.h" +#include "gen_cpp/TExtDataSourceService.h" +#include "gen_cpp/PaloExternalDataSourceService_types.h" + +namespace doris { + +class TupleDescriptor; +class RuntimeState; +class Status; + +class EsScanNode : public ScanNode { +public: + EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~EsScanNode(); + + virtual Status prepare(RuntimeState* state) override; + virtual Status open(RuntimeState* state) override; + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + virtual Status close(RuntimeState* state) override; + virtual Status set_scan_ranges(const std::vector& scan_ranges) override; + +protected: + // Write debug string of this into out. + virtual void debug_string(int indentation_level, std::stringstream* out) const; + +private: + Status open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params); + Status materialize_row(MemPool* tuple_pool, Tuple* tuple, + const vector& cols, int next_row_idx, + vector& cols_next_val_idx); + Status get_next_from_es(TExtGetNextResult& result); + + bool get_disjuncts(ExprContext* context, Expr* conjunct, vector& disjuncts); + bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal); + + bool is_match_func(Expr* conjunct); + + SlotDescriptor* get_slot_desc(SlotRef* slotRef); + + // check if open result meets condition + // 1. check if left conjuncts contain "match" function, since match function could only be executed on es + bool check_left_conjuncts(Expr* conjunct); + +private: + TupleId _tuple_id; + std::map _properties; + const TupleDescriptor* _tuple_desc; + ExecEnv* _env; + std::vector _scan_ranges; + + // scan range's iterator, used in get_next() + int _scan_range_idx; + + // store every scan range's netaddress/handle/offset + std::vector _addresses; + std::vector _scan_handles; + std::vector _offsets; +}; + +} + +#endif diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 65b04c9f17..a71eaa9da0 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -30,6 +30,7 @@ #include "exec/partitioned_aggregation_node.h" #include "exec/new_partitioned_aggregation_node.h" #include "exec/csv_scan_node.h" +#include "exec/es_scan_node.h" #include "exec/pre_aggregation_node.h" #include "exec/hash_join_node.h" #include "exec/broker_scan_node.h" @@ -359,6 +360,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new MysqlScanNode(pool, tnode, descs)); return Status::OK; + case TPlanNodeType::ES_SCAN_NODE: + *node = pool->add(new EsScanNode(pool, tnode, descs)); + return Status::OK; + case TPlanNodeType::SCHEMA_SCAN_NODE: *node = pool->add(new SchemaScanNode(pool, tnode, descs)); return Status::OK; @@ -507,6 +512,7 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, vector* n void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes); } void ExecNode::init_runtime_profile(const std::string& name) { diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h index 85717410cc..0b3d02fc43 100644 --- a/be/src/exprs/expr.h +++ b/be/src/exprs/expr.h @@ -159,6 +159,10 @@ public: return _node_type; } + const TFunction& fn() const { + return _fn; + } + bool is_slotref() const { return _is_slotref; } diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index 158911aa02..9c3b8ddde7 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -174,6 +174,7 @@ private: friend class ScalarFnCall; friend class InPredicate; friend class OlapScanNode; + friend class EsScanNode; /// FunctionContexts for each registered expression. The FunctionContexts are created /// and owned by this ExprContext. diff --git a/be/src/gen_cpp/CMakeLists.txt b/be/src/gen_cpp/CMakeLists.txt index 57449caf56..3212d4d2c2 100644 --- a/be/src/gen_cpp/CMakeLists.txt +++ b/be/src/gen_cpp/CMakeLists.txt @@ -32,6 +32,9 @@ set(SRC_FILES ${GEN_CPP_DIR}/HeartbeatService_types.cpp ${GEN_CPP_DIR}/PaloInternalService_constants.cpp ${GEN_CPP_DIR}/PaloInternalService_types.cpp + ${GEN_CPP_DIR}/PaloExternalDataSourceService_constants.cpp + ${GEN_CPP_DIR}/PaloExternalDataSourceService_types.cpp + ${GEN_CPP_DIR}/TExtDataSourceService.cpp ${GEN_CPP_DIR}/FrontendService.cpp ${GEN_CPP_DIR}/FrontendService_constants.cpp ${GEN_CPP_DIR}/FrontendService_types.cpp diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index dcdd9a92b9..b5780c2043 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -274,6 +274,9 @@ typedef ClientConnection FrontendServiceConnection; class TPaloBrokerServiceClient; typedef ClientCache BrokerServiceClientCache; typedef ClientConnection BrokerServiceConnection; +class TExtDataSourceServiceClient; +typedef ClientCache ExtDataSourceServiceClientCache; +typedef ClientConnection ExtDataSourceServiceConnection; } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index c1d929e778..fb07a7c792 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -54,6 +54,7 @@ class WebPageHandler; class BackendServiceClient; class FrontendServiceClient; class TPaloBrokerServiceClient; +class TExtDataSourceServiceClient; template class ClientCache; // Execution environment for queries/plan fragments. @@ -88,6 +89,7 @@ public: ClientCache* client_cache() { return _client_cache; } ClientCache* frontend_client_cache() { return _frontend_client_cache; } ClientCache* broker_client_cache() { return _broker_client_cache; } + ClientCache* extdatasource_client_cache() { return _extdatasource_client_cache; } MemTracker* process_mem_tracker() { return _mem_tracker; } PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } @@ -131,6 +133,7 @@ private: ClientCache* _client_cache = nullptr; ClientCache* _frontend_client_cache = nullptr; ClientCache* _broker_client_cache = nullptr; + ClientCache* _extdatasource_client_cache = nullptr; MemTracker* _mem_tracker = nullptr; PoolMemTrackerRegistry* _pool_mem_trackers = nullptr; ThreadResourceMgr* _thread_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 896356f23c..930286e519 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -54,6 +54,7 @@ #include "gen_cpp/BackendService.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/TPaloBrokerService.h" +#include "gen_cpp/TExtDataSourceService.h" #include "gen_cpp/HeartbeatService_types.h" namespace doris { @@ -71,6 +72,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { _client_cache = new BackendServiceClientCache(); _frontend_client_cache = new FrontendServiceClientCache(); _broker_client_cache = new BrokerServiceClientCache(); + _extdatasource_client_cache = new ExtDataSourceServiceClientCache(); _mem_tracker = nullptr; _pool_mem_trackers = new PoolMemTrackerRegistry(); _thread_mgr = new ThreadResourceMgr(); @@ -97,6 +99,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); _broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker"); + _extdatasource_client_cache->init_metrics(DorisMetrics::metrics(), "extdatasource"); _result_mgr->init(); _cgroups_mgr->init_cgroups(); _etl_job_mgr->init(); @@ -198,6 +201,7 @@ void ExecEnv::_destory() { delete _pool_mem_trackers; delete _mem_tracker; delete _broker_client_cache; + delete _extdatasource_client_cache; delete _frontend_client_cache; delete _client_cache; delete _result_mgr; diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt index 6d7dbe6c95..7b683602ce 100644 --- a/be/test/exec/CMakeLists.txt +++ b/be/test/exec/CMakeLists.txt @@ -43,6 +43,7 @@ ADD_BE_TEST(plain_text_line_reader_lzop_test) ADD_BE_TEST(broker_reader_test) ADD_BE_TEST(broker_scanner_test) ADD_BE_TEST(broker_scan_node_test) +ADD_BE_TEST(es_scan_node_test) ADD_BE_TEST(olap_table_info_test) ADD_BE_TEST(olap_table_sink_test) #ADD_BE_TEST(schema_scan_node_test) diff --git a/be/test/exec/es_scan_node_test.cpp b/be/test/exec/es_scan_node_test.cpp new file mode 100644 index 0000000000..adc2c0d8e3 --- /dev/null +++ b/be/test/exec/es_scan_node_test.cpp @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/object_pool.h" +#include "exec/es_scan_node.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/mem_pool.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "runtime/row_batch.h" +#include "runtime/string_value.h" +#include "runtime/tuple_row.h" +#include "util/runtime_profile.h" +#include "util/debug_util.h" + +using std::vector; + +namespace doris { + +// mock +class EsScanNodeTest : public testing::Test { +public: + EsScanNodeTest() : _runtime_state("EsScanNodeTest") { + _runtime_state._instance_mem_tracker.reset(new MemTracker()); + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::ES_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_table_desc.__isset.esTable = true; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + // TSlotDescriptor + int offset = 1; + int i = 0; + // id + { + TSlotDescriptor t_slot_desc; + t_slot_desc.__set_slotType(TypeDescriptor(TYPE_INT).to_thrift()); + t_slot_desc.__set_columnPos(i); + t_slot_desc.__set_byteOffset(offset); + t_slot_desc.__set_nullIndicatorByte(0); + t_slot_desc.__set_nullIndicatorBit(-1); + t_slot_desc.__set_slotIdx(i); + t_slot_desc.__set_isMaterialized(true); + t_desc_table.slotDescriptors.push_back(t_slot_desc); + offset += sizeof(int); + } + + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = 0; + t_tuple_desc.byteSize = offset; + t_tuple_desc.numNullBytes = 1; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.__isset.slotDescriptors = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + _runtime_state.set_desc_tbl(_desc_tbl); + + // Node Id + _tnode.node_id = 0; + _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE; + _tnode.num_children = 0; + _tnode.limit = -1; + _tnode.row_tuples.push_back(0); + _tnode.nullable_tuples.push_back(false); + _tnode.es_scan_node.tuple_id = 0; + std::map properties; + _tnode.es_scan_node.__set_properties(properties); + _tnode.__isset.es_scan_node = true; + } + +protected: + virtual void SetUp() { + } + virtual void TearDown() { + } + TPlanNode _tnode; + ObjectPool _obj_pool; + DescriptorTbl* _desc_tbl; + RuntimeState _runtime_state; +}; + + +TEST_F(EsScanNodeTest, normal_use) { + EsScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + Status status = scan_node.prepare(&_runtime_state); + ASSERT_TRUE(status.ok()); + TEsScanRange es_scan_range; + es_scan_range.__set_index("index1"); + es_scan_range.__set_type("docs"); + es_scan_range.__set_shard_id(0); + TNetworkAddress es_host; + es_host.__set_hostname("host"); + es_host.__set_port(8200); + std::vector es_hosts; + es_hosts.push_back(es_host); + es_scan_range.__set_es_hosts(es_hosts); + TScanRange scan_range; + scan_range.__set_es_scan_range(es_scan_range); + TScanRangeParams scan_range_params; + scan_range_params.__set_scan_range(scan_range); + std::vector scan_ranges; + scan_ranges.push_back(scan_range_params); + + status = scan_node.set_scan_ranges(scan_ranges); + ASSERT_TRUE(status.ok()); + std::stringstream out; + scan_node.debug_string(1, &out); + LOG(WARNING) << out.str(); + + status = scan_node.open(&_runtime_state); + ASSERT_TRUE(status.ok()); + RowBatch row_batch(scan_node._row_descriptor, _runtime_state.batch_size(), new MemTracker(-1)); + bool eos = false; + status = scan_node.get_next(&_runtime_state, &row_batch, &eos); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(2, row_batch.num_rows()); + ASSERT_TRUE(eos); + + status = scan_node.close(&_runtime_state); + ASSERT_TRUE(status.ok()); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 2caf3472a2..2714520ddb 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -438,7 +438,7 @@ visible_functions = [ [['coalesce'], 'DATETIME', ['DATETIME', '...'], ''], [['coalesce'], 'DECIMAL', ['DECIMAL', '...'], ''], - [['match'], 'BOOLEAN', ['VARCHAR', 'VARCHAR'], + [['esquery'], 'BOOLEAN', ['VARCHAR', 'VARCHAR'], '_ZN5doris11ESFunctions5matchEPN' '9doris_udf15FunctionContextERKNS1_9StringValES6_'], diff --git a/gensrc/thrift/PaloExternalDataSourceService.thrift b/gensrc/thrift/PaloExternalDataSourceService.thrift index 6b86b3f2de..678bbc15f1 100644 --- a/gensrc/thrift/PaloExternalDataSourceService.thrift +++ b/gensrc/thrift/PaloExternalDataSourceService.thrift @@ -57,6 +57,7 @@ struct TExtLiteral { // The column and the value are guaranteed to be type compatible in Impala, // but they are not necessarily the same type, so the data source // implementation may need to do an implicit cast. +// > < = != >= <= struct TExtBinaryPredicate { // Column on which the predicate is applied. Always set. 1: optional TExtColumnDesc col @@ -66,10 +67,40 @@ struct TExtBinaryPredicate { 3: optional TExtLiteral value } +struct TExtInPredicate { + 1: optional bool is_not_in + // Column on which the predicate is applied. Always set. + 2: optional TExtColumnDesc col + // Value on the right side of the binary predicate. Always set. + 3: optional list values +} + +struct TExtLikePredicate { + 1: optional TExtColumnDesc col + 2: optional TExtLiteral value +} + +struct TExtIsNullPredicate { + 1: optional bool is_not_null + 2: optional TExtColumnDesc col +} + +struct TExtFunction { + 1: optional string func_name + // input parameter column descs + 2: optional list cols + // input parameter column literals + 3: optional list values +} + // a union of all predicates struct TExtPredicate { 1: required Exprs.TExprNodeType node_type 2: optional TExtBinaryPredicate binary_predicate + 3: optional TExtInPredicate in_predicate + 4: optional TExtLikePredicate like_predicate + 5: optional TExtIsNullPredicate is_null_predicate + 6: optional TExtFunction ext_function } // A union over all possible return types for a column of data @@ -82,7 +113,7 @@ struct TExtColumnData { // Only one is set, only non-null values are set. this indicates one column data for a row batch 2: optional list bool_vals; - 3: optional binary byte_vals; + 3: optional list byte_vals; 4: optional list short_vals; 5: optional list int_vals; 6: optional list long_vals; @@ -170,6 +201,7 @@ struct TExtOpenResult { // An opaque handle used in subsequent getNext()/close() calls. Required. 2: optional string scan_handle + 3: optional list accepted_conjuncts } // Parameters to getNext() @@ -215,4 +247,4 @@ service TExtDataSourceService { TExtGetNextResult getNext(1: TExtGetNextParams params); // 1. es will release the context when receiving the data TExtCloseResult close(1: TExtCloseParams params); -} \ No newline at end of file +} diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index ea3b119615..abebc39afb 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -35,7 +35,12 @@ enum TStatusCode { MINIMUM_RESERVATION_UNAVAILABLE, PUBLISH_TIMEOUT, LABEL_ALREADY_EXISTS, - DATA_QUALITY_ERROR, + ES_INTERNAL_ERROR, + ES_INDEX_NOT_FOUND, + ES_SHARD_NOT_FOUND, + ES_INVALID_CONTEXTID, + ES_INVALID_OFFSET, + ES_REQUEST_ERROR } struct TStatus { diff --git a/run-ut.sh b/run-ut.sh index c3e710751f..96ed3f544e 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -162,6 +162,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lz4frame_test ${DORIS_TEST_BINARY_DIR}/exec/plain_text_line_reader_lzop_test ${DORIS_TEST_BINARY_DIR}/exec/broker_scanner_test ${DORIS_TEST_BINARY_DIR}/exec/broker_scan_node_test +${DORIS_TEST_BINARY_DIR}/exec/es_scan_node_test ${DORIS_TEST_BINARY_DIR}/exec/olap_table_info_test ${DORIS_TEST_BINARY_DIR}/exec/olap_table_sink_test