[refactor]refactor lazy materialized (#8834)

[refactor]refactor lazy materialized (#8834)
This commit is contained in:
wangbo
2022-05-06 19:16:35 +08:00
committed by GitHub
parent c1707ca388
commit a9831f87f2
15 changed files with 166 additions and 104 deletions

View File

@ -126,7 +126,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
_rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
_pred_col_read_timer = ADD_TIMER(_segment_profile, "PredColumnReadTime");
_first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime");
_lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime");
_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");

View File

@ -290,7 +290,7 @@ protected:
RuntimeProfile::Counter* _rows_vec_cond_counter = nullptr;
RuntimeProfile::Counter* _vec_cond_timer = nullptr;
RuntimeProfile::Counter* _short_cond_timer = nullptr;
RuntimeProfile::Counter* _pred_col_read_timer = nullptr;
RuntimeProfile::Counter* _first_read_timer = nullptr;
RuntimeProfile::Counter* _lazy_read_timer = nullptr;
RuntimeProfile::Counter* _output_col_timer = nullptr;

View File

@ -545,7 +545,7 @@ void OlapScanner::update_counter() {
// COUNTER_UPDATE(_parent->_filtered_rows_counter, stats.num_rows_filtered);
COUNTER_UPDATE(_parent->_vec_cond_timer, stats.vec_cond_ns);
COUNTER_UPDATE(_parent->_short_cond_timer, stats.short_cond_ns);
COUNTER_UPDATE(_parent->_pred_col_read_timer, stats.pred_col_read_ns);
COUNTER_UPDATE(_parent->_first_read_timer, stats.first_read_ns);
COUNTER_UPDATE(_parent->_lazy_read_timer, stats.lazy_read_ns);
COUNTER_UPDATE(_parent->_output_col_timer, stats.output_col_ns);
COUNTER_UPDATE(_parent->_rows_vec_cond_counter, stats.rows_vec_cond_filtered);

View File

@ -43,7 +43,7 @@ enum class PredicateType {
IN_LIST = 7,
NOT_IN_LIST = 8,
IS_NULL = 9,
NOT_IS_NULL = 10,
IS_NOT_NULL = 10,
BF = 11, // BloomFilter
};

View File

@ -30,7 +30,7 @@ NullPredicate::NullPredicate(uint32_t column_id, bool is_null, bool opposite)
: ColumnPredicate(column_id), _is_null(opposite != is_null) {}
PredicateType NullPredicate::type() const {
return _is_null ? PredicateType::IS_NULL : PredicateType::NOT_IS_NULL;
return _is_null ? PredicateType::IS_NULL : PredicateType::IS_NOT_NULL;
}
void NullPredicate::evaluate(VectorizedRowBatch* batch) const {

View File

@ -265,7 +265,7 @@ struct OlapReaderStatistics {
int64_t rows_vec_del_cond_filtered = 0;
int64_t vec_cond_ns = 0;
int64_t short_cond_ns = 0;
int64_t pred_col_read_ns = 0;
int64_t first_read_ns = 0;
int64_t lazy_read_ns = 0;
int64_t output_col_ns = 0;

View File

@ -523,7 +523,7 @@ Status SegmentIterator::next_batch(RowBlockV2* block) {
// phase 1: read rows selected by various index (indicated by _row_bitmap) into block
// when using lazy-materialization-read, only columns with predicates are read
{
SCOPED_RAW_TIMER(&_opts.stats->pred_col_read_ns);
SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
do {
uint32_t range_from;
uint32_t range_to;
@ -607,22 +607,45 @@ Status SegmentIterator::next_batch(RowBlockV2* block) {
/* ---------------------- for vecterization implementation ---------------------- */
/**
* For storage layer data type, can be measured from two perspectives:
* 1 Whether the type can be read in a fast way(batch read using SIMD)
* Such as integer type and float type, this type can be read in SIMD way.
* For the type string/bitmap/hll, they can not be read in batch way, so read this type data is slow.
* If a type can be read fast, we can try to eliminate Lazy Materialization, because we think for this type, seek cost > read cost.
* This is an estimate, if we want more precise cost, statistics collection is necessary(this is a todo).
* In short, when returned non-pred columns contains string/hll/bitmap, we using Lazy Materialization.
* Otherwish, we disable it.
*
* When Lazy Materialization enable, we need to read column at least two times.
* Firt time to read Pred col, second time to read non-pred.
* Here's an interesting question to research, whether read Pred col once is the best plan.
* (why not read Pred col twice or more?)
*
* When Lazy Materialization disable, we just need to read once.
*
*
* 2 Whether the predicate type can be evaluate in a fast way(using SIMD to eval pred)
* Such as integer type and float type, they can be eval fast.
* But for BloomFilter/string/date, they eval slow.
* If a type can be eval fast, we use vectorizaion to eval it.
* Otherwise, we use short-circuit to eval it.
*
*
*/
// todo(wb) need a UT here
void SegmentIterator::_vec_init_lazy_materialization() {
_is_pred_column.resize(_schema.columns().size(), false);
// including short_cir_pred_col_id_set and vec_pred_col_id_set
// including short/vec/delete pred
std::set<ColumnId> pred_column_ids;
_is_all_column_basic_type = true;
bool is_predicate_column_exists = false;
bool is_non_predicate_column_exists = false;
_lazy_materialization_read = false;
std::set<ColumnId> del_cond_id_set;
_opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
is_predicate_column_exists = true;
std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
std::set<ColumnId> vec_pred_col_id_set;
@ -632,13 +655,16 @@ void SegmentIterator::_vec_init_lazy_materialization() {
_is_pred_column[cid] = true;
pred_column_ids.insert(cid);
// Step1: check pred using short eval or vec eval
if (type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
type == OLAP_FIELD_TYPE_STRING || predicate->type() == PredicateType::BF ||
predicate->type() == PredicateType::IN_LIST ||
predicate->type() == PredicateType::NOT_IN_LIST) {
predicate->type() == PredicateType::NOT_IN_LIST ||
predicate->type() == PredicateType::IS_NULL ||
predicate->type() == PredicateType::IS_NOT_NULL || type == OLAP_FIELD_TYPE_DATE ||
type == OLAP_FIELD_TYPE_DECIMAL) {
short_cir_pred_col_id_set.insert(cid);
_short_cir_eval_predicate.push_back(predicate);
_is_all_column_basic_type = false;
} else {
vec_pred_col_id_set.insert(predicate->column_id());
if (_pre_eval_block_predicate == nullptr) {
@ -653,75 +679,84 @@ void SegmentIterator::_vec_init_lazy_materialization() {
if (!del_cond_id_set.empty()) {
short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end());
pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end());
_is_all_column_basic_type = false;
for (auto cid : del_cond_id_set) {
_is_pred_column[cid] = true;
}
}
if (_schema.column_ids().size() > pred_column_ids.size()) {
for (auto cid : _schema.column_ids()) {
if (!_is_pred_column[cid]) {
_non_predicate_columns.push_back(cid);
is_non_predicate_column_exists = true;
// todo(wb) make a cost-based lazy-materialization framework
// check non-pred column type to decide whether using lazy-materialization
FieldType type = _schema.column(cid)->type();
if (_is_all_column_basic_type &&
(type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
type == OLAP_FIELD_TYPE_STRING)) {
_is_all_column_basic_type = false;
}
}
}
}
_vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend());
_short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(),
short_cir_pred_col_id_set.cend());
} else {
_is_all_column_basic_type = false;
is_non_predicate_column_exists = true;
}
if (!_vec_pred_column_ids.empty()) {
_is_need_vec_eval = true;
}
if (!_short_cir_pred_column_ids.empty()) {
_is_need_short_eval = true;
}
// Step 2: check non-predicate read costs to determine whether need lazy materialization
// fill _non_predicate_columns.
// note(wb) For block schema, query layer and storage layer may have some diff
// query layer block schema not contains delete column, but storage layer appends delete column to end of block schema
// When output block to query layer, delete column can be skipped.
// _schema.column_ids() stands for storage layer block schema, so it contains delete columnid
// we just regard delete column as common pred column here.
if (_schema.column_ids().size() > pred_column_ids.size()) {
for (auto cid : _schema.column_ids()) {
_non_predicate_columns.push_back(cid);
if (!_is_pred_column[cid]) {
_non_predicate_columns.push_back(cid);
FieldType type = _schema.column(cid)->type();
// todo(wb) maybe we can make read char type faster
// todo(wb) support map/array type
// todo(wb) consider multiple integer columns cost, such as 1000 columns, maybe lazy materialization faster
if (!_lazy_materialization_read &&
(_is_need_vec_eval ||
_is_need_short_eval) && // only when pred exists, we need to consider lazy materialization
(type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_BOOL ||
type == OLAP_FIELD_TYPE_DATE || type == OLAP_FIELD_TYPE_DATETIME ||
type == OLAP_FIELD_TYPE_DECIMAL)) {
_lazy_materialization_read = true;
}
}
}
}
// note(wb) in following cases we disable lazy materialization
// case 1: when all column is basic type(is_all_column_basic_type = true)
// because we think `seek and read` cost > read page cost, lazy materialize may cause more `seek and read`, so disable it
// case 2: all column is predicate column
// case 3: all column is not predicate column
// todo(wb) need further research more lazy materialization rule, such as get more info from `statistics` for better decision
if (_is_all_column_basic_type) {
std::set<ColumnId> pred_set(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());
// Step 3: fill column ids for read and output
if (_lazy_materialization_read) {
// insert pred cid to first_read_columns
for (auto cid : pred_column_ids) {
_first_read_column_ids.push_back(cid);
}
} else if (!_is_need_vec_eval &&
!_is_need_short_eval) { // no pred exists, just read and output column
for (int i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
_first_read_column_ids.push_back(cid);
}
} else { // pred exits, but we can eliminate lazy materialization
// insert pred/non-pred cid to first read columns
std::set<ColumnId> pred_id_set;
pred_id_set.insert(_short_cir_pred_column_ids.begin(), _short_cir_pred_column_ids.end());
pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());
std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
_non_predicate_columns.end());
// when _is_all_column_basic_type = true, _first_read_column_ids should keep the same order with _schema.column_ids which stands for return column order
for (int i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
if (pred_set.find(cid) != pred_set.end()) {
if (pred_id_set.find(cid) != pred_id_set.end()) {
_first_read_column_ids.push_back(cid);
} else if (non_pred_set.find(cid) != non_pred_set.end()) {
_first_read_column_ids.push_back(cid);
// in this case, non-predicate column should also be filtered by sel idx, so we regard it as pred columns
// when _lazy_materialization_read = false, non-predicate column should also be filtered by sel idx, so we regard it as pred columns
_is_pred_column[cid] = true;
}
}
} else if (is_predicate_column_exists && !is_non_predicate_column_exists) {
_first_read_column_ids.assign(pred_column_ids.cbegin(), pred_column_ids.cend());
} else if (!is_predicate_column_exists && is_non_predicate_column_exists) {
for (auto cid : _non_predicate_columns) {
_first_read_column_ids.push_back(cid);
}
} else {
_lazy_materialization_read = true;
_first_read_column_ids.assign(pred_column_ids.cbegin(), pred_column_ids.cend());
}
// make _schema_block_id_map
@ -787,7 +822,7 @@ void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
bool set_block_rowid) {
SCOPED_RAW_TIMER(&_opts.stats->pred_col_read_ns);
SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
do {
uint32_t range_from;
uint32_t range_to;
@ -818,7 +853,7 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32
void SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
uint16_t& selected_size) {
SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
if (_vec_pred_column_ids.empty()) {
if (!_is_need_vec_eval) {
for (uint32_t i = 0; i < selected_size; ++i) {
sel_rowid_idx[i] = i;
}
@ -859,7 +894,7 @@ void SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
void SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx,
uint16_t* selected_size_ptr) {
SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns);
if (_short_cir_pred_column_ids.empty()) {
if (!_is_need_short_eval) {
return;
}
@ -911,17 +946,19 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
if (UNLIKELY(!_inited)) {
RETURN_IF_ERROR(_init(true));
_inited = true;
if (!_vec_pred_column_ids.empty() || !_short_cir_pred_column_ids.empty()) {
if (_lazy_materialization_read) {
_block_rowids.resize(_opts.block_row_max);
}
_current_return_columns.resize(_schema.columns().size());
for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
if (_is_pred_column[cid]) {
auto column_desc = _schema.column(cid);
_current_return_columns[cid] = Schema::get_predicate_column_nullable_ptr(
column_desc->type(), column_desc->is_nullable());
_current_return_columns[cid]->reserve(_opts.block_row_max);
if (_is_need_vec_eval || _is_need_short_eval) {
for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
if (_is_pred_column[cid]) {
auto column_desc = _schema.column(cid);
_current_return_columns[cid] = Schema::get_predicate_column_nullable_ptr(
column_desc->type(), column_desc->is_nullable());
_current_return_columns[cid]->reserve(_opts.block_row_max);
}
}
}
}
@ -947,48 +984,46 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
return Status::EndOfFile("no more data in segment");
}
// when no predicate(include delete condition) is provided, output column directly
if (_vec_pred_column_ids.empty() && _short_cir_pred_column_ids.empty()) {
if (!_is_need_vec_eval && !_is_need_short_eval) {
_output_non_pred_columns(block);
} else { // need predicate evaluation
} else {
uint16_t selected_size = nrows_read;
uint16_t sel_rowid_idx[selected_size];
// step 1: evaluate vectorization predicate
_evaluate_vectorization_predicate(sel_rowid_idx, selected_size);
// When predicate column and no-predicate column are both basic type, lazy materialization is eliminate
// So output block directly after vectorization evaluation
if (_is_all_column_basic_type) {
RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx,
selected_size));
} else {
// step 2: evaluate short ciruit predicate
// todo(wb) research whether need to read short predicate after vectorization evaluation
// to reduce cost of read short circuit columns.
// In SSB test, it make no difference; So need more scenarios to test
_evaluate_short_circuit_predicate(sel_rowid_idx, &selected_size);
// step 2: evaluate short ciruit predicate
// todo(wb) research whether need to read short predicate after vectorization evaluation
// to reduce cost of read short circuit columns.
// In SSB test, it make no difference; So need more scenarios to test
_evaluate_short_circuit_predicate(sel_rowid_idx, &selected_size);
// step3: read non_predicate column
if (!_non_predicate_columns.empty()) {
_read_columns_by_rowids(_non_predicate_columns, _block_rowids, sel_rowid_idx,
selected_size, &_current_return_columns);
if (!_lazy_materialization_read) {
Status ret = _output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx,
selected_size);
if (!ret.ok()) {
return ret;
}
// step4: output columns
// 4.1 output non-predicate column
_output_non_pred_columns(block);
// 4.2 get union of short_cir_pred and vec_pred
std::set<ColumnId> pred_column_ids;
pred_column_ids.insert(_short_cir_pred_column_ids.begin(),
_short_cir_pred_column_ids.end());
pred_column_ids.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());
// 4.3 output short circuit and predicate column
RETURN_IF_ERROR(_output_column_by_sel_idx(block, pred_column_ids, sel_rowid_idx,
selected_size));
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);
return ret;
}
// step3: read non_predicate column
_read_columns_by_rowids(_non_predicate_columns, _block_rowids, sel_rowid_idx, selected_size,
&_current_return_columns);
// step4: output columns
// 4.1 output non-predicate column
_output_non_pred_columns(block);
// 4.3 output short circuit and predicate column
// when lazy materialization enables, _first_read_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
// see _vec_init_lazy_materialization
// todo(wb) need to tell input columnids from output columnids
RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx,
selected_size));
}
// shrink char_type suffix zero data

View File

@ -146,9 +146,10 @@ private:
// remember the rowids we've read for the current row block.
// could be a local variable of next_batch(), kept here to reuse vector memory
std::vector<rowid_t> _block_rowids;
bool _is_need_vec_eval = false;
bool _is_need_short_eval = false;
// fields for vectorization execution
bool _is_all_column_basic_type;
std::vector<ColumnId>
_vec_pred_column_ids; // keep columnId of columns for vectorized predicate evaluation
std::vector<ColumnId>

View File

@ -38,6 +38,8 @@ PROPERTIES (
)
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values
(9,10,11,12),
(9,10,11,12),
@ -57,6 +59,8 @@ PROPERTIES (
(5,6,7,8)
"""
sql "set enable_vectorized_engine = true"
test {
// siteid column not contain null
sql "select siteid,citycode,userid,pv from ${table1} where siteid = 21 "

View File

@ -37,6 +37,9 @@ suite("test_dup_tab_basic_varchar_nullable") {
"storage_format" = "V2"
)
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values(null,'qie3','yy','lj'),
(null,'hehe',null,'lala'),
('beijing','xuanwu','wugui',null),
@ -47,6 +50,8 @@ suite("test_dup_tab_basic_varchar_nullable") {
('tengxun2','qie',null,'lj')
"""
sql "set enable_vectorized_engine = true"
// read single column
test {
sql "select city from ${table1} order by city"

View File

@ -38,6 +38,7 @@ PROPERTIES (
)
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values
('a1','a2','a3','a4'),
@ -48,6 +49,7 @@ PROPERTIES (
('e1','e2','e3','e4'),
(null,'e2',null,'e4')
"""
sql "set enable_vectorized_engine = true"
qt_read_single_column_1 "select city from ${table1} where city in ('a1','e1')"
qt_read_single_column_2 "select city from ${table1} where city not in ('a1','e1')"

View File

@ -39,6 +39,8 @@ PROPERTIES (
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values
(1, '2021-04-01', '2021-04-02', '2021-04-03'),
(1, '2021-03-01', '2021-03-02', '2021-03-03'),
@ -47,6 +49,8 @@ PROPERTIES (
(null, '2021-05-01', 'null', '2021-04-03')
"""
sql "set enable_vectorized_engine = true"
qt_sql1 "select date1 from ${table1} order by date1"
// read single column

View File

@ -39,6 +39,8 @@ PROPERTIES (
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values
(1,'2021-01-01 23:10:01','2021-01-02 23:10:04','2021-01-02 22:10:04'),
(2,'2021-02-01 23:10:01','2021-02-02 23:10:04','2021-03-02 22:10:04'),
@ -48,6 +50,8 @@ PROPERTIES (
(null,'2021-06-01 23:10:01',null,'2021-06-02 22:10:04')
"""
sql "set enable_vectorized_engine = true"
qt_read_single_column_1 "select datetime1 from ${table1}"
qt_read_single_column_2 "select siteid from ${table1}"

View File

@ -38,6 +38,8 @@ PROPERTIES (
)
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values(1.1,1.2,1.3,1.4),
(1.1,2.2,2.3,3.4),
(2.1,2.2,2.3,2.4),
@ -46,6 +48,8 @@ PROPERTIES (
(null,2,null,4)
"""
sql "set enable_vectorized_engine = true"
// query decimal
test {
sql "select siteid from ${table1} order by siteid"

View File

@ -42,6 +42,8 @@ PROPERTIES (
)
"""
sql "set enable_vectorized_engine = false"
sql """insert into ${table1} values(1,2,3.1,4.2,5.3,5.4,'a1','a2'),
(2,3,4.1,5.2,6.3,7.4,'b1','b2'),
(3,4,5.1,6.2,7.3,8.4,'c1','c2'),
@ -50,6 +52,7 @@ PROPERTIES (
(5,6,5.1,8.2,6.3,11.4,'e1','e2'),
(null,7,null,8,null,9,null,'e3')
"""
sql "set enable_vectorized_engine = true"
// read int and string
test {