Files
doris/be/src/olap/reader.cpp
thinker 2a2f12ca51 [refactor & fix](exce & olap) refactor reader: rename Reader to TabletReader (#7544)
1. Consider the responsibility of Reader,  Rename Reader to TabletReader, I think the new name TabletReader can represent its function exactly,  it is more suitable and meaningful
2. add virtual keyword for the destructor of OlapScanner, because VOlapScanner is derived from it
3. refactor struct ReaderParams and KeysParam as TabletReader's inner struct,guard by TabletReader name scope, it's also more reasonable
4. reduce OlapScanner's member data amount, just use _parent->member_data is simpler
5. bugfix: TupleReader has the same memeber data _collect_iter to its parent class Reader, this usage is dangerous, the writer may make some mistake, so i delete TupleReader::_collect_iter to fix it.
6. call set_tablet_reader() in OlapScanner::prepare() to setup _tablet_reader, VOlapScanner should override set_tablet_reader to new BlockReader instead,  use this way to avoid new Reader twice by reset unique_ptr _tablet_reader
7. if the member data is a inseparable part of a class, i suggest using normal variable while not pointer variable, because pointer bring a indirect lay and must handle coping and destructing carefully, it's not necessary
8. some other small changes for readability or design
2022-01-06 00:00:32 +08:00

835 lines
38 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/reader.h"
#include <parallel_hashmap/phmap.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <charconv>
#include <unordered_set>
#include "olap/bloom_filter_predicate.h"
#include "olap/collect_iterator.h"
#include "olap/comparison_predicate.h"
#include "olap/in_list_predicate.h"
#include "olap/null_predicate.h"
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/rowset/column_data.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "runtime/string_value.hpp"
#include "util/date_func.h"
#include "util/mem_util.hpp"
using std::nothrow;
using std::set;
using std::vector;
namespace doris {
void TabletReader::ReaderParams::check_validation() const {
if (UNLIKELY(version.first == -1)) {
LOG(FATAL) << "version is not set. tablet=" << tablet->full_name();
}
}
std::string TabletReader::ReaderParams::to_string() const {
std::stringstream ss;
ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type
<< " aggregation=" << aggregation << " version=" << version << " start_key_include=" << start_key_include
<< " end_key_include=" << end_key_include;
for (const auto& key : start_key) {
ss << " keys=" << key;
}
for (const auto& key : end_key) {
ss << " end_keys=" << key;
}
for (auto& condition : conditions) {
ss << " conditions=" << apache::thrift::ThriftDebugString(condition);
}
return ss.str();
}
std::string TabletReader::KeysParam::to_string() const {
std::stringstream ss;
ss << "start_key_include=" << start_key_include << " end_key_include=" << end_key_include;
for (auto& start_key : start_keys) {
ss << " keys=" << start_key.to_string();
}
for (auto& end_key : end_keys) {
ss << " end_keys=" << end_key.to_string();
}
return ss.str();
}
TabletReader::~TabletReader() {
VLOG_NOTICE << "merged rows:" << _merged_rows;
_conditions.finalize();
if (!_all_conditions.empty()) {
_all_conditions.finalize();
}
_delete_handler.finalize();
for (auto pred : _col_predicates) {
delete pred;
}
for (auto pred : _value_col_predicates) {
delete pred;
}
}
OLAPStatus TabletReader::init(const ReaderParams& read_params) {
// TODO(yingchun): monitor
_tracker.reset(new MemTracker(-1, read_params.tablet->full_name()));
_predicate_mem_pool.reset(new MemPool(_tracker.get()));
OLAPStatus res = _init_params(read_params);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init reader when init params. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
<< ", schema_hash:" << read_params.tablet->schema_hash()
<< ", reader type:" << read_params.reader_type
<< ", version:" << read_params.version;
}
return res;
}
// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation
bool TabletReader::_optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
bool has_delete_rowset = false;
bool has_overlapping = false;
int nonoverlapping_count = 0;
for (const auto& rs_reader : rs_readers) {
if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
has_delete_rowset = true;
break;
}
if (rs_reader->rowset()->rowset_meta()->num_rows() > 0) {
if (rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
// when there are overlapping segments, can not do directly read
has_overlapping = true;
break;
} else if (++nonoverlapping_count > 1) {
break;
}
}
}
return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset;
}
OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
const std::vector<RowsetReaderSharedPtr>* rs_readers = &read_params.rs_readers;
if (rs_readers->empty()) {
LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name();
return OLAP_ERR_VERSION_NOT_EXIST;
}
bool eof = false;
bool is_lower_key_included = _keys_param.start_key_include;
bool is_upper_key_included = _keys_param.end_key_include;
for (int i = 0; i < _keys_param.start_keys.size(); ++i) {
// lower bound
RowCursor& start_key = _keys_param.start_keys[i];
RowCursor& end_key = _keys_param.end_keys[i];
if (!is_lower_key_included) {
if (compare_row_key(start_key, end_key) >= 0) {
VLOG_NOTICE << "return EOF when lower key not include"
<< ", start_key=" << start_key.to_string()
<< ", end_key=" << end_key.to_string();
eof = true;
break;
}
} else {
if (compare_row_key(start_key, end_key) > 0) {
VLOG_NOTICE << "return EOF when lower key include="
<< ", start_key=" << start_key.to_string()
<< ", end_key=" << end_key.to_string();
eof = true;
break;
}
}
_is_lower_keys_included.push_back(is_lower_key_included);
_is_upper_keys_included.push_back(is_upper_key_included);
}
if (eof) {
return OLAP_SUCCESS;
}
bool need_ordered_result = true;
if (read_params.reader_type == READER_QUERY) {
if (_tablet->tablet_schema().keys_type() == DUP_KEYS) {
// duplicated keys are allowed, no need to merge sort keys in rowset
need_ordered_result = false;
}
if (_aggregation) {
// compute engine will aggregate rows with the same key,
// it's ok for rowset to return unordered result
need_ordered_result = false;
}
}
_reader_context.reader_type = read_params.reader_type;
_reader_context.tablet_schema = &_tablet->tablet_schema();
_reader_context.need_ordered_result = need_ordered_result;
_reader_context.return_columns = &_return_columns;
_reader_context.seek_columns = &_seek_columns;
_reader_context.load_bf_columns = &_load_bf_columns;
_reader_context.load_bf_all_columns = &_load_bf_all_columns;
_reader_context.conditions = &_conditions;
_reader_context.all_conditions = &_all_conditions;
_reader_context.predicates = &_col_predicates;
_reader_context.value_predicates = &_value_col_predicates;
_reader_context.lower_bound_keys = &_keys_param.start_keys;
_reader_context.is_lower_keys_included = &_is_lower_keys_included;
_reader_context.upper_bound_keys = &_keys_param.end_keys;
_reader_context.is_upper_keys_included = &_is_upper_keys_included;
_reader_context.delete_handler = &_delete_handler;
_reader_context.stats = &_stats;
_reader_context.runtime_state = read_params.runtime_state;
_reader_context.use_page_cache = read_params.use_page_cache;
_reader_context.sequence_id_idx = _sequence_col_idx;
*valid_rs_readers = *rs_readers;
return OLAP_SUCCESS;
}
OLAPStatus TabletReader::_init_params(const ReaderParams& read_params) {
read_params.check_validation();
_aggregation = read_params.aggregation;
_need_agg_finalize = read_params.need_agg_finalize;
_reader_type = read_params.reader_type;
_tablet = read_params.tablet;
_init_conditions_param(read_params);
_init_load_bf_columns(read_params);
OLAPStatus res = _init_delete_condition(read_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init delete param. [res=%d]", res);
return res;
}
res = _init_return_columns(read_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init return columns. [res=%d]", res);
return res;
}
res = _init_keys_param(read_params);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init keys param. res=" << res;
return res;
}
_init_seek_columns();
_collect_iter.init(this);
if (_tablet->tablet_schema().has_sequence_col()) {
auto sequence_col_idx = _tablet->tablet_schema().sequence_col_idx();
DCHECK_NE(sequence_col_idx, -1);
for (auto col : _return_columns) {
// query has sequence col
if (col == sequence_col_idx) {
_sequence_col_idx = sequence_col_idx;
break;
}
}
}
return res;
}
OLAPStatus TabletReader::_init_return_columns(const ReaderParams& read_params) {
if (read_params.reader_type == READER_QUERY) {
_return_columns = read_params.return_columns;
if (!_delete_handler.empty()) {
// We need to fetch columns which there are deletion conditions on them.
set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (const auto& conds : _delete_handler.get_delete_conditions()) {
for (const auto& cond_column : conds.del_cond->columns()) {
if (column_set.find(cond_column.first) == column_set.end()) {
column_set.insert(cond_column.first);
_return_columns.push_back(cond_column.first);
}
}
}
}
for (auto id : read_params.return_columns) {
if (_tablet->tablet_schema().column(id).is_key()) {
_key_cids.push_back(id);
} else {
_value_cids.push_back(id);
}
}
} else if (read_params.return_columns.empty()) {
for (size_t i = 0; i < _tablet->tablet_schema().num_columns(); ++i) {
_return_columns.push_back(i);
if (_tablet->tablet_schema().column(i).is_key()) {
_key_cids.push_back(i);
} else {
_value_cids.push_back(i);
}
}
VLOG_NOTICE << "return column is empty, using full column as default.";
} else if (read_params.reader_type == READER_CHECKSUM) {
_return_columns = read_params.return_columns;
for (auto id : read_params.return_columns) {
if (_tablet->tablet_schema().column(id).is_key()) {
_key_cids.push_back(id);
} else {
_value_cids.push_back(id);
}
}
} else {
OLAP_LOG_WARNING("fail to init return columns. [reader_type=%d return_columns_size=%u]",
read_params.reader_type, read_params.return_columns.size());
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
std::sort(_key_cids.begin(), _key_cids.end(), std::greater<uint32_t>());
return OLAP_SUCCESS;
}
void TabletReader::_init_seek_columns() {
std::unordered_set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (auto& it : _conditions.columns()) {
column_set.insert(it.first);
}
size_t max_key_column_count = 0;
for (const auto& key : _keys_param.start_keys) {
max_key_column_count = std::max(max_key_column_count, key.field_count());
}
for (const auto& key : _keys_param.end_keys) {
max_key_column_count = std::max(max_key_column_count, key.field_count());
}
for (size_t i = 0; i < _tablet->tablet_schema().num_columns(); i++) {
if (i < max_key_column_count || column_set.find(i) != column_set.end()) {
_seek_columns.push_back(i);
}
}
}
OLAPStatus TabletReader::_init_keys_param(const ReaderParams& read_params) {
if (read_params.start_key.empty()) {
return OLAP_SUCCESS;
}
_keys_param.start_key_include = read_params.start_key_include;
_keys_param.end_key_include = read_params.end_key_include;
size_t start_key_size = read_params.start_key.size();
//_keys_param.start_keys.resize(start_key_size);
std::vector<RowCursor>(start_key_size).swap(_keys_param.start_keys);
size_t scan_key_size = read_params.start_key.front().size();
if (scan_key_size > _tablet->tablet_schema().num_columns()) {
LOG(WARNING)
<< "Input param are invalid. Column count is bigger than num_columns of schema. "
<< "column_count=" << scan_key_size
<< ", schema.num_columns=" << _tablet->tablet_schema().num_columns();
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
std::vector<uint32_t> columns(scan_key_size);
std::iota(columns.begin(), columns.end(), 0);
std::shared_ptr<Schema> schema =
std::make_shared<Schema>(_tablet->tablet_schema().columns(), columns);
for (size_t i = 0; i < start_key_size; ++i) {
if (read_params.start_key[i].size() != scan_key_size) {
OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i,
read_params.start_key[i].size(), scan_key_size);
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
OLAPStatus res = _keys_param.start_keys[i].init_scan_key(
_tablet->tablet_schema(), read_params.start_key[i].values(), schema);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
return res;
}
res = _keys_param.start_keys[i].from_tuple(read_params.start_key[i]);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor from Keys. [res=%d key_index=%ld]", res, i);
return res;
}
}
size_t end_key_size = read_params.end_key.size();
//_keys_param.end_keys.resize(end_key_size);
std::vector<RowCursor>(end_key_size).swap(_keys_param.end_keys);
for (size_t i = 0; i < end_key_size; ++i) {
if (read_params.end_key[i].size() != scan_key_size) {
OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i,
read_params.end_key[i].size(), scan_key_size);
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
OLAPStatus res = _keys_param.end_keys[i].init_scan_key(
_tablet->tablet_schema(), read_params.end_key[i].values(), schema);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
return res;
}
res = _keys_param.end_keys[i].from_tuple(read_params.end_key[i]);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row cursor from Keys. [res=%d key_index=%ld]", res, i);
return res;
}
}
//TODO:check the valid of start_key and end_key.(eg. start_key <= end_key)
return OLAP_SUCCESS;
}
void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
_conditions.set_tablet_schema(&_tablet->tablet_schema());
_all_conditions.set_tablet_schema(&_tablet->tablet_schema());
for (const auto& condition : read_params.conditions) {
ColumnPredicate* predicate = _parse_to_predicate(condition);
if (predicate != nullptr) {
if (_tablet->tablet_schema()
.column(_tablet->field_index(condition.column_name))
.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
_value_col_predicates.push_back(predicate);
} else {
_col_predicates.push_back(predicate);
OLAPStatus status = _conditions.append_condition(condition);
DCHECK_EQ(OLAP_SUCCESS, status);
}
OLAPStatus status = _all_conditions.append_condition(condition);
DCHECK_EQ(OLAP_SUCCESS, status);
}
}
// Only key column bloom filter will push down to storage engine
for (const auto& filter : read_params.bloom_filters) {
_col_predicates.emplace_back(_parse_to_predicate(filter));
}
}
#define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \
ColumnPredicate* TabletReader::_new_##NAME##_pred(const TabletColumn& column, int index, \
const std::string& cond, bool opposite) const { \
ColumnPredicate* predicate = nullptr; \
switch (column.type()) { \
case OLAP_FIELD_TYPE_TINYINT: { \
int8_t value = 0; \
std::from_chars(cond.data(), cond.data() + cond.size(), value); \
predicate = new PREDICATE<int8_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_SMALLINT: { \
int16_t value = 0; \
std::from_chars(cond.data(), cond.data() + cond.size(), value); \
predicate = new PREDICATE<int16_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_INT: { \
int32_t value = 0; \
std::from_chars(cond.data(), cond.data() + cond.size(), value); \
predicate = new PREDICATE<int32_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_BIGINT: { \
int64_t value = 0; \
std::from_chars(cond.data(), cond.data() + cond.size(), value); \
predicate = new PREDICATE<int64_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_LARGEINT: { \
int128_t value = 0; \
StringParser::ParseResult result; \
value = StringParser::string_to_int<__int128>(cond.data(), cond.size(), &result); \
predicate = new PREDICATE<int128_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_DECIMAL: { \
decimal12_t value = {0, 0}; \
value.from_string(cond); \
predicate = new PREDICATE<decimal12_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_CHAR: { \
StringValue value; \
size_t length = std::max(static_cast<size_t>(column.length()), cond.length()); \
char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length)); \
memset(buffer, 0, length); \
memory_copy(buffer, cond.c_str(), cond.length()); \
value.len = length; \
value.ptr = buffer; \
predicate = new PREDICATE<StringValue>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_VARCHAR: \
case OLAP_FIELD_TYPE_STRING: { \
StringValue value; \
int32_t length = cond.length(); \
char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length)); \
memory_copy(buffer, cond.c_str(), length); \
value.len = length; \
value.ptr = buffer; \
predicate = new PREDICATE<StringValue>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_DATE: { \
uint24_t value = timestamp_from_date(cond); \
predicate = new PREDICATE<uint24_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_DATETIME: { \
uint64_t value = timestamp_from_datetime(cond); \
predicate = new PREDICATE<uint64_t>(index, value, opposite); \
break; \
} \
case OLAP_FIELD_TYPE_BOOL: { \
int32_t ivalue = 0; \
auto result = std::from_chars(cond.data(), cond.data() + cond.size(), ivalue); \
bool value = false; \
if (result.ec == std::errc()) { \
if (ivalue == 0) { \
value = false; \
} else { \
value = true; \
} \
} else { \
StringParser::ParseResult parse_result; \
value = StringParser::string_to_bool(cond.data(), cond.size(), &parse_result); \
} \
predicate = new PREDICATE<bool>(index, value, opposite); \
break; \
} \
default: \
break; \
} \
\
return predicate; \
}
COMPARISON_PREDICATE_CONDITION_VALUE(eq, EqualPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(ne, NotEqualPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(lt, LessPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(le, LessEqualPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(gt, GreaterPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(ge, GreaterEqualPredicate)
ColumnPredicate* TabletReader::_parse_to_predicate(
const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter) {
int32_t index = _tablet->field_index(bloom_filter.first);
if (index < 0) {
return nullptr;
}
const TabletColumn& column = _tablet->tablet_schema().column(index);
return BloomFilterColumnPredicateFactory::create_column_predicate(index, bloom_filter.second,
column.type());
}
ColumnPredicate* TabletReader::_parse_to_predicate(const TCondition& condition, bool opposite) const {
// TODO: not equal and not in predicate is not pushed down
int32_t index = _tablet->field_index(condition.column_name);
if (index < 0) {
return nullptr;
}
const TabletColumn& column = _tablet->tablet_schema().column(index);
ColumnPredicate* predicate = nullptr;
if ((condition.condition_op == "*=" || condition.condition_op == "!*=" ||
condition.condition_op == "=" || condition.condition_op == "!=") &&
condition.condition_values.size() == 1) {
predicate = condition.condition_op == "*=" || condition.condition_op == "="
? _new_eq_pred(column, index, condition.condition_values[0], opposite)
: _new_ne_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == "<<") {
predicate = _new_lt_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == "<=") {
predicate = _new_le_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == ">>") {
predicate = _new_gt_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == ">=") {
predicate = _new_ge_pred(column, index, condition.condition_values[0], opposite);
} else if ((condition.condition_op == "*=" || condition.condition_op == "!*=") &&
condition.condition_values.size() > 1) {
switch (column.type()) {
case OLAP_FIELD_TYPE_TINYINT: {
phmap::flat_hash_set<int8_t> values;
int8_t value = 0;
for (auto& cond_val : condition.condition_values) {
std::from_chars(cond_val.data(), cond_val.data() + cond_val.size(), value);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<int8_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<int8_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_SMALLINT: {
phmap::flat_hash_set<int16_t> values;
int16_t value = 0;
for (auto& cond_val : condition.condition_values) {
std::from_chars(cond_val.data(), cond_val.data() + cond_val.size(), value);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<int16_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<int16_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_INT: {
phmap::flat_hash_set<int32_t> values;
int32_t value = 0;
for (auto& cond_val : condition.condition_values) {
std::from_chars(cond_val.data(), cond_val.data() + cond_val.size(), value);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<int32_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<int32_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_BIGINT: {
phmap::flat_hash_set<int64_t> values;
int64_t value = 0;
for (auto& cond_val : condition.condition_values) {
std::from_chars(cond_val.data(), cond_val.data() + cond_val.size(), value);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<int64_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<int64_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_LARGEINT: {
phmap::flat_hash_set<int128_t> values;
int128_t value = 0;
StringParser::ParseResult result;
for (auto& cond_val : condition.condition_values) {
value = StringParser::string_to_int<__int128>(cond_val.c_str(), cond_val.size(),
&result);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<int128_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<int128_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_DECIMAL: {
phmap::flat_hash_set<decimal12_t> values;
for (auto& cond_val : condition.condition_values) {
decimal12_t value = {0, 0};
value.from_string(cond_val);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<decimal12_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<decimal12_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_CHAR: {
phmap::flat_hash_set<StringValue> values;
for (auto& cond_val : condition.condition_values) {
StringValue value;
size_t length = std::max(static_cast<size_t>(column.length()), cond_val.length());
char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length));
memset(buffer, 0, length);
memory_copy(buffer, cond_val.c_str(), cond_val.length());
value.len = length;
value.ptr = buffer;
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<StringValue>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<StringValue>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_VARCHAR:
case OLAP_FIELD_TYPE_STRING:{
phmap::flat_hash_set<StringValue> values;
for (auto& cond_val : condition.condition_values) {
StringValue value;
int32_t length = cond_val.length();
char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length));
memory_copy(buffer, cond_val.c_str(), length);
value.len = length;
value.ptr = buffer;
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<StringValue>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<StringValue>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_DATE: {
phmap::flat_hash_set<uint24_t> values;
for (auto& cond_val : condition.condition_values) {
uint24_t value = timestamp_from_date(cond_val);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<uint24_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<uint24_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_DATETIME: {
phmap::flat_hash_set<uint64_t> values;
for (auto& cond_val : condition.condition_values) {
uint64_t value = timestamp_from_datetime(cond_val);
values.insert(value);
}
if (condition.condition_op == "*=") {
predicate = new InListPredicate<uint64_t>(index, std::move(values), opposite);
} else {
predicate = new NotInListPredicate<uint64_t>(index, std::move(values), opposite);
}
break;
}
// OLAP_FIELD_TYPE_BOOL is not valid in this case.
default:
break;
}
} else if (boost::to_lower_copy(condition.condition_op) == "is") {
predicate = new NullPredicate(
index, boost::to_lower_copy(condition.condition_values[0]) == "null", opposite);
}
return predicate;
}
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params) {
_init_load_bf_columns(read_params, &_conditions, &_load_bf_columns);
_init_load_bf_columns(read_params, &_all_conditions, &_load_bf_all_columns);
}
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions,
std::set<uint32_t>* load_bf_columns) {
// add all columns with condition to load_bf_columns
for (const auto& cond_column : conditions->columns()) {
if (!_tablet->tablet_schema().column(cond_column.first).is_bf_column()) {
continue;
}
for (const auto& cond : cond_column.second->conds()) {
if (cond->op == OP_EQ ||
(cond->op == OP_IN && cond->operand_set.size() < MAX_OP_IN_FIELD_NUM)) {
load_bf_columns->insert(cond_column.first);
}
}
}
// remove columns which have same value between start_key and end_key
int min_scan_key_len = _tablet->tablet_schema().num_columns();
for (const auto& start_key : read_params.start_key) {
min_scan_key_len = std::min(min_scan_key_len, static_cast<int>(start_key.size()));
}
for (const auto& end_key : read_params.end_key) {
min_scan_key_len = std::min(min_scan_key_len, static_cast<int>(end_key.size()));
}
int max_equal_index = -1;
for (int i = 0; i < read_params.start_key.size(); ++i) {
int j = 0;
for (; j < min_scan_key_len; ++j) {
if (read_params.start_key[i].get_value(j) != read_params.end_key[i].get_value(j)) {
break;
}
}
if (max_equal_index < j - 1) {
max_equal_index = j - 1;
}
}
for (int i = 0; i < max_equal_index; ++i) {
load_bf_columns->erase(i);
}
// remove the max_equal_index column when it's not varchar
// or longer than number of short key fields
if (max_equal_index == -1) {
return;
}
FieldType type = _tablet->tablet_schema().column(max_equal_index).type();
if ((type != OLAP_FIELD_TYPE_VARCHAR && type != OLAP_FIELD_TYPE_STRING)|| max_equal_index + 1 > _tablet->num_short_key_columns()) {
load_bf_columns->erase(max_equal_index);
}
}
OLAPStatus TabletReader::_init_delete_condition(const ReaderParams& read_params) {
if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
return OLAP_SUCCESS;
}
_tablet->obtain_header_rdlock();
OLAPStatus ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
read_params.version.second, this);
_tablet->release_header_lock();
// Only BASE_COMPACTION need set filter_delete = true
// other reader type:
// QUERY will filter the row in query layer to keep right result use where clause.
// CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
if (read_params.reader_type == READER_BASE_COMPACTION) {
_filter_delete = true;
}
return ret;
}
} // namespace doris