From ef233701b39d37c1671a04cebb1f2fdf48941bbc Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 8 Feb 2022 11:04:09 +0800 Subject: [PATCH] [feature](vec)(load) Support vtablet sink to enable insert into by using vec query engine (#7957) Support vtablet sink to enable insert into query in vec query engine --- be/src/exec/data_sink.cpp | 3 +- be/src/exec/tablet_info.cpp | 19 +-- be/src/exec/tablet_info.h | 2 +- be/src/olap/olap_define.h | 3 + be/src/olap/row_block2.cpp | 2 - be/src/runtime/plan_fragment_executor.cpp | 9 +- be/src/vec/core/block.cpp | 5 +- be/src/vec/sink/vtablet_sink.cpp | 169 ++++++++++++---------- be/src/vec/sink/vtablet_sink.h | 5 +- 9 files changed, 115 insertions(+), 102 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 22ec175917..2a25948225 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -145,8 +145,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink Status status; DCHECK(thrift_sink.__isset.olap_table_sink); if (is_vec) { - // sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status)); - return Status::NotSupported("VOlapTableSink is not supported yet"); + sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status)); } else { sink->reset(new stream_load::OlapTableSink(pool, row_desc, output_exprs, &status)); } diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index a1195d5b35..cbeda55bd1 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -530,15 +530,15 @@ bool VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTable Status VOlapTablePartitionParam::_create_partition_keys(const std::vector& t_exprs, BlockRow* part_key) { for (int i = 0; i < t_exprs.size(); i++) { - RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key->first, + RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key, _partition_slot_locs[i])); } return Status::OK(); } -Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, vectorized::Block* block, +Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos) { - auto column = std::move(*block->get_by_position(pos).column).mutate(); + auto column = std::move(*part_key->first->get_by_position(pos).column).mutate(); switch (t_expr.node_type) { case TExprNodeType::DATE_LITERAL: { vectorized::VecDateTimeValue dt; @@ -586,18 +586,6 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, } case TExprNodeType::STRING_LITERAL: { int len = t_expr.string_literal.value.size(); const char* str_val = t_expr.string_literal.value.c_str(); - - // CHAR is a fixed-length string and needs to use the length in the slot definition, - // VARVHAR is a variable-length string and needs to use the length of the string itself - // padding 0 to CHAR field -// if (TYPE_CHAR == slot_desc->type().type && len < slot_desc->type().len) { -// auto new_ptr = (char*)_mem_pool->allocate(slot_desc->type().len); -// memset(new_ptr, 0, slot_desc->type().len); -// memcpy(new_ptr, str_val, len); -// -// str_val = new_ptr; -// len = slot_desc->type().len; -// } column->insert_data(str_val, len); break; } case TExprNodeType::BOOL_LITERAL: { @@ -609,6 +597,7 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, return Status::InternalError(ss.str()); } } + part_key->second = column->size() - 1; return Status::OK(); } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 53221baa04..f47566351a 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -273,7 +273,7 @@ public: private: Status _create_partition_keys(const std::vector& t_exprs, BlockRow* part_key); - Status _create_partition_key(const TExprNode& t_expr, vectorized::Block* block, uint16_t pos); + Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos); uint32_t _compute_dist_hash(BlockRow* key) const; diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index a9ac731333..c72017785d 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -56,6 +56,9 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535; // the max length supported for string type 2GB static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647; +// the max length supported for vec string type 1MB +static constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024; + // the max length supported for array static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535; diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp index 877f6a21a2..dda02b335d 100644 --- a/be/src/olap/row_block2.cpp +++ b/be/src/olap/row_block2.cpp @@ -96,8 +96,6 @@ Status RowBlockV2::convert_to_row_block(RowCursor* helper, RowBlock* dst) { } Status RowBlockV2::_copy_data_to_column(int cid, doris::vectorized::MutableColumnPtr& origin_column) { - constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024; - auto* column = origin_column.get(); bool nullable_mark_array[_selected_size]; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 1e5ad0cc4c..92aefc4d28 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -296,8 +296,14 @@ Status PlanFragmentExecutor::open_vectorized_internal() { if (_collect_query_statistics_with_every_batch) { _collect_query_statistics(); } - RETURN_IF_ERROR(_sink->send(runtime_state(), block)); + + auto st =_sink->send(runtime_state(), block); + if (st.is_end_of_file()) { + break; + } + RETURN_IF_ERROR(st); } + { SCOPED_TIMER(profile()->total_time_counter()); _collect_query_statistics(); @@ -318,6 +324,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { return Status::OK(); } + Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block** block) { if (_done) { *block = nullptr; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 3664a2a5fa..5a8800154a 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -777,10 +777,9 @@ doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, MemPool for (int i = 0; i < desc.slots().size(); ++i) { auto slot_desc = desc.slots()[i]; - auto column_ptr = get_by_position(column_offset + i).column; - auto data_ref = column_ptr->get_data_at(row); + auto data_ref = get_by_position(column_offset + i).column->get_data_at(row); - if (data_ref.size == 0) { + if (data_ref.data == nullptr) { dst->set_null(slot_desc->null_indicator_offset()); continue; } else { diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index baf3b04a56..8e165c3215 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -55,16 +55,19 @@ Status VOlapTableSink::open(RuntimeState* state) { Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) { Status status = Status::OK(); - if (UNLIKELY(input_block->rows() == 0)) { return status; } + + auto rows = input_block->rows(); + auto bytes = input_block->bytes(); + if (UNLIKELY(rows == 0)) { return status; } SCOPED_TIMER(_profile->total_time_counter()); - _number_input_rows += input_block->rows(); + _number_input_rows += rows; // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. - state->update_num_rows_load_total(input_block->rows()); - state->update_num_bytes_load_total(input_block->bytes()); - DorisMetrics::instance()->load_rows->increment(input_block->rows()); - DorisMetrics::instance()->load_bytes->increment(input_block->bytes()); + state->update_num_rows_load_total(rows); + state->update_num_bytes_load_total(bytes); + DorisMetrics::instance()->load_rows->increment(rows); + DorisMetrics::instance()->load_bytes->increment(bytes); vectorized::Block block(input_block->get_columns_with_type_and_name()); if (!_output_vexpr_ctxs.empty()) { @@ -75,21 +78,26 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) } auto num_rows = block.rows(); - int num_invalid_rows = 0; + int filtered_rows = 0; { SCOPED_RAW_TIMER(&_validate_data_ns); - _filter_vec.resize(num_rows); - num_invalid_rows = _validate_data(state, &block, reinterpret_cast(_filter_vec.data())); - _number_filtered_rows += num_invalid_rows; + _filter_bitmap.Reset(block.rows()); + bool stop_processing = false; + RETURN_IF_ERROR(_validate_data(state, &block, &_filter_bitmap, &filtered_rows, &stop_processing)); + _number_filtered_rows += filtered_rows; + if (stop_processing) { + // should be returned after updating "_number_filtered_rows", to make sure that load job can be cancelled + // because of "data unqualified" + return Status::EndOfFile("Encountered unqualified data, stop processing"); + } } BlockRow block_row; SCOPED_RAW_TIMER(&_send_data_ns); - // TODO(cmy): vtablet_sink does not implement this "stop_processing" logic. // This is just for passing compilation. bool stop_processing = false; for (int i = 0; i < num_rows; ++i) { - if (num_invalid_rows > 0 && _filter_vec[i] != 0) { + if (filtered_rows > 0 && _filter_bitmap.Get(i)) { continue; } const VOlapTablePartition* partition = nullptr; @@ -103,6 +111,9 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) return buf.data(); }, &stop_processing)); _number_filtered_rows++; + if (stop_processing) { + return Status::EndOfFile("Encountered unqualified data, stop processing"); + } continue; } _partition_ids.emplace(partition->id); @@ -113,6 +124,11 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) _number_output_rows++; } } + + // check intolerable failure + for (auto index_channel : _channels) { + RETURN_IF_ERROR(index_channel->check_intolerable_failure()); + } return Status::OK(); } @@ -122,14 +138,15 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { return OlapTableSink::close(state, exec_status); } -int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized::Block* block, - bool* filter_map) { - // TODO(cmy): implement it - return 0; -#if 0 +Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, int* filtered_rows, + bool* stop_processing) { const auto num_rows = block->rows(); - // set all row is valid - memset(filter_map, 0, num_rows * sizeof(bool)); + fmt::memory_buffer error_msg; + auto set_invalid_and_append_error_msg = [&](int row) { + filter_bitmap->Set(row, true); + return state->append_error_msg_to_file([]() -> std::string { return ""; }, + [&error_msg]() -> std::string { return error_msg.data(); }, stop_processing); + }; for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { SlotDescriptor* desc = _output_tuple_desc->slots()[i]; @@ -139,12 +156,13 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized if (desc->is_nullable() && desc->type() == TYPE_OBJECT) { const auto& null_map = vectorized::check_and_get_column(*column) ->get_null_map_data(); + fmt::format_to(error_msg, "null is not allowed for bitmap column, column_name: {}; ", + desc->col_name()); + for (int j = 0; j < num_rows; ++j) { - if (!filter_map[j]) { + if (!filter_bitmap->Get(j)) { if (null_map[j]) { - state->append_error_msg_to_file("", std::string("null is not allowed for " - "bitmap column, column_name: ") + desc->col_name() + ";"); - filter_map[j] = true; + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } } @@ -154,67 +172,68 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized switch (desc->type().type) { case TYPE_CHAR: - case TYPE_VARCHAR: { + case TYPE_VARCHAR: + case TYPE_STRING: { const auto column_string = assert_cast(real_column_ptr.get()); for (int j = 0; j < num_rows; ++j) { - if (!filter_map[j]) { + if (!filter_bitmap->Get(j)) { auto str_val = column_string->get_data_at(j); + bool invalid = str_val.size > std::min(desc->type().len, (int)MAX_SIZE_OF_VEC_STRING); + + error_msg.clear(); if (str_val.size > desc->type().len) { - state->append_error_msg_to_file("", fmt::format( - "the length of input is too long than schema. " - "column_name: {}; input_str: [{}] schema length: {}; actual length: {}; ", - desc->col_name(), str_val.to_string(), - desc->type().len, str_val.size)); - filter_map[j] = true; + fmt::format_to(error_msg, "{}", "the length of input is too long than schema. "); + fmt::format_to(error_msg, "column_name: {}; ", desc->col_name()); + fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string()); + fmt::format_to(error_msg, "schema length: {}; ", desc->type().len); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) { + fmt::format_to(error_msg, "{}", "the length of input string is too long than vec schema. "); + fmt::format_to(error_msg, "column_name: {}; ", desc->col_name()); + fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string()); + fmt::format_to(error_msg, "schema length: {}; ", MAX_SIZE_OF_VEC_STRING); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } + + if (invalid) { + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } } break; } - // TODO: Support TYPE_STRING in the future -// case TYPE_STRING: { -// StringValue* str_val = (StringValue*)slot; -// if (str_val->len > desc->type().MAX_STRING_LENGTH) { -// ss << "the length of input is too long than schema. " -// << "column_name: " << desc->col_name() << "; " -// << "first 128 bytes of input_str: [" << std::string(str_val->ptr, 128) -// << "] " -// << "schema length: " << desc->type().MAX_STRING_LENGTH << "; " -// << "actual length: " << str_val->len << "; "; -// row_valid = false; -// continue; -// } -// break; -// } case TYPE_DECIMALV2: { auto column_decimal = const_cast *>(assert_cast *>(real_column_ptr.get())); for (int j = 0; j < num_rows; ++j) { - if (!filter_map[j]) { + if (!filter_bitmap->Get(j)) { auto dec_val = binary_cast( column_decimal->get_data()[j]); + error_msg.clear(); + bool invalid = false; + if (dec_val.greater_than_scale(desc->type().scale)) { auto code = dec_val.round(&dec_val, desc->type().scale, HALF_UP); column_decimal->get_data()[j] = binary_cast( dec_val); if (code != E_DEC_OK) { - state->append_error_msg_to_file("", "round one decimal failed.value=" + - dec_val.to_string()); - filter_map[j] = true; + fmt::format_to(error_msg, "round one decimal failed.value={}; ", dec_val.to_string()); + invalid = true; } } - if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) { - state->append_error_msg_to_file("", fmt::format( - "decimal value is not valid for definition, column={}, " - "value={}, precision={}, scale= {};", - desc->col_name(), dec_val.to_string(), desc->type().precision, - desc->type().scale)); - filter_map[j] = true; + fmt::format_to(error_msg, "decimal value is not valid for definition, column={}", desc->col_name()); + fmt::format_to(error_msg, ", value={}", dec_val.to_string()); + fmt::format_to(error_msg, ", precision={}, scale={}; ", desc->type().precision, desc->type().scale); + invalid = true; + } + + if (invalid) { + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } } @@ -224,13 +243,17 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized auto column_string = assert_cast(real_column_ptr.get()); for (int j = 0; j < num_rows; ++j) { - if (!filter_map[j]) { + if (!filter_bitmap->Get(j)) { auto str_val = column_string->get_data_at(j); - if (!HyperLogLog::is_valid(Slice(str_val.data, str_val.size))) { - state->append_error_msg_to_file("", std::string( - "Content of HLL type column is invalid column_name: " + desc->col_name() + - ";")); - filter_map[j] = true; + bool invalid = false; + error_msg.clear(); + if(!HyperLogLog::is_valid(Slice(str_val.data, str_val.size))) { + fmt::format_to(error_msg, "Content of HLL type column is invalid. column name: {}; ", desc->col_name()); + invalid = true; + } + + if (invalid) { + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } } @@ -250,15 +273,10 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized if (!desc->is_nullable() && column_ptr) { const auto& null_map = column_ptr->get_null_map_data(); for (int j = 0; j < null_map.size(); ++j) { - if (null_map[j] && !filter_map[j]) { - filter_map[j] = true; - std::stringstream ss; - ss << "null value for not null column, column=" << desc->col_name(); -#if BE_TEST - LOG(INFO) << ss.str(); -#else - state->append_error_msg_to_file("", ss.str()); -#endif + fmt::format_to(error_msg, "null value for not null column, column={}; ", + desc->col_name()); + if (null_map[j] && !filter_bitmap->Get(j)) { + RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); } } block->get_by_position(i).column = column_ptr->get_nested_column_ptr(); @@ -268,12 +286,11 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized } } - auto filter_row = 0; + *filtered_rows = 0; for (int i = 0; i < num_rows; ++i) { - filter_row += filter_map[i]; + *filtered_rows += filter_bitmap->Get(i); } - return filter_row; -#endif + return Status::OK(); } } // namespace stream_load diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index ec40d02327..8785b5ba18 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -54,11 +54,12 @@ private: // make input data valid for OLAP table // return number of invalid/filtered rows. // invalid row number is set in Bitmap - int _validate_data(RuntimeState* state, vectorized::Block* block, bool* filter_map); + // set stop_processing is we want to stop the whole process now. + Status _validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, int* filtered_rows, + bool* stop_processing); VOlapTablePartitionParam* _vpartition = nullptr; std::vector _output_vexpr_ctxs; - std::vector _filter_vec; }; } // namespace stream_load