[Opt][VecLoad] Opt the vec stream load performance (#9772)
Co-authored-by: lihaopeng <lihaopeng@baidu.com>
This commit is contained in:
@ -48,6 +48,10 @@ public:
|
||||
bool write_slot(const SlotDescriptor* slot_desc, Tuple* tuple, const char* data, int len,
|
||||
bool copy_string, bool need_escape, MemPool* pool);
|
||||
|
||||
void write_string_column(const SlotDescriptor* slot_desc,
|
||||
vectorized::MutableColumnPtr* column_ptr, const char* data,
|
||||
size_t len);
|
||||
|
||||
bool write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr,
|
||||
const char* data, size_t len, bool copy_string, bool need_escape);
|
||||
|
||||
|
||||
@ -166,6 +166,24 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu
|
||||
return true;
|
||||
}
|
||||
|
||||
inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
|
||||
vectorized::MutableColumnPtr* column_ptr,
|
||||
const char* data, size_t len) {
|
||||
vectorized::IColumn* col_ptr = column_ptr->get();
|
||||
// \N means it's NULL
|
||||
if (LIKELY(slot_desc->is_nullable())) {
|
||||
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
|
||||
if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
|
||||
nullable_column->insert_data(nullptr, 0);
|
||||
return;
|
||||
} else {
|
||||
nullable_column->get_null_map_data().push_back(0);
|
||||
col_ptr = &nullable_column->get_nested_column();
|
||||
}
|
||||
}
|
||||
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data, len);
|
||||
}
|
||||
|
||||
inline bool TextConverter::write_column(const SlotDescriptor* slot_desc,
|
||||
vectorized::MutableColumnPtr* column_ptr, const char* data,
|
||||
size_t len, bool copy_string, bool need_escape) {
|
||||
|
||||
@ -190,18 +190,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
|
||||
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
|
||||
}
|
||||
|
||||
int start = 0, end = 0;
|
||||
const size_t num_rows = row_idxs.size();
|
||||
for (; start < num_rows;) {
|
||||
auto count = end + 1 - start;
|
||||
if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
|
||||
_mem_table->insert(block, row_idxs[start], count);
|
||||
start += count;
|
||||
end = start;
|
||||
} else {
|
||||
end++;
|
||||
}
|
||||
}
|
||||
_mem_table->insert(block, row_idxs);
|
||||
|
||||
if (_mem_table->need_to_agg()) {
|
||||
_mem_table->shrink_memtable_by_agg();
|
||||
|
||||
@ -114,7 +114,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
|
||||
*_pblock, -1);
|
||||
}
|
||||
|
||||
void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
|
||||
void MemTable::insert(const vectorized::Block* block, const std::vector<int>& row_idxs) {
|
||||
if (_is_first_insertion) {
|
||||
_is_first_insertion = false;
|
||||
auto cloneBlock = block->clone_without_columns();
|
||||
@ -125,8 +125,9 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
|
||||
_init_agg_functions(block);
|
||||
}
|
||||
}
|
||||
auto num_rows = row_idxs.size();
|
||||
size_t cursor_in_mutableblock = _input_mutable_block.rows();
|
||||
_input_mutable_block.add_rows(block, row_pos, num_rows);
|
||||
_input_mutable_block.add_rows(block, row_idxs.data(), row_idxs.data() + num_rows);
|
||||
size_t input_size = block->allocated_bytes() * num_rows / block->rows();
|
||||
_mem_usage += input_size;
|
||||
_mem_tracker->consume(input_size);
|
||||
@ -245,11 +246,15 @@ template <bool is_final>
|
||||
void MemTable::_collect_vskiplist_results() {
|
||||
VecTable::Iterator it(_vec_skip_list.get());
|
||||
vectorized::Block in_block = _input_mutable_block.to_block();
|
||||
// TODO: should try to insert data by column, not by row. to opt the code
|
||||
if (_keys_type == KeysType::DUP_KEYS) {
|
||||
std::vector<int> row_pos_vec;
|
||||
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
|
||||
row_pos_vec.reserve(in_block.rows());
|
||||
for (it.SeekToFirst(); it.Valid(); it.Next()) {
|
||||
_output_mutable_block.add_row(&in_block, it.key()->_row_pos);
|
||||
row_pos_vec.emplace_back(it.key()->_row_pos);
|
||||
}
|
||||
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
|
||||
row_pos_vec.data() + in_block.rows());
|
||||
} else {
|
||||
size_t idx = 0;
|
||||
for (it.SeekToFirst(); it.Valid(); it.Next()) {
|
||||
|
||||
@ -52,7 +52,7 @@ public:
|
||||
|
||||
inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
|
||||
// insert tuple from (row_pos) to (row_pos+num_rows)
|
||||
void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
|
||||
void insert(const vectorized::Block* block, const std::vector<int>& row_idxs);
|
||||
|
||||
void shrink_memtable_by_agg();
|
||||
|
||||
|
||||
@ -20,11 +20,11 @@
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
#include "exec/plain_text_line_reader.h"
|
||||
#include "exec/text_converter.h"
|
||||
#include "exec/text_converter.hpp"
|
||||
#include "exprs/expr_context.h"
|
||||
#include "util/utf8_check.h"
|
||||
|
||||
@ -111,22 +111,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
|
||||
continue;
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc,
|
||||
&columns[dest_index], _state));
|
||||
_text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data,
|
||||
value.size);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot,
|
||||
vectorized::MutableColumnPtr* column_ptr,
|
||||
RuntimeState* state) {
|
||||
if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) {
|
||||
std::stringstream ss;
|
||||
ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`"
|
||||
<< slot->col_name() + "`";
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -39,9 +39,6 @@ public:
|
||||
private:
|
||||
std::unique_ptr<TextConverter> _text_converter;
|
||||
|
||||
Status _write_text_column(char* value, int length, SlotDescriptor* slot,
|
||||
MutableColumnPtr* column_ptr, RuntimeState* state);
|
||||
|
||||
Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Reference in New Issue
Block a user