[refactor] Simple refactor on class Reader (#3691)

This is a simple refactor patch on class Reader without any functional changes.
Main refactor points:
- Remove some useless return value
- Use range loop
- Use empty() instead of size() for some STL containers size judgement
- Use in-class initialization instead of initialize in constructor function
- Some other small refactor
This commit is contained in:
Yingchun Lai
2020-06-03 19:55:53 +08:00
committed by GitHub
parent ed886a485d
commit 73c3de4313
2 changed files with 81 additions and 160 deletions

View File

@ -43,13 +43,12 @@ class CollectIterator {
public:
~CollectIterator();
// Hold reader point to get reader params,
// set reverse to true if need read in reverse order.
OLAPStatus init(Reader* reader);
// Hold reader point to get reader params
void init(Reader* reader);
OLAPStatus add_child(RowsetReaderSharedPtr rs_reader);
// Get top row of the heap, NULL if reach end.
// Get top row of the heap, nullptr if reach end.
const RowCursor* current_row(bool* delete_flag) const {
if (_cur_child != nullptr) {
return _cur_child->current_row(delete_flag);
@ -143,7 +142,7 @@ private:
RowsetReaderSharedPtr _rs_reader;
const RowCursor* _current_row = nullptr;
bool _is_delete = false;
Reader* _reader;
Reader* _reader = nullptr;
RowCursor _row_cursor; // point to rows inside `_row_block`
RowBlock* _row_block = nullptr;
@ -188,7 +187,7 @@ CollectIterator::~CollectIterator() {
}
}
OLAPStatus CollectIterator::init(Reader* reader) {
void CollectIterator::init(Reader* reader) {
_reader = reader;
// when aggregate is enabled or key_type is DUP_KEYS, we don't merge
// multiple data to aggregate for performance in user fetch
@ -197,7 +196,6 @@ OLAPStatus CollectIterator::init(Reader* reader) {
_reader->_tablet->keys_type() == KeysType::DUP_KEYS)) {
_merge = false;
}
return OLAP_SUCCESS;
}
OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
@ -227,7 +225,7 @@ inline OLAPStatus CollectIterator::_merge_next(const RowCursor** row, bool* dele
_heap.push(_cur_child);
_cur_child = _heap.top();
} else if (res == OLAP_ERR_DATA_EOF) {
if (_heap.size() > 0) {
if (!_heap.empty()) {
_cur_child = _heap.top();
} else {
_cur_child = nullptr;
@ -275,7 +273,7 @@ bool CollectIterator::ChildCtxComparator::operator()(const ChildCtx* a, const Ch
}
void CollectIterator::clear() {
while (_heap.size() > 0) {
while (!_heap.empty()) {
_heap.pop();
}
for (auto child : _children) {
@ -287,14 +285,7 @@ void CollectIterator::clear() {
_child_idx = 0;
}
Reader::Reader()
: _next_key_index(0),
_aggregation(false),
_version_locked(false),
_reader_type(READER_QUERY),
_next_delete_flag(false),
_next_key(NULL),
_merged_rows(0) {
Reader::Reader() {
_tracker.reset(new MemTracker(-1));
_predicate_mem_pool.reset(new MemPool(_tracker.get()));
}
@ -304,25 +295,23 @@ Reader::~Reader() {
}
OLAPStatus Reader::init(const ReaderParams& read_params) {
OLAPStatus res = OLAP_SUCCESS;
res = _init_params(read_params);
OLAPStatus res = _init_params(read_params);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init reader when init params.res" << res
<< ", tablet_id :" << read_params.tablet->tablet_id()
LOG(WARNING) << "fail to init reader when init params. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
<< ", schema_hash:" << read_params.tablet->schema_hash()
<< ", reader type:" << read_params.reader_type
<< ", version:" << read_params.version.first << "-" << read_params.version.second;
<< ", version:" << read_params.version;
return res;
}
res = _capture_rs_readers(read_params);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init reader when _capture_rs_readers.res" << res
<< ", tablet_id :" << read_params.tablet->tablet_id()
LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
<< ", schema_hash:" << read_params.tablet->schema_hash()
<< ", reader_type:" << read_params.reader_type
<< ", version:" << read_params.version.first << "-" << read_params.version.second;
<< ", version:" << read_params.version;
return res;
}
@ -410,7 +399,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool);
int64_t merged_count = 0;
while (NULL != _next_key) {
while (nullptr != _next_key) {
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
if (res != OLAP_SUCCESS) {
if (res != OLAP_ERR_DATA_EOF) {
@ -421,7 +410,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
// we will not do aggregation in two case:
// 1. DUP_KEYS keys type has no semantic to aggregate,
// 2. to make cost of each scan round reasonable, we will control merged_count.
// 2. to make cost of each scan round reasonable, we will control merged_count.
if (_aggregation && merged_count > config::doris_scanner_row_num) {
agg_finalize_row(_value_cids, row_cursor, mem_pool);
break;
@ -463,23 +452,20 @@ void Reader::close() {
OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
const std::vector<RowsetReaderSharedPtr>* rs_readers = &read_params.rs_readers;
if (rs_readers->size() < 1) {
LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name()
<< ", version=" << _version.first << "-" << _version.second;
if (rs_readers->empty()) {
LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name();
return OLAP_ERR_VERSION_NOT_EXIST;
}
bool eof = false;
RowCursor* start_key = nullptr;
RowCursor* end_key = nullptr;
bool is_lower_key_included = false;
bool is_upper_key_included = false;
for (int i = 0; i < _keys_param.start_keys.size(); ++i) {
start_key = _keys_param.start_keys[i];
end_key = _keys_param.end_keys[i];
if (_keys_param.end_range.compare("lt") == 0) {
RowCursor* start_key = _keys_param.start_keys[i];
RowCursor* end_key = _keys_param.end_keys[i];
bool is_lower_key_included = false;
bool is_upper_key_included = false;
if (_keys_param.end_range == "lt") {
is_upper_key_included = false;
} else if (_keys_param.end_range.compare("le") == 0) {
} else if (_keys_param.end_range == "le") {
is_upper_key_included = true;
} else {
LOG(WARNING) << "reader params end_range is error. "
@ -487,7 +473,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
return OLAP_ERR_READER_GET_ITERATOR_ERROR;
}
if (_keys_param.range.compare("gt") == 0) {
if (_keys_param.range == "gt") {
if (end_key != nullptr && compare_row_key(*start_key, *end_key) >= 0) {
VLOG(3) << "return EOF when range=" << _keys_param.range
<< ", start_key=" << start_key->to_string()
@ -496,7 +482,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
break;
}
is_lower_key_included = false;
} else if (_keys_param.range.compare("ge") == 0) {
} else if (_keys_param.range == "ge") {
if (end_key != nullptr && compare_row_key(*start_key, *end_key) > 0) {
VLOG(3) << "return EOF when range=" << _keys_param.range
<< ", start_key=" << start_key->to_string()
@ -505,7 +491,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
break;
}
is_lower_key_included = true;
} else if (_keys_param.range.compare("eq") == 0) {
} else if (_keys_param.range == "eq") {
is_lower_key_included = true;
is_upper_key_included = true;
} else {
@ -513,6 +499,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
<< "range=" << _keys_param.to_string();
return OLAP_ERR_READER_GET_ITERATOR_ERROR;
}
_is_lower_keys_included.push_back(is_lower_key_included);
_is_upper_keys_included.push_back(is_upper_key_included);
}
@ -550,15 +537,12 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.use_page_cache = read_params.use_page_cache;
for (auto& rs_reader : *rs_readers) {
RETURN_NOT_OK(rs_reader->init(&_reader_context));
_rs_readers.push_back(rs_reader);
}
for (auto& rs_reader : _rs_readers) {
OLAPStatus res = _collect_iter->add_child(rs_reader);
if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
LOG(WARNING) << "failed to add child to iterator";
return res;
}
_rs_readers.push_back(rs_reader);
}
_next_key = _collect_iter->current_row(&_next_delete_flag);
@ -567,26 +551,16 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
read_params.check_validation();
OLAPStatus res = OLAP_SUCCESS;
_aggregation = read_params.aggregation;
_need_agg_finalize = read_params.need_agg_finalize;
_reader_type = read_params.reader_type;
_tablet = read_params.tablet;
_version = read_params.version;
res = _init_conditions_param(read_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init conditions param. [res=%d]", res);
return res;
}
_init_conditions_param(read_params);
_init_load_bf_columns(read_params);
res = _init_load_bf_columns(read_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init load bloom filter columns. [res=%d]", res);
return res;
}
res = _init_delete_condition(read_params);
OLAPStatus res = _init_delete_condition(read_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init delete param. [res=%d]", res);
return res;
@ -604,11 +578,7 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
return res;
}
res = _init_seek_columns();
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init seek columns. [res=%d]", res);
return res;
}
_init_seek_columns();
_collect_iter = new CollectIterator();
_collect_iter->init(this);
@ -621,7 +591,6 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {
_return_columns = read_params.return_columns;
if (_delete_handler.conditions_num() != 0 && read_params.aggregation) {
set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (auto conds : _delete_handler.get_delete_conditions()) {
for (auto cond_column : conds.del_cond->columns()) {
if (column_set.find(cond_column.first) == column_set.end()) {
@ -638,7 +607,7 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {
_value_cids.push_back(id);
}
}
} else if (read_params.return_columns.size() == 0) {
} else if (read_params.return_columns.empty()) {
for (size_t i = 0; i < _tablet->tablet_schema().num_columns(); ++i) {
_return_columns.push_back(i);
if (_tablet->tablet_schema().column(i).is_key()) {
@ -669,7 +638,7 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {
}
OLAPStatus Reader::_init_seek_columns() {
void Reader::_init_seek_columns() {
std::unordered_set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (auto& it : _conditions.columns()) {
column_set.insert(it.first);
@ -690,15 +659,10 @@ OLAPStatus Reader::_init_seek_columns() {
_seek_columns.push_back(i);
}
}
return OLAP_SUCCESS;
}
OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
OLAPStatus res = OLAP_SUCCESS;
_next_key_index = 0;
if (read_params.start_key.size() == 0) {
if (read_params.start_key.empty()) {
return OLAP_SUCCESS;
}
@ -706,14 +670,14 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
_keys_param.end_range = read_params.end_range;
size_t start_key_size = read_params.start_key.size();
_keys_param.start_keys.resize(start_key_size, NULL);
_keys_param.start_keys.resize(start_key_size, nullptr);
for (size_t i = 0; i < start_key_size; ++i) {
if ((_keys_param.start_keys[i] = new(nothrow) RowCursor()) == NULL) {
if ((_keys_param.start_keys[i] = new(nothrow) RowCursor()) == nullptr) {
OLAP_LOG_WARNING("fail to new RowCursor!");
return OLAP_ERR_MALLOC_ERROR;
}
res = _keys_param.start_keys[i]->init_scan_key(_tablet->tablet_schema(),
OLAPStatus res = _keys_param.start_keys[i]->init_scan_key(_tablet->tablet_schema(),
read_params.start_key[i].values());
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
@ -735,7 +699,7 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
return OLAP_ERR_MALLOC_ERROR;
}
res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(),
OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(),
read_params.end_key[i].values());
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
@ -754,24 +718,20 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
return OLAP_SUCCESS;
}
OLAPStatus Reader::_init_conditions_param(const ReaderParams& read_params) {
OLAPStatus res = OLAP_SUCCESS;
void Reader::_init_conditions_param(const ReaderParams& read_params) {
_conditions.set_tablet_schema(&_tablet->tablet_schema());
for (int i = 0; i < read_params.conditions.size(); ++i) {
_conditions.append_condition(read_params.conditions[i]);
ColumnPredicate* predicate = _parse_to_predicate(read_params.conditions[i]);
if (predicate != NULL) {
for (const auto& condition : read_params.conditions) {
_conditions.append_condition(condition);
ColumnPredicate* predicate = _parse_to_predicate(condition);
if (predicate != nullptr) {
_col_predicates.push_back(predicate);
}
}
return res;
}
#define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \
ColumnPredicate* Reader::_new_##NAME##_pred(const TabletColumn& column, int index, const std::string& cond) { \
ColumnPredicate* predicate = NULL; \
ColumnPredicate* predicate = nullptr; \
switch (column.type()) { \
case OLAP_FIELD_TYPE_TINYINT: { \
std::stringstream ss(cond); \
@ -872,9 +832,8 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
return nullptr;
}
ColumnPredicate* predicate = NULL;
if (condition.condition_op == "*="
&& condition.condition_values.size() == 1) {
ColumnPredicate* predicate = nullptr;
if (condition.condition_op == "*=" && condition.condition_values.size() == 1) {
predicate = _new_eq_pred(column, index, condition.condition_values[0]);
} else if (condition.condition_op == "<<") {
predicate = _new_lt_pred(column, index, condition.condition_values[0]);
@ -884,8 +843,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
predicate = _new_gt_pred(column, index, condition.condition_values[0]);
} else if (condition.condition_op == ">=") {
predicate = _new_ge_pred(column, index, condition.condition_values[0]);
} else if (condition.condition_op == "*="
&& condition.condition_values.size() > 1) {
} else if (condition.condition_op == "*=" && condition.condition_values.size() > 1) {
switch (column.type()) {
case OLAP_FIELD_TYPE_TINYINT: {
std::set<int8_t> values;
@ -1003,20 +961,12 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
default: break;
}
} else if (condition.condition_op == "is") {
bool is_null = false;
if (condition.condition_values[0] == "null") {
is_null = true;
} else {
is_null = false;
}
predicate = new NullPredicate(index, is_null);
predicate = new NullPredicate(index, condition.condition_values[0] == "null");
}
return predicate;
}
OLAPStatus Reader::_init_load_bf_columns(const ReaderParams& read_params) {
OLAPStatus res = OLAP_SUCCESS;
void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
// add all columns with condition to _load_bf_columns
for (const auto& cond_column : _conditions.columns()) {
for (const Cond* cond : cond_column.second->conds()) {
@ -1068,16 +1018,13 @@ OLAPStatus Reader::_init_load_bf_columns(const ReaderParams& read_params) {
// remove the max_equal_index column when it's not varchar
// or longer than number of short key fields
FieldType type = OLAP_FIELD_TYPE_NONE;
if (max_equal_index == -1) {
return res;
return;
}
type = _tablet->tablet_schema().column(max_equal_index).type();
FieldType type = _tablet->tablet_schema().column(max_equal_index).type();
if (type != OLAP_FIELD_TYPE_VARCHAR || max_equal_index + 1 > _tablet->num_short_key_columns()) {
_load_bf_columns.erase(max_equal_index);
}
return res;
}
OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) {
@ -1087,7 +1034,6 @@ OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) {
_tablet->delete_predicates(),
read_params.version.second);
_tablet->release_header_lock();
return ret;
} else {
return OLAP_SUCCESS;

View File

@ -51,8 +51,8 @@ class RuntimeState;
// mainly include tablet, data version and fetch range.
struct ReaderParams {
TabletSharedPtr tablet;
ReaderType reader_type;
bool aggregation;
ReaderType reader_type = READER_QUERY;
bool aggregation = false;
bool need_agg_finalize = true;
// 1. when read column data page:
// for compaction, schema_change, check_sum: we don't use page cache
@ -60,7 +60,7 @@ struct ReaderParams {
// 2. when read column index page
// if config::disable_storage_page_cache is false, we use page cache
bool use_page_cache = false;
Version version;
Version version = Version(-1, 0);
// possible values are "gt", "ge", "eq"
std::string range;
// possible values are "lt", "le"
@ -71,34 +71,21 @@ struct ReaderParams {
// The ColumnData will be set when using Merger, eg Cumulative, BE.
std::vector<RowsetReaderSharedPtr> rs_readers;
std::vector<uint32_t> return_columns;
RuntimeProfile* profile;
RuntimeState* runtime_state;
ReaderParams() :
reader_type(READER_QUERY),
aggregation(false),
version(-1, 0),
profile(NULL),
runtime_state(NULL) {
start_key.clear();
end_key.clear();
conditions.clear();
rs_readers.clear();
}
RuntimeProfile* profile = nullptr;
RuntimeState* runtime_state = nullptr;
void check_validation() const {
if (version.first == -1) {
if (UNLIKELY(version.first == -1)) {
LOG(FATAL) << "verison is not set. tablet=" << tablet->full_name();
}
}
std::string to_string() {
std::stringstream ss;
ss << "tablet=" << tablet->full_name()
<< " reader_type=" << reader_type
<< " aggregation=" << aggregation
<< " version=" << version.first << "-" << version.second
<< " version=" << version
<< " range=" << range
<< " end_range=" << end_range;
@ -110,8 +97,8 @@ struct ReaderParams {
ss << " end_keys=" << key;
}
for (int i = 0, size = conditions.size(); i < size; ++i) {
ss << " conditions=" << apache::thrift::ThriftDebugString(conditions[i]);
for (auto& condition : conditions) {
ss << " conditions=" << apache::thrift::ThriftDebugString(condition);
}
return ss.str();
@ -150,12 +137,12 @@ public:
private:
struct KeysParam {
~KeysParam() {
for (int32_t i = 0; i < start_keys.size(); i++) {
SAFE_DELETE(start_keys[i]);
for (auto start_key : start_keys) {
SAFE_DELETE(start_key);
}
for (int32_t i = 0; i < end_keys.size(); i++) {
SAFE_DELETE(end_keys[i]);
for (auto end_key : end_keys) {
SAFE_DELETE(end_key);
}
}
@ -164,12 +151,12 @@ private:
ss << "range=" << range
<< " end_range=" << end_range;
for (int i = 0, size = this->start_keys.size(); i < size; ++i) {
ss << " keys=" << start_keys[i]->to_string();
for (auto start_key : start_keys) {
ss << " keys=" << start_key->to_string();
}
for (int i = 0, size = this->end_keys.size(); i < size; ++i) {
ss << " end_keys=" << end_keys[i]->to_string();
for (auto end_key : end_keys) {
ss << " end_keys=" << end_key->to_string();
}
return ss.str();
@ -189,7 +176,7 @@ private:
OLAPStatus _init_keys_param(const ReaderParams& read_params);
OLAPStatus _init_conditions_param(const ReaderParams& read_params);
void _init_conditions_param(const ReaderParams& read_params);
ColumnPredicate* _new_eq_pred(const TabletColumn& column, int index, const std::string& cond);
ColumnPredicate* _new_ne_pred(const TabletColumn& column, int index, const std::string& cond);
@ -203,9 +190,9 @@ private:
OLAPStatus _init_delete_condition(const ReaderParams& read_params);
OLAPStatus _init_return_columns(const ReaderParams& read_params);
OLAPStatus _init_seek_columns();
void _init_seek_columns();
OLAPStatus _init_load_bf_columns(const ReaderParams& read_params);
void _init_load_bf_columns(const ReaderParams& read_params);
OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof);
OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof);
@ -220,44 +207,32 @@ private:
std::vector<uint32_t> _return_columns;
std::vector<uint32_t> _seek_columns;
Version _version;
TabletSharedPtr _tablet;
// _own_rs_readers is data source that reader aquire from tablet, so we need to
// release these when reader closing
std::vector<RowsetReaderSharedPtr> _own_rs_readers;
std::vector<RowsetReaderSharedPtr> _rs_readers;
RowsetReaderContext _reader_context;
KeysParam _keys_param;
std::vector<bool> _is_lower_keys_included;
std::vector<bool> _is_upper_keys_included;
int32_t _next_key_index;
Conditions _conditions;
std::vector<ColumnPredicate*> _col_predicates;
DeleteHandler _delete_handler;
OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) = nullptr;
bool _aggregation;
bool _aggregation = false;
// for agg query, we don't need to finalize when scan agg object data
bool _need_agg_finalize = true;
bool _version_locked;
ReaderType _reader_type;
bool _next_delete_flag;
const RowCursor* _next_key;
ReaderType _reader_type = READER_QUERY;
bool _next_delete_flag = false;
const RowCursor* _next_key = nullptr;
CollectIterator* _collect_iter = nullptr;
std::vector<uint32_t> _key_cids;
std::vector<uint32_t> _value_cids;
uint64_t _merged_rows;
uint64_t _merged_rows = 0;
OlapReaderStatistics _stats;
DISALLOW_COPY_AND_ASSIGN(Reader);
DISALLOW_COPY_AND_ASSIGN(Reader);
};
} // namespace doris