[Refactor] Refactor DeleteHandler and Cond module (2nd) (#5030)
* [Refactor] Refactor DeleteHandler and Cond module (#4925) This patch mainly do the following refactors: - Use int64_t instead of int32_t for 'version' in DeleteHandler - Move some comments from .cpp to .h file, add some new comments in .h files, and also remove some meaningless comments - Use switch...case... instead of multiple if..else.. for DeleteConditionHandler::is_condition_value_valid - Use range loop to simplify code - Reduce some compare operations in Cond::del_eval - Improve some branch predictions in Reader - Fix and improve some unit tests
This commit is contained in:
@ -162,7 +162,7 @@ const RowCursor* CollectIterator::Level0Iterator::current_row() const {
|
||||
return _current_row;
|
||||
}
|
||||
|
||||
int32_t CollectIterator::Level0Iterator::version() const {
|
||||
int64_t CollectIterator::Level0Iterator::version() const {
|
||||
return _rs_reader->version().second;
|
||||
}
|
||||
|
||||
@ -172,8 +172,7 @@ OLAPStatus CollectIterator::Level0Iterator::_refresh_current_row() {
|
||||
size_t pos = _row_block->pos();
|
||||
_row_block->get_row(pos, &_row_cursor);
|
||||
if (_row_block->block_status() == DEL_PARTIAL_SATISFIED &&
|
||||
_reader->_delete_handler.is_filter_data(_rs_reader->version().second,
|
||||
_row_cursor)) {
|
||||
_reader->_delete_handler.is_filter_data(version(), _row_cursor)) {
|
||||
_reader->_stats.rows_del_filtered++;
|
||||
_row_block->pos_inc();
|
||||
continue;
|
||||
@ -250,7 +249,7 @@ const RowCursor* CollectIterator::Level1Iterator::current_row() const {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
int32_t CollectIterator::Level1Iterator::version() const {
|
||||
int64_t CollectIterator::Level1Iterator::version() const {
|
||||
if (_cur_child != nullptr) {
|
||||
return _cur_child->version();
|
||||
}
|
||||
@ -283,7 +282,7 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor**
|
||||
bool* delete_flag) {
|
||||
_heap->pop();
|
||||
auto res = _cur_child->next(row, delete_flag);
|
||||
if (res == OLAP_SUCCESS) {
|
||||
if (LIKELY(res == OLAP_SUCCESS)) {
|
||||
_heap->push(_cur_child);
|
||||
_cur_child = _heap->top();
|
||||
} else if (res == OLAP_ERR_DATA_EOF) {
|
||||
|
||||
@ -65,7 +65,7 @@ private:
|
||||
|
||||
virtual const RowCursor* current_row() const = 0;
|
||||
|
||||
virtual int32_t version() const = 0;
|
||||
virtual int64_t version() const = 0;
|
||||
|
||||
virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0;
|
||||
virtual ~LevelIterator() = 0;
|
||||
@ -96,14 +96,13 @@ private:
|
||||
|
||||
const RowCursor* current_row() const;
|
||||
|
||||
int32_t version() const;
|
||||
int64_t version() const;
|
||||
|
||||
OLAPStatus next(const RowCursor** row, bool* delete_flag);
|
||||
|
||||
~Level0Iterator();
|
||||
|
||||
private:
|
||||
// refresh_current_row
|
||||
OLAPStatus _refresh_current_row();
|
||||
|
||||
RowsetReaderSharedPtr _rs_reader;
|
||||
@ -125,7 +124,7 @@ private:
|
||||
|
||||
const RowCursor* current_row() const;
|
||||
|
||||
int32_t version() const;
|
||||
int64_t version() const;
|
||||
|
||||
OLAPStatus next(const RowCursor** row, bool* delete_flag);
|
||||
|
||||
|
||||
@ -95,7 +95,7 @@ std::string DeleteConditionHandler::construct_sub_predicates(const TCondition& c
|
||||
} else if (op == ">") {
|
||||
op += ">";
|
||||
}
|
||||
string condition_str = "";
|
||||
string condition_str;
|
||||
if ("IS" == op) {
|
||||
condition_str = condition.column_name + " " + op + " " + condition.condition_values[0];
|
||||
} else {
|
||||
@ -110,58 +110,60 @@ std::string DeleteConditionHandler::construct_sub_predicates(const TCondition& c
|
||||
}
|
||||
|
||||
bool DeleteConditionHandler::is_condition_value_valid(const TabletColumn& column,
|
||||
const TCondition& cond,
|
||||
const std::string& condition_op,
|
||||
const string& value_str) {
|
||||
bool valid_condition = false;
|
||||
FieldType field_type = column.type();
|
||||
if ("IS" == cond.condition_op && ("NULL" == value_str || "NOT NULL" == value_str)) {
|
||||
valid_condition = true;
|
||||
} else if (field_type == OLAP_FIELD_TYPE_TINYINT) {
|
||||
valid_condition = valid_signed_number<int8_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_SMALLINT) {
|
||||
valid_condition = valid_signed_number<int16_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_INT) {
|
||||
valid_condition = valid_signed_number<int32_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_BIGINT) {
|
||||
valid_condition = valid_signed_number<int64_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_LARGEINT) {
|
||||
valid_condition = valid_signed_number<int128_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_TINYINT) {
|
||||
valid_condition = valid_unsigned_number<uint8_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_SMALLINT) {
|
||||
valid_condition = valid_unsigned_number<uint16_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_INT) {
|
||||
valid_condition = valid_unsigned_number<uint32_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_BIGINT) {
|
||||
valid_condition = valid_unsigned_number<uint64_t>(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_DECIMAL) {
|
||||
valid_condition = valid_decimal(value_str, column.precision(), column.frac());
|
||||
} else if (field_type == OLAP_FIELD_TYPE_CHAR || field_type == OLAP_FIELD_TYPE_VARCHAR) {
|
||||
if (value_str.size() <= column.length()) {
|
||||
valid_condition = true;
|
||||
}
|
||||
} else if (field_type == OLAP_FIELD_TYPE_DATE || field_type == OLAP_FIELD_TYPE_DATETIME) {
|
||||
valid_condition = valid_datetime(value_str);
|
||||
} else if (field_type == OLAP_FIELD_TYPE_BOOL) {
|
||||
valid_condition = valid_bool(value_str);
|
||||
} else {
|
||||
OLAP_LOG_WARNING("unknown field type. [type=%d]", field_type);
|
||||
if ("IS" == condition_op && ("NULL" == value_str || "NOT NULL" == value_str)) {
|
||||
return true;
|
||||
}
|
||||
return valid_condition;
|
||||
|
||||
FieldType field_type = column.type();
|
||||
switch(field_type) {
|
||||
case OLAP_FIELD_TYPE_TINYINT:
|
||||
return valid_signed_number<int8_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_SMALLINT:
|
||||
return valid_signed_number<int16_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_INT:
|
||||
return valid_signed_number<int32_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_BIGINT:
|
||||
return valid_signed_number<int64_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_LARGEINT:
|
||||
return valid_signed_number<int128_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
|
||||
return valid_unsigned_number<uint8_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
|
||||
return valid_unsigned_number<uint16_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_UNSIGNED_INT:
|
||||
return valid_unsigned_number<uint32_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
|
||||
return valid_unsigned_number<uint64_t>(value_str);
|
||||
case OLAP_FIELD_TYPE_DECIMAL:
|
||||
return valid_decimal(value_str, column.precision(), column.frac());
|
||||
case OLAP_FIELD_TYPE_CHAR:
|
||||
case OLAP_FIELD_TYPE_VARCHAR:
|
||||
return value_str.size() <= column.length();
|
||||
case OLAP_FIELD_TYPE_DATE:
|
||||
case OLAP_FIELD_TYPE_DATETIME:
|
||||
return valid_datetime(value_str);
|
||||
case OLAP_FIELD_TYPE_BOOL:
|
||||
return valid_bool(value_str);
|
||||
default:
|
||||
OLAP_LOG_WARNING("unknown field type. [type=%d]", field_type);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
OLAPStatus DeleteConditionHandler::check_condition_valid(const TabletSchema& schema,
|
||||
const TCondition& cond) {
|
||||
// 检查指定列名的列是否存在
|
||||
// Check whether the column exists
|
||||
int32_t field_index = schema.field_index(cond.column_name);
|
||||
if (field_index < 0) {
|
||||
OLAP_LOG_WARNING("field is not existent. [field_index=%d]", field_index);
|
||||
return OLAP_ERR_DELETE_INVALID_CONDITION;
|
||||
}
|
||||
|
||||
// 检查指定的列是不是key,是不是float或double类型
|
||||
// Delete condition should only applied on key columns or duplicate key table, and
|
||||
// the condition column type should not be float or double.
|
||||
const TabletColumn& column = schema.column(field_index);
|
||||
|
||||
if ((!column.is_key() && schema.keys_type() != KeysType::DUP_KEYS) ||
|
||||
column.type() == OLAP_FIELD_TYPE_DOUBLE || column.type() == OLAP_FIELD_TYPE_FLOAT) {
|
||||
LOG(WARNING) << "field is not key column, or storage model is not duplicate, or data type "
|
||||
@ -169,21 +171,17 @@ OLAPStatus DeleteConditionHandler::check_condition_valid(const TabletSchema& sch
|
||||
return OLAP_ERR_DELETE_INVALID_CONDITION;
|
||||
}
|
||||
|
||||
// 检查删除条件中指定的过滤值是否符合每个类型自身的要求
|
||||
// 1. 对于整数类型(int8,int16,in32,int64,uint8,uint16,uint32,uint64),检查是否溢出
|
||||
// 2. 对于decimal类型,检查是否超过建表时指定的精度和标度
|
||||
// 3. 对于date和datetime类型,检查指定的过滤值是否符合日期格式以及是否指定错误的值
|
||||
// 4. 对于string和varchar类型,检查指定的过滤值是否超过建表时指定的长度
|
||||
// Check operator and operands size are matched.
|
||||
if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
|
||||
cond.condition_values.size() != 1) {
|
||||
OLAP_LOG_WARNING("invalid condition value size. [size=%ld]", cond.condition_values.size());
|
||||
return OLAP_ERR_DELETE_INVALID_CONDITION;
|
||||
}
|
||||
|
||||
for (int i = 0; i < cond.condition_values.size(); i++) {
|
||||
const string& value_str = cond.condition_values[i];
|
||||
if (!is_condition_value_valid(column, cond, value_str)) {
|
||||
LOG(WARNING) << "invalid condition value. [value=" << value_str << "]";
|
||||
// Check each operand is valid
|
||||
for (const auto& condition_value : cond.condition_values) {
|
||||
if (!is_condition_value_valid(column, cond.condition_op, condition_value)) {
|
||||
LOG(WARNING) << "invalid condition value. [value=" << condition_value << "]";
|
||||
return OLAP_ERR_DELETE_INVALID_CONDITION;
|
||||
}
|
||||
}
|
||||
@ -227,32 +225,29 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio
|
||||
}
|
||||
|
||||
OLAPStatus DeleteHandler::init(const TabletSchema& schema,
|
||||
const DelPredicateArray& delete_conditions, int32_t version) {
|
||||
const DelPredicateArray& delete_conditions, int64_t version) {
|
||||
DCHECK(!_is_inited) << "reinitialize delete handler.";
|
||||
DCHECK(version >= 0) << "invalid parameters. version=" << version;
|
||||
|
||||
DelPredicateArray::const_iterator it = delete_conditions.begin();
|
||||
for (; it != delete_conditions.end(); ++it) {
|
||||
for (const auto& delete_condition : delete_conditions) {
|
||||
// 跳过版本号大于version的过滤条件
|
||||
if (it->version() > version) {
|
||||
if (delete_condition.version() > version) {
|
||||
continue;
|
||||
}
|
||||
|
||||
DeleteConditions temp;
|
||||
temp.filter_version = it->version();
|
||||
temp.filter_version = delete_condition.version();
|
||||
temp.del_cond = new (std::nothrow) Conditions();
|
||||
|
||||
if (temp.del_cond == nullptr) {
|
||||
LOG(FATAL) << "fail to malloc Conditions. size=" << sizeof(Conditions);
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
|
||||
temp.del_cond->set_tablet_schema(&schema);
|
||||
for (int i = 0; i != it->sub_predicates_size(); ++i) {
|
||||
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
|
||||
TCondition condition;
|
||||
if (!_parse_condition(it->sub_predicates(i), &condition)) {
|
||||
OLAP_LOG_WARNING("fail to parse condition. [condition=%s]",
|
||||
it->sub_predicates(i).c_str());
|
||||
if (!_parse_condition(sub_predicate, &condition)) {
|
||||
OLAP_LOG_WARNING("fail to parse condition. [condition=%s]", sub_predicate.c_str());
|
||||
return OLAP_ERR_DELETE_INVALID_PARAMETERS;
|
||||
}
|
||||
|
||||
@ -263,9 +258,8 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i != it->in_predicates_size(); ++i) {
|
||||
for (const auto& in_predicate : delete_condition.in_predicates()) {
|
||||
TCondition condition;
|
||||
const InPredicatePB& in_predicate = it->in_predicates(i);
|
||||
condition.__set_column_name(in_predicate.column_name());
|
||||
if (in_predicate.is_not_in()) {
|
||||
condition.__set_condition_op("!*=");
|
||||
@ -290,17 +284,11 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
bool DeleteHandler::is_filter_data(const int32_t data_version, const RowCursor& row) const {
|
||||
if (_del_conds.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool DeleteHandler::is_filter_data(const int64_t data_version, const RowCursor& row) const {
|
||||
// 根据语义,存储在_del_conds的删除条件应该是OR关系
|
||||
// 因此,只要数据符合其中一条过滤条件,则返回true
|
||||
std::vector<DeleteConditions>::const_iterator it = _del_conds.begin();
|
||||
|
||||
for (; it != _del_conds.end(); ++it) {
|
||||
if (data_version <= it->filter_version && it->del_cond->delete_conditions_eval(row)) {
|
||||
for (const auto& del_cond : _del_conds) {
|
||||
if (data_version <= del_cond.filter_version && del_cond.del_cond->delete_conditions_eval(row)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -308,14 +296,11 @@ bool DeleteHandler::is_filter_data(const int32_t data_version, const RowCursor&
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<int32_t> DeleteHandler::get_conds_version() {
|
||||
std::vector<int32_t> conds_version;
|
||||
std::vector<DeleteConditions>::const_iterator cond_iter = _del_conds.begin();
|
||||
|
||||
for (; cond_iter != _del_conds.end(); ++cond_iter) {
|
||||
conds_version.push_back(cond_iter->filter_version);
|
||||
std::vector<int64_t> DeleteHandler::get_conds_version() {
|
||||
std::vector<int64_t> conds_version;
|
||||
for (const auto& cond : _del_conds) {
|
||||
conds_version.push_back(cond.filter_version);
|
||||
}
|
||||
|
||||
return conds_version;
|
||||
}
|
||||
|
||||
@ -324,19 +309,16 @@ void DeleteHandler::finalize() {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<DeleteConditions>::iterator it = _del_conds.begin();
|
||||
|
||||
for (; it != _del_conds.end(); ++it) {
|
||||
it->del_cond->finalize();
|
||||
delete it->del_cond;
|
||||
for (auto& cond : _del_conds) {
|
||||
cond.del_cond->finalize();
|
||||
delete cond.del_cond;
|
||||
}
|
||||
|
||||
_del_conds.clear();
|
||||
_is_inited = false;
|
||||
}
|
||||
|
||||
void DeleteHandler::get_delete_conditions_after_version(
|
||||
int32_t version, std::vector<const Conditions*>* delete_conditions) const {
|
||||
int64_t version, std::vector<const Conditions*>* delete_conditions) const {
|
||||
for (auto& del_cond : _del_conds) {
|
||||
if (del_cond.filter_version > version) {
|
||||
delete_conditions->emplace_back(del_cond.del_cond);
|
||||
|
||||
@ -42,91 +42,103 @@ public:
|
||||
const std::vector<TCondition>& conditions,
|
||||
DeletePredicatePB* del_pred);
|
||||
|
||||
// 检查cond表示的删除条件是否符合要求;
|
||||
// 如果不符合要求,返回OLAP_ERR_DELETE_INVALID_CONDITION;符合要求返回OLAP_SUCCESS
|
||||
OLAPStatus check_condition_valid(const TabletSchema& tablet_schema, const TCondition& cond);
|
||||
|
||||
// construct sub condition from TCondition
|
||||
std::string construct_sub_predicates(const TCondition& condition);
|
||||
|
||||
private:
|
||||
bool is_condition_value_valid(const TabletColumn& column, const TCondition& cond,
|
||||
const string& value);
|
||||
// Validate the condition on the schema.
|
||||
// Return OLAP_SUCCESS, if valid
|
||||
// OLAP_ERR_DELETE_INVALID_CONDITION, otherwise
|
||||
OLAPStatus check_condition_valid(const TabletSchema& tablet_schema, const TCondition& cond);
|
||||
|
||||
// Check whether the condition value is valid according to its type.
|
||||
// 1. For integers(int8,int16,in32,int64,uint8,uint16,uint32,uint64), check whether they are overflow
|
||||
// 2. For decimal, check whether precision or scale is overflow
|
||||
// 3. For date and datetime, check format and value
|
||||
// 4. For char and varchar, check length
|
||||
bool is_condition_value_valid(const TabletColumn& column,
|
||||
const std::string& condition_op,
|
||||
const string& value_str);
|
||||
};
|
||||
|
||||
// 表示一个删除条件
|
||||
// Represent a delete condition.
|
||||
struct DeleteConditions {
|
||||
int32_t filter_version = 0; // 删除条件版本号
|
||||
Conditions* del_cond = nullptr; // 删除条件
|
||||
int64_t filter_version = 0; // The version of this condition
|
||||
Conditions* del_cond = nullptr; // The delete condition
|
||||
};
|
||||
|
||||
// 这个类主要用于判定一条数据(RowCursor)是否符合删除条件。这个类的使用流程如下:
|
||||
// 1. 使用一个版本号来初始化handler
|
||||
// This class is used for checking whether a row should be deleted.
|
||||
// It is used in the following processes:
|
||||
// 1. Create and initialize a DeleteHandler object:
|
||||
// OLAPStatus res;
|
||||
// DeleteHandler delete_handler;
|
||||
// res = delete_handler.init(tablet, condition_version);
|
||||
// 2. 使用这个handler来判定一条数据是否符合删除条件
|
||||
// bool filter_data;
|
||||
// filter_data = delete_handler.is_filter_data(data_version, row_cursor);
|
||||
// 3. 如果有多条数据要判断,可重复调用delete_handler.is_filter_data(data_version, row_data)
|
||||
// 4. 完成所有数据的判断后,需要销毁delete_handler对象
|
||||
// 2. Use it to check whether a row should be deleted:
|
||||
// bool should_be_deleted = delete_handler.is_filter_data(data_version, row_cursor);
|
||||
// 3. If there are multiple rows, you can invoke function is_filter_data multiple times:
|
||||
// should_be_deleted = delete_handler.is_filter_data(data_version, row_cursor);
|
||||
// 4. After all rows have been checked, you should release this object by calling:
|
||||
// delete_handler.finalize();
|
||||
//
|
||||
// 注:
|
||||
// * 第1步中,在调用init()函数之前,需要对Header文件加读锁
|
||||
// NOTE:
|
||||
// * In the first step, before calling delete_handler.init(), you should lock the tablet's header file.
|
||||
class DeleteHandler {
|
||||
public:
|
||||
typedef std::vector<DeleteConditions>::size_type cond_num_t;
|
||||
DeleteHandler() = default;
|
||||
~DeleteHandler() {
|
||||
finalize();
|
||||
}
|
||||
|
||||
DeleteHandler() : _is_inited(false) {}
|
||||
~DeleteHandler() {}
|
||||
|
||||
// 初始化handler,将从Header文件中取出小于等于指定版本号的删除条件填充到_del_conds中
|
||||
// 调用前需要先对Header文件加读锁
|
||||
// Initialize DeleteHandler, use the delete conditions of this tablet whose version less than or equal to
|
||||
// 'version' to fill '_del_conds'.
|
||||
// NOTE: You should lock the tablet's header file before calling this function.
|
||||
//
|
||||
// 输入参数:
|
||||
// * tablet: 删除条件和数据所在的tablet
|
||||
// * version: 要取出的删除条件版本号
|
||||
// 返回值:
|
||||
// * OLAP_SUCCESS: 调用成功
|
||||
// * OLAP_ERR_DELETE_INVALID_PARAMETERS: 参数不符合要求
|
||||
// * OLAP_ERR_MALLOC_ERROR: 在填充_del_conds时,分配内存失败
|
||||
// input:
|
||||
// * schema: tablet's schema, the delete conditions and data rows are in this schema
|
||||
// * version: maximum version
|
||||
// return:
|
||||
// * OLAP_SUCCESS: succeed
|
||||
// * OLAP_ERR_DELETE_INVALID_PARAMETERS: input parameters are not valid
|
||||
// * OLAP_ERR_MALLOC_ERROR: alloc memory failed
|
||||
OLAPStatus init(const TabletSchema& schema, const DelPredicateArray& delete_conditions,
|
||||
int32_t version);
|
||||
int64_t version);
|
||||
|
||||
// 判定一条数据是否符合删除条件
|
||||
// Check whether a row should be deleted.
|
||||
//
|
||||
// 输入参数:
|
||||
// * data_version: 待判定数据的版本号
|
||||
// * row: 待判定的一行数据
|
||||
// 返回值:
|
||||
// * true: 数据符合删除条件
|
||||
// * false: 数据不符合删除条件
|
||||
bool is_filter_data(const int32_t data_version, const RowCursor& row) const;
|
||||
// input:
|
||||
// * data_version: the version of this row
|
||||
// * row: the row data to be checked
|
||||
// return:
|
||||
// * true: this row should be deleted
|
||||
// * false: this row should NOT be deleted
|
||||
bool is_filter_data(const int64_t data_version, const RowCursor& row) const;
|
||||
|
||||
// 返回handler中有存有多少条删除条件
|
||||
cond_num_t conditions_num() const { return _del_conds.size(); }
|
||||
// Return the delete conditions' size.
|
||||
size_t conditions_num() const { return _del_conds.size(); }
|
||||
|
||||
bool empty() const { return _del_conds.empty(); }
|
||||
|
||||
// 返回handler中存有的所有删除条件的版本号
|
||||
std::vector<int32_t> get_conds_version();
|
||||
// Return all the versions of the delete conditions.
|
||||
std::vector<int64_t> get_conds_version();
|
||||
|
||||
// 销毁handler对象
|
||||
// Release an instance of this class.
|
||||
void finalize();
|
||||
|
||||
// 获取只读删除条件
|
||||
// Return all the delete conditions.
|
||||
const std::vector<DeleteConditions>& get_delete_conditions() const { return _del_conds; }
|
||||
|
||||
void get_delete_conditions_after_version(
|
||||
int32_t version, std::vector<const Conditions*>* delete_conditions) const;
|
||||
int64_t version, std::vector<const Conditions*>* delete_conditions) const;
|
||||
|
||||
private:
|
||||
// Use regular expression to extract 'column_name', 'op' and 'operands'
|
||||
bool _parse_condition(const std::string& condition_str, TCondition* condition);
|
||||
|
||||
bool _is_inited;
|
||||
bool _is_inited = false;
|
||||
// DeleteConditions in _del_conds are in 'OR' relationship
|
||||
std::vector<DeleteConditions> _del_conds;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(DeleteHandler);
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -87,9 +87,6 @@ static CondOp parse_op_type(const string& op) {
|
||||
return OP_NULL;
|
||||
}
|
||||
|
||||
Cond::Cond()
|
||||
: op(OP_NULL), operand_field(nullptr), min_value_field(nullptr), max_value_field(nullptr) {}
|
||||
|
||||
Cond::~Cond() {
|
||||
delete operand_field;
|
||||
for (auto& it : operand_set) {
|
||||
@ -109,6 +106,7 @@ OLAPStatus Cond::init(const TCondition& tcond, const TabletColumn& column) {
|
||||
}
|
||||
if (op == OP_IS) {
|
||||
// 'is null' or 'is not null'
|
||||
DCHECK_EQ(tcond.condition_values.size(), 1);
|
||||
auto operand = tcond.condition_values.begin();
|
||||
std::unique_ptr<WrapperField> f(WrapperField::create(column, operand->length()));
|
||||
if (f == nullptr) {
|
||||
@ -123,6 +121,7 @@ OLAPStatus Cond::init(const TCondition& tcond, const TabletColumn& column) {
|
||||
}
|
||||
operand_field = f.release();
|
||||
} else if (op != OP_IN && op != OP_NOT_IN) {
|
||||
DCHECK_EQ(tcond.condition_values.size(), 1);
|
||||
auto operand = tcond.condition_values.begin();
|
||||
std::unique_ptr<WrapperField> f(WrapperField::create(column, operand->length()));
|
||||
if (f == nullptr) {
|
||||
@ -132,22 +131,24 @@ OLAPStatus Cond::init(const TCondition& tcond, const TabletColumn& column) {
|
||||
}
|
||||
OLAPStatus res = f->from_string(*operand);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
OLAP_LOG_WARNING("Create field failed. [name=%s, operand=%s, op_type=%d]",
|
||||
OLAP_LOG_WARNING("Convert from string failed. [name=%s, operand=%s, op_type=%d]",
|
||||
tcond.column_name.c_str(), operand->c_str(), op);
|
||||
return res;
|
||||
}
|
||||
operand_field = f.release();
|
||||
} else {
|
||||
DCHECK(op == OP_IN || op == OP_NOT_IN);
|
||||
DCHECK(!tcond.condition_values.empty());
|
||||
for (auto& operand : tcond.condition_values) {
|
||||
std::unique_ptr<WrapperField> f(WrapperField::create(column, operand.length()));
|
||||
if (f == NULL) {
|
||||
if (f == nullptr) {
|
||||
OLAP_LOG_WARNING("Create field failed. [name=%s, operand=%s, op_type=%d]",
|
||||
tcond.column_name.c_str(), operand.c_str(), op);
|
||||
return OLAP_ERR_INPUT_PARAMETER_ERROR;
|
||||
}
|
||||
OLAPStatus res = f->from_string(operand);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
OLAP_LOG_WARNING("Create field failed. [name=%s, operand=%s, op_type=%d]",
|
||||
OLAP_LOG_WARNING("Convert from string failed. [name=%s, operand=%s, op_type=%d]",
|
||||
tcond.column_name.c_str(), operand.c_str(), op);
|
||||
return res;
|
||||
}
|
||||
@ -175,7 +176,7 @@ OLAPStatus Cond::init(const TCondition& tcond, const TabletColumn& column) {
|
||||
|
||||
bool Cond::eval(const RowCursorCell& cell) const {
|
||||
if (cell.is_null() && op != OP_IS) {
|
||||
//任何operand和NULL的运算都是false
|
||||
//任何非OP_IS operand和NULL的运算都是false
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -266,8 +267,6 @@ bool Cond::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const
|
||||
}
|
||||
|
||||
int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
//通过单列上的单个删除条件对version进行过滤。
|
||||
|
||||
// When we apply column statistics, stat maybe null.
|
||||
if (stat.first == nullptr || stat.second == nullptr) {
|
||||
//for string type, the column statistics may be not recorded in block level
|
||||
@ -286,9 +285,11 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
int ret = DEL_NOT_SATISFIED;
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
if (operand_field->cmp(stat.first) == 0 && operand_field->cmp(stat.second) == 0) {
|
||||
int cmp1 = operand_field->cmp(stat.first);
|
||||
int cmp2 = operand_field->cmp(stat.second);
|
||||
if (cmp1 == 0 && cmp2 == 0) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else if (operand_field->cmp(stat.first) >= 0 && operand_field->cmp(stat.second) <= 0) {
|
||||
} else if (cmp1 >= 0 && cmp2 <= 0) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
@ -296,9 +297,11 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
return ret;
|
||||
}
|
||||
case OP_NE: {
|
||||
if (operand_field->cmp(stat.first) == 0 && operand_field->cmp(stat.second) == 0) {
|
||||
int cmp1 = operand_field->cmp(stat.first);
|
||||
int cmp2 = operand_field->cmp(stat.second);
|
||||
if (cmp1 == 0 && cmp2 == 0) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (operand_field->cmp(stat.first) >= 0 && operand_field->cmp(stat.second) <= 0) {
|
||||
} else if (cmp1 >= 0 && cmp2 <= 0) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_SATISFIED;
|
||||
@ -355,6 +358,8 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
} else {
|
||||
if (min_value_field->cmp(stat.second) <= 0 && max_value_field->cmp(stat.first) >= 0) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -368,6 +373,9 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
}
|
||||
} else {
|
||||
if (min_value_field->cmp(stat.second) > 0 || max_value_field->cmp(stat.first) < 0) {
|
||||
// When there is no intersection, all entries in the range should be deleted.
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
}
|
||||
@ -380,8 +388,8 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
} else if (stat.first->is_null() && !stat.second->is_null()) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
//不会出现min不为NULL,max为NULL
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
CHECK(false) << "It will not happen when the stat's min is not null and max is null";
|
||||
ret = DEL_SATISFIED;
|
||||
}
|
||||
} else {
|
||||
if (stat.first->is_null() && stat.second->is_null()) {
|
||||
@ -389,19 +397,20 @@ int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
} else if (stat.first->is_null() && !stat.second->is_null()) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
CHECK(false) << "It will not happen when the stat's min is not null and max is null";
|
||||
ret = DEL_SATISFIED;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
default:
|
||||
LOG(WARNING) << "Not supported operation: " << op;
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool Cond::eval(const BloomFilter& bf) const {
|
||||
//通过单列上BloomFilter对block进行过滤。
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
bool existed = false;
|
||||
@ -443,7 +452,6 @@ bool Cond::eval(const BloomFilter& bf) const {
|
||||
}
|
||||
|
||||
bool Cond::eval(const segment_v2::BloomFilter* bf) const {
|
||||
//通过单列上BloomFilter对block进行过滤。
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
bool existed = false;
|
||||
@ -500,7 +508,6 @@ OLAPStatus CondColumn::add_cond(const TCondition& tcond, const TabletColumn& col
|
||||
}
|
||||
|
||||
bool CondColumn::eval(const RowCursor& row) const {
|
||||
//通过一列上的所有查询条件对单行数据进行过滤
|
||||
auto cell = row.cell(_col_index);
|
||||
for (auto& each_cond : _conds) {
|
||||
// As long as there is one condition not satisfied, we can return false
|
||||
@ -512,9 +519,9 @@ bool CondColumn::eval(const RowCursor& row) const {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CondColumn::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
//通过一列上的所有查询条件对version进行过滤
|
||||
bool CondColumn::eval(const std::pair<WrapperField*, WrapperField*> &statistic) const {
|
||||
for (auto& each_cond : _conds) {
|
||||
// As long as there is one condition not satisfied, we can return false
|
||||
if (!each_cond->eval(statistic)) {
|
||||
return false;
|
||||
}
|
||||
@ -524,8 +531,6 @@ bool CondColumn::eval(const std::pair<WrapperField*, WrapperField*>& statistic)
|
||||
}
|
||||
|
||||
int CondColumn::del_eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
//通过一列上的所有删除条件对version进行过滤
|
||||
|
||||
/*
|
||||
* the relationship between cond A and B is A & B.
|
||||
* if all delete condition is satisfied, the data can be filtered.
|
||||
@ -560,7 +565,6 @@ int CondColumn::del_eval(const std::pair<WrapperField*, WrapperField*>& statisti
|
||||
}
|
||||
|
||||
bool CondColumn::eval(const BloomFilter& bf) const {
|
||||
//通过一列上的所有BloomFilter索引信息对block进行过滤
|
||||
for (auto& each_cond : _conds) {
|
||||
if (!each_cond->eval(bf)) {
|
||||
return false;
|
||||
@ -607,7 +611,6 @@ OLAPStatus Conditions::append_condition(const TCondition& tcond) {
|
||||
}
|
||||
|
||||
bool Conditions::delete_conditions_eval(const RowCursor& row) const {
|
||||
//通过所有列上的删除条件对rowcursor进行过滤
|
||||
if (_columns.empty()) {
|
||||
return false;
|
||||
}
|
||||
@ -677,8 +680,6 @@ int Conditions::delete_pruning_filter(const std::vector<KeyRange>& zone_maps) co
|
||||
}
|
||||
|
||||
if (del_not_satisfied) {
|
||||
// if the size of condcolumn vector is zero,
|
||||
// the delete condtion is not satisfied.
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (del_partial_satisfied) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
|
||||
@ -65,7 +65,7 @@ struct FieldEqual {
|
||||
// 条件二元组,描述了一个条件的操作类型和操作数(1个或者多个)
|
||||
struct Cond {
|
||||
public:
|
||||
Cond();
|
||||
Cond() = default;
|
||||
~Cond();
|
||||
|
||||
OLAPStatus init(const TCondition& tcond, const TabletColumn& column);
|
||||
@ -73,49 +73,53 @@ public:
|
||||
// 用一行数据的指定列同条件进行比较,如果符合过滤条件,
|
||||
// 即按照此条件,行应被过滤掉,则返回true,否则返回false
|
||||
bool eval(const RowCursorCell& cell) const;
|
||||
|
||||
bool eval(const KeyRange& statistic) const;
|
||||
|
||||
// 通过单列上的单个删除条件对version进行过滤
|
||||
int del_eval(const KeyRange& stat) const;
|
||||
|
||||
// 通过单列上BloomFilter对block进行过滤
|
||||
bool eval(const BloomFilter& bf) const;
|
||||
|
||||
bool eval(const segment_v2::BloomFilter* bf) const;
|
||||
|
||||
bool can_do_bloom_filter() const { return op == OP_EQ || op == OP_IN || op == OP_IS; }
|
||||
|
||||
CondOp op;
|
||||
CondOp op = OP_NULL;
|
||||
// valid when op is not OP_IN and OP_NOT_IN
|
||||
WrapperField* operand_field;
|
||||
WrapperField* operand_field = nullptr;
|
||||
// valid when op is OP_IN or OP_NOT_IN
|
||||
typedef std::unordered_set<const WrapperField*, FieldHash, FieldEqual> FieldSet;
|
||||
FieldSet operand_set;
|
||||
// valid when op is OP_IN or OP_NOT_IN, represents the minimum or maximum value of in elements
|
||||
WrapperField* min_value_field;
|
||||
WrapperField* max_value_field;
|
||||
WrapperField* min_value_field = nullptr;
|
||||
WrapperField* max_value_field = nullptr;
|
||||
};
|
||||
|
||||
// 所有归属于同一列上的条件二元组,聚合在一个CondColumn上
|
||||
class CondColumn {
|
||||
public:
|
||||
CondColumn(const TabletSchema& tablet_schema, int32_t index) : _col_index(index) {
|
||||
_conds.clear();
|
||||
_is_key = tablet_schema.column(_col_index).is_key();
|
||||
}
|
||||
~CondColumn();
|
||||
|
||||
// Convert condition's operand from string to Field*, and append this condition to _conds
|
||||
// return true if success, otherwise return false
|
||||
bool add_condition(Cond* condition);
|
||||
OLAPStatus add_cond(const TCondition& tcond, const TabletColumn& column);
|
||||
|
||||
// 对一行数据中的指定列,用所有过滤条件进行比较,如果所有条件都满足,则过滤此行
|
||||
// Return true means this row should be filtered out, otherwise return false
|
||||
bool eval(const RowCursor& row) const;
|
||||
|
||||
// Return true if the rowset should be pruned
|
||||
bool eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
|
||||
|
||||
// Whether the rowset satisfied delete condition
|
||||
int del_eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
|
||||
|
||||
// 通过一列上的所有BloomFilter索引信息对block进行过滤
|
||||
// Return true if the block should be filtered out
|
||||
bool eval(const BloomFilter& bf) const;
|
||||
|
||||
// Return true if the block should be filtered out
|
||||
bool eval(const segment_v2::BloomFilter* bf) const;
|
||||
|
||||
bool can_do_bloom_filter() const {
|
||||
@ -133,8 +137,11 @@ public:
|
||||
const std::vector<Cond*>& conds() const { return _conds; }
|
||||
|
||||
private:
|
||||
bool _is_key;
|
||||
int32_t _col_index;
|
||||
friend class Conditions;
|
||||
|
||||
bool _is_key = false;
|
||||
int32_t _col_index = 0;
|
||||
// Conds in _conds are in 'AND' relationship
|
||||
std::vector<Cond*> _conds;
|
||||
};
|
||||
|
||||
@ -163,10 +170,15 @@ public:
|
||||
// 1. column不属于key列
|
||||
// 2. column类型是double, float
|
||||
OLAPStatus append_condition(const TCondition& condition);
|
||||
|
||||
|
||||
// 通过所有列上的删除条件对RowCursor进行过滤
|
||||
// Return true means this row should be filtered out, otherwise return false
|
||||
bool delete_conditions_eval(const RowCursor& row) const;
|
||||
|
||||
// Return true if the rowset should be pruned
|
||||
bool rowset_pruning_filter(const std::vector<KeyRange>& zone_maps) const;
|
||||
|
||||
// Whether the rowset satisfied delete condition
|
||||
int delete_pruning_filter(const std::vector<KeyRange>& zone_maps) const;
|
||||
|
||||
const CondColumns& columns() const { return _columns; }
|
||||
@ -180,7 +192,10 @@ private:
|
||||
|
||||
private:
|
||||
const TabletSchema* _schema = nullptr;
|
||||
// CondColumns in _index_conds are in 'AND' relationship
|
||||
CondColumns _columns; // list of condition column
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(Conditions);
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -46,17 +46,17 @@ void ReaderParams::check_validation() const {
|
||||
}
|
||||
}
|
||||
|
||||
std::string ReaderParams::to_string() {
|
||||
std::string ReaderParams::to_string() const {
|
||||
std::stringstream ss;
|
||||
ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type
|
||||
<< " aggregation=" << aggregation << " version=" << version << " range=" << range
|
||||
<< " end_range=" << end_range;
|
||||
|
||||
for (auto& key : start_key) {
|
||||
for (const auto& key : start_key) {
|
||||
ss << " keys=" << key;
|
||||
}
|
||||
|
||||
for (auto& key : end_key) {
|
||||
for (const auto& key : end_key) {
|
||||
ss << " end_keys=" << key;
|
||||
}
|
||||
|
||||
@ -66,6 +66,7 @@ std::string ReaderParams::to_string() {
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
Reader::KeysParam::~KeysParam() {
|
||||
for (auto start_key : start_keys) {
|
||||
SAFE_DELETE(start_key);
|
||||
@ -91,8 +92,6 @@ std::string Reader::KeysParam::to_string() const {
|
||||
}
|
||||
|
||||
Reader::Reader() : _collect_iter(new CollectIterator()) {
|
||||
_tracker.reset(new MemTracker(-1));
|
||||
_predicate_mem_pool.reset(new MemPool(_tracker.get()));
|
||||
}
|
||||
|
||||
Reader::~Reader() {
|
||||
@ -100,6 +99,9 @@ Reader::~Reader() {
|
||||
}
|
||||
|
||||
OLAPStatus Reader::init(const ReaderParams& read_params) {
|
||||
_tracker.reset(new MemTracker(-1, read_params.tablet->full_name()));
|
||||
_predicate_mem_pool.reset(new MemPool(_tracker.get()));
|
||||
|
||||
OLAPStatus res = _init_params(read_params);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to init reader when init params. res:" << res
|
||||
@ -165,10 +167,8 @@ OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, Ob
|
||||
}
|
||||
direct_copy_row(row_cursor, *_next_key);
|
||||
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
if (res != OLAP_ERR_DATA_EOF) {
|
||||
return res;
|
||||
}
|
||||
if (UNLIKELY(res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF)) {
|
||||
return res;
|
||||
}
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
@ -199,15 +199,16 @@ OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, O
|
||||
int64_t merged_count = 0;
|
||||
do {
|
||||
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
if (res != OLAP_ERR_DATA_EOF) {
|
||||
LOG(WARNING) << "next failed:" << res;
|
||||
return res;
|
||||
}
|
||||
if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (_aggregation && merged_count > config::doris_scanner_row_num) {
|
||||
if (UNLIKELY(res != OLAP_SUCCESS)) {
|
||||
LOG(WARNING) << "next failed: " << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
if (UNLIKELY(_aggregation && merged_count > config::doris_scanner_row_num)) {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -245,12 +246,15 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
|
||||
// skip the lower version rows;
|
||||
while (nullptr != _next_key) {
|
||||
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
if (res != OLAP_ERR_DATA_EOF) {
|
||||
return res;
|
||||
}
|
||||
if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (UNLIKELY(res != OLAP_SUCCESS)) {
|
||||
LOG(WARNING) << "next failed: " << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
// break while can NOT doing aggregation
|
||||
if (!equal_row(_key_cids, *row_cursor, *_next_key)) {
|
||||
agg_finalize_row(_value_cids, row_cursor, mem_pool);
|
||||
@ -298,9 +302,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
|
||||
|
||||
bool eof = false;
|
||||
for (int i = 0; i < _keys_param.start_keys.size(); ++i) {
|
||||
RowCursor* start_key = _keys_param.start_keys[i];
|
||||
RowCursor* end_key = _keys_param.end_keys[i];
|
||||
bool is_lower_key_included = false;
|
||||
// upper bound
|
||||
bool is_upper_key_included = false;
|
||||
if (_keys_param.end_range == "lt") {
|
||||
is_upper_key_included = false;
|
||||
@ -312,6 +314,10 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
|
||||
return OLAP_ERR_READER_GET_ITERATOR_ERROR;
|
||||
}
|
||||
|
||||
// lower bound
|
||||
RowCursor* start_key = _keys_param.start_keys[i];
|
||||
RowCursor* end_key = _keys_param.end_keys[i];
|
||||
bool is_lower_key_included = false;
|
||||
if (_keys_param.range == "gt") {
|
||||
if (end_key != nullptr && compare_row_key(*start_key, *end_key) >= 0) {
|
||||
VLOG(3) << "return EOF when range=" << _keys_param.range
|
||||
@ -381,7 +387,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
|
||||
RETURN_NOT_OK(rs_reader->init(&_reader_context));
|
||||
OLAPStatus res = _collect_iter->add_child(rs_reader);
|
||||
if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
|
||||
LOG(WARNING) << "failed to add child to iterator";
|
||||
LOG(WARNING) << "failed to add child to iterator, err=" << res;
|
||||
return res;
|
||||
}
|
||||
if (res == OLAP_SUCCESS) {
|
||||
@ -428,13 +434,12 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
|
||||
|
||||
if (_tablet->tablet_schema().has_sequence_col()) {
|
||||
_sequence_col_idx = _tablet->tablet_schema().sequence_col_idx();
|
||||
if (_sequence_col_idx != -1) {
|
||||
for (auto col : _return_columns) {
|
||||
// query has sequence col
|
||||
if (col == _sequence_col_idx) {
|
||||
_has_sequence_col = true;
|
||||
break;
|
||||
}
|
||||
DCHECK_NE(_sequence_col_idx, -1);
|
||||
for (auto col : _return_columns) {
|
||||
// query has sequence col
|
||||
if (col == _sequence_col_idx) {
|
||||
_has_sequence_col = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -445,10 +450,10 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
|
||||
OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {
|
||||
if (read_params.reader_type == READER_QUERY) {
|
||||
_return_columns = read_params.return_columns;
|
||||
if (_delete_handler.conditions_num() != 0 && read_params.aggregation) {
|
||||
if (!_delete_handler.empty() && read_params.aggregation) {
|
||||
set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
|
||||
for (auto conds : _delete_handler.get_delete_conditions()) {
|
||||
for (auto cond_column : conds.del_cond->columns()) {
|
||||
for (const auto& conds : _delete_handler.get_delete_conditions()) {
|
||||
for (const auto& cond_column : conds.del_cond->columns()) {
|
||||
if (column_set.find(cond_column.first) == column_set.end()) {
|
||||
column_set.insert(cond_column.first);
|
||||
_return_columns.push_back(cond_column.first);
|
||||
@ -498,18 +503,15 @@ void Reader::_init_seek_columns() {
|
||||
for (auto& it : _conditions.columns()) {
|
||||
column_set.insert(it.first);
|
||||
}
|
||||
uint32_t max_key_column_count = 0;
|
||||
for (auto key : _keys_param.start_keys) {
|
||||
if (key->field_count() > max_key_column_count) {
|
||||
max_key_column_count = key->field_count();
|
||||
}
|
||||
size_t max_key_column_count = 0;
|
||||
for (const auto& key : _keys_param.start_keys) {
|
||||
max_key_column_count = std::max(max_key_column_count, key->field_count());
|
||||
}
|
||||
for (auto key : _keys_param.end_keys) {
|
||||
if (key->field_count() > max_key_column_count) {
|
||||
max_key_column_count = key->field_count();
|
||||
}
|
||||
for (const auto& key : _keys_param.end_keys) {
|
||||
max_key_column_count = std::max(max_key_column_count, key->field_count());
|
||||
}
|
||||
for (uint32_t i = 0; i < _tablet->tablet_schema().num_columns(); i++) {
|
||||
|
||||
for (size_t i = 0; i < _tablet->tablet_schema().num_columns(); i++) {
|
||||
if (i < max_key_column_count || column_set.find(i) != column_set.end()) {
|
||||
_seek_columns.push_back(i);
|
||||
}
|
||||
@ -547,9 +549,9 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
|
||||
}
|
||||
|
||||
size_t end_key_size = read_params.end_key.size();
|
||||
_keys_param.end_keys.resize(end_key_size, NULL);
|
||||
_keys_param.end_keys.resize(end_key_size, nullptr);
|
||||
for (size_t i = 0; i < end_key_size; ++i) {
|
||||
if ((_keys_param.end_keys[i] = new (nothrow) RowCursor()) == NULL) {
|
||||
if ((_keys_param.end_keys[i] = new (nothrow) RowCursor()) == nullptr) {
|
||||
OLAP_LOG_WARNING("fail to new RowCursor!");
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
@ -834,7 +836,10 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
|
||||
void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
|
||||
// add all columns with condition to _load_bf_columns
|
||||
for (const auto& cond_column : _conditions.columns()) {
|
||||
for (const Cond* cond : cond_column.second->conds()) {
|
||||
if (!_tablet->tablet_schema().column(cond_column.first).is_bf_column()) {
|
||||
continue;
|
||||
}
|
||||
for (const auto& cond : cond_column.second->conds()) {
|
||||
if (cond->op == OP_EQ ||
|
||||
(cond->op == OP_IN && cond->operand_set.size() < MAX_OP_IN_FIELD_NUM)) {
|
||||
_load_bf_columns.insert(cond_column.first);
|
||||
@ -842,25 +847,13 @@ void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
|
||||
}
|
||||
}
|
||||
|
||||
// remove columns which have no bf stream
|
||||
for (int i = 0; i < _tablet->tablet_schema().num_columns(); ++i) {
|
||||
if (!_tablet->tablet_schema().column(i).is_bf_column()) {
|
||||
_load_bf_columns.erase(i);
|
||||
}
|
||||
}
|
||||
|
||||
// remove columns which have same value between start_key and end_key
|
||||
int min_scan_key_len = _tablet->tablet_schema().num_columns();
|
||||
for (int i = 0; i < read_params.start_key.size(); ++i) {
|
||||
if (read_params.start_key[i].size() < min_scan_key_len) {
|
||||
min_scan_key_len = read_params.start_key[i].size();
|
||||
}
|
||||
for (const auto& start_key : read_params.start_key) {
|
||||
min_scan_key_len = std::min(min_scan_key_len, static_cast<int>(start_key.size()));
|
||||
}
|
||||
|
||||
for (int i = 0; i < read_params.end_key.size(); ++i) {
|
||||
if (read_params.end_key[i].size() < min_scan_key_len) {
|
||||
min_scan_key_len = read_params.end_key[i].size();
|
||||
}
|
||||
for (const auto& end_key : read_params.end_key) {
|
||||
min_scan_key_len = std::min(min_scan_key_len, static_cast<int>(end_key.size()));
|
||||
}
|
||||
|
||||
int max_equal_index = -1;
|
||||
@ -893,19 +886,19 @@ void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
|
||||
}
|
||||
|
||||
OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) {
|
||||
if (read_params.reader_type != READER_CUMULATIVE_COMPACTION) {
|
||||
_tablet->obtain_header_rdlock();
|
||||
OLAPStatus ret = _delete_handler.init(
|
||||
_tablet->tablet_schema(), _tablet->delete_predicates(), read_params.version.second);
|
||||
_tablet->release_header_lock();
|
||||
|
||||
if (read_params.reader_type == READER_BASE_COMPACTION) {
|
||||
_filter_delete = true;
|
||||
}
|
||||
return ret;
|
||||
} else {
|
||||
if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
_tablet->obtain_header_rdlock();
|
||||
OLAPStatus ret = _delete_handler.init(
|
||||
_tablet->tablet_schema(), _tablet->delete_predicates(), read_params.version.second);
|
||||
_tablet->release_header_lock();
|
||||
|
||||
if (read_params.reader_type == READER_BASE_COMPACTION) {
|
||||
_filter_delete = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -76,7 +76,7 @@ struct ReaderParams {
|
||||
|
||||
void check_validation() const;
|
||||
|
||||
std::string to_string();
|
||||
std::string to_string() const;
|
||||
};
|
||||
|
||||
class Reader {
|
||||
|
||||
@ -89,6 +89,7 @@ public:
|
||||
bool empty() const { return _segment_group->empty(); }
|
||||
bool zero_num_rows() const { return _segment_group->zero_num_rows(); }
|
||||
|
||||
// Return true if should be filtered out
|
||||
bool rowset_pruning_filter();
|
||||
int delete_pruning_filter();
|
||||
uint64_t get_filtered_rows();
|
||||
|
||||
@ -95,7 +95,7 @@ RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) {
|
||||
}
|
||||
|
||||
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
|
||||
const DeleteHandler& delete_handler) {
|
||||
const DeleteHandler* delete_handler) {
|
||||
_schema_mapping.resize(tablet_schema.num_columns());
|
||||
_delete_handler = delete_handler;
|
||||
}
|
||||
@ -106,8 +106,6 @@ RowBlockChanger::~RowBlockChanger() {
|
||||
SAFE_DELETE(it->default_value);
|
||||
}
|
||||
_schema_mapping.clear();
|
||||
|
||||
_delete_handler.finalize();
|
||||
}
|
||||
|
||||
ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index) {
|
||||
@ -449,7 +447,7 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
|
||||
|
||||
// filter data according to delete conditions specified in DeleteData command
|
||||
if (is_data_left_vec[row_index] == 1) {
|
||||
if (_delete_handler.is_filter_data(data_version, read_helper)) {
|
||||
if (_delete_handler != nullptr && _delete_handler->is_filter_data(data_version, read_helper)) {
|
||||
is_data_left_vec[row_index] = 0;
|
||||
}
|
||||
}
|
||||
@ -1552,7 +1550,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
|
||||
sc_params.base_tablet = base_tablet;
|
||||
sc_params.new_tablet = new_tablet;
|
||||
sc_params.ref_rowset_readers = rs_readers;
|
||||
sc_params.delete_handler = delete_handler;
|
||||
sc_params.delete_handler = &delete_handler;
|
||||
if (request.__isset.materialized_view_params) {
|
||||
for (auto item : request.materialized_view_params) {
|
||||
AlterMaterializedViewParam mv_param;
|
||||
|
||||
@ -49,7 +49,7 @@ bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletCo
|
||||
|
||||
class RowBlockChanger {
|
||||
public:
|
||||
RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler& delete_handler);
|
||||
RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler);
|
||||
|
||||
RowBlockChanger(const TabletSchema& tablet_schema);
|
||||
|
||||
@ -67,7 +67,7 @@ private:
|
||||
SchemaMapping _schema_mapping;
|
||||
|
||||
// delete handler for filtering data which use specified in DELETE_DATA
|
||||
DeleteHandler _delete_handler;
|
||||
const DeleteHandler* _delete_handler = nullptr;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(RowBlockChanger);
|
||||
};
|
||||
@ -210,7 +210,7 @@ private:
|
||||
TabletSharedPtr base_tablet;
|
||||
TabletSharedPtr new_tablet;
|
||||
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
|
||||
DeleteHandler delete_handler;
|
||||
DeleteHandler* delete_handler = nullptr;
|
||||
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
|
||||
};
|
||||
|
||||
|
||||
@ -571,10 +571,8 @@ void Tablet::delete_expired_stale_rowset() {
|
||||
|
||||
OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
|
||||
std::vector<Version>* version_path) const {
|
||||
// OLAPStatus status = _rs_graph.capture_consistent_versions(spec_version, version_path);
|
||||
OLAPStatus status =
|
||||
_timestamped_version_tracker.capture_consistent_versions(spec_version, version_path);
|
||||
|
||||
if (status != OLAP_SUCCESS) {
|
||||
std::vector<Version> missed_versions;
|
||||
calc_missed_versions_unlocked(spec_version.second, &missed_versions);
|
||||
|
||||
@ -34,6 +34,7 @@
|
||||
#include "olap/utils.h"
|
||||
#include "util/file_utils.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/cpu_info.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace doris;
|
||||
@ -450,7 +451,6 @@ protected:
|
||||
|
||||
TEST_F(TestDeleteConditionHandler2, ValidConditionValue) {
|
||||
OLAPStatus res;
|
||||
DeleteConditionHandler cond_handler;
|
||||
std::vector<TCondition> conditions;
|
||||
|
||||
// 测试数据中, k1,k2,k3,k4类型分别为int8, int16, int32, int64
|
||||
@ -572,7 +572,6 @@ TEST_F(TestDeleteConditionHandler2, ValidConditionValue) {
|
||||
|
||||
TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
|
||||
OLAPStatus res;
|
||||
DeleteConditionHandler cond_handler;
|
||||
std::vector<TCondition> conditions;
|
||||
|
||||
// 测试k1的值越上界,k1类型为int8
|
||||
@ -783,6 +782,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
|
||||
class TestDeleteHandler : public testing::Test {
|
||||
protected:
|
||||
void SetUp() {
|
||||
CpuInfo::init();
|
||||
// Create local data dir for StorageEngine.
|
||||
char buffer[MAX_PATH_LEN];
|
||||
getcwd(buffer, MAX_PATH_LEN);
|
||||
@ -825,7 +825,6 @@ protected:
|
||||
TEST_F(TestDeleteHandler, InitSuccess) {
|
||||
OLAPStatus res;
|
||||
std::vector<TCondition> conditions;
|
||||
DeleteConditionHandler delete_condition_handler;
|
||||
|
||||
// 往头文件中添加过滤条件
|
||||
TCondition condition;
|
||||
@ -896,7 +895,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
|
||||
res = _delete_handler.init(tablet->tablet_schema(), tablet->delete_predicates(), 4);
|
||||
ASSERT_EQ(OLAP_SUCCESS, res);
|
||||
ASSERT_EQ(4, _delete_handler.conditions_num());
|
||||
std::vector<int32_t> conds_version = _delete_handler.get_conds_version();
|
||||
std::vector<int64_t> conds_version = _delete_handler.get_conds_version();
|
||||
EXPECT_EQ(4, conds_version.size());
|
||||
sort(conds_version.begin(), conds_version.end());
|
||||
EXPECT_EQ(1, conds_version[0]);
|
||||
@ -911,7 +910,6 @@ TEST_F(TestDeleteHandler, InitSuccess) {
|
||||
// 即只有满足一条过滤条件包含的所有子条件,这条数据才会被过滤
|
||||
TEST_F(TestDeleteHandler, FilterDataSubconditions) {
|
||||
OLAPStatus res;
|
||||
DeleteConditionHandler cond_handler;
|
||||
std::vector<TCondition> conditions;
|
||||
|
||||
// 往Header中添加过滤条件
|
||||
@ -973,7 +971,6 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
|
||||
// 即如果存在多个过滤条件,会一次检查数据是否符合这些过滤条件;只要有一个过滤条件符合,则过滤数据
|
||||
TEST_F(TestDeleteHandler, FilterDataConditions) {
|
||||
OLAPStatus res;
|
||||
DeleteConditionHandler cond_handler;
|
||||
std::vector<TCondition> conditions;
|
||||
|
||||
// 往Header中添加过滤条件
|
||||
@ -1054,7 +1051,6 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
|
||||
// 测试在过滤时,版本号小于数据版本的过滤条件将不起作用
|
||||
TEST_F(TestDeleteHandler, FilterDataVersion) {
|
||||
OLAPStatus res;
|
||||
DeleteConditionHandler cond_handler;
|
||||
std::vector<TCondition> conditions;
|
||||
|
||||
// 往Header中添加过滤条件
|
||||
|
||||
@ -543,7 +543,8 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
|
||||
while (left > 0) {
|
||||
int rows_read = left > 1024 ? 1024 : left;
|
||||
block.clear();
|
||||
ASSERT_TRUE(iter->next_batch(&block).ok());
|
||||
auto s = iter->next_batch(&block);
|
||||
ASSERT_TRUE(s.ok()) << s.to_string();
|
||||
ASSERT_EQ(rows_read, block.num_rows());
|
||||
ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state());
|
||||
left -= rows_read;
|
||||
@ -603,6 +604,7 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
|
||||
|
||||
// segment write
|
||||
std::string dname = "./ut_dir/segment_write_size";
|
||||
FileUtils::remove_all(dname);
|
||||
FileUtils::create_dir(dname);
|
||||
|
||||
SegmentWriterOptions opts;
|
||||
@ -612,10 +614,10 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
|
||||
std::unique_ptr<fs::WritableBlock> wblock;
|
||||
fs::CreateBlockOptions wblock_opts({fname});
|
||||
Status st = fs::fs_util::block_manager()->create_block(wblock_opts, &wblock);
|
||||
ASSERT_TRUE(st.ok());
|
||||
ASSERT_TRUE(st.ok()) << st.to_string();
|
||||
SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
|
||||
st = writer.init(10);
|
||||
ASSERT_TRUE(st.ok());
|
||||
ASSERT_TRUE(st.ok()) << st.to_string();
|
||||
|
||||
RowCursor row;
|
||||
auto olap_st = row.init(*tablet_schema);
|
||||
|
||||
Reference in New Issue
Block a user