[fix] return different result in different hbase version
This commit is contained in:
parent
34be91fe62
commit
71cb406baf
@ -1703,6 +1703,9 @@ int ObHTableFilterOperator::init(common::ObIAllocator *allocator)
|
||||
check_existence_only_ = hbase_params->check_existence_only_;
|
||||
row_iterator_->set_allow_partial_results(hbase_params->allow_partial_results_);
|
||||
row_iterator_->set_is_cache_block(hbase_params->is_cache_block_);
|
||||
if (NULL != filter_ && !hbase_params->hbase_version_.empty()) {
|
||||
filter_->set_hbase_version(hbase_params->hbase_version_);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,8 @@ using namespace oceanbase::common;
|
||||
using namespace oceanbase::table;
|
||||
using namespace oceanbase::table::hfilter;
|
||||
Filter::Filter()
|
||||
:is_reversed_(false)
|
||||
: is_reversed_(false),
|
||||
hbase_major_version_(1)
|
||||
{
|
||||
}
|
||||
|
||||
@ -432,6 +433,18 @@ void FilterListBase::reset()
|
||||
} // end for
|
||||
}
|
||||
|
||||
void FilterListBase::set_hbase_version(const ObString &version)
|
||||
{
|
||||
const int64_t N = filters_.count();
|
||||
for (int64_t i = 0; i < N; ++i) {
|
||||
Filter* filter = nullptr;
|
||||
if (nullptr != (filter = filters_.at(i))) {
|
||||
filter->set_hbase_version(version);
|
||||
}
|
||||
} // end for
|
||||
Filter::set_hbase_version(version);
|
||||
}
|
||||
|
||||
// statement is "$list_filter_name $operator $filter0, $filter1, ..., $filtern"
|
||||
// eg: "FilterListAND AND ValueFilter EQUAL, ValueFilter GREATER"
|
||||
int64_t FilterListBase::get_format_filter_string_length() const
|
||||
@ -589,7 +602,11 @@ int FilterListAND::filter_cell(const ObHTableCell &cell, ReturnCode &ret_code)
|
||||
LOG_WARN("failed to filter cell", K(ret));
|
||||
loop = false;
|
||||
} else {
|
||||
ret_code = merge_return_code(ret_code, local_rc);
|
||||
if (get_hbase_major_version() < 2) {
|
||||
ret_code = local_rc;
|
||||
} else {
|
||||
ret_code = merge_return_code(ret_code, local_rc);
|
||||
}
|
||||
switch (ret_code) {
|
||||
case ReturnCode::INCLUDE_AND_NEXT_COL:
|
||||
case ReturnCode::INCLUDE:
|
||||
|
@ -78,9 +78,12 @@ public:
|
||||
virtual int get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell) = 0;
|
||||
void set_reversed(bool reversed) { is_reversed_ = reversed; }
|
||||
bool is_reversed() const { return is_reversed_; }
|
||||
virtual void set_hbase_version(const ObString &version) { hbase_major_version_ = version[0] - '0'; }
|
||||
int8_t get_hbase_major_version() const { return hbase_major_version_; }
|
||||
VIRTUAL_TO_STRING_KV("filter", "Filter");
|
||||
protected:
|
||||
bool is_reversed_;
|
||||
int8_t hbase_major_version_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(Filter);
|
||||
};
|
||||
@ -330,6 +333,7 @@ public:
|
||||
virtual int64_t get_format_filter_string_length() const override;
|
||||
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
|
||||
virtual const char *filter_name() const { return "FilterListBase"; }
|
||||
virtual void set_hbase_version(const ObString &version) override;
|
||||
|
||||
TO_STRING_KV("filter", "FilterList",
|
||||
"op", operator_to_string(op_),
|
||||
@ -431,6 +435,7 @@ public:
|
||||
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) override;
|
||||
virtual bool filter_row() override;
|
||||
virtual bool has_filter_row() override { return true; }
|
||||
virtual void set_hbase_version(const ObString &version) override { filter_->set_hbase_version(version); }
|
||||
virtual int64_t get_format_filter_string_length() const override;
|
||||
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
|
||||
|
||||
|
@ -3285,9 +3285,10 @@ DEF_TO_STRING(ObTableBitMap)
|
||||
OB_SERIALIZE_MEMBER_SIMPLE(ObHBaseParams,
|
||||
caching_,
|
||||
call_timeout_,
|
||||
flag_);
|
||||
flag_,
|
||||
hbase_version_);
|
||||
|
||||
int ObHBaseParams::deep_copy(ObKVParamsBase *hbase_params) const
|
||||
int ObHBaseParams::deep_copy(ObIAllocator &allocator, ObKVParamsBase *hbase_params) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (hbase_params == nullptr || hbase_params->get_param_type() != ParamType::HBase) {
|
||||
@ -3300,6 +3301,9 @@ int ObHBaseParams::deep_copy(ObKVParamsBase *hbase_params) const
|
||||
param->is_cache_block_ = is_cache_block_;
|
||||
param->allow_partial_results_ = allow_partial_results_;
|
||||
param->check_existence_only_ = check_existence_only_;
|
||||
if (OB_FAIL(ob_write_string(allocator, hbase_version_, param->hbase_version_))) {
|
||||
LOG_WARN("failed to write string", K(ret), K_(hbase_version));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -3358,7 +3362,7 @@ int ObKVParams::deep_copy(ObIAllocator &allocator, ObKVParams &ob_params) const
|
||||
ob_params.set_allocator(&allocator);
|
||||
if (OB_FAIL(ob_params.alloc_ob_params(ob_params_->get_param_type(), ob_params.ob_params_))) {
|
||||
LOG_WARN("alloc ob params error", K(ob_params_->get_param_type()), K(ret));
|
||||
} else if (OB_FAIL(ob_params_->deep_copy(ob_params.ob_params_))) {
|
||||
} else if (OB_FAIL(ob_params_->deep_copy(allocator, ob_params.ob_params_))) {
|
||||
LOG_WARN("ob_params_ deep_copy error", K(ret));
|
||||
}
|
||||
}
|
||||
|
@ -873,7 +873,7 @@ public:
|
||||
virtual int serialize(char *buf, const int64_t buf_len, int64_t &pos) const = 0;
|
||||
virtual int deserialize(const char *buf, const int64_t data_len, int64_t &pos) = 0;
|
||||
virtual int64_t get_serialize_size() const = 0;
|
||||
virtual int deep_copy(ObKVParamsBase *ob_params) const = 0;
|
||||
virtual int deep_copy(ObIAllocator &allocator, ObKVParamsBase *ob_params) const = 0;
|
||||
virtual int64_t to_string(char* buf, const int64_t buf_len) const = 0;
|
||||
protected:
|
||||
ParamType param_type_;
|
||||
@ -882,12 +882,14 @@ protected:
|
||||
class ObHBaseParams : public ObKVParamsBase
|
||||
{
|
||||
public:
|
||||
const ObString DEFAULT_HBASE_VERSION = ObString("1.3.6");
|
||||
ObHBaseParams()
|
||||
: caching_(0),
|
||||
call_timeout_(0),
|
||||
allow_partial_results_(false),
|
||||
is_cache_block_(true),
|
||||
check_existence_only_(false)
|
||||
check_existence_only_(false),
|
||||
hbase_version_(DEFAULT_HBASE_VERSION)
|
||||
{
|
||||
param_type_ = ParamType::HBase;
|
||||
}
|
||||
@ -899,14 +901,15 @@ public:
|
||||
OB_INLINE void set_allow_partial_results(const bool allow_partial_results) { allow_partial_results_ = allow_partial_results; }
|
||||
OB_INLINE void set_is_cache_block(const bool is_cache_block) { is_cache_block_ = is_cache_block; }
|
||||
OB_INLINE void set_check_existence_only(const bool check_existence_only) {check_existence_only_ = check_existence_only; }
|
||||
int deep_copy(ObKVParamsBase *ob_params) const;
|
||||
int deep_copy(ObIAllocator &allocator, ObKVParamsBase *ob_params) const override;
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
TO_STRING_KV( K_(param_type),
|
||||
K_(caching),
|
||||
K_(call_timeout),
|
||||
K_(allow_partial_results),
|
||||
K_(is_cache_block),
|
||||
K_(check_existence_only));
|
||||
K_(check_existence_only),
|
||||
K_(hbase_version));
|
||||
public:
|
||||
int32_t caching_;
|
||||
int32_t call_timeout_;
|
||||
@ -919,6 +922,7 @@ public:
|
||||
bool check_existence_only_ : 1;
|
||||
};
|
||||
};
|
||||
ObString hbase_version_;
|
||||
};
|
||||
|
||||
class ObKVParams
|
||||
|
Loading…
x
Reference in New Issue
Block a user