From 73c3de43138e9014ff2aed38ffc995fd6307727f Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Wed, 3 Jun 2020 19:55:53 +0800 Subject: [PATCH] [refactor] Simple refactor on class Reader (#3691) This is a simple refactor patch on class Reader without any functional changes. Main refactor points: - Remove some useless return value - Use range loop - Use empty() instead of size() for some STL containers size judgement - Use in-class initialization instead of initialize in constructor function - Some other small refactor --- be/src/olap/reader.cpp | 164 ++++++++++++++--------------------------- be/src/olap/reader.h | 77 +++++++------------ 2 files changed, 81 insertions(+), 160 deletions(-) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index a19dd03b20..66de5dfb62 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -43,13 +43,12 @@ class CollectIterator { public: ~CollectIterator(); - // Hold reader point to get reader params, - // set reverse to true if need read in reverse order. - OLAPStatus init(Reader* reader); + // Hold reader point to get reader params + void init(Reader* reader); OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); - // Get top row of the heap, NULL if reach end. + // Get top row of the heap, nullptr if reach end. const RowCursor* current_row(bool* delete_flag) const { if (_cur_child != nullptr) { return _cur_child->current_row(delete_flag); @@ -143,7 +142,7 @@ private: RowsetReaderSharedPtr _rs_reader; const RowCursor* _current_row = nullptr; bool _is_delete = false; - Reader* _reader; + Reader* _reader = nullptr; RowCursor _row_cursor; // point to rows inside `_row_block` RowBlock* _row_block = nullptr; @@ -188,7 +187,7 @@ CollectIterator::~CollectIterator() { } } -OLAPStatus CollectIterator::init(Reader* reader) { +void CollectIterator::init(Reader* reader) { _reader = reader; // when aggregate is enabled or key_type is DUP_KEYS, we don't merge // multiple data to aggregate for performance in user fetch @@ -197,7 +196,6 @@ OLAPStatus CollectIterator::init(Reader* reader) { _reader->_tablet->keys_type() == KeysType::DUP_KEYS)) { _merge = false; } - return OLAP_SUCCESS; } OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { @@ -227,7 +225,7 @@ inline OLAPStatus CollectIterator::_merge_next(const RowCursor** row, bool* dele _heap.push(_cur_child); _cur_child = _heap.top(); } else if (res == OLAP_ERR_DATA_EOF) { - if (_heap.size() > 0) { + if (!_heap.empty()) { _cur_child = _heap.top(); } else { _cur_child = nullptr; @@ -275,7 +273,7 @@ bool CollectIterator::ChildCtxComparator::operator()(const ChildCtx* a, const Ch } void CollectIterator::clear() { - while (_heap.size() > 0) { + while (!_heap.empty()) { _heap.pop(); } for (auto child : _children) { @@ -287,14 +285,7 @@ void CollectIterator::clear() { _child_idx = 0; } -Reader::Reader() - : _next_key_index(0), - _aggregation(false), - _version_locked(false), - _reader_type(READER_QUERY), - _next_delete_flag(false), - _next_key(NULL), - _merged_rows(0) { +Reader::Reader() { _tracker.reset(new MemTracker(-1)); _predicate_mem_pool.reset(new MemPool(_tracker.get())); } @@ -304,25 +295,23 @@ Reader::~Reader() { } OLAPStatus Reader::init(const ReaderParams& read_params) { - OLAPStatus res = OLAP_SUCCESS; - - res = _init_params(read_params); + OLAPStatus res = _init_params(read_params); if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to init reader when init params.res" << res - << ", tablet_id :" << read_params.tablet->tablet_id() + LOG(WARNING) << "fail to init reader when init params. res:" << res + << ", tablet_id:" << read_params.tablet->tablet_id() << ", schema_hash:" << read_params.tablet->schema_hash() << ", reader type:" << read_params.reader_type - << ", version:" << read_params.version.first << "-" << read_params.version.second; + << ", version:" << read_params.version; return res; } res = _capture_rs_readers(read_params); if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to init reader when _capture_rs_readers.res" << res - << ", tablet_id :" << read_params.tablet->tablet_id() + LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res + << ", tablet_id:" << read_params.tablet->tablet_id() << ", schema_hash:" << read_params.tablet->schema_hash() << ", reader_type:" << read_params.reader_type - << ", version:" << read_params.version.first << "-" << read_params.version.second; + << ", version:" << read_params.version; return res; } @@ -410,7 +399,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); int64_t merged_count = 0; - while (NULL != _next_key) { + while (nullptr != _next_key) { auto res = _collect_iter->next(&_next_key, &_next_delete_flag); if (res != OLAP_SUCCESS) { if (res != OLAP_ERR_DATA_EOF) { @@ -421,7 +410,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool // we will not do aggregation in two case: // 1. DUP_KEYS keys type has no semantic to aggregate, - // 2. to make cost of each scan round reasonable, we will control merged_count. + // 2. to make cost of each scan round reasonable, we will control merged_count. if (_aggregation && merged_count > config::doris_scanner_row_num) { agg_finalize_row(_value_cids, row_cursor, mem_pool); break; @@ -463,23 +452,20 @@ void Reader::close() { OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { const std::vector* rs_readers = &read_params.rs_readers; - if (rs_readers->size() < 1) { - LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name() - << ", version=" << _version.first << "-" << _version.second; + if (rs_readers->empty()) { + LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name(); return OLAP_ERR_VERSION_NOT_EXIST; } bool eof = false; - RowCursor* start_key = nullptr; - RowCursor* end_key = nullptr; - bool is_lower_key_included = false; - bool is_upper_key_included = false; for (int i = 0; i < _keys_param.start_keys.size(); ++i) { - start_key = _keys_param.start_keys[i]; - end_key = _keys_param.end_keys[i]; - if (_keys_param.end_range.compare("lt") == 0) { + RowCursor* start_key = _keys_param.start_keys[i]; + RowCursor* end_key = _keys_param.end_keys[i]; + bool is_lower_key_included = false; + bool is_upper_key_included = false; + if (_keys_param.end_range == "lt") { is_upper_key_included = false; - } else if (_keys_param.end_range.compare("le") == 0) { + } else if (_keys_param.end_range == "le") { is_upper_key_included = true; } else { LOG(WARNING) << "reader params end_range is error. " @@ -487,7 +473,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { return OLAP_ERR_READER_GET_ITERATOR_ERROR; } - if (_keys_param.range.compare("gt") == 0) { + if (_keys_param.range == "gt") { if (end_key != nullptr && compare_row_key(*start_key, *end_key) >= 0) { VLOG(3) << "return EOF when range=" << _keys_param.range << ", start_key=" << start_key->to_string() @@ -496,7 +482,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { break; } is_lower_key_included = false; - } else if (_keys_param.range.compare("ge") == 0) { + } else if (_keys_param.range == "ge") { if (end_key != nullptr && compare_row_key(*start_key, *end_key) > 0) { VLOG(3) << "return EOF when range=" << _keys_param.range << ", start_key=" << start_key->to_string() @@ -505,7 +491,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { break; } is_lower_key_included = true; - } else if (_keys_param.range.compare("eq") == 0) { + } else if (_keys_param.range == "eq") { is_lower_key_included = true; is_upper_key_included = true; } else { @@ -513,6 +499,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { << "range=" << _keys_param.to_string(); return OLAP_ERR_READER_GET_ITERATOR_ERROR; } + _is_lower_keys_included.push_back(is_lower_key_included); _is_upper_keys_included.push_back(is_upper_key_included); } @@ -550,15 +537,12 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.use_page_cache = read_params.use_page_cache; for (auto& rs_reader : *rs_readers) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); - _rs_readers.push_back(rs_reader); - } - - for (auto& rs_reader : _rs_readers) { OLAPStatus res = _collect_iter->add_child(rs_reader); if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { LOG(WARNING) << "failed to add child to iterator"; return res; } + _rs_readers.push_back(rs_reader); } _next_key = _collect_iter->current_row(&_next_delete_flag); @@ -567,26 +551,16 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { OLAPStatus Reader::_init_params(const ReaderParams& read_params) { read_params.check_validation(); - OLAPStatus res = OLAP_SUCCESS; + _aggregation = read_params.aggregation; _need_agg_finalize = read_params.need_agg_finalize; _reader_type = read_params.reader_type; _tablet = read_params.tablet; - _version = read_params.version; - res = _init_conditions_param(read_params); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init conditions param. [res=%d]", res); - return res; - } + _init_conditions_param(read_params); + _init_load_bf_columns(read_params); - res = _init_load_bf_columns(read_params); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init load bloom filter columns. [res=%d]", res); - return res; - } - - res = _init_delete_condition(read_params); + OLAPStatus res = _init_delete_condition(read_params); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init delete param. [res=%d]", res); return res; @@ -604,11 +578,7 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) { return res; } - res = _init_seek_columns(); - if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to init seek columns. [res=%d]", res); - return res; - } + _init_seek_columns(); _collect_iter = new CollectIterator(); _collect_iter->init(this); @@ -621,7 +591,6 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) { _return_columns = read_params.return_columns; if (_delete_handler.conditions_num() != 0 && read_params.aggregation) { set column_set(_return_columns.begin(), _return_columns.end()); - for (auto conds : _delete_handler.get_delete_conditions()) { for (auto cond_column : conds.del_cond->columns()) { if (column_set.find(cond_column.first) == column_set.end()) { @@ -638,7 +607,7 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) { _value_cids.push_back(id); } } - } else if (read_params.return_columns.size() == 0) { + } else if (read_params.return_columns.empty()) { for (size_t i = 0; i < _tablet->tablet_schema().num_columns(); ++i) { _return_columns.push_back(i); if (_tablet->tablet_schema().column(i).is_key()) { @@ -669,7 +638,7 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) { } -OLAPStatus Reader::_init_seek_columns() { +void Reader::_init_seek_columns() { std::unordered_set column_set(_return_columns.begin(), _return_columns.end()); for (auto& it : _conditions.columns()) { column_set.insert(it.first); @@ -690,15 +659,10 @@ OLAPStatus Reader::_init_seek_columns() { _seek_columns.push_back(i); } } - return OLAP_SUCCESS; } OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { - OLAPStatus res = OLAP_SUCCESS; - - _next_key_index = 0; - - if (read_params.start_key.size() == 0) { + if (read_params.start_key.empty()) { return OLAP_SUCCESS; } @@ -706,14 +670,14 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { _keys_param.end_range = read_params.end_range; size_t start_key_size = read_params.start_key.size(); - _keys_param.start_keys.resize(start_key_size, NULL); + _keys_param.start_keys.resize(start_key_size, nullptr); for (size_t i = 0; i < start_key_size; ++i) { - if ((_keys_param.start_keys[i] = new(nothrow) RowCursor()) == NULL) { + if ((_keys_param.start_keys[i] = new(nothrow) RowCursor()) == nullptr) { OLAP_LOG_WARNING("fail to new RowCursor!"); return OLAP_ERR_MALLOC_ERROR; } - res = _keys_param.start_keys[i]->init_scan_key(_tablet->tablet_schema(), + OLAPStatus res = _keys_param.start_keys[i]->init_scan_key(_tablet->tablet_schema(), read_params.start_key[i].values()); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); @@ -735,7 +699,7 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { return OLAP_ERR_MALLOC_ERROR; } - res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(), + OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(), read_params.end_key[i].values()); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); @@ -754,24 +718,20 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { return OLAP_SUCCESS; } -OLAPStatus Reader::_init_conditions_param(const ReaderParams& read_params) { - OLAPStatus res = OLAP_SUCCESS; - +void Reader::_init_conditions_param(const ReaderParams& read_params) { _conditions.set_tablet_schema(&_tablet->tablet_schema()); - for (int i = 0; i < read_params.conditions.size(); ++i) { - _conditions.append_condition(read_params.conditions[i]); - ColumnPredicate* predicate = _parse_to_predicate(read_params.conditions[i]); - if (predicate != NULL) { + for (const auto& condition : read_params.conditions) { + _conditions.append_condition(condition); + ColumnPredicate* predicate = _parse_to_predicate(condition); + if (predicate != nullptr) { _col_predicates.push_back(predicate); } } - - return res; } #define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \ ColumnPredicate* Reader::_new_##NAME##_pred(const TabletColumn& column, int index, const std::string& cond) { \ - ColumnPredicate* predicate = NULL; \ + ColumnPredicate* predicate = nullptr; \ switch (column.type()) { \ case OLAP_FIELD_TYPE_TINYINT: { \ std::stringstream ss(cond); \ @@ -872,9 +832,8 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) { if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) { return nullptr; } - ColumnPredicate* predicate = NULL; - if (condition.condition_op == "*=" - && condition.condition_values.size() == 1) { + ColumnPredicate* predicate = nullptr; + if (condition.condition_op == "*=" && condition.condition_values.size() == 1) { predicate = _new_eq_pred(column, index, condition.condition_values[0]); } else if (condition.condition_op == "<<") { predicate = _new_lt_pred(column, index, condition.condition_values[0]); @@ -884,8 +843,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) { predicate = _new_gt_pred(column, index, condition.condition_values[0]); } else if (condition.condition_op == ">=") { predicate = _new_ge_pred(column, index, condition.condition_values[0]); - } else if (condition.condition_op == "*=" - && condition.condition_values.size() > 1) { + } else if (condition.condition_op == "*=" && condition.condition_values.size() > 1) { switch (column.type()) { case OLAP_FIELD_TYPE_TINYINT: { std::set values; @@ -1003,20 +961,12 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) { default: break; } } else if (condition.condition_op == "is") { - bool is_null = false; - if (condition.condition_values[0] == "null") { - is_null = true; - } else { - is_null = false; - } - predicate = new NullPredicate(index, is_null); + predicate = new NullPredicate(index, condition.condition_values[0] == "null"); } return predicate; } -OLAPStatus Reader::_init_load_bf_columns(const ReaderParams& read_params) { - OLAPStatus res = OLAP_SUCCESS; - +void Reader::_init_load_bf_columns(const ReaderParams& read_params) { // add all columns with condition to _load_bf_columns for (const auto& cond_column : _conditions.columns()) { for (const Cond* cond : cond_column.second->conds()) { @@ -1068,16 +1018,13 @@ OLAPStatus Reader::_init_load_bf_columns(const ReaderParams& read_params) { // remove the max_equal_index column when it's not varchar // or longer than number of short key fields - FieldType type = OLAP_FIELD_TYPE_NONE; if (max_equal_index == -1) { - return res; + return; } - type = _tablet->tablet_schema().column(max_equal_index).type(); + FieldType type = _tablet->tablet_schema().column(max_equal_index).type(); if (type != OLAP_FIELD_TYPE_VARCHAR || max_equal_index + 1 > _tablet->num_short_key_columns()) { _load_bf_columns.erase(max_equal_index); } - - return res; } OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) { @@ -1087,7 +1034,6 @@ OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) { _tablet->delete_predicates(), read_params.version.second); _tablet->release_header_lock(); - return ret; } else { return OLAP_SUCCESS; diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 0041c62350..f4ffa59ae9 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -51,8 +51,8 @@ class RuntimeState; // mainly include tablet, data version and fetch range. struct ReaderParams { TabletSharedPtr tablet; - ReaderType reader_type; - bool aggregation; + ReaderType reader_type = READER_QUERY; + bool aggregation = false; bool need_agg_finalize = true; // 1. when read column data page: // for compaction, schema_change, check_sum: we don't use page cache @@ -60,7 +60,7 @@ struct ReaderParams { // 2. when read column index page // if config::disable_storage_page_cache is false, we use page cache bool use_page_cache = false; - Version version; + Version version = Version(-1, 0); // possible values are "gt", "ge", "eq" std::string range; // possible values are "lt", "le" @@ -71,34 +71,21 @@ struct ReaderParams { // The ColumnData will be set when using Merger, eg Cumulative, BE. std::vector rs_readers; std::vector return_columns; - RuntimeProfile* profile; - RuntimeState* runtime_state; - - ReaderParams() : - reader_type(READER_QUERY), - aggregation(false), - version(-1, 0), - profile(NULL), - runtime_state(NULL) { - start_key.clear(); - end_key.clear(); - conditions.clear(); - rs_readers.clear(); - } + RuntimeProfile* profile = nullptr; + RuntimeState* runtime_state = nullptr; void check_validation() const { - if (version.first == -1) { + if (UNLIKELY(version.first == -1)) { LOG(FATAL) << "verison is not set. tablet=" << tablet->full_name(); } } std::string to_string() { std::stringstream ss; - ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type << " aggregation=" << aggregation - << " version=" << version.first << "-" << version.second + << " version=" << version << " range=" << range << " end_range=" << end_range; @@ -110,8 +97,8 @@ struct ReaderParams { ss << " end_keys=" << key; } - for (int i = 0, size = conditions.size(); i < size; ++i) { - ss << " conditions=" << apache::thrift::ThriftDebugString(conditions[i]); + for (auto& condition : conditions) { + ss << " conditions=" << apache::thrift::ThriftDebugString(condition); } return ss.str(); @@ -150,12 +137,12 @@ public: private: struct KeysParam { ~KeysParam() { - for (int32_t i = 0; i < start_keys.size(); i++) { - SAFE_DELETE(start_keys[i]); + for (auto start_key : start_keys) { + SAFE_DELETE(start_key); } - for (int32_t i = 0; i < end_keys.size(); i++) { - SAFE_DELETE(end_keys[i]); + for (auto end_key : end_keys) { + SAFE_DELETE(end_key); } } @@ -164,12 +151,12 @@ private: ss << "range=" << range << " end_range=" << end_range; - for (int i = 0, size = this->start_keys.size(); i < size; ++i) { - ss << " keys=" << start_keys[i]->to_string(); + for (auto start_key : start_keys) { + ss << " keys=" << start_key->to_string(); } - for (int i = 0, size = this->end_keys.size(); i < size; ++i) { - ss << " end_keys=" << end_keys[i]->to_string(); + for (auto end_key : end_keys) { + ss << " end_keys=" << end_key->to_string(); } return ss.str(); @@ -189,7 +176,7 @@ private: OLAPStatus _init_keys_param(const ReaderParams& read_params); - OLAPStatus _init_conditions_param(const ReaderParams& read_params); + void _init_conditions_param(const ReaderParams& read_params); ColumnPredicate* _new_eq_pred(const TabletColumn& column, int index, const std::string& cond); ColumnPredicate* _new_ne_pred(const TabletColumn& column, int index, const std::string& cond); @@ -203,9 +190,9 @@ private: OLAPStatus _init_delete_condition(const ReaderParams& read_params); OLAPStatus _init_return_columns(const ReaderParams& read_params); - OLAPStatus _init_seek_columns(); + void _init_seek_columns(); - OLAPStatus _init_load_bf_columns(const ReaderParams& read_params); + void _init_load_bf_columns(const ReaderParams& read_params); OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof); OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof); @@ -220,44 +207,32 @@ private: std::vector _return_columns; std::vector _seek_columns; - Version _version; - TabletSharedPtr _tablet; - - // _own_rs_readers is data source that reader aquire from tablet, so we need to - // release these when reader closing - std::vector _own_rs_readers; std::vector _rs_readers; RowsetReaderContext _reader_context; - KeysParam _keys_param; std::vector _is_lower_keys_included; std::vector _is_upper_keys_included; - int32_t _next_key_index; - Conditions _conditions; std::vector _col_predicates; - DeleteHandler _delete_handler; OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) = nullptr; - bool _aggregation; + bool _aggregation = false; // for agg query, we don't need to finalize when scan agg object data bool _need_agg_finalize = true; - bool _version_locked; - ReaderType _reader_type; - bool _next_delete_flag; - const RowCursor* _next_key; + ReaderType _reader_type = READER_QUERY; + bool _next_delete_flag = false; + const RowCursor* _next_key = nullptr; CollectIterator* _collect_iter = nullptr; std::vector _key_cids; std::vector _value_cids; - uint64_t _merged_rows; - + uint64_t _merged_rows = 0; OlapReaderStatistics _stats; - DISALLOW_COPY_AND_ASSIGN(Reader); + DISALLOW_COPY_AND_ASSIGN(Reader); }; } // namespace doris