[FEAT MERGE] Hbase compatibility phase 2

Co-authored-by: 884244693 <884244693@qq.com>
Co-authored-by: stuBirFly <1065492934@qq.com>
Co-authored-by: JackShi148 <jackshi896@gmail.com>
This commit is contained in:
shenyunlong.syl 2024-11-23 04:44:53 +00:00 committed by ob-robot
parent 7887fcce92
commit 35cabca404
31 changed files with 4841 additions and 1359 deletions

View File

@ -767,6 +767,33 @@ int ob_write_string(AllocatorT &allocator, const ObString &src, ObString &dst, b
return ret;
}
template <typename AllocatorT>
int ob_write_string(AllocatorT &allocator, const ObString &src, ObString &dst, int len, int padding = 0, bool c_style = false)
{
int ret = OB_SUCCESS;
const ObString::obstr_size_t src_len = src.length();
char *ptr = NULL;
if (NULL == (ptr = static_cast<char *>(allocator.alloc(len + (c_style ? 1 : 0))))) {
dst.assign(NULL, 0);
ret = OB_ALLOCATE_MEMORY_FAILED;
LIB_LOG(ERROR, "allocate memory failed", K(ret), "size", len);
} else if (OB_NOT_NULL(src.ptr()) && 0 < src_len) {
int copy_len = src_len > len ? len : src_len;
MEMCPY(ptr, src.ptr(), copy_len);
if (len > copy_len) {
MEMSET(ptr + copy_len, padding, len - copy_len);
}
if (c_style) {
ptr[len] = 0;
}
dst.assign_ptr(ptr, len);
} else {
MEMSET(ptr, padding, len);
dst.assign_ptr(ptr, len);
}
return ret;
}
template <typename AllocatorT>
int ob_strip_space(AllocatorT &allocator, const ObString &src, ObString &dst)
{

View File

@ -5540,7 +5540,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
htable_filter.set_valid(true);
{
fprintf(stderr, "case: = binary comparator\n");
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', =, 'binary:xy_string2', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', =, 'binary:xy_string2', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5582,7 +5582,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: > binary comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', >, 'binary:w', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', >, 'binary:w', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5627,7 +5627,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: < binary comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', <, 'binary:w', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', <, 'binary:w', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5672,7 +5672,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: >= binary comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', >=, 'binary:xx_string1', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', >=, 'binary:xx_string1', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5717,7 +5717,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: <= binary comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', <=, 'binary:ab_string2', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', <=, 'binary:ab_string2', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5762,7 +5762,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: = prefix comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', =, 'binaryprefix:a', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', =, 'binaryprefix:a', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5807,7 +5807,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: != prefix comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', !=, 'binaryprefix:x', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', !=, 'binaryprefix:x', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5852,7 +5852,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: = substring comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', =, 'substring:string1', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', =, 'substring:string1', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -5896,7 +5896,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: != substring comparator\n");
// check
{
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1', 'cq1', !=, 'substring:string1', true, false)"));
htable_filter.set_filter(ObString::make_string("SingleColumnValueFilter('cf1_filter', 'cq1', !=, 'substring:string1', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));
@ -6065,7 +6065,7 @@ TEST_F(TestBatchExecute, htable_scan_with_filter)
fprintf(stderr, "case: WhileMatchFilter\n");
// check
{
htable_filter.set_filter(ObString::make_string("While SingleColumnValueFilter('cf1', 'cq1', !=, 'substring:string1', true, false)"));
htable_filter.set_filter(ObString::make_string("While SingleColumnValueFilter('cf1_filter', 'cq1', !=, 'substring:string1', true, false)"));
ObTableEntityIterator *iter = nullptr;
ASSERT_EQ(OB_SUCCESS, the_table->execute_query(query, iter));

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,10 @@
/**
#ifndef ob_hfilter_HEADER_H
#define ob_hfilter_HEADER_H 1
#define ob_hfilter_IN_HEADER 1
#line 6 "../../../src/observer/table/htable_filter_lex.hxx"
#line 7 "../../../src/observer/table/htable_filter_lex.lxx"
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
@ -9,13 +15,6 @@
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef ob_hfilter_HEADER_H
#define ob_hfilter_HEADER_H 1
#define ob_hfilter_IN_HEADER 1
#line 6 "../../../src/observer/table/htable_filter_lex.hxx"
#line 7 "../../../src/observer/table/htable_filter_lex.lxx"
#define USING_LOG_PREFIX SERVER
#include "observer/table/ob_htable_filter_parser.h"
#include "observer/table/ob_htable_filters.h"
@ -26,7 +25,7 @@ using namespace oceanbase::common;
#line 18 "../../../src/observer/table/htable_filter_lex.hxx"
#line 29 "../../../src/observer/table/htable_filter_lex.hxx"
#define YY_INT_ALIGNED short int
@ -358,9 +357,9 @@ extern int ob_hfilter_lex \
#undef YY_DECL
#endif
#line 117 "../../../src/observer/table/htable_filter_lex.lxx"
#line 138 "../../../src/observer/table/htable_filter_lex.lxx"
#line 353 "../../../src/observer/table/htable_filter_lex.hxx"
#line 364 "../../../src/observer/table/htable_filter_lex.hxx"
#undef ob_hfilter_IN_HEADER
#endif /* ob_hfilter_HEADER_H */

View File

@ -4,6 +4,17 @@
%option extra-type="oceanbase::table::ObHTableFilterParser *"
%option prefix="ob_hfilter_"
%top{
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "observer/table/ob_htable_filter_parser.h"
#include "observer/table/ob_htable_filters.h"
@ -88,14 +99,24 @@ RowFilter { return RowFilter; }
ValueFilter { return ValueFilter; }
QualifierFilter { return QualifierFilter; }
SingleColumnValueFilter { return SingleColumnValueFilter; }
SingleColumnValueExcludeFilter { return SingleColumnValueExcludeFilter; }
DependentColumnFilter { return DependentColumnFilter; }
PageFilter { return PageFilter; }
RandomRowFilter { return RandomRowFilter; }
ColumnPaginationFilter { return ColumnPaginationFilter; }
ColumnPrefixFilter { return ColumnPrefixFilter; }
FirstKeyOnlyFilter { return FirstKeyOnlyFilter; }
KeyOnlyFilter { return KeyOnlyFilter; }
FuzzyRowFilter { return FuzzyRowFilter; }
TimestampsFilter { return TimestampsFilter; }
ColumnValueFilter { return ColumnValueFilter; }
MultiRowRangeFilter { return MultiRowRangeFilter; }
InclusiveStopFilter { return InclusiveStopFilter; }
ColumnRangeFilter { return ColumnRangeFilter; }
MultipleColumnPrefixFilter { return MultipleColumnPrefixFilter; }
FamilyFilter { return FamilyFilter; }
ColumnCountGetFilter { return ColumnCountGetFilter; }
FirstKeyValueMatchingQualifiersFilter { return FirstKeyValueMatchingQualifiersFilter; }
CheckAndMutateFilter { return CheckAndMutateFilter; }
PrefixFilter { return PrefixFilter; }
TableCompareFilter { return TableCompareFilter; }

File diff suppressed because it is too large Load Diff

View File

@ -1,14 +1,3 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
/* A Bison parser, made by GNU Bison 2.4.1. */
@ -43,6 +32,27 @@
This special exception was added by the Free Software Foundation in
version 2.2 of Bison. */
/* "%code requires" blocks. */
/* Line 1676 of yacc.c */
#line 1 "../../../src/observer/table/htable_filter_tab.yxx"
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
/* Line 1676 of yacc.c */
#line 56 "../../../src/observer/table/htable_filter_tab.hxx"
/* Tokens. */
#ifndef YYTOKENTYPE
@ -59,32 +69,42 @@
ColumnCountGetFilter = 263,
ColumnPrefixFilter = 264,
ColumnPaginationFilter = 265,
CheckAndMutateFilter = 266,
PrefixFilter = 267,
FirstKeyOnlyFilter = 268,
KeyOnlyFilter = 269,
TimestampsFilter = 270,
TableCompareFilter = 271,
RandomRowFilter = 272,
LESS = 273,
LESS_OR_EQUAL = 274,
EQUAL = 275,
NOT_EQUAL = 276,
GREATER = 277,
GREATER_OR_EQUAL = 278,
NO_OP = 279,
IS = 280,
IS_NOT = 281,
BOOL_VALUE = 282,
STRING_VALUE = 283,
INT_VALUE = 284,
OR = 285,
AND = 286,
T_OR = 287,
T_AND = 288,
WHILE = 289,
SKIP = 290,
ERROR = 291
FirstKeyValueMatchingQualifiersFilter = 266,
CheckAndMutateFilter = 267,
PrefixFilter = 268,
FirstKeyOnlyFilter = 269,
KeyOnlyFilter = 270,
TimestampsFilter = 271,
MultiRowRangeFilter = 272,
InclusiveStopFilter = 273,
FuzzyRowFilter = 274,
ColumnValueFilter = 275,
TableCompareFilter = 276,
RandomRowFilter = 277,
DependentColumnFilter = 278,
SingleColumnValueExcludeFilter = 279,
ColumnRangeFilter = 280,
MultipleColumnPrefixFilter = 281,
FamilyFilter = 282,
LESS = 283,
LESS_OR_EQUAL = 284,
EQUAL = 285,
NOT_EQUAL = 286,
GREATER = 287,
GREATER_OR_EQUAL = 288,
NO_OP = 289,
IS = 290,
IS_NOT = 291,
BOOL_VALUE = 292,
STRING_VALUE = 293,
INT_VALUE = 294,
OR = 295,
AND = 296,
T_OR = 297,
T_AND = 298,
WHILE = 299,
SKIP = 300,
ERROR = 301
};
#endif
@ -95,19 +115,23 @@ typedef union YYSTYPE
{
/* Line 1676 of yacc.c */
#line 24 "../../../src/observer/table/htable_filter_tab.yxx"
#line 50 "../../../src/observer/table/htable_filter_tab.yxx"
int32_t ival;
int64_t lval;
oceanbase::table::hfilter::CompareOperator cmp_op;
oceanbase::table::ObHTableFilterParser::SimpleString sval;
oceanbase::table::hfilter::Filter *fval;
oceanbase::table::KeyRange *rval;
oceanbase::common::ObSEArray<oceanbase::ObString, 8> *sarray;
oceanbase::common::ObSEArray<int64_t, 8> *array;
oceanbase::common::ObSEArray<oceanbase::table::KeyRange*, 8> *range_array;
oceanbase::common::ObSEArray<oceanbase::table::hfilter::ObPair<oceanbase::ObString, oceanbase::ObString>*, 8> *fuzzy_array;
/* Line 1676 of yacc.c */
#line 100 "../../../src/observer/table/htable_filter_tab.hxx"
#line 135 "../../../src/observer/table/htable_filter_tab.hxx"
} YYSTYPE;
# define YYSTYPE_IS_TRIVIAL 1
# define yystype YYSTYPE /* obsolescent; will be withdrawn */

View File

@ -1,3 +1,29 @@
%code requires {
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
}
%code top {
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
}
/* Location tracking. */
%locations
/* Pure yylex. */
@ -27,12 +53,16 @@ using namespace oceanbase::common;
oceanbase::table::hfilter::CompareOperator cmp_op;
oceanbase::table::ObHTableFilterParser::SimpleString sval;
oceanbase::table::hfilter::Filter *fval;
oceanbase::table::KeyRange *rval;
oceanbase::common::ObSEArray<oceanbase::ObString, 8> *sarray;
oceanbase::common::ObSEArray<int64_t, 8> *array;
oceanbase::common::ObSEArray<oceanbase::table::KeyRange*, 8> *range_array;
oceanbase::common::ObSEArray<oceanbase::table::hfilter::ObPair<oceanbase::ObString, oceanbase::ObString>*, 8> *fuzzy_array;
};
%token RowFilter ValueFilter QualifierFilter SingleColumnValueFilter PageFilter ColumnCountGetFilter ColumnPrefixFilter ColumnPaginationFilter
%token CheckAndMutateFilter PrefixFilter FirstKeyOnlyFilter KeyOnlyFilter TimestampsFilter
%token TableCompareFilter RandomRowFilter
%token RowFilter ValueFilter QualifierFilter SingleColumnValueFilter PageFilter ColumnCountGetFilter ColumnPrefixFilter ColumnPaginationFilter FirstKeyValueMatchingQualifiersFilter
%token CheckAndMutateFilter PrefixFilter FirstKeyOnlyFilter KeyOnlyFilter TimestampsFilter MultiRowRangeFilter InclusiveStopFilter FuzzyRowFilter ColumnValueFilter
%token TableCompareFilter RandomRowFilter DependentColumnFilter SingleColumnValueExcludeFilter ColumnRangeFilter MultipleColumnPrefixFilter FamilyFilter
%token LESS LESS_OR_EQUAL EQUAL NOT_EQUAL GREATER GREATER_OR_EQUAL NO_OP IS IS_NOT
%token <ival> BOOL_VALUE
%token <sval> STRING_VALUE
@ -50,6 +80,10 @@ using namespace oceanbase::common;
%type <sval> family qualifier comparator
%type <cmp_op> compare_op
%type <array> timestamp_list
%type <range_array> range_list prefix_list
%type <rval> range
%type <sarray> param_list
%type <fuzzy_array> fuzzy_list
%start result_filter
%%
@ -328,6 +362,75 @@ simple_filter:
YYABORT;
}
}
| SingleColumnValueExcludeFilter '(' family ',' qualifier ',' compare_op ',' comparator ',' BOOL_VALUE ',' BOOL_VALUE ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
hfilter::Comparable *comparable = nullptr;
if (OB_FAIL(parse_ctx->create_comparator($9, comparable))) {
LOG_WARN("failed to create comparator", K(ret));
} else {
ObString family($3.len_, $3.str_);
ObString qualifier($5.len_, $5.str_);
bool filter_if_missing = ($11 == 1);
bool latest_version_only = ($13 == 1);
hfilter::SingleColumnValueExcludeFilter *filter = NULL;
$$ = filter = OB_NEWx(hfilter::SingleColumnValueExcludeFilter, parse_ctx->allocator(), family, qualifier, $7, comparable);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
} else {
filter->set_filter_if_missing(filter_if_missing);
filter->set_latest_version_only(latest_version_only);
}
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse SingleColumnValueExcludeFilter");
YYABORT;
}
}
| DependentColumnFilter '(' family ',' qualifier ',' BOOL_VALUE ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
ObString family($3.len_, $3.str_);
ObString qualifier($5.len_, $5.str_);
bool drop_dependent_column = ($7 == 1);
$$ = OB_NEWx(hfilter::DependentColumnFilter, parse_ctx->allocator(), family, qualifier, drop_dependent_column);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse DependentColumnFilter");
YYABORT;
}
}
| DependentColumnFilter '(' family ',' qualifier ',' BOOL_VALUE ',' compare_op ',' comparator ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
hfilter::Comparable *comparable = nullptr;
if (OB_FAIL(parse_ctx->create_comparator($11, comparable))) {
LOG_WARN("failed to create comparator", K(ret));
} else {
ObString family($3.len_, $3.str_);
ObString qualifier($5.len_, $5.str_);
bool drop_dependent_column = ($7 == 1);
$$ = OB_NEWx(hfilter::DependentColumnFilter, parse_ctx->allocator(), family, qualifier, drop_dependent_column, $9, comparable);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse DependentColumnFilter");
YYABORT;
}
}
| PageFilter '(' INT_VALUE ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
@ -440,6 +543,25 @@ simple_filter:
YYABORT;
}
}
| FuzzyRowFilter '(' fuzzy_list ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
if (nullptr != parse_ctx->allocator() && nullptr != $3) {
$$ = OB_NEWx(hfilter::FuzzyRowFilter, parse_ctx->allocator(), (*parse_ctx->allocator()), *$3);
} else {
$$ = nullptr;
}
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse FuzzyRowFilter");
YYABORT;
}
}
| TimestampsFilter '(' timestamp_list ',' BOOL_VALUE ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
@ -455,6 +577,126 @@ simple_filter:
YYABORT;
}
}
| ColumnValueFilter '(' family ',' qualifier ',' compare_op ',' comparator ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
hfilter::Comparable *comparable = nullptr;
if (OB_FAIL(parse_ctx->create_comparator($9, comparable))) {
LOG_WARN("failed to create comparator", K(ret));
} else {
ObString family($3.len_, $3.str_);
ObString qualifier($5.len_, $5.str_);
hfilter::ColumnValueFilter *filter = NULL;
$$ = filter = OB_NEWx(hfilter::ColumnValueFilter, parse_ctx->allocator(), family, qualifier, $7, comparable);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse ColumnValueFilter");
YYABORT;
}
}
| MultiRowRangeFilter '(' range_list ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(hfilter::MultiRowRangeFilter, parse_ctx->allocator(), $3);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse MultiRowRangeFilter");
YYABORT;
}
}
| InclusiveStopFilter '(' STRING_VALUE ')'
{
typedef ObSEArray<KeyRange*, 8> RangeArray;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
RangeArray* arr = OB_NEWx(RangeArray, parse_ctx->allocator());
KeyRange* range = OB_NEWx(KeyRange, parse_ctx->allocator());
if (nullptr == arr || nullptr == range) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else {
range->set_max(parse_ctx->create_ob_string($3));
range->set_max_inclusive(true);
if (OB_FAIL(arr->push_back(range))) {
LOG_WARN("failed to push back", K(ret));
} else {
$$ = OB_NEWx(hfilter::InclusiveStopFilter, parse_ctx->allocator(), arr, parse_ctx->create_ob_string($3));
}
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse InclusiveStopFilter");
YYABORT;
}
}
| ColumnRangeFilter '(' range_list ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(hfilter::ColumnRangeFilter, parse_ctx->allocator(), $3);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse ColumnRangeFilter");
YYABORT;
}
}
| MultipleColumnPrefixFilter '(' prefix_list ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(hfilter::MultipleColumnPrefixFilter, parse_ctx->allocator(), $3);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse MultipleColumnPrefixFilter");
YYABORT;
}
}
| FamilyFilter '(' compare_op ',' comparator ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
hfilter::Comparable *comparable = nullptr;
if (OB_FAIL(parse_ctx->create_comparator($5, comparable))) {
LOG_WARN("failed to create comparator", K(ret));
} else {
$$ = OB_NEWx(hfilter::FamilyFilter, parse_ctx->allocator(), $3, comparable);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(static_cast<hfilter::FamilyFilter*>($$)->check_arguments())) {
LOG_WARN("failed to check arguments", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse FamilyFilter");
YYABORT;
}
}
| ColumnCountGetFilter '(' INT_VALUE ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
@ -470,6 +712,21 @@ simple_filter:
YYABORT;
}
}
| FirstKeyValueMatchingQualifiersFilter '(' param_list ')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(hfilter::FirstKeyValueMatchingQualifiersFilter, parse_ctx->allocator(), $3);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(parse_ctx->store_filter($$))) {
LOG_WARN("failed to store filter", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse FirstKeyValueMatchingQualifiersFilter");
YYABORT;
}
}
| CheckAndMutateFilter '(' compare_op ',' comparator ',' family ',' qualifier ',' BOOL_VALUE')'
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
@ -536,6 +793,73 @@ comparator:
STRING_VALUE
{ $$ = $1; }
;
param_list:
STRING_VALUE
{
typedef ObSEArray<ObString, 8> param_array;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(param_array, parse_ctx->allocator());
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL($$->push_back(parse_ctx->create_ob_string($1)))) {
LOG_WARN("failed to push back", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse param list");
YYABORT;
}
}
| param_list ',' STRING_VALUE
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
if (OB_FAIL($1->push_back(parse_ctx->create_ob_string($3)))) {
LOG_WARN("failed to push back", K(ret));
}
$$ = $1;
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse param list");
YYABORT;
}
}
;
fuzzy_list:
STRING_VALUE ',' STRING_VALUE
{
typedef oceanbase::table::hfilter::ObPair<oceanbase::ObString, oceanbase::ObString> FuzzyKey;
typedef ObSEArray<FuzzyKey*, 8> ParamArray;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(ParamArray, parse_ctx->allocator());
FuzzyKey *pair = OB_NEWx(FuzzyKey, parse_ctx->allocator(), parse_ctx->create_ob_string($1), parse_ctx->create_ob_string($3));
if (nullptr == $$ || nullptr == pair) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL($$->push_back(pair))) {
LOG_WARN("failed to push back", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse param list");
YYABORT;
}
}
| fuzzy_list ',' STRING_VALUE ',' STRING_VALUE
{
typedef oceanbase::table::hfilter::ObPair<oceanbase::ObString, oceanbase::ObString> FuzzyKey;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
FuzzyKey *pair = OB_NEWx(FuzzyKey, parse_ctx->allocator(), parse_ctx->create_ob_string($3), parse_ctx->create_ob_string($5));
if (nullptr == pair) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL($1->push_back(pair))) {
LOG_WARN("failed to push back", K(ret));
}
$$ = $1;
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse param list");
YYABORT;
}
}
;
timestamp_list:
INT_VALUE
{
@ -566,4 +890,116 @@ timestamp_list:
}
}
;
%%
range:
STRING_VALUE ',' BOOL_VALUE ',' STRING_VALUE ',' BOOL_VALUE
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(KeyRange, parse_ctx->allocator(), parse_ctx->create_ob_string($1), $3, parse_ctx->create_ob_string($5), $7);
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse range");
YYABORT;
}
}
;
range_list:
range
{
typedef ObSEArray<KeyRange*, 8> RangeArray;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(RangeArray, parse_ctx->allocator());
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL($$->push_back($1))) {
LOG_WARN("failed to push back", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse range list");
YYABORT;
}
}
| range_list ',' range
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
if (OB_FAIL($1->push_back($3))) {
LOG_WARN("failed to push back", K(ret));
}
$$ = $1;
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse range list");
YYABORT;
}
}
;
prefix_list:
STRING_VALUE
{
typedef ObSEArray<KeyRange*, 8> RangeArray;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
ObString prefix = parse_ctx->create_ob_string($1);
KeyRange *range = OB_NEWx(KeyRange, parse_ctx->allocator());
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else {
range->set_min(prefix);
range->set_min_inclusive(true);
$$ = OB_NEWx(RangeArray, parse_ctx->allocator());
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else if (OB_FAIL(ObHTableUtils::get_prefix_key_range(*parse_ctx->allocator(), prefix, range))) {
LOG_WARN("failed to create key range", K(ret));
} else if (OB_FAIL($$->push_back(range))) {
LOG_WARN("failed to push back", K(ret));
}
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse prefix list");
YYABORT;
}
}
| prefix_list ',' STRING_VALUE
{
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
ObString prefix = parse_ctx->create_ob_string($3);
KeyRange *range = OB_NEWx(KeyRange, parse_ctx->allocator());
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
} else {
range->set_min(prefix);
range->set_min_inclusive(true);
if (OB_FAIL(ObHTableUtils::get_prefix_key_range(*parse_ctx->allocator(), prefix, range))) {
LOG_WARN("failed to create key range", K(ret));
} else if (OB_FAIL($1->push_back(range))) {
LOG_WARN("failed to push back", K(ret));
}
$$ = $1;
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse prefix list");
YYABORT;
}
}
| /*EMPTY*/
{
typedef ObSEArray<KeyRange*, 8> RangeArray;
int &ret = parse_ctx->error_code_ = OB_SUCCESS;
$$ = OB_NEWx(RangeArray, parse_ctx->allocator());
if (nullptr == $$) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret));
}
if (OB_SUCCESS != ret) {
ob_hfilter_error(&(@$), parse_ctx, "failed to parse prefix list");
YYABORT;
}
}
;
%%

View File

@ -555,13 +555,13 @@ int ObHTableScanMatcher::set_to_new_row(const ObHTableCell &arg_curr_row)
return ret;
}
int ObHTableScanMatcher::get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell)
int ObHTableScanMatcher::get_next_cell_hint(const ObHTableCell &cell, ObHTableCell *&new_cell)
{
int ret = OB_SUCCESS;
if (hfilter_ == NULL) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("filter is null", K(ret));
} else if (OB_FAIL(hfilter_->get_next_cell_hint(allocator, cell, new_cell))) {
} else if (OB_FAIL(hfilter_->get_next_cell_hint(allocator_, cell, new_cell))) {
LOG_WARN("failed to get next hint cell of filter", K(ret), K(cell));
}
return ret;
@ -570,7 +570,8 @@ int ObHTableScanMatcher::get_next_cell_hint(common::ObIAllocator &allocator, con
////////////////////////////////////////////////////////////////
ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query)
: ObTableQueryResultIterator(&query),
allocator_("HtblRowItAloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
allocator_("HtbRowItAloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
forward_range_alloc_("HtbForwRanAloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
child_op_(NULL),
curr_cell_(),
scan_order_(query.get_scan_order()),
@ -601,8 +602,8 @@ ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query)
scanner_context_(NULL)
{
if (query.get_scan_ranges().count() != 0) {
start_row_key_ = query.get_scan_ranges().at(ObHTableConstants::COL_IDX_K).start_key_;
stop_row_key_ = query.get_scan_ranges().at(ObHTableConstants::COL_IDX_K).end_key_;
start_row_key_ = query.get_scan_ranges().at(0).start_key_;
stop_row_key_ = query.get_scan_ranges().at(0).end_key_;
}
}
@ -627,7 +628,7 @@ int ObHTableRowIterator::get_next_cell_hint()
{
int ret = OB_SUCCESS;
ObHTableCell *hint_cell = nullptr;
if (OB_FAIL(matcher_->get_next_cell_hint(allocator_, curr_cell_, hint_cell))) {
if (OB_FAIL(matcher_->get_next_cell_hint(curr_cell_, hint_cell))) {
LOG_WARN("failed to get next hint cell of filter", K(ret), K_(curr_cell));
} else {
if (hint_cell != NULL) {
@ -662,11 +663,10 @@ int ObHTableRowIterator::get_next_cell_hint()
try_record_expired_rowkey(cur_version + skipped_count, tmp_cell->get_rowkey());
}
cur_version = 1;
next_cell->~ObHTableCell();
next_cell = nullptr;
}
}
hint_cell->~ObHTableCell();
allocator_.reuse();
} else if (OB_FAIL(next_cell())) {
LOG_WARN("failed to get next cell", K(ret));
}
@ -674,10 +674,11 @@ int ObHTableRowIterator::get_next_cell_hint()
return ret;
}
int ObHTableRowIterator::get_next_result(ObTableQueryIterableResult *&out_result)
int ObHTableRowIterator::get_next_result(ObTableQueryDListResult *&out_result)
{
int ret = OB_SUCCESS;
out_result = &one_iterable_hbase_row_;
one_iterable_hbase_row_.reset();
if (OB_FAIL(get_next_result_internal(out_result))) {
LOG_WARN("fail to get next result", K(ret));
}
@ -751,7 +752,10 @@ int ObHTableRowIterator::get_next_result_internal(ResultType*& result)
LOG_WARN("matcher should have valid first cell", K(ret));
} else if (NULL != hfilter_) {
const ObHTableCell &first_cell = curr_cell_;
if (hfilter_->filter_row_key(first_cell)) {
bool filtered = false;
if (OB_FAIL(hfilter_->filter_row_key(first_cell, filtered))) {
LOG_WARN("failed to filter row key", K(ret), K(first_cell));
} else if (filtered) {
// filter out the current row and fetch the next row
hfilter_->reset();
LOG_DEBUG("filter_row_key skip the row", K(ret));
@ -762,6 +766,7 @@ int ObHTableRowIterator::get_next_result_internal(ResultType*& result)
}
while (OB_SUCC(ret) && loop) {
match_code = ObHTableMatchCode::DONE_SCAN; // initialize
curr_cell_.set_family(family_name_);
if (OB_FAIL(matcher_->match(curr_cell_, match_code))) {
LOG_WARN("failed to match cell", K(ret));
} else {
@ -841,7 +846,7 @@ int ObHTableRowIterator::get_next_result_internal(ResultType*& result)
// when scan return OB_ITER_END, cur_cell_ will be empty
// but the ret code will be covered
if (NULL != curr_cell_.get_ob_row()) {
seek_or_skip_to_next_col(curr_cell_);
ret = seek_or_skip_to_next_col(curr_cell_);
}
break;
case ObHTableMatchCode::SEEK_NEXT_ROW:
@ -881,7 +886,7 @@ void ObHTableRowIterator::init_table_group_value()
}
}
int ObHTableRowIterator::add_new_row(const ObNewRow &row, ObTableQueryIterableResult *&out_result)
int ObHTableRowIterator::add_new_row(const ObNewRow &row, ObTableQueryDListResult *&out_result)
{
int ret = OB_SUCCESS;
if (OB_FAIL(out_result->add_row(row, family_name_))) {
@ -921,6 +926,7 @@ int ObHTableRowIterator::seek(ObHTableCell &key, int32_t &skipped_count)
break;
}
}
allocator_.reuse();
++skipped_count;
}
return ret;
@ -953,29 +959,29 @@ int ObHTableRowIterator::seek_first_cell_on_hint(const ObNewRow *ob_row)
ObTableCtx &forward_tb_ctx = forward_child_op->get_scan_executor()->get_table_ctx();
ObNewRange &forward_range = forward_tb_ctx.get_key_ranges().at(0);
forward_tb_ctx.set_limit(-1);
forward_range_alloc_.reuse();
if (scan_order_ == ObQueryFlag::Forward) {
forward_range.end_key_ = stop_row_key_;
} else {
forward_range.end_key_.reset();
forward_range.end_key_.set_max_row();
}
ObObj *start_key = nullptr;
if (ob_row->count_ < 3) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ob_row start_key obj_cnt_ is less than 3", K(ret), K(ob_row->count_));
} else if (OB_ISNULL(start_key = static_cast<ObObj *>(allocator_.alloc(sizeof(ObObj) * 3)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for start_obj failed", K(ret));
} else if (OB_ISNULL(start_key = static_cast<ObObj *>(forward_range_alloc_.alloc(sizeof(ObObj) * 3)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for start_obj failed", K(ret));
} else {
for (int i = 0; OB_SUCC(ret) && i < ob_row->count_ && i < 3; ++i) {
if (OB_FAIL(ob_write_obj(allocator_, ob_row->cells_[i], start_key[i]))) {
if (OB_FAIL(ob_write_obj(forward_range_alloc_, ob_row->cells_[i], start_key[i]))) {
LOG_WARN("failed to write obj", K(ret));
}
}
}
if (OB_SUCC(ret)) {
forward_range.start_key_.assign(start_key, 3);
ObNewRow *first_cell_on_row = NULL;
ObNewRow *first_cell_on_row = nullptr;
if (OB_FAIL(rescan_and_get_next_row(forward_child_op, first_cell_on_row))) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to rescan and get next row", K(ret));
@ -988,10 +994,8 @@ int ObHTableRowIterator::seek_first_cell_on_hint(const ObNewRow *ob_row)
LOG_DEBUG("iterator end", K_(has_more_cells));
}
} else {
curr_cell_.reset(allocator_);
if (OB_FAIL(curr_cell_.deep_copy_ob_row(first_cell_on_row, allocator_))) {
LOG_WARN("failed to deep copy ob row", K(ret), K(first_cell_on_row));
}
allocator_.reuse();
curr_cell_.set_ob_row(first_cell_on_row);
}
}
return ret;
@ -1026,7 +1030,6 @@ int ObHTableRowIterator::seek_or_skip_to_next_row(const ObHTableCell &cell)
}
cur_version = 1;
next_cell->~ObHTableCell();
allocator_.reuse();
}
}
matcher_->clear_curr_row();
@ -1132,7 +1135,12 @@ void ObHTableRowIterator::try_record_expired_rowkey(const ObString &rowkey)
////////////////////////////////////////////////////////////////
ObHTableReversedRowIterator::ObHTableReversedRowIterator(const ObTableQuery &query)
: ObHTableRowIterator(query), spec_(nullptr), forward_tb_ctx_(allocator_), expr_frame_info_(allocator_)
: ObHTableRowIterator(query),
iter_allocator_("HtbRevIterAloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
reversed_range_alloc_("HtbRevRanAloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
spec_(nullptr),
forward_tb_ctx_(iter_allocator_),
expr_frame_info_(iter_allocator_)
{}
ObHTableReversedRowIterator::~ObHTableReversedRowIterator()
@ -1151,36 +1159,13 @@ ObHTableReversedRowIterator::~ObHTableReversedRowIterator()
}
}
int ObHTableReversedRowIterator::seek_to_max_row()
{
int ret = OB_SUCCESS;
ObTableCtx &reversed_tb_ctx = child_op_->get_scan_executor()->get_table_ctx();
ObNewRange &reversed_range = reversed_tb_ctx.get_key_ranges().at(0);
reversed_range.reset();
reversed_range.start_key_.set_min_row();
reversed_range.end_key_.set_max_row();
reversed_tb_ctx.set_limit(1);
ObNewRow *last_cell = NULL;
if (OB_FAIL(rescan_and_get_next_row(child_op_, last_cell))) {
if (OB_ITER_END == ret) {
LOG_INFO("no data in table", K(ret));
} else {
LOG_WARN("failed to rescan and get next row", K(ret));
}
} else {
stop_row_key_.reset();
stop_row_key_.assign(last_cell->cells_, last_cell->count_);
}
return ret;
}
int ObHTableReversedRowIterator::init()
{
int ret = OB_SUCCESS;
if (is_inited()) {
ret = OB_INIT_TWICE;
LOG_WARN("reverse iter inited twice", K(ret));
} else if (OB_ISNULL(forward_child_op_ = OB_NEWx(ObTableApiScanRowIterator, &allocator_))) {
} else if (OB_ISNULL(forward_child_op_ = OB_NEWx(ObTableApiScanRowIterator, &iter_allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("forward_child_op_ alloc memory error", K(ret));
} else if (start_row_key_.length() == 0) {
@ -1203,41 +1188,22 @@ int ObHTableReversedRowIterator::init()
} else {
// curr_cell_ needs to be initialized, and the pointer should be directed
// to either the last `startKey` or the first cell of the preceding row.
if (stop_row_key_.is_max_row() && OB_FAIL(seek_to_max_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("seek to max row failed", K(ret));
}
} else {
ObHTableFirstOnRowCell tmp_cell(stop_row_key_.get_obj_ptr()[ObHTableConstants::COL_IDX_K].get_varchar());
ObHTableCell *new_cell = NULL;
ObHTableUtils::create_first_cell_on_row(allocator_, tmp_cell, new_cell);
if (OB_FAIL(seek_first_cell_on_hint(static_cast<ObHTableCellEntity *>(new_cell)->get_ob_row()))) {
if (OB_ITER_END == ret) {
has_more_cells_ = true;
ret = OB_SUCCESS;
ObHTableFirstOnRowCell cell(stop_row_key_.get_obj_ptr()[ObHTableConstants::COL_IDX_K].get_varchar());
if (OB_FAIL(seek_or_skip_to_next_row(cell))) {
LOG_WARN("failed to seek to next row in init", K(ret));
}
} else {
LOG_WARN("failed to rescan and get next row", K(ret));
}
} else if (0 < ObHTableUtils::compare_rowkey(curr_cell_.get_rowkey(),
stop_row_key_.get_obj_ptr()[ObHTableConstants::COL_IDX_K].get_varchar())) {
ObHTableFirstOnRowCell cell(stop_row_key_.get_obj_ptr()[ObHTableConstants::COL_IDX_K].get_varchar());
if (OB_FAIL(seek_or_skip_to_next_row(cell))) {
LOG_WARN("failed to seek to next row in init", K(ret));
}
} else if (0 >= ObHTableUtils::compare_rowkey(curr_cell_.get_rowkey(),
start_row_key_.get_obj_ptr()[ObHTableConstants::COL_IDX_K].get_varchar())) {
ObNewRow *tmp_next_row = nullptr;
ObHTableCell *new_cell = nullptr;
if (OB_FAIL(child_op_->get_next_row(tmp_next_row))) {
if (OB_ITER_END == ret) {
has_more_cells_ = false;
curr_cell_.reset(allocator_);
ret = OB_ITER_END;
LOG_DEBUG("no data in table", K(ret));
} else {
LOG_WARN("failed to rescan and get next row", K(ret));
}
new_cell->~ObHTableCell();
} else if (OB_FAIL(ObHTableUtils::create_first_cell_on_row(
allocator_, tmp_next_row->get_cell(ObHTableConstants::COL_IDX_K).get_string(), new_cell))) {
LOG_WARN("failed to create first cell on row", K(ret));
} else if (OB_FAIL(seek_first_cell_on_hint(static_cast<ObHTableCellEntity *>(new_cell)->get_ob_row()))) {
LOG_WARN("failed to get first cell on row in init", K(ret));
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
@ -1248,7 +1214,10 @@ int ObHTableReversedRowIterator::next_cell()
{
int ret = OB_SUCCESS;
ObNewRow *ob_row = NULL;
if (OB_FAIL(forward_child_op_->get_next_row(ob_row))) {
ObHTableCellEntity cell_clone;
if (OB_FAIL(cell_clone.deep_copy_ob_row(curr_cell_.get_ob_row(), allocator_))) {
LOG_WARN("allocate memory for row_key failed", K(ret));
} else if (OB_FAIL(forward_child_op_->get_next_row(ob_row))) {
if (OB_ITER_END == ret) {
// cover recode when ret = OB_OB_ITER_END
ret = OB_SUCCESS;
@ -1260,21 +1229,18 @@ int ObHTableReversedRowIterator::next_cell()
} else {
LOG_WARN("failed to get next row", K(ret));
}
} else if (0 == ObHTableUtils::compare_rowkey(
curr_cell_.get_rowkey(), ob_row->get_cell(ObHTableConstants::COL_IDX_K).get_varchar())) {
} else if (FALSE_IT(curr_cell_.set_ob_row(ob_row))) {
} else if (0 == ObHTableUtils::compare_rowkey(cell_clone.get_rowkey(), curr_cell_.get_rowkey())) {
// same rowkey
curr_cell_.reset(allocator_);
if (OB_FAIL(curr_cell_.deep_copy_ob_row(ob_row, allocator_))) {
LOG_WARN("failed to deep copy ob row", K(ret));
}
try_record_expired_rowkey(curr_cell_);
} else {
if (OB_FAIL(seek_or_skip_to_next_row(curr_cell_))) {
if (OB_FAIL(seek_or_skip_to_next_row(cell_clone))) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to seek to next row when get next cell", K(ret));
}
}
}
allocator_.reuse();
return ret;
}
@ -1290,15 +1256,18 @@ int ObHTableReversedRowIterator::seek_or_skip_to_next_row(const ObHTableCell &ce
{
int ret = OB_SUCCESS;
ObTableCtx &reversed_tb_ctx = child_op_->get_scan_executor()->get_table_ctx();
reversed_tb_ctx.set_limit(1);
ObNewRange &reverse_range = reversed_tb_ctx.get_key_ranges().at(0);
ObString rowkey_clone;
reversed_range_alloc_.reuse();
reverse_range.start_key_ = start_row_key_;
ObObj *end_key = nullptr;
if (OB_ISNULL(end_key = static_cast<ObObj *>(allocator_.alloc(sizeof(ObObj) * 3)))) {
if (OB_ISNULL(end_key = static_cast<ObObj *>(reversed_range_alloc_.alloc(sizeof(ObObj) * 3)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for end_key failed", K(ret));
} else if (OB_FAIL(ob_write_string(reversed_range_alloc_, cell.get_rowkey(), rowkey_clone))) {
LOG_WARN("allocate memory for row_key failed", K(ret));
} else {
end_key[ObHTableConstants::COL_IDX_K].set_varbinary(cell.get_rowkey());
end_key[ObHTableConstants::COL_IDX_K].set_varbinary(rowkey_clone);
end_key[ObHTableConstants::COL_IDX_Q].set_min_value();
end_key[ObHTableConstants::COL_IDX_T].set_min_value();
reverse_range.end_key_.assign(end_key, 3);
@ -1309,7 +1278,6 @@ int ObHTableReversedRowIterator::seek_or_skip_to_next_row(const ObHTableCell &ce
} else {
LOG_DEBUG("reverse scan has no more cell", K(ret), K(has_more_cells_));
has_more_cells_ = false;
curr_cell_.reset(allocator_);
}
} else {
ObObj hint_cell[3];
@ -1336,18 +1304,25 @@ int ObHTableReversedRowIterator::seek_or_skip_to_next_col(const ObHTableCell &ce
ObNewRange &column_range = forward_tb_ctx_.get_key_ranges().at(0);
ObObj *start_key = nullptr;
ObObj *end_key = nullptr;
if (OB_ISNULL(start_key = static_cast<ObObj *>(allocator_.alloc(sizeof(ObObj) * 3)))) {
forward_range_alloc_.reuse();
ObString rowkey_clone;
ObString qualifier_clone;
if (OB_ISNULL(start_key = static_cast<ObObj *>(forward_range_alloc_.alloc(sizeof(ObObj) * 3)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for start_key failed", K(ret));
} else if (OB_ISNULL(end_key = static_cast<ObObj *>(allocator_.alloc(sizeof(ObObj) * 3)))) {
} else if (OB_ISNULL(end_key = static_cast<ObObj *>(forward_range_alloc_.alloc(sizeof(ObObj) * 3)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for end_key failed", K(ret));
} else if (OB_FAIL(ob_write_string(forward_range_alloc_, cell.get_rowkey(), rowkey_clone))) {
LOG_WARN("allocate memory for row_key failed", K(ret));
} else if (OB_FAIL(ob_write_string(forward_range_alloc_, cell.get_qualifier(), qualifier_clone))) {
LOG_WARN("allocate memory for qualifier_key failed", K(ret));
} else {
end_key[ObHTableConstants::COL_IDX_K].set_varbinary(cell.get_rowkey());
end_key[ObHTableConstants::COL_IDX_K].set_varbinary(rowkey_clone);
end_key[ObHTableConstants::COL_IDX_Q].set_max_value();
end_key[ObHTableConstants::COL_IDX_T].set_max_value();
start_key[ObHTableConstants::COL_IDX_K].set_varbinary(cell.get_rowkey());
start_key[ObHTableConstants::COL_IDX_Q].set_varbinary(cell.get_qualifier());
start_key[ObHTableConstants::COL_IDX_K].set_varbinary(rowkey_clone);
start_key[ObHTableConstants::COL_IDX_Q].set_varbinary(qualifier_clone);
start_key[ObHTableConstants::COL_IDX_T].set_max_value();
column_range.start_key_.assign(start_key, 3);
column_range.end_key_.assign(end_key, 3);
@ -1363,10 +1338,20 @@ int ObHTableReversedRowIterator::seek_or_skip_to_next_col(const ObHTableCell &ce
}
}
} else {
curr_cell_.reset(allocator_);
if (OB_FAIL(curr_cell_.deep_copy_ob_row(ob_next_column_row, allocator_))) {
LOG_WARN("failed to deep copy ob row", K(ret));
}
allocator_.reuse();
curr_cell_.set_ob_row(ob_next_column_row);
}
}
return ret;
}
int ObHTableReversedRowIterator::seek(ObHTableCell &key, int32_t &skipped_count)
{
int ret = OB_SUCCESS;
UNUSED(skipped_count);
if (OB_FAIL(seek_first_cell_on_hint(static_cast<ObHTableCellEntity *>(&key)->get_ob_row()))) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to seek first cell on hint", K(ret));
}
}
return ret;
@ -1403,7 +1388,7 @@ int ObHTableReversedRowIterator::init_async_forward_tb_ctx()
LOG_WARN("fail to construct column items", K(ret));
} else if (OB_FAIL(forward_tb_ctx_.generate_table_schema_for_cg())) {
LOG_WARN("fail to generate table schema", K(ret));
} else if (OB_FAIL(ObTableExprCgService::generate_exprs(forward_tb_ctx_, allocator_, expr_frame_info_))) {
} else if (OB_FAIL(ObTableExprCgService::generate_exprs(forward_tb_ctx_, iter_allocator_, expr_frame_info_))) {
LOG_WARN("fail to generate exprs", K(ret), K(forward_tb_ctx_));
} else if (OB_FAIL(ObTableExprCgService::alloc_exprs_memory(forward_tb_ctx_, expr_frame_info_))) {
LOG_WARN("fail to alloc expr memory", K(ret));
@ -1416,7 +1401,7 @@ int ObHTableReversedRowIterator::init_async_forward_tb_ctx()
forward_tb_ctx_.set_expr_info(&expr_frame_info_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObTableSpecCgService::generate<TABLE_API_EXEC_SCAN>(allocator_, forward_tb_ctx_, spec_))) {
if (OB_FAIL(ObTableSpecCgService::generate<TABLE_API_EXEC_SCAN>(iter_allocator_, forward_tb_ctx_, spec_))) {
LOG_WARN("fail to generate scan spec", K(ret), K(forward_tb_ctx_));
}
}
@ -1482,7 +1467,8 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query, table:
is_first_result_(true),
check_existence_only_(false),
scanner_context_(),
is_inited_(false)
is_inited_(false),
row_count_(0)
{
}
@ -1500,7 +1486,8 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
is_first_result_(true),
check_existence_only_(false),
scanner_context_(),
is_inited_(false)
is_inited_(false),
row_count_(0)
{}
ObHTableFilterOperator::~ObHTableFilterOperator()
@ -1510,6 +1497,11 @@ ObHTableFilterOperator::~ObHTableFilterOperator()
}
}
bool ObHTableFilterOperator::reach_limit()
{
return query_->get_limit() > 0 && row_count_ >= query_->get_limit();
}
bool ObHTableFilterOperator::reach_caching_limit(int num_of_row)
{
return caching_ >= 0 && num_of_row >= caching_;
@ -1544,7 +1536,7 @@ int ObHTableFilterOperator::get_next_result_internal(ResultType *&next_result)
{
int ret = OB_SUCCESS;
bool has_filter_row = (NULL != filter_) && (filter_->has_filter_row());
ResultType *htable_row = nullptr;
ObTableQueryDListResult *htable_row = nullptr;
// ObNewRow first_entity;
// ObObj first_entity_cells[4];
// first_entity.cells_ = first_entity_cells;
@ -1637,6 +1629,11 @@ int ObHTableFilterOperator::get_next_result_internal(ResultType *&next_result)
break;
}
}
num_of_row++;
row_count_++;
if (reach_limit()) {
row_iterator_->no_more_result();
}
} // end while
if (!row_iterator_->has_more_result()) {
@ -1691,6 +1688,7 @@ int ObHTableFilterOperator::init(common::ObIAllocator *allocator)
} else if (OB_FAIL(filter_parser_.parse_filter(hfilter_string, filter_))) {
LOG_WARN("failed to parse filter", K(ret), K(hfilter_string));
} else {
filter_->set_reversed(ObQueryFlag::Reverse == query_->get_scan_order());
row_iterator_->set_hfilter(filter_);
}
if (OB_SUCC(ret) && OB_NOT_NULL(ob_kv_params_.ob_params_)) {

View File

@ -267,7 +267,7 @@ public:
const ObHTableCell* get_curr_row() const;
void clear_curr_row() { curr_row_.set_ob_row(NULL); }
int set_to_new_row(const ObHTableCell &curr_row);
int get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell);
int get_next_cell_hint(const ObHTableCell &cell, ObHTableCell *&new_cell);
bool is_curr_row_empty() const { return NULL == curr_row_.get_ob_row(); }
ObHTableMatchCode merge_filter_return_code(const ObHTableCell &cell,
const ObHTableMatchCode match_code,
@ -294,13 +294,14 @@ public:
};
/// Fetch next row
virtual int get_next_result(ObTableQueryResult *&one_row) override;
virtual int get_next_result(ObTableQueryIterableResult *&one_row) override;
int seek(ObHTableCell &key, int32_t &skipped_count);
int get_next_result(ObTableQueryDListResult *&one_result);
virtual int seek(ObHTableCell &key, int32_t &skipped_count);
virtual void set_scan_result(table::ObTableApiScanRowIterator *scan_result) override
{
child_op_ = scan_result;
}
bool has_more_result() const { return has_more_cells_; }
void no_more_result() { has_more_cells_ = false; }
void set_hfilter(table::hfilter::Filter *hfilter);
void set_ttl(int32_t ttl_value);
virtual int init()
@ -320,6 +321,7 @@ public:
private:
template <typename ResultType>
int get_next_result_internal(ResultType*& result);
virtual int get_next_result(ObTableQueryIterableResult *&one_result) override { return OB_SUCCESS; }
protected:
virtual int rescan_and_get_next_row(table::ObTableApiScanRowIterator *tb_op, ObNewRow *&ob_next_row);
@ -333,7 +335,7 @@ protected:
int get_next_cell_hint();
virtual int add_new_row(const ObNewRow &row, ObTableQueryResult *&out_result);
int add_new_row(const ObNewRow &row, ObTableQueryIterableResult *&out_result);
int add_new_row(const ObNewRow &row, ObTableQueryDListResult *&out_result);
protected:
// try record expired rowkey accord cell's timestamp
@ -345,6 +347,7 @@ protected:
protected:
common::ObArenaAllocator allocator_; // used for deep copy of curr_cell_
common::ObArenaAllocator forward_range_alloc_; // used for forward range
table::ObTableApiScanRowIterator *child_op_;
ObHTableCellEntity curr_cell_;
common::ObQueryFlag::ScanOrder scan_order_;
@ -365,7 +368,7 @@ private:
int32_t max_version_; // Column family max_version
table::ObTableQueryResult one_hbase_row_;
table::ObTableQueryIterableResult one_iterable_hbase_row_;
table::ObTableQueryDListResult one_iterable_hbase_row_;
ObHTableColumnTracker *column_tracker_;
ObHTableWildcardColumnTracker column_tracker_wildcard_;
ObHTableExplicitColumnTracker column_tracker_explicit_;
@ -390,16 +393,17 @@ public:
private:
virtual int next_cell() override;
int seek(ObHTableCell &key, int32_t &skipped_count) override;
virtual int seek_or_skip_to_next_row(const ObHTableCell &cell) override;
virtual int seek_or_skip_to_next_col(const ObHTableCell &cell) override;
int seek_first_cell_on_row(const ObNewRow *ob_row);
int init_forward_tb_ctx();
int init_async_forward_tb_ctx();
virtual table::ObTableApiScanRowIterator* get_forward_child_op() { return forward_child_op_; }
int create_forward_child_op();
int seek_to_max_row();
private:
common::ObArenaAllocator iter_allocator_; // used for forward iter
common::ObArenaAllocator reversed_range_alloc_; // used for reversed range
table::ObTableApiScanRowIterator *forward_child_op_;
ObTableApiCacheGuard cache_guard_;
ObTableApiSpec *spec_;
@ -415,6 +419,7 @@ public:
ObHTableFilterOperator(const ObTableQuery &query, table::ObTableQueryIterableResult &one_result);
virtual ~ObHTableFilterOperator();
/// Fetch next batch result
bool reach_limit();
bool reach_caching_limit(int num_of_row);
virtual int get_next_result(ObTableQueryResult *&one_result) override;
virtual int get_next_result(ObTableQueryIterableResult *&one_result) override;
@ -462,6 +467,7 @@ private:
bool check_existence_only_;
ScannerContext scanner_context_;
bool is_inited_;
int32_t row_count_;
};
} // end namespace table

File diff suppressed because it is too large Load Diff

View File

@ -55,7 +55,7 @@ public:
/// If this returns true, the scan will terminate.
virtual bool filter_all_remaining() = 0;
/// Filters a row based on the row key.
virtual bool filter_row_key(const ObHTableCell &first_row_cell) = 0;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) = 0;
/// Will filter use hint to seek.
virtual bool is_hinting_filter() = 0;
/// A way to filter based on the column family, column qualifier and/or the column value.
@ -64,7 +64,7 @@ public:
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) = 0;
/// Chance to alter the list of Cells to be submitted.
virtual int filter_row_cells(const RowCells &cells) = 0;
virtual int filter_row_cells(const table::ObTableQueryIterableResult &cells) = 0;
virtual int filter_row_cells(ObTableQueryDListResult &cells) = 0;
/// Last chance to veto row based on previous filterCell(Cell) calls.
virtual bool filter_row() = 0;
/// for tableApi filter
@ -76,7 +76,7 @@ public:
virtual int64_t get_format_filter_string_length() const = 0;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const = 0;
virtual int get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell) = 0;
void set_reversed(bool reversed) { is_reversed_ = reversed; }
virtual void set_reversed(bool reversed) { is_reversed_ = reversed; }
bool is_reversed() const { return is_reversed_; }
virtual void set_hbase_version(const ObString &version) { hbase_major_version_ = version[0] - '0'; }
int8_t get_hbase_major_version() const { return hbase_major_version_; }
@ -115,14 +115,14 @@ public:
virtual void reset() override {}
virtual bool filter_all_remaining() override { return false; }
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override
{ UNUSED(first_row_cell); if (filter_all_remaining()) return true; else return false; }
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override
{ UNUSED(first_row_cell); if (filter_all_remaining()) filtered = true; else filtered = false; return OB_SUCCESS; }
virtual bool is_hinting_filter() override { return false; }
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) override
{ UNUSED(cell); return common::OB_SUCCESS; }
virtual int filter_row_cells(const RowCells &cells) override
{ UNUSED(cells); return common::OB_SUCCESS; }
virtual int filter_row_cells(const table::ObTableQueryIterableResult &cells) override
virtual int filter_row_cells(ObTableQueryDListResult &cells) override
{UNUSED(cells); return common::OB_SUCCESS;}
virtual bool filter_row() override { return false; }
virtual bool has_filter_row() override { return false; }
@ -240,6 +240,7 @@ public:
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const = 0;
protected:
bool compare_row(CompareOperator op, Comparable &comparator, const ObHTableCell &cell);
bool compare_family(CompareOperator op, Comparable &comparator, const ObHTableCell &cell);
bool compare_qualifier(CompareOperator op, Comparable &comparator, const ObHTableCell &cell);
bool compare_value(CompareOperator op, Comparable &comparator, const ObHTableCell &cell);
protected:
@ -260,7 +261,7 @@ public:
{}
virtual ~RowFilter();
virtual void reset() override;
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual bool filter_row() override;
virtual int64_t get_format_filter_string_length() const override;
@ -313,6 +314,50 @@ private:
DISALLOW_COPY_AND_ASSIGN(ValueFilter);
};
// this filter is used to filter based on the target column value and cells with the same timestamp
class DependentColumnFilter: public CompareFilter
{
public:
DependentColumnFilter(const ObString &family, const ObString &qualifier, bool drop_dependent_column)
:CompareFilter(CompareOperator::NO_OP, NULL),
family_(family),
qualifier_(qualifier),
drop_dependent_column_(drop_dependent_column),
is_inited_(false)
{}
DependentColumnFilter(const ObString &family, const ObString &qualifier, bool drop_dependent_column, CompareOperator cmp_op, Comparable *comparator)
:CompareFilter(cmp_op, comparator),
family_(family),
qualifier_(qualifier),
drop_dependent_column_(drop_dependent_column),
is_inited_(false)
{}
virtual ~DependentColumnFilter();
virtual bool has_filter_row() override { return true; }
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "DependentColumnFilter",
K_(family),
K_(qualifier),
K_(drop_dependent_column),
"cmp_op", compare_operator_to_string(cmp_op_),
K_(comparator));
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int filter_row_cells(ObTableQueryDListResult &cells) override;
virtual void reset() override;
int init();
private:
ObString family_;
ObString qualifier_;
bool drop_dependent_column_; // default as false
bool is_inited_;
common::hash::ObHashSet<int64_t> stamp_set_;
// disallow copy
DISALLOW_COPY_AND_ASSIGN(DependentColumnFilter);
};
/// represents an ordered List of Filters which will be evaluated with a specified boolean operator
/// FilterList.Operator.MUST_PASS_ALL (AND) or FilterList.Operator.MUST_PASS_ONE (OR).
class FilterListBase: public FilterBase
@ -333,6 +378,7 @@ public:
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
virtual const char *filter_name() const { return "FilterListBase"; }
virtual void set_reversed(bool reversed) override;
virtual void set_hbase_version(const ObString &version) override;
TO_STRING_KV("filter", "FilterList",
@ -357,7 +403,7 @@ public:
virtual ~FilterListAND();
virtual void reset() override;
virtual bool filter_all_remaining() override;
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) override;
virtual bool filter_row() override;
@ -379,7 +425,7 @@ public:
virtual ~FilterListOR();
virtual void reset() override;
virtual bool filter_all_remaining() override;
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) override;
virtual bool filter_row() override;
@ -401,7 +447,7 @@ public:
{}
virtual ~SkipFilter();
virtual void reset() override;
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual bool filter_row() override;
virtual bool has_filter_row() override { return true; }
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) override;
@ -430,11 +476,12 @@ public:
virtual void reset() override;
virtual bool filter_all_remaining() override;
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int transform_cell(ObIAllocator &allocator, ObHTableCellEntity &cell) override;
virtual bool filter_row() override;
virtual bool has_filter_row() override { return true; }
virtual void set_reversed(bool reversed) override { if (nullptr != filter_) { filter_->set_reversed(reversed); } }
virtual void set_hbase_version(const ObString &version) override { filter_->set_hbase_version(version); }
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
@ -480,8 +527,9 @@ public:
K_(latest_version_only));
private:
bool filter_column_value(const ObHTableCell &cell);
bool match_column(const ObHTableCell &cell);
private:
protected:
bool match_family_column(const ObHTableCell &cell);
protected:
ObString family_;
ObString qualifier_;
CompareOperator cmp_op_;
@ -495,6 +543,31 @@ private:
DISALLOW_COPY_AND_ASSIGN(SingleColumnValueFilter);
};
// Simple filter that filters out matched cells filtered by SingleColumnValueFilter
class SingleColumnValueExcludeFilter: public SingleColumnValueFilter
{
public:
SingleColumnValueExcludeFilter(const ObString &family, const ObString &qualifier, CompareOperator cmp_op, Comparable *comparator)
:SingleColumnValueFilter(family, qualifier, cmp_op, comparator)
{}
virtual ~SingleColumnValueExcludeFilter();
virtual int filter_row_cells(ObTableQueryDListResult &cells) override;
virtual bool has_filter_row() override { return true; }
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "SingleColumnValueExcludeFilter",
K_(family),
K_(qualifier),
"cmp_op", compare_operator_to_string(cmp_op_),
K_(comparator),
K_(filter_if_missing),
K_(latest_version_only));
private:
// disallow copy
DISALLOW_COPY_AND_ASSIGN(SingleColumnValueExcludeFilter);
};
/// Simple filter that limits results to a specific page size
class PageFilter: public FilterBase
{
@ -527,7 +600,7 @@ public:
filter_out_row_(false)
{}
~RandomRowFilter() {}
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
void reset() override;
virtual bool has_filter_row() override { return true; }
@ -631,6 +704,141 @@ private:
DISALLOW_COPY_AND_ASSIGN(KeyOnlyFilter);
};
template<typename L, typename R>
struct ObPair
{
L left_; R right_;
public:
ObPair() : left_(), right_() {}
ObPair(const L &l, const R &r): left_(l), right_(r) {}
inline bool operator==(const ObPair &b) const
{
return b.left_ == left_ && b.right_ == right_;
}
TO_STRING_KV(K_(left), K_(right));
};
typedef ObPair<ObString, ObPair<ObString, ObString>> NextRowType; // <rowkey candidate, <fuzzy key, fuzzy meta>>
struct NextRowsComparator
{
bool reversed_;
NextRowsComparator(bool reversed) : reversed_(reversed) {}
bool operator() (const NextRowType *le, const NextRowType *re)
{
bool b_ret = false;
if (nullptr != le && nullptr != re) {
if (reversed_) {
b_ret = (le->left_ <= re->left_);
} else {
b_ret = (le->left_ >= re->left_);
} // to build a min-root heap
}
return b_ret;
}
int get_error_code() { return OB_SUCCESS; }
};
class Order
{
public:
virtual bool lt(char lhs, char rhs) = 0;
virtual bool gt(char lhs, char rhs) = 0;
virtual void inc(char &val) = 0;
virtual bool isMax(char val) = 0;
virtual int8_t min() const = 0;
};
class ASC : public Order
{
virtual bool lt(char lhs, char rhs) override { return lhs < rhs; }
virtual bool gt(char lhs, char rhs) override { return lhs > rhs; }
virtual void inc(char &val) override { val += 1; }
virtual bool isMax(char val) override { return val == -1; }
virtual int8_t min() const override { return 0; }
};
class DESC : public Order
{
virtual bool lt(char lhs, char rhs) override { return lhs > rhs; }
virtual bool gt(char lhs, char rhs) override { return lhs < rhs; }
virtual void inc(char &val) override { val -= 1; }
virtual bool isMax(char val) override { return val == 0; }
virtual int8_t min() const override { return -1; }
};
class RowTracker
{
public:
RowTracker(ObIAllocator &allocator, common::ObIArray<ObPair<ObString, ObString>*> &fuzzy_key_data, bool is_reversed)
: allocator_(allocator),
is_reversed_(is_reversed),
cmp_(is_reversed),
next_rows_(cmp_, &allocator),
fuzzy_key_data_(fuzzy_key_data) ,
order_(nullptr),
inited_(false)
{}
~RowTracker() {}
int init(const ObString &cur_row);
int update_tracker(const ObString &cur_row, bool &done); // initialize or update next_rows usuing current_cell
int next_row(ObString &row); // get top row from next_rows_
TO_STRING_KV("FuzzyRowFilter", "RowTracker", K_(is_reversed), K_(next_rows), K_(fuzzy_key_data));
private:
int update_with(const ObString &cur_row, const ObPair<ObString, ObString> &fuzzy_data);
// compare cur_cell rowkey and the given rowkey considering scan order
bool less_than(const ObString &cur_row, const ObString &next_row);
private:
ObIAllocator &allocator_;
bool is_reversed_;
NextRowsComparator cmp_;
ObBinaryHeap<NextRowType*, NextRowsComparator> next_rows_;
ObIArray<ObPair<ObString, ObString>*> &fuzzy_key_data_;
Order *order_;
bool inited_;
DISALLOW_COPY_AND_ASSIGN(RowTracker);
};
class FuzzyRowFilter: public FilterBase
{
public:
FuzzyRowFilter(ObIAllocator &allocator, ObIArray<ObPair<ObString, ObString>*> &fuzzy_key)
: allocator_(allocator),
row_tracker_(nullptr),
done_(false),
filter_row_(false),
included_(false),
is_inited_(false),
fuzzy_key_(fuzzy_key)
{}
virtual ~FuzzyRowFilter() {
if (nullptr != row_tracker_) {
row_tracker_->~RowTracker();
}
}
int init(const ObHTableCell &cell);
virtual bool filter_all_remaining() override { return done_; }
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell) override;
virtual bool is_hinting_filter() override { return true; }
virtual bool filter_row() override { return filter_row_; }
void reset() override;
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "FuzzyRowFilter", K_(row_tracker), K_(fuzzy_key), K_(is_inited));
private:
static bool fuzzy_match(const ObString &rowkey, const ObPair<ObString, ObString> &fuzzy_key);
private:
ObIAllocator &allocator_;
RowTracker *row_tracker_;
bool done_;
bool filter_row_;
bool included_;
bool is_inited_;
ObIArray<ObPair<ObString, ObString>*> &fuzzy_key_;
DISALLOW_COPY_AND_ASSIGN(FuzzyRowFilter);
};
class ObTimestampNode
{
public:
@ -687,6 +895,146 @@ private:
DISALLOW_COPY_AND_ASSIGN(TimestampsFilter);
};
/// Simple filter that filter cells by column family and qualifier.
class ColumnValueFilter: public FilterBase
{
public:
ColumnValueFilter(ObString &cf, ObString &qualifier, CompareOperator cmp_op, Comparable *comparator)
: cf_(cf),
qualifier_(qualifier),
cmp_op_(cmp_op),
comparator_(comparator),
column_found_(false)
{}
~ColumnValueFilter() {}
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
void reset() override;
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "ColumnValueFilter", K_(cf), K_(qualifier), K_(cmp_op), KP_(comparator));
private:
ObString cf_;
ObString qualifier_;
CompareOperator cmp_op_;
Comparable *comparator_;
bool column_found_;
DISALLOW_COPY_AND_ASSIGN(ColumnValueFilter);
};
/// Simple filter that returns rows in range.
class MultiRowRangeFilter: public FilterBase
{
public:
MultiRowRangeFilter(common::ObSEArray<KeyRange*, 8>* origin_range)
: origin_range_(origin_range),
inited_(false),
nodes_(),
ranges_(),
range_(nullptr),
return_code_(),
done_(false)
{}
virtual ~MultiRowRangeFilter() {}
virtual int init();
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override { UNUSED(cell); ret_code = return_code_; return OB_SUCCESS; }
virtual int get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell) override;
virtual bool is_hinting_filter() override { return true; }
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual bool filter_all_remaining() override { return done_; }
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "MultiRowRangeFilter");
protected:
common::ObSEArray<KeyRange*, 8>* origin_range_;
bool inited_;
private:
common::ObSEArray<ObKeyRangeNode, 8> nodes_;
ObKeyRangeTree ranges_;
KeyRange* range_;
ReturnCode return_code_;
bool done_;
DISALLOW_COPY_AND_ASSIGN(MultiRowRangeFilter);
};
/// Simple filter that returns rows before stop key.
class InclusiveStopFilter: public MultiRowRangeFilter
{
public:
InclusiveStopFilter(common::ObSEArray<KeyRange*, 8>* origin_range, ObString stop_key)
: MultiRowRangeFilter(origin_range),
stop_key_(stop_key)
{}
~InclusiveStopFilter() {}
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "InclusiveStopFilter");
private:
ObString stop_key_;
DISALLOW_COPY_AND_ASSIGN(InclusiveStopFilter);
};
/// Simple filter that returns columns in range.
class ColumnRangeFilter: public FilterBase
{
public:
ColumnRangeFilter(common::ObSEArray<KeyRange*, 8>* origin_range)
: origin_range_(origin_range),
inited_(false),
nodes_(),
ranges_(),
range_(nullptr)
{}
virtual ~ColumnRangeFilter() {}
virtual int init();
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
void reset() override;
virtual bool is_hinting_filter() override { return true; }
virtual int get_next_cell_hint(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell) override;
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "ColumnRangeFilter");
protected:
common::ObSEArray<KeyRange*, 8>* origin_range_;
bool inited_;
private:
common::ObSEArray<ObKeyRangeNode, 8> nodes_;
ObKeyRangeTree ranges_;
KeyRange* range_;
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ColumnRangeFilter);
};
/// Simple filter based on the multiple lead portion of Column names
class MultipleColumnPrefixFilter: public ColumnRangeFilter
{
public:
MultipleColumnPrefixFilter(common::ObSEArray<KeyRange*, 8>* origin_range)
: ColumnRangeFilter(origin_range)
{}
~MultipleColumnPrefixFilter() {}
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "MultipleColumnPrefixFilter");
private:
DISALLOW_COPY_AND_ASSIGN(MultipleColumnPrefixFilter);
};
/// Simple filter that filter cells with column family.
class FamilyFilter : public CompareFilter
{
public:
FamilyFilter(CompareOperator cmp_op, Comparable *comparator)
: CompareFilter(cmp_op, comparator)
{}
virtual ~FamilyFilter() {}
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "FamilyFilter");
private:
DISALLOW_COPY_AND_ASSIGN(FamilyFilter);
};
/// Simple filter that returns first N columns on row only.
class ColumnCountGetFilter: public FilterBase
{
@ -697,7 +1045,7 @@ public:
{}
virtual ~ColumnCountGetFilter() {}
virtual void reset() override;
virtual bool filter_row_key(const ObHTableCell &first_row_cell) override;
virtual int filter_row_key(const ObHTableCell &first_row_cell, bool &filtered) override;
virtual bool filter_all_remaining() override;
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
virtual int64_t get_format_filter_string_length() const override;
@ -711,6 +1059,26 @@ private:
DISALLOW_COPY_AND_ASSIGN(ColumnCountGetFilter);
};
/// Simple filter that returns cells before matching qualifiers.
class FirstKeyValueMatchingQualifiersFilter: public FilterBase
{
public:
FirstKeyValueMatchingQualifiersFilter(common::ObSEArray<ObString, 8> *qualifiers)
: found_(false),
qualifiers_(qualifiers)
{}
virtual ~FirstKeyValueMatchingQualifiersFilter() {}
virtual int filter_cell(const ObHTableCell &cell, ReturnCode &ret_code) override;
void reset() override;
virtual int64_t get_format_filter_string_length() const override;
virtual int get_format_filter_string(char *buf, int64_t buf_len, int64_t &pos) const override;
TO_STRING_KV("filter", "FirstKeyValueMatchingQualifiersFilter");
private:
bool found_;
common::ObSEArray<ObString, 8> *qualifiers_;
DISALLOW_COPY_AND_ASSIGN(FirstKeyValueMatchingQualifiersFilter);
};
/// CheckAndMutateFilter is used to implement the check logic of CheckAndMutate
/// @see https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Table.html#checkAndMutate-byte:A-byte:A-
class CheckAndMutateFilter: public FilterBase

View File

@ -93,6 +93,15 @@ ObString ObHTableCellEntity::get_value() const
{
return ob_row_->get_cell(ObHTableConstants::COL_IDX_V).get_varchar();
}
ObString ObHTableCellEntity::get_family() const
{
return family_;
}
void ObHTableCellEntity::set_family(const ObString family) {
family_ = family;
}
////////////////////////////////////////////////////////////////
ObString ObHTableCellEntity2::get_rowkey() const
{
@ -227,8 +236,9 @@ ObString ObHTableCellEntity3::get_value() const
}
////////////////////////////////////////////////////////////////
int ObHTableUtils::create_last_cell_on_row_col(
common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell)
int ObHTableUtils::create_last_cell_on_row_col(common::ObIAllocator &allocator,
const ObHTableCell &cell,
ObHTableCell *&new_cell)
{
int ret = OB_SUCCESS;
ObString rowkey_clone;
@ -257,8 +267,10 @@ int ObHTableUtils::create_last_cell_on_row_col(
return ret;
}
int ObHTableUtils::create_first_cell_on_row_col(common::ObIAllocator &allocator, const ObHTableCell &cell,
const common::ObString &qualifier, ObHTableCell *&new_cell)
int ObHTableUtils::create_first_cell_on_row_col(common::ObIAllocator &allocator,
const ObHTableCell &cell,
const common::ObString &qualifier,
ObHTableCell *&new_cell)
{
int ret = OB_SUCCESS;
ObString rowkey_clone;
@ -287,8 +299,9 @@ int ObHTableUtils::create_first_cell_on_row_col(common::ObIAllocator &allocator,
return ret;
}
int ObHTableUtils::create_last_cell_on_row(
common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell)
int ObHTableUtils::create_last_cell_on_row(common::ObIAllocator &allocator,
const ObHTableCell &cell,
ObHTableCell *&new_cell)
{
int ret = OB_SUCCESS;
ObString rowkey_clone;
@ -315,9 +328,9 @@ int ObHTableUtils::create_last_cell_on_row(
}
int ObHTableUtils::create_first_cell_on_row_col_ts(common::ObIAllocator &allocator,
const ObHTableCell &cell,
const int64_t timestamp,
ObHTableCell *&new_cell)
const ObHTableCell &cell,
const int64_t timestamp,
ObHTableCell *&new_cell)
{
int ret = OB_SUCCESS;
ObString rowkey_clone;
@ -346,14 +359,22 @@ int ObHTableUtils::create_first_cell_on_row_col_ts(common::ObIAllocator &allocat
return ret;
}
int ObHTableUtils::create_first_cell_on_row(
common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell)
int ObHTableUtils::create_first_cell_on_row(common::ObIAllocator &allocator,
const ObHTableCell &cell,
ObHTableCell *&new_cell)
{
return create_first_cell_on_row(allocator, cell.get_rowkey(), new_cell);
}
int ObHTableUtils::create_first_cell_on_row(common::ObIAllocator &allocator,
const ObString &row_key,
ObHTableCell *&new_cell)
{
int ret = OB_SUCCESS;
ObString rowkey_clone;
ObObj *first_cell = nullptr;
ObNewRow *ob_row = nullptr;
if (OB_FAIL(ob_write_string(allocator, cell.get_rowkey(), rowkey_clone))) {
if (OB_FAIL(ob_write_string(allocator, row_key, rowkey_clone))) {
LOG_WARN("failed to clone rowkey", K(ret));
} else if (OB_ISNULL(first_cell = static_cast<ObObj *>(allocator.alloc(sizeof(ObObj) * 3)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -609,3 +630,95 @@ int ObHTableUtils::generate_hbase_bytes(ObIAllocator& allocator, int32_t len, ch
}
return ret;
}
int ObHTableUtils::get_prefix_key_range(common::ObIAllocator &allocator, ObString prefix, KeyRange *range)
{
int ret = OB_SUCCESS;
bool is_max = true;
ObString prefix_end;
if (OB_FAIL(ob_write_string(allocator, prefix, prefix_end))) {
LOG_WARN("failed to clone prefix", K(ret), K(prefix));
} else if (!prefix_end.empty()) {
char* ptr = prefix_end.ptr();
bool loop = true;
for (int i = prefix_end.length() - 1; i >= 0 && loop; i--) {
if (ptr[i] != -1) {
is_max = false;
ptr[i] += 1;
loop = false;
prefix_end.set_length(i + 1);
}
}
if (is_max) {
prefix_end.reset();
}
range->set_max(prefix_end);
range->set_max_inclusive(false);
}
return ret;
}
int ObHTableUtils::merge_key_range(ObKeyRangeTree& tree)
{
int ret = OB_SUCCESS;
ObKeyRangeNode* curr_range = tree.get_first();
ObKeyRangeNode* merge_range = nullptr;
if (nullptr == curr_range) {
} else if (!curr_range->get_value()->valid()) {
ret = common::OB_INVALID_ARGUMENT;
LOG_WARN("invalid key range", KR(ret), KPC(curr_range));
} else if (OB_FAIL(tree.get_next(curr_range, merge_range))) {
LOG_WARN("fail to get next node from tree", KR(ret), KPC(merge_range));
}
bool loop = true;
while (OB_SUCC(ret) && merge_range != nullptr && loop) {
if (!merge_range->get_value()->valid()) {
ret = common::OB_INVALID_ARGUMENT;
LOG_WARN("invalid row range", KR(ret), KPC(merge_range));
} else if (curr_range->get_value()->max().empty()) {
loop = false;
} else {
KeyRange* l_val = curr_range->get_value();
KeyRange* r_val = merge_range->get_value();
// With overlap in the ranges
if (l_val->max().compare(r_val->min()) > 0
|| (l_val->max().compare(r_val->min()) == 0 &&
(l_val->max_inclusive() || r_val->min_inclusive()) )) {
if (r_val->max().empty()) {
l_val->set_max(ObString());
tree.remove(merge_range);
loop = false;
} else {
int cmp = l_val->max().compare(r_val->max());
if (cmp < 0) {
l_val->set_max(r_val->max());
l_val->set_max_inclusive(r_val->max_inclusive());
} else if (cmp == 0) {
l_val->set_max_inclusive(l_val->max_inclusive() | r_val->max_inclusive());
}
tree.remove(merge_range);
}
} else {
curr_range = merge_range;
}
if (loop && OB_FAIL(tree.get_next(curr_range, merge_range))) {
LOG_WARN("fail to get next node from tree", KR(ret), KPC(merge_range));
}
}
}
loop = (nullptr != curr_range);
while (OB_SUCC(ret) && loop) {
if (OB_FAIL(tree.get_next(curr_range, merge_range))) {
LOG_WARN("fail to get next node from tree", KR(ret), KPC(merge_range));
} else if (merge_range == NULL) {
loop = false;
} else {
tree.remove(merge_range);
}
}
return ret;
}

View File

@ -35,6 +35,7 @@ public:
virtual common::ObString get_qualifier() const = 0;
virtual int64_t get_timestamp() const = 0;
virtual common::ObString get_value() const = 0;
virtual common::ObString get_family() const = 0;
enum class Type
{
FIRST_ON_ROW = 0 /*virtual cell which present the first cell on row*/,
@ -76,12 +77,15 @@ public:
virtual common::ObString get_qualifier() const override;
virtual int64_t get_timestamp() const override;
virtual common::ObString get_value() const override;
virtual common::ObString get_family() const override;
void set_value(ObString value) const;
void set_family(const ObString family);
virtual Type get_type() const { return type_; }
private:
common::ObNewRow *ob_row_;
Type type_;
ObString family_;
DISALLOW_COPY_AND_ASSIGN(ObHTableCellEntity);
};
@ -109,6 +113,7 @@ public:
virtual common::ObString get_rowkey() const override { return common::ObString(); }
virtual common::ObString get_qualifier() const override { return common::ObString(); }
virtual common::ObString get_value() const override { return common::ObString(); }
virtual common::ObString get_family() const override { return common::ObString(); }
private:
DISALLOW_COPY_AND_ASSIGN(ObHTableEmptyCell);
};
@ -189,6 +194,7 @@ public:
virtual common::ObString get_qualifier() const override;
virtual int64_t get_timestamp() const override;
virtual common::ObString get_value() const override;
virtual common::ObString get_family() const override { return common::ObString(); }
virtual Type get_type() const { return Type::NORMAL; }
int get_value(ObString &str) const;
private:
@ -209,6 +215,7 @@ public:
virtual common::ObString get_qualifier() const override;
virtual int64_t get_timestamp() const override;
virtual common::ObString get_value() const override;
virtual common::ObString get_family() const override { return common::ObString(); }
bool last_get_is_null() const { return last_get_is_null_; }
virtual Type get_type() const { return Type::NORMAL; }
private:
@ -316,19 +323,110 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObNegativeTimeRange);
};
class KeyRange
{
public:
KeyRange()
: min_(),
min_inclusive_(false),
max_(),
max_inclusive_(false)
{}
KeyRange(ObString min, bool min_inclusive, ObString max, bool max_inclusive)
: min_(min),
min_inclusive_(min_inclusive),
max_(max),
max_inclusive_(max_inclusive)
{}
~KeyRange() {}
ObString min() const { return min_; }
bool min_inclusive() const { return min_inclusive_; }
ObString max() const { return max_; }
bool max_inclusive() const { return max_inclusive_; }
void set_min(ObString value) { min_ = value; }
void set_min_inclusive(bool value) { min_inclusive_ = value; }
void set_max(ObString value) { max_ = value; }
void set_max_inclusive(bool value) { max_inclusive_ = value; }
bool valid()
{
return min_.empty()
| max_.empty()
| (min_.compare(max_) < 0)
| (min_.compare(max_) == 0 && max_inclusive_);
}
bool contain(ObString row_key)
{
int l_cmp = min_.compare(row_key);
int r_cmp = max_.empty()? 1 : max_.compare(row_key);
bool l_contain = l_cmp < 0 || (l_cmp == 0 && min_inclusive_);
bool r_contain = r_cmp > 0 || (r_cmp == 0 && max_inclusive_);
return l_contain & r_contain;
}
int compare(const KeyRange* other) const
{
int ret = 0;
if (nullptr == other) {
} else if (min() == other->min()) {
ret = static_cast<int>(other->min_inclusive() - (int)min_inclusive());
} else {
ret = min().compare(other->min());
}
return ret;
}
TO_STRING_KV(K_(min), K_(min_inclusive), K_(max), K_(max_inclusive));
private:
ObString min_;
bool min_inclusive_;
ObString max_;
bool max_inclusive_;
};
class ObKeyRangeNode {
public:
ObKeyRangeNode(): value_(nullptr) {};
ObKeyRangeNode(KeyRange* value): value_(value) {};
~ObKeyRangeNode() {};
RBNODE(ObKeyRangeNode, rblink);
KeyRange* get_value() const { return value_; }
int compare(const ObKeyRangeNode *node) const
{
int ret = 0;
if (nullptr != value_ && nullptr != node) {
KeyRange* other_range = node->get_value();
ret = value_->compare(other_range);
}
return ret;
}
TO_STRING_KV(K_(value));
private:
KeyRange* value_;
};
typedef container::ObRbTree<ObKeyRangeNode, container::ObDummyCompHelper<ObKeyRangeNode>> ObKeyRangeTree;
class ObHTableUtils
{
public:
/// Create a Cell that is larger than all other possible Cells for the given Cell's rk:cf:q
static int create_last_cell_on_row_col(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell);
/// Create a Cell that is smaller than all other possible Cells for the given Cell's rk:cf and passed qualifier.
static int create_first_cell_on_row_col(common::ObIAllocator &allocator, const ObHTableCell &cell, const common::ObString &qualifier, ObHTableCell *&new_cell);
static int create_first_cell_on_row_col(common::ObIAllocator &allocator,
const ObHTableCell &cell,
const common::ObString &qualifier,
ObHTableCell *&new_cell);
/// Create a Cell that is smaller than all other possible Cells for the given Cell's rk:ts and passed timestamp.
static int create_first_cell_on_row_col_ts(common::ObIAllocator &allocator, const ObHTableCell &cell, const int64_t timestamp, ObHTableCell *&new_cell);
static int create_first_cell_on_row_col_ts(common::ObIAllocator &allocator,
const ObHTableCell &cell,
const int64_t timestamp,
ObHTableCell *&new_cell);
/// Create a Cell that is larger than all other possible Cells for the given Cell's row.
static int create_last_cell_on_row(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell);
/// Create a Cell that is smaller than all other possible Cells for the given Cell's row.
static int create_first_cell_on_row(common::ObIAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&new_cell);
static int create_first_cell_on_row(common::ObIAllocator &allocator, const ObString &row_key, ObHTableCell *&new_cell);
static int compare_qualifier(const common::ObString &cq1, const common::ObString &cq2);
static int compare_rowkey(const common::ObString &rk1, const common::ObString &rk2);
@ -356,6 +454,8 @@ public:
static int get_kv_hbase_client_scanner_timeout_period(uint64_t tenant_id);
static int get_hbase_scanner_timeout(uint64_t tenant_id);
static int generate_hbase_bytes(ObIAllocator& allocator, int32_t len, char*& val);
static int get_prefix_key_range(common::ObIAllocator &allocator, ObString prefix, KeyRange *range);
static int merge_key_range(ObKeyRangeTree& tree);
private:
ObHTableUtils() = delete;
~ObHTableUtils() = delete;

View File

@ -1767,10 +1767,10 @@ int ObTableCtx::init_das_context(ObDASCtx &das_ctx)
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(local_table_loc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("local_table_loc is NULL", K(ret));
} else if (OB_FAIL(exec_ctx_.get_das_ctx().extended_tablet_loc(*local_table_loc,
if (OB_ISNULL(local_table_loc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("local_table_loc is NULL", K(ret));
} else if (OB_FAIL(exec_ctx_.get_das_ctx().extended_tablet_loc(*local_table_loc,
index_tablet_id_,
tablet_loc))) {
LOG_WARN("fail to extend tablet loc", K(ret), K(index_tablet_id_));
@ -1809,10 +1809,10 @@ int ObTableCtx::init_related_tablet_map(ObDASCtx &das_ctx)
ret = OB_SCHEMA_EAGAIN;
LOG_WARN("fail to get table schema", KR(ret), K(related_table_id));
} else if (OB_FAIL(relative_table_schema->get_part_id_and_tablet_id_by_idx(part_idx,
subpart_idx,
related_part_id,
related_first_level_part_id,
related_tablet_id))) {
subpart_idx,
related_part_id,
related_first_level_part_id,
related_tablet_id))) {
LOG_WARN("get part by idx failed", K(ret), K(part_idx), K(subpart_idx), K(related_table_id));
} else if (OB_FAIL(related_tablet_map.add_related_tablet_id(tablet_id_,
related_table_id,

View File

@ -370,6 +370,7 @@ public:
OB_INLINE common::ObTableID get_ref_table_id() const { return ref_table_id_; }
OB_INLINE common::ObTableID get_index_table_id() const { return index_table_id_; }
OB_INLINE common::ObTabletID get_tablet_id() const { return tablet_id_; }
OB_INLINE common::ObTabletID get_index_tablet_id() const { return index_tablet_id_; }
OB_INLINE common::ObString &get_table_name() { return table_name_; }
OB_INLINE const share::ObLSID& get_ls_id() const { return ls_id_; }
OB_INLINE int64_t get_timeout_ts() const { return timeout_ts_; }
@ -522,6 +523,7 @@ public:
OB_INLINE void set_ls_id(share::ObLSID &ls_id) { ls_id_ = ls_id; }
OB_INLINE void set_audit_ctx(ObTableAuditCtx *ctx) { audit_ctx_ = ctx; }
OB_INLINE void set_tablet_id(common::ObTabletID &tablet_id) { tablet_id_ = tablet_id; }
OB_INLINE void set_index_tablet_id(common::ObTabletID &tablet_id) { index_tablet_id_ = tablet_id; }
// for scan
OB_INLINE void set_scan(const bool &is_scan) { is_scan_ = is_scan; }
OB_INLINE void set_scan_order(const common::ObQueryFlag::ScanOrder scan_order) { scan_order_ = scan_order; }

View File

@ -200,8 +200,7 @@ public:
inline void set_limit(int32_t limit) { limit_ = limit; }
inline void set_offset(int32_t offset) { offset_ = offset; }
private:
virtual int get_next_result(ObTableQueryIterableResult*& one_result) override { return OB_NOT_SUPPORTED; }
virtual int get_next_result(ObTableQueryIterableResult *& one_result) override { return OB_NOT_SUPPORTED; }
private:
table::ObTableQueryResult *one_result_;
common::ObNewRow *last_row_;

File diff suppressed because it is too large Load Diff

View File

@ -28,9 +28,14 @@ namespace observer
struct ObTableHbaseMutationInfo : public ObTableInfoBase
{
ObTableHbaseMutationInfo(common::ObIAllocator &allocator):
ctx_(allocator) {}
ObTableHbaseMutationInfo(common::ObIAllocator &allocator)
: ctx_(allocator)
{}
int init(const schema::ObSimpleTableSchemaV2 *simple_table_schema, share::schema::ObSchemaGetterGuard &schema_guard);
TO_STRING_KV(K(ctx_));
private:
table::ObTableCtx ctx_;
};
@ -39,6 +44,8 @@ struct ObTableHbaseMutationInfo : public ObTableInfoBase
class ObTableLSExecuteP: public ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_LS_EXECUTE> >
{
typedef ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_LS_EXECUTE> > ParentType;
class LSExecuteIter;
public:
explicit ObTableLSExecuteP(const ObGlobalContext &gctx);
virtual ~ObTableLSExecuteP();
@ -46,7 +53,6 @@ public:
protected:
virtual int check_arg() override;
int init_multi_schema_info(const ObString& arg_tablegroup_name);
virtual int try_process() override;
virtual void reset_ctx() override;
@ -54,6 +60,7 @@ protected:
virtual int before_process();
virtual table::ObTableEntityType get_entity_type() override { return arg_.entity_type_; }
virtual bool is_kv_processor() override { return true; }
private:
int get_ls_id(ObLSID &ls_id, const share::schema::ObSimpleTableSchemaV2 *simple_table_schema);
int check_arg_for_query_and_mutate(const table::ObTableSingleOp &single_op);
@ -62,73 +69,137 @@ private:
table::ObKvSchemaCacheGuard *schema_cache_guard,
const ObSimpleTableSchemaV2 *simple_table_schema,
table::ObTableTabletOpResult &tablet_result);
int execute_tablet_query_and_mutate(const table::ObTableTabletOp &tablet_op, table::ObTableTabletOpResult&tablet_result);
int execute_tablet_query_and_mutate(const uint64_t table_id, const table::ObTableTabletOp &tablet_op, table::ObTableTabletOpResult&tablet_result);
int execute_tablet_query_and_mutate(const uint64_t table_id,
const table::ObTableTabletOp &tablet_op,
table::ObTableTabletOpResult&tablet_result);
int execute_single_query_and_mutate(const uint64_t table_id,
const common::ObTabletID tablet_id,
const table::ObTableSingleOp &single_op,
table::ObTableSingleOpResult &result);
// batch service
int execute_tablet_batch_ops(const table::ObTableTabletOp &tablet_op,
uint64_t table_id,
table::ObKvSchemaCacheGuard *schema_cache_guard,
const ObSimpleTableSchemaV2 *simple_table_schema,
table::ObTableTabletOpResult&tablet_result);
uint64_t table_id,
table::ObKvSchemaCacheGuard *schema_cache_guard,
const ObSimpleTableSchemaV2 *simple_table_schema,
table::ObTableTabletOpResult&tablet_result);
int try_process_tablegroup(table::ObTableLSOpResult& ls_result);
int try_process_tablegroup(table::ObTableLSOpResult &ls_result);
int modify_htable_quailfier_and_timestamp(const table::ObITableEntity *entity,
const ObString& qualifier,
int64_t timestamp,
table::ObTableOperationType::Type type);
int init_batch_ctx(table::ObTableBatchCtx &batch_ctx,
const table::ObTableTabletOp &tablet_op,
ObIArray<table::ObTableOperation> &table_operations,
uint64_t table_id,
table::ObKvSchemaCacheGuard *shcema_cache_guard,
const ObSimpleTableSchemaV2 *table_schema,
table::ObTableTabletOpResult &tablet_result);
int init_tb_ctx(table::ObTableCtx &tb_ctx,
const table::ObTableTabletOp &tablet_op,
int init_batch_ctx(const table::ObTableTabletOp &tablet_op,
ObIArray<table::ObTableOperation> &table_operations,
uint64_t table_id,
table::ObKvSchemaCacheGuard *shcema_cache_guard,
const ObSimpleTableSchemaV2 *table_schema,
table::ObTableTabletOpResult &tablet_result,
table::ObTableBatchCtx &batch_ctx);
int init_tb_ctx(const table::ObTableTabletOp &tablet_op,
const table::ObTableOperation &table_operation,
table::ObKvSchemaCacheGuard *shcema_cache_guard,
const ObSimpleTableSchemaV2 *table_schema);
const ObSimpleTableSchemaV2 *table_schema,
table::ObTableCtx &tb_ctx);
int add_dict_and_bm_to_result_entity(const table::ObTableTabletOp &tablet_op,
table::ObTableTabletOpResult &tablet_result);
int execute_ls_op(table::ObTableLSOpResult &ls_result);
int execute_ls_op_tablegroup(table::ObTableLSOpResult& ls_result);
int execute_ls_op_tablegroup(table::ObTableLSOpResult &ls_result);
// void record_aft_rtn_rows(const table::ObTableLSOpResult &ls_result);
private:
class LSExecuteIter {
int find_and_add_op(ObIArray<std::pair<uint64_t, table::ObTableTabletOp>>& tablet_ops,
uint64_t real_table_id,
const table::ObTableSingleOp& single_op,
bool &is_found);
int add_new_op(ObIArray<std::pair<uint64_t, table::ObTableTabletOp>>& tablet_ops,
uint64_t real_table_id, uint64_t real_tablet_id,
table::ObTableTabletOp& origin_tablet_op,
const table::ObTableSingleOp& single_op);
int partition_single_op(table::ObTableTabletOp& tablet_op,
common::ObIArray<std::pair<uint64_t, table::ObTableTabletOp>>& tablet_ops);
public:
LSExecuteIter(ObTableLSExecuteP &outer_exectute_process);
virtual ~LSExecuteIter();
int construct_delete_family_op(const table::ObTableSingleOp &single_op,
const ObTableHbaseMutationInfo &mutation_info,
table::ObTableTabletOp &tablet_op);
virtual int init();
virtual void reset() {};
virtual int get_next_batch_ctx(table::ObTableTabletOpResult &tablet_result,
table::ObTableBatchCtx *&batch_ctx) = 0;
virtual int set_tablet_ops(table::ObTableTabletOp &tablet_ops)
{
tablet_ops_(tablet_ops);
return OB_SUCCESS;
}
protected:
int init_batch_ctx(uint64_t table_id,
table::ObTableSingleOp &single_op,
table::ObKvSchemaCacheGuard *shcema_cache_guard,
const ObSimpleTableSchemaV2 *simple_table_schema,
table::ObTableTabletOpResult &tablet_result,
table::ObTableBatchCtx &batch_ctx);
int init_tb_ctx(table::ObTableSingleOp &single_op,
table::ObKvSchemaCacheGuard *shcema_cache_guard,
const ObSimpleTableSchemaV2 *table_schema,
table::ObTableCtx &tb_ctx);
protected:
ObTableLSExecuteP &outer_exectute_process_;
table::ObTableTabletOp tablet_ops_;
common::ObTabletID tablet_id_;
common::ObArenaAllocator allocator_;
ObArray<std::pair<std::pair<table::ObTableOperationType::Type, uint64_t>, table::ObTableBatchCtx *>> batch_ctxs_;
uint64_t ops_timestamp_;
DISALLOW_COPY_AND_ASSIGN(LSExecuteIter);
};
class ObTableLSExecuteIter : public LSExecuteIter {
public:
ObTableLSExecuteIter(ObTableLSExecuteP &outer_exectute_process);
virtual ~ObTableLSExecuteIter() {};
int get_next_batch_ctx(table::ObTableTabletOpResult &tablet_result,
table::ObTableBatchCtx *&batch_ctx) override;
DISALLOW_COPY_AND_ASSIGN(ObTableLSExecuteIter);
};
class HTableLSExecuteIter : public LSExecuteIter {
public:
explicit HTableLSExecuteIter(ObTableLSExecuteP &outer_exectute_process);
virtual ~HTableLSExecuteIter() {};
int init();
int set_tablet_ops(table::ObTableTabletOp &tablet_ops) override;
void reset() override;
table::ObTableTabletOp &get_same_ctx_ops() { return same_ctx_ops_; }
int get_next_batch_ctx(table::ObTableTabletOpResult &tablet_result,
table::ObTableBatchCtx *&batch_ctx) override;
private:
int find_real_table_id(const ObString &family_name, uint64_t &real_table_id);
int find_real_tablet_id(uint64_t arg_table_id, uint64_t real_table_id, ObTabletID &real_tablet_id);
int convert_batch_ctx(table::ObTableBatchCtx &batch_ctx);
int init_multi_schema_info(const ObString &arg_tablegroup_name);
int construct_delete_family_op(const table::ObTableSingleOp &single_op,
const ObTableHbaseMutationInfo &mutation_info);
int get_family_from_op(table::ObTableSingleOp &curr_single_op,
ObString &family);
int modify_htable_quailfier_and_timestamp(const table::ObTableSingleOp &curr_single_op,
table::ObTableOperationType::Type type,
int64_t now_ms);
private:
table::ObTableTabletOp same_ctx_ops_;
uint64_t curr_op_index_;
ObString table_group_name_;
ObArray<ObTableHbaseMutationInfo *> hbase_infos_;
ObArray<table::ObTableOperation> table_operations_;
DISALLOW_COPY_AND_ASSIGN(HTableLSExecuteIter);
};
private:
common::ObArenaAllocator allocator_;
ObArray<ObTableHbaseMutationInfo*> hbase_infos_;
table::ObTableEntityFactory<table::ObTableSingleOpEntity> *default_entity_factory_;
table::ObTableLSExecuteCreateCbFunctor cb_functor_;
table::ObTableLSExecuteEndTransCb *cb_;
};
} // end namespace observer
} // end namespace oceanbase
} // end namespace observer
} // end namespace oceanbase
#endif /* _OB_TABLE_BATCH_EXECUTE_PROCESSOR_H */

View File

@ -45,7 +45,15 @@ public:
virtual int get_next_result(ObTableQueryResult *&one_result) override;
virtual bool has_more_result() const override { return binary_heap_.count() == 0; }
virtual bool has_more_result() const override {
bool has_more_result = true;
if (binary_heap_.count() == 0) {
has_more_result = false;
} else if (query_->get_limit() > 0 && row_count_ > query_->get_limit()) {
has_more_result = false;
}
return has_more_result;
}
virtual void set_query(const ObTableQuery *query) override { query_ = query; };
@ -112,6 +120,7 @@ private:
common::ObSEArray<ObTableQueryResultIterator *, 8> inner_result_iters_;
bool is_inited_;
int64_t row_count_;
};
template <typename Row, typename Compare>
@ -122,7 +131,8 @@ ObMergeTableQueryResultIterator<Row, Compare>::ObMergeTableQueryResultIterator(c
allocator_(allocator),
one_result_(&one_result),
binary_heap_(compare_),
is_inited_(false)
is_inited_(false),
row_count_(0)
{
inner_result_iters_.set_attr(ObMemAttr(MTL_ID(), "MergeInnerIters"));
}
@ -207,12 +217,24 @@ int ObMergeTableQueryResultIterator<Row, Compare>::get_next_result(ObTableQueryR
ret = OB_ITER_END;
} else {
one_result = one_result_;
int limit = query_->get_batch() <= 0 ?
static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024)
: query_->get_batch();
while (!binary_heap_.empty() && one_result_->get_row_count() < limit) {
ObRowCacheIterator* cache_iterator = binary_heap_.top();
if (OB_FAIL(binary_heap_.pop())) {
int batch = query_->get_batch() <= 0
? static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024)
: query_->get_batch();
int32_t caching = query_->get_ob_params().ob_params_->get_caching() > 0
? query_->get_ob_params().ob_params_->get_caching()
: INT32_MAX;
ObString curr_key;
int curr_row_count = 0;
while (OB_SUCC(ret) && has_more_result() && one_result_->get_row_count() < batch && curr_row_count <= caching) {
ObRowCacheIterator *cache_iterator = binary_heap_.top();
ObString curr_iter_key = cache_iterator->row_.get_cell(ObHTableConstants::COL_IDX_K).get_string();
if (curr_key.compare(curr_iter_key) != 0) {
curr_row_count++;
row_count_++;
if (OB_FAIL(ob_write_string(allocator_, curr_iter_key, curr_key))) {
SERVER_LOG(WARN, "allocate memory for row_key failed", K(ret));
}
} else if (OB_FAIL(binary_heap_.pop())) {
SERVER_LOG(WARN, "fail to pop items", K(ret));
} else if (OB_FAIL(compare_.get_error_code())) {
SERVER_LOG(WARN, "fail to compare items", K(ret));
@ -226,14 +248,17 @@ int ObMergeTableQueryResultIterator<Row, Compare>::get_next_result(ObTableQueryR
if (OB_FAIL(cache_iterator->get_next_row(row))) {
if (ret != OB_ITER_END) {
SERVER_LOG(WARN, "fail to get next row from ObRowCacheIterator");
} else if (row.is_valid()) {
if (OB_FAIL(ob_write_row(cache_iterator->allocator_, row, cache_iterator->row_))) {
SERVER_LOG(WARN, "fail to copy row", K(ret));
} else if (OB_FAIL(binary_heap_.push(cache_iterator))) {
SERVER_LOG(WARN, "fail to push items", K(ret));
} else if (FALSE_IT(is_iter_valid = true)) {
} else if (OB_FAIL(compare_.get_error_code())) {
SERVER_LOG(WARN, "fail to compare items", K(ret));
} else {
ret = OB_SUCCESS;
if (row.is_valid()) {
if (OB_FAIL(ob_write_row(cache_iterator->allocator_, row, cache_iterator->row_))) {
SERVER_LOG(WARN, "fail to copy row", K(ret));
} else if (OB_FAIL(binary_heap_.push(cache_iterator))) {
SERVER_LOG(WARN, "fail to push items", K(ret));
} else if (FALSE_IT(is_iter_valid = true)) {
} else if (OB_FAIL(compare_.get_error_code())) {
SERVER_LOG(WARN, "fail to compare items", K(ret));
}
}
}
} else {

View File

@ -121,10 +121,8 @@ private:
const ObObj &ts,
const ObObj &value);
int get_old_row(table::ObTableApiSpec &scan_spec, ObNewRow *&row);
int execute_htable_delete();
int execute_htable_delete(const ObTableOperation &table_operation, ObTableCtx *&tb_ctx);
int execute_htable_put();
int execute_htable_put(const ObTableOperation &table_operation, ObTableCtx *&tb_ctx);
int execute_htable_delete(const table::ObTableOperation &table_operation, table::ObTableCtx *&tb_ctx);
int execute_htable_put(const table::ObTableOperation &table_operation, table::ObTableCtx *&tb_ctx);
int execute_htable_increment(table::ObTableApiSpec &scan_spec);
int execute_htable_insert(const table::ObITableEntity &new_entity);
int execute_htable_mutation(table::ObTableQueryResultIterator *result_iterator);

View File

@ -44,8 +44,6 @@ void ObTableQueryAsyncSession::set_result_iterator(ObTableQueryResultIterator *q
}
}
int ObTableQueryAsyncSession::init()
{
int ret = OB_SUCCESS;
@ -503,7 +501,9 @@ int ObTableQueryAsyncP::get_query_session(uint64_t sessid, ObTableQueryAsyncSess
LOG_WARN("fail to insert session to query map", K(ret), K(sessid));
MTL(ObTableQueryASyncMgr*)->free_query_session(query_session);
} else {}
} else if (ObQueryOperationType::QUERY_NEXT == arg_.query_type_ || ObQueryOperationType::QUERY_END == arg_.query_type_) {
} else if (ObQueryOperationType::QUERY_NEXT == arg_.query_type_ ||
ObQueryOperationType::QUERY_END == arg_.query_type_ ||
ObQueryOperationType::QUERY_RENEW == arg_.query_type_) {
if (OB_FAIL(MTL(ObTableQueryASyncMgr*)->get_query_session(sessid, query_session))) {
LOG_WARN("fail to get query session from query sync mgr", K(ret), K(sessid));
} else if (OB_ISNULL(query_session)) {
@ -1399,6 +1399,8 @@ int ObTableQueryAsyncP::try_process()
} else {
ret = process_query_next();
}
} else if (ObQueryOperationType::QUERY_RENEW == arg_.query_type_) {
result_.query_session_id_ = query_session_id_;
} else if (ObQueryOperationType::QUERY_END == arg_.query_type_) {
ret = process_query_end();
}

View File

@ -156,7 +156,8 @@ public:
select_columns_(),
result_iterator_(nullptr),
query_ctx_(allocator_),
lease_timeout_period_(60 * 1000 * 1000)
lease_timeout_period_(60 * 1000 * 1000),
row_count_(0)
{}
~ObTableQueryAsyncSession() {}
@ -183,6 +184,7 @@ public:
public:
sql::TransState* get_trans_state() {return &trans_state_;}
transaction::ObTxDesc* get_trans_desc() {return trans_desc_;}
int64_t &get_row_count() { return row_count_; }
void set_trans_desc(transaction::ObTxDesc *trans_desc) { trans_desc_ = trans_desc; }
private:
bool in_use_;
@ -202,6 +204,7 @@ private:
sql::TransState trans_state_;
uint64_t lease_timeout_period_;
transaction::ObTxDesc *trans_desc_;
int64_t row_count_;
};
/**

View File

@ -190,6 +190,7 @@ void ObTableQueryUtils::destroy_result_iterator(ObTableQueryResultIterator *resu
{
if (OB_NOT_NULL(result_iter)) {
result_iter->~ObTableQueryResultIterator();
result_iter = nullptr;
}
}

View File

@ -238,6 +238,7 @@ struct ObTableInfoBase {
K(schema_cache_guard_),
K(schema_version_));
int64_t table_id_;
ObString real_table_name_;
const share::schema::ObSimpleTableSchemaV2* simple_schema_;
table::ObKvSchemaCacheGuard schema_cache_guard_;
int64_t schema_version_;

View File

@ -109,10 +109,13 @@ int ObTableApiScanExecutor::prepare_das_task()
{
int ret = OB_SUCCESS;
ObIDASTaskOp *task_op = nullptr;
ObDASTabletLoc *tablet_loc = tsc_rtdef_.scan_rtdef_.table_loc_->get_first_tablet_loc();
if (OB_FAIL(das_ref_.create_das_task(tablet_loc,
DAS_OP_TABLE_SCAN,
task_op))) {
ObDASTabletLoc *tablet_loc = nullptr;
if (OB_FAIL(tsc_rtdef_.scan_rtdef_.table_loc_->get_tablet_loc_by_id(tb_ctx_.get_index_tablet_id(), tablet_loc))) {
LOG_WARN("fail to get tablet loc", K(ret), K(tsc_rtdef_), K(tb_ctx_.get_index_tablet_id()));
} else if (OB_ISNULL(tablet_loc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("not found tablet loc", K(ret), K(tsc_rtdef_), K(tb_ctx_.get_index_tablet_id()));
} else if (OB_FAIL(das_ref_.create_das_task(tablet_loc, DAS_OP_TABLE_SCAN, task_op))) {
LOG_WARN("fail to prepare das task", K(ret));
} else {
scan_op_ = static_cast<ObDASScanOp *>(task_op);

View File

@ -32178,12 +32178,12 @@ static const _error _error_OB_KV_FILTER_PARSE_ERROR = {
.mysql_errno = -1,
.sqlstate = "HY000",
.str_error = "OBKV filter parse error",
.str_user_error = "Filter parse errror, the input filter string is: '%.*s'",
.str_user_error = "Filter parse error, the input filter string is: '%.*s'",
.oracle_errno = 600,
.oracle_str_error = "ORA-00600: internal error code, arguments: -10514, OBKV filter parse error",
.oracle_str_user_error = "ORA-00600: internal error code, arguments: -10514, Filter parse errror, the input filter string is: '%.*s'",
.oracle_str_user_error = "ORA-00600: internal error code, arguments: -10514, Filter parse error, the input filter string is: '%.*s'",
.ob_str_error = "OBE-00600: internal error code, arguments: -10514, OBKV filter parse error",
.ob_str_user_error = "OBE-00600: internal error code, arguments: -10514, Filter parse errror, the input filter string is: '%.*s'"
.ob_str_user_error = "OBE-00600: internal error code, arguments: -10514, Filter parse error, the input filter string is: '%.*s'"
};
static const _error _error_OB_KV_REDIS_PARSE_ERROR = {
.error_name = "OB_KV_REDIS_PARSE_ERROR",
@ -32192,12 +32192,12 @@ static const _error _error_OB_KV_REDIS_PARSE_ERROR = {
.mysql_errno = -1,
.sqlstate = "HY000",
.str_error = "OBKV redis protocol parse error",
.str_user_error = "Redis protocol parse errror, the input redis string is: '%.*s'",
.str_user_error = "Redis protocol parse error, the input redis string is: '%.*s'",
.oracle_errno = 600,
.oracle_str_error = "ORA-00600: internal error code, arguments: -10515, OBKV redis protocol parse error",
.oracle_str_user_error = "ORA-00600: internal error code, arguments: -10515, Redis protocol parse errror, the input redis string is: '%.*s'",
.oracle_str_user_error = "ORA-00600: internal error code, arguments: -10515, Redis protocol parse error, the input redis string is: '%.*s'",
.ob_str_error = "OBE-00600: internal error code, arguments: -10515, OBKV redis protocol parse error",
.ob_str_user_error = "OBE-00600: internal error code, arguments: -10515, Redis protocol parse errror, the input redis string is: '%.*s'"
.ob_str_user_error = "OBE-00600: internal error code, arguments: -10515, Redis protocol parse error, the input redis string is: '%.*s'"
};
static const _error _error_OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG = {
.error_name = "OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG",

View File

@ -2716,8 +2716,8 @@ DEFINE_ERROR_EXT(OB_KV_ROWKEY_COUNT_NOT_MATCH, -10510, -1, "HY000", "OBKV rowkey
DEFINE_ERROR_EXT(OB_KV_COLUMN_TYPE_NOT_MATCH, -10511, -1, "HY000", "OBKV column type not match", "Column type for '%.*s' not match, schema column type is '%.*s', input column type is '%.*s'");
DEFINE_ERROR_EXT(OB_KV_COLLATION_MISMATCH, -10512, -1, "HY000", "OBKV collation type not match", "Collation type for '%.*s' not match, schema collation type is '%.*s', input collation type is '%.*s'");
DEFINE_ERROR_EXT(OB_KV_SCAN_RANGE_MISSING, -10513, -1, "HY000", "OBKV scan range missing", "Scan range missing, input scan range cell count is '%ld', which should equal to rowkey count '%ld'");
DEFINE_ERROR_EXT(OB_KV_FILTER_PARSE_ERROR, -10514, -1, "HY000", "OBKV filter parse error", "Filter parse errror, the input filter string is: '%.*s'");
DEFINE_ERROR_EXT(OB_KV_REDIS_PARSE_ERROR, -10515, -1, "HY000", "OBKV redis protocol parse error", "Redis protocol parse errror, the input redis string is: '%.*s'");
DEFINE_ERROR_EXT(OB_KV_FILTER_PARSE_ERROR, -10514, -1, "HY000", "OBKV filter parse error", "Filter parse error, the input filter string is: '%.*s'");
DEFINE_ERROR_EXT(OB_KV_REDIS_PARSE_ERROR, -10515, -1, "HY000", "OBKV redis protocol parse error", "Redis protocol parse error, the input redis string is: '%.*s'");
DEFINE_ERROR_EXT(OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG, -10516, -1, "HY000", "When invoking the Increment interface only HBase cells with a length of 8 can be converted to int64_t", "When invoking the Increment interface, only HBase cells with a length of 8 can be converted to int64_t. the current length of the HBase cell is '%d'.");
DEFINE_ERROR_EXT(OB_KV_REDIS_ERROR, -10517, -1, "HY000", "OBKV redis error", "Redis err need return to client");
DEFINE_ERROR(OB_KV_ODP_TIMEOUT, -10650, -1, "HY000", "ODP process timeout");

View File

@ -4257,8 +4257,8 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_KV_COLUMN_TYPE_NOT_MATCH__USER_ERROR_MSG "Column type for '%.*s' not match, schema column type is '%.*s', input column type is '%.*s'"
#define OB_KV_COLLATION_MISMATCH__USER_ERROR_MSG "Collation type for '%.*s' not match, schema collation type is '%.*s', input collation type is '%.*s'"
#define OB_KV_SCAN_RANGE_MISSING__USER_ERROR_MSG "Scan range missing, input scan range cell count is '%ld', which should equal to rowkey count '%ld'"
#define OB_KV_FILTER_PARSE_ERROR__USER_ERROR_MSG "Filter parse errror, the input filter string is: '%.*s'"
#define OB_KV_REDIS_PARSE_ERROR__USER_ERROR_MSG "Redis protocol parse errror, the input redis string is: '%.*s'"
#define OB_KV_FILTER_PARSE_ERROR__USER_ERROR_MSG "Filter parse error, the input filter string is: '%.*s'"
#define OB_KV_REDIS_PARSE_ERROR__USER_ERROR_MSG "Redis protocol parse error, the input redis string is: '%.*s'"
#define OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG__USER_ERROR_MSG "When invoking the Increment interface, only HBase cells with a length of 8 can be converted to int64_t. the current length of the HBase cell is '%d'."
#define OB_KV_REDIS_ERROR__USER_ERROR_MSG "Redis err need return to client"
#define OB_KV_ODP_TIMEOUT__USER_ERROR_MSG "ODP process timeout"
@ -8939,10 +8939,10 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_KV_COLLATION_MISMATCH__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10512, Collation type for '%.*s' not match, schema collation type is '%.*s', input collation type is '%.*s'"
#define OB_KV_SCAN_RANGE_MISSING__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10513, Scan range missing, input scan range cell count is '%ld', which should equal to rowkey count '%ld'"
#define OB_KV_SCAN_RANGE_MISSING__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10513, Scan range missing, input scan range cell count is '%ld', which should equal to rowkey count '%ld'"
#define OB_KV_FILTER_PARSE_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10514, Filter parse errror, the input filter string is: '%.*s'"
#define OB_KV_FILTER_PARSE_ERROR__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10514, Filter parse errror, the input filter string is: '%.*s'"
#define OB_KV_REDIS_PARSE_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10515, Redis protocol parse errror, the input redis string is: '%.*s'"
#define OB_KV_REDIS_PARSE_ERROR__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10515, Redis protocol parse errror, the input redis string is: '%.*s'"
#define OB_KV_FILTER_PARSE_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10514, Filter parse error, the input filter string is: '%.*s'"
#define OB_KV_FILTER_PARSE_ERROR__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10514, Filter parse error, the input filter string is: '%.*s'"
#define OB_KV_REDIS_PARSE_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10515, Redis protocol parse error, the input redis string is: '%.*s'"
#define OB_KV_REDIS_PARSE_ERROR__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10515, Redis protocol parse error, the input redis string is: '%.*s'"
#define OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10516, When invoking the Increment interface, only HBase cells with a length of 8 can be converted to int64_t. the current length of the HBase cell is '%d'."
#define OB_KV_HBASE_INCR_FIELD_IS_NOT_LONG__OBE_USER_ERROR_MSG "OBE-00600: internal error code, arguments: -10516, When invoking the Increment interface, only HBase cells with a length of 8 can be converted to int64_t. the current length of the HBase cell is '%d'."
#define OB_KV_REDIS_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -10517, Redis err need return to client"

View File

@ -17,6 +17,7 @@
#include "rpc/obrpc/ob_rpc_packet.h"
#include "sql/engine/expr/ob_expr_lob_utils.h"
#include "ob_table_object.h"
#include "observer/table/ob_htable_utils.h"
using namespace oceanbase::table;
using namespace oceanbase::common;
@ -1157,9 +1158,13 @@ void ObTableQuery::reset()
bool ObTableQuery::is_valid() const
{
return (limit_ == -1 || limit_ > 0)
&& (offset_ >= 0)
&& key_ranges_.count() > 0;
bool valid = false;
if (OB_NOT_NULL(ob_params_.ob_params_) && ob_params_.ob_params_->get_param_type() == ParamType::HBase) {
valid = key_ranges_.count() > 0;
} else {
valid = (limit_ == -1 || limit_ > 0) && (offset_ >= 0) && key_ranges_.count() > 0;
}
return valid;
}
int ObTableQuery::add_scan_range(common::ObNewRange &scan_range)
@ -1853,6 +1858,63 @@ OB_SERIALIZE_MEMBER_IF(ObHTableFilter,
limit_per_row_per_cf_,
offset_per_row_per_cf_,
filter_string_);
////////////////////////////////////////////////////////////////
int ObTableQueryIterableResultBase::append_family(ObNewRow &row, ObString family_name)
{
int ret = OB_SUCCESS;
// qualifier obj
ObObj &qualifier = row.get_cell(ObHTableConstants::COL_IDX_Q);
// get length and create buffer
int64_t sep_pos = family_name.length() + 1;
int64_t buf_size = sep_pos + qualifier.get_data_length();
char *buf = nullptr;
if (OB_ISNULL(buf = static_cast<char *>(allocator_.alloc(buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc family and qualifier buffer", K(ret));
} else {
MEMCPY(buf, family_name.ptr(), family_name.length());
buf[family_name.length()] = '\0';
MEMCPY(buf + sep_pos, qualifier.get_data_ptr(), qualifier.get_data_length());
ObString family_qualifier(buf_size, buf);
qualifier.set_varbinary(family_qualifier);
}
return ret;
}
int ObTableQueryIterableResultBase::transform_lob_cell(ObNewRow &row, const int64_t lob_storage_count)
{
int ret = OB_SUCCESS;
// check properties count and row count
const int64_t N = row.get_count();
ObObj *lob_cells = nullptr;
if (OB_ISNULL(lob_cells = static_cast<ObObj*>(allocator_.alloc(sizeof(ObObj) * lob_storage_count)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc cells buffer", K(ret), K(lob_storage_count));
}
for (int i = 0, lob_cell_idx = 0; OB_SUCC(ret) && i < N; ++i) {
const ObObj &cell = row.get_cell(i);
ObObjType type = cell.get_type();
if (is_lob_storage(type)) {
ObString real_data;
if (cell.has_lob_header()) {
if (OB_FAIL(sql::ObTextStringHelper::read_real_string_data(&allocator_, cell, real_data))) {
LOG_WARN("fail to read real string date", K(ret), K(cell));
} else if (lob_cell_idx >= lob_storage_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index count", K(ret), K(lob_cell_idx), K(lob_storage_count));
} else {
lob_cells[lob_cell_idx].set_lob_value(type, real_data.ptr(), real_data.length());
lob_cells[lob_cell_idx].set_collation_type(cell.get_collation_type());
row.get_cell(i) = lob_cells[lob_cell_idx]; // switch lob cell
lob_cell_idx++;
}
}
}
}
return ret;
}
////////////////////////////////////////////////////////////////
ObTableQueryResult::ObTableQueryResult()
:row_count_(0),
@ -2108,6 +2170,23 @@ int ObTableQueryResult::add_all_row(const ObTableQueryResult &other)
return ret;
}
int ObTableQueryResult::add_all_row(ObTableQueryIterableResultBase &other)
{
int ret = OB_SUCCESS;
ObNewRow row;
while (OB_SUCC(ret) && OB_SUCC(other.get_row(row))) {
if (OB_FAIL(add_row(row))) {
LOG_WARN("fail to add row from ObTableQueryIterableResultBase", K(ret));
}
}
if (ret == OB_ARRAY_OUT_OF_RANGE || ret == OB_ITER_END) {
ret = OB_SUCCESS;
} else {
LOG_WARN("add_all_row get error", K(ret));
}
return ret;
}
int64_t ObTableQueryResult::get_result_size()
{
if (0 >= fixed_result_size_) {
@ -2229,10 +2308,10 @@ OB_DEF_DESERIALIZE(ObTableQueryResult)
return ret;
}
////////////////////////////////////////////////////////////////
ObTableQueryIterableResult::ObTableQueryIterableResult()
: allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
current_(0),
row_count_(0)
: ObTableQueryIterableResultBase(),
current_(0)
{
}
@ -2244,29 +2323,34 @@ void ObTableQueryIterableResult::reset_except_property()
current_ = 0;
rows_.reset();
}
int ObTableQueryIterableResult::add_all_row(ObTableQueryIterableResult &other) {
int ObTableQueryIterableResult::add_all_row(ObTableQueryDListResult &other)
{
int ret = OB_SUCCESS;
ObNewRow row;
ObHTableCellEntity *row = nullptr;
while (OB_SUCC(ret) && OB_SUCC(other.get_row(row))) {
ObNewRow copy_row;
if (OB_FAIL(ob_write_row(allocator_, row, copy_row))) {
if (OB_FAIL(ob_write_row(allocator_, *row->get_ob_row(), copy_row))) {
LOG_WARN("fail to copy_row ", K(ret), K(row));
} else if (OB_FAIL(rows_.push_back(copy_row))) {
LOG_WARN("fail to push_back to rows", K(ret));
} else {
row_count_++;
ObString family_name = row->get_family();
if (OB_FAIL(append_family(copy_row, family_name))) {
LOG_WARN("fail to append family to row", K(ret), K(copy_row));
} else if (OB_FAIL(rows_.push_back(copy_row))) {
LOG_WARN("fail to push_back to rows", K(ret));
} else {
row_count_++;
}
}
}
if (ret == OB_ARRAY_OUT_OF_RANGE) {
if (ret == OB_ITER_END) {
ret = OB_SUCCESS;
} else {
LOG_WARN("add_all_row get error", K(ret));
}
return ret;
}
int ObTableQueryIterableResult::get_row(ObNewRow& row) {
int ObTableQueryIterableResult::get_row(ObNewRow& row)
{
int ret = 0;
if (OB_FAIL(rows_.at(current_, row))) {
if (ret == OB_ARRAY_OUT_OF_RANGE) {
@ -2282,62 +2366,16 @@ int ObTableQueryIterableResult::get_row(ObNewRow& row) {
return ret;
}
int ObTableQueryIterableResult::append_family(const ObNewRow &row, ObString family_name)
int ObTableQueryIterableResult::add_row(const common::ObNewRow &row, ObString family_name)
{
int ret = OB_SUCCESS;
// qualifier obj
ObNewRow &row_op = const_cast<ObNewRow &>(row);
ObObj &qualifier = row_op.get_cell(ObHTableConstants::COL_IDX_Q);
// get length and create buffer
int64_t sep_pos = family_name.length() + 1;
int64_t buf_size = sep_pos + qualifier.get_data_length();
char *buf = nullptr;
if (OB_ISNULL(buf = static_cast<char *>(allocator_.alloc(buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc family and qualifier buffer", K(ret));
} else {
MEMCPY(buf, family_name.ptr(), family_name.length());
buf[family_name.length()] = '\0';
MEMCPY(buf + sep_pos, qualifier.get_data_ptr(), qualifier.get_data_length());
ObString family_qualifier(buf_size, buf);
qualifier.set_varbinary(family_qualifier);
}
return ret;
}
int ObTableQueryIterableResult::add_row(const common::ObNewRow &row, ObString family_name) {
int ret = OB_SUCCESS;
// 1. check properties count and row count
const int64_t N = row.get_count();
// 2. construct new row
// construct new row
ObNewRow new_row = row;
ObObj *lob_cells = nullptr;
const int64_t lob_storage_count = get_lob_storage_count(new_row);
if (lob_storage_count != 0) {
if (OB_ISNULL(lob_cells = static_cast<ObObj*>(allocator_.alloc(sizeof(ObObj) * lob_storage_count)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc cells buffer", K(ret), K(lob_storage_count));
}
for (int i = 0, lob_cell_idx = 0; OB_SUCC(ret) && i < N; ++i) {
const ObObj &cell = new_row.get_cell(i);
ObObjType type = cell.get_type();
if (is_lob_storage(type)) {
ObString real_data;
if (cell.has_lob_header()) {
if (OB_FAIL(sql::ObTextStringHelper::read_real_string_data(&allocator_, cell, real_data))) {
LOG_WARN("fail to read real string date", K(ret), K(cell));
} else if (lob_cell_idx >= lob_storage_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index count", K(ret), K(lob_cell_idx), K(lob_storage_count));
} else {
lob_cells[lob_cell_idx].set_lob_value(type, real_data.ptr(), real_data.length());
lob_cells[lob_cell_idx].set_collation_type(cell.get_collation_type());
new_row.get_cell(i) = lob_cells[lob_cell_idx]; // switch lob cell
lob_cell_idx++;
}
}
}
if (OB_FAIL(transform_lob_cell(new_row, lob_storage_count))) {
LOG_WARN("fail to switch lob cell", K(ret));
}
}
if (OB_SUCC(ret)) {
@ -2379,7 +2417,154 @@ bool ObTableQueryIterableResult::reach_batch_size_or_result_size(const int32_t b
return reach_size;
}
////////////////////////////////////////////////////////////////
ObTableQueryDListResult::ObTableQueryDListResult()
: ObTableQueryIterableResultBase(),
cell_list_()
{
}
ObTableQueryDListResult:: ~ObTableQueryDListResult()
{
}
int ObTableQueryDListResult::add_row(const common::ObNewRow &row, ObString family_name) {
int ret = OB_SUCCESS;
// construct new row
ObNewRow new_row = row;
const int64_t lob_storage_count = get_lob_storage_count(new_row);
if (lob_storage_count != 0) {
if (OB_FAIL(transform_lob_cell(new_row, lob_storage_count))) {
LOG_WARN("fail to swtich lob cell", K(ret));
}
}
if (OB_SUCC(ret)) {
ObNewRow *copy_row = nullptr;
if (OB_ISNULL(copy_row = OB_NEWx(ObNewRow, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc ObNewRow buffer", K(ret));
} else if (OB_FAIL(ob_write_row(allocator_, new_row, *copy_row))) {
LOG_WARN("fail to copy row", K(ret), K(new_row));
} else {
ObHTableCellEntity *cell_entity = nullptr;
if (OB_ISNULL(cell_entity = OB_NEWx(ObHTableCellEntity, (&allocator_), copy_row))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc ObHTableCellEntity buffer", K(ret));
} else {
cell_entity->set_family(family_name);
ObCellNode *new_node = nullptr;
if (OB_ISNULL(new_node = OB_NEWx(ObCellNode, (&allocator_), cell_entity))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc ObHTableCellEntity buffer", K(ret));
} else {
if (cell_list_.add_first(new_node)) {
++row_count_;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to add new_node to cell_list", K(ret));
}
}
}
}
}
return ret;
}
int ObTableQueryDListResult::add_all_row(ObTableQueryIterableResultBase &other)
{
int ret = OB_SUCCESS;
ObNewRow row;
while (OB_SUCC(ret) && OB_SUCC(other.get_row(row))) {
ObNewRow *new_row = nullptr;
// need to deep copy row, because other will free its row when it's empty
if (OB_ISNULL(new_row = OB_NEWx(ObNewRow, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for ObNewRow", K(ret));
} else if (OB_FAIL(ob_write_row(allocator_, row, *new_row))) {
LOG_WARN("fail to copy row", K(ret), K(row));
} else {
ObHTableCellEntity *cell_entity = nullptr;
if (OB_ISNULL(cell_entity = OB_NEWx(ObHTableCellEntity, (&allocator_), new_row))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for ObHTableCellEntity", K(ret));
} else {
ObCellNode *node = nullptr;
if (OB_ISNULL(node = OB_NEWx(ObCellNode, (&allocator_), cell_entity))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for ObCellNode", K(ret));
} else if (OB_FAIL(cell_list_.add_first(node))) {
LOG_WARN("fail to add to cell_list", K(ret));
} else {
++row_count_;
}
}
}
}
if (ret == OB_ITER_END || ret == OB_ARRAY_OUT_OF_RANGE) {
ret = OB_SUCCESS;
} else {
LOG_WARN("add_all_row get error", K(ret));
}
return ret;
}
int ObTableQueryDListResult::get_row(ObNewRow &row)
{
int ret = OB_SUCCESS;
ObCellNode *node = cell_list_.remove_last();
if (OB_ISNULL(node)) {
if (0 != row_count_) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("fail to get row", K(ret), K(cell_list_));
} else {
ret = OB_ITER_END;
cell_list_.reset();
allocator_.reset();
}
} else {
--row_count_;
row = *node->get_data()->get_ob_row();
}
return ret;
}
int ObTableQueryDListResult::get_row(ObHTableCellEntity *&row) {
int ret = OB_SUCCESS;
ObCellNode *node = cell_list_.remove_last();
if (OB_ISNULL(node)) {
if (0 != row_count_) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("fail to get row from DListResult", K(ret), K(cell_list_));
} else {
ret = OB_ITER_END;
cell_list_.reset();
allocator_.reset();
}
} else {
--row_count_;
row = node->get_data();
}
return ret;
}
bool ObTableQueryDListResult::reach_batch_size_or_result_size(const int32_t batch_count, const int64_t max_result_size)
{
bool reach_size = false;
if (batch_count > 0 && this->get_row_count() >= batch_count) {
LOG_DEBUG("reach batch limit", K(batch_count));
reach_size = true;
}
return reach_size;
}
void ObTableQueryDListResult::reset()
{
cell_list_.reset();
allocator_.reset();
row_count_ = 0;
}
////////////////////////////////////////////////////////////////
uint64_t ObTableQueryAndMutate::get_checksum()
{

View File

@ -21,6 +21,7 @@
#include "lib/container/ob_se_array.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/list/ob_dlist.h"
#include "lib/list/ob_dlink_node.h"
#include "lib/net/ob_addr.h"
#include "common/ob_common_types.h"
#include "common/ob_range.h"
@ -62,6 +63,7 @@ enum class ObTableEntityType
ET_HKV = 2,
ET_REDIS = 3
};
class ObHTableCellEntity;
class ObTableBitMap {
public:
@ -325,6 +327,7 @@ enum class ObQueryOperationType : int {
QUERY_START = 0,
QUERY_NEXT = 1,
QUERY_END = 2,
QUERY_RENEW = 3,
QUERY_MAX
};
@ -870,6 +873,7 @@ public:
ObKVParamsBase(): param_type_(ParamType::HBase) {}
virtual ~ObKVParamsBase() = default;
OB_INLINE ParamType get_param_type() { return param_type_; }
virtual int32_t get_caching() const = 0;
virtual int serialize(char *buf, const int64_t buf_len, int64_t &pos) const = 0;
virtual int deserialize(const char *buf, const int64_t data_len, int64_t &pos) = 0;
virtual int64_t get_serialize_size() const = 0;
@ -896,7 +900,8 @@ public:
~ObHBaseParams() {};
OB_INLINE ParamType get_param_type() { return param_type_; }
OB_INLINE void set_caching(const int32_t caching ) { caching_ = caching; }
OB_INLINE int32_t get_caching() const { return caching_; }
OB_INLINE void set_caching(const int32_t caching) { caching_ = caching; }
OB_INLINE void set_call_timeout_(const int32_t call_timeout) { call_timeout_ = call_timeout; }
OB_INLINE void set_allow_partial_results(const bool allow_partial_results) { allow_partial_results_ = allow_partial_results; }
OB_INLINE void set_is_cache_block(const bool is_cache_block) { is_cache_block_ = is_cache_block; }
@ -1190,6 +1195,38 @@ inline void ObTableQueryAndMutate::set_entity_factory(ObITableEntityFactory *ent
mutations_.set_entity_factory(entity_factory);
}
class ObTableQueryIterableResultBase
{
public:
ObTableQueryIterableResultBase()
: allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
row_count_(0)
{};
virtual ~ObTableQueryIterableResultBase() {}
virtual int add_row(const common::ObNewRow &row, ObString family_name) { UNUSED(row); UNUSED(family_name); return OB_SUCCESS; }
virtual int add_row(const common::ObIArray<ObObj> &row) { UNUSED(row); return OB_SUCCESS; }
virtual int add_all_row(ObTableQueryIterableResultBase &other) { UNUSED(other); return OB_SUCCESS; };
virtual bool reach_batch_size_or_result_size(const int32_t batch_count, const int64_t max_result_size) { UNUSED(batch_count); UNUSED(max_result_size); return true; }
virtual int get_row(ObNewRow &row) { UNUSED(row); return OB_SUCCESS; }
int64_t get_row_count() const { return row_count_; };
int64_t &get_row_count() { return row_count_; }
protected:
int append_family(ObNewRow &row, ObString family_name);
int transform_lob_cell(ObNewRow &row, const int64_t lob_storage_count);
OB_INLINE int64_t get_lob_storage_count(const common::ObNewRow &row) const
{
int64_t count = 0;
for (int64_t i = 0; i < row.get_count(); ++i) {
if (is_lob_storage(row.get_cell(i).get_type())) {
count++;
}
}
return count;
}
protected:
common::ObArenaAllocator allocator_;
int64_t row_count_;
};
class ObTableQueryResult: public ObTableEntityIterator
{
@ -1210,6 +1247,7 @@ public:
virtual int add_row(const common::ObIArray<ObObj> &row);
int add_all_property(const ObTableQueryResult &other);
int add_all_row(const ObTableQueryResult &other);
int add_all_row(ObTableQueryIterableResultBase &other);
void save_row_count_only(const int row_count) { reset(); row_count_ += row_count; }
int64_t get_row_count() const { return row_count_; }
int64_t get_property_count() const { return properties_names_.count(); }
@ -1248,37 +1286,39 @@ private:
ObTableEntity curr_entity_;
};
class ObTableQueryIterableResult final
class ObTableQueryDListResult: public ObTableQueryIterableResultBase
{
using ObCellNode = common::ObDLinkNode<ObHTableCellEntity*>;
using ObCellDLinkedList = common::ObDList<ObCellNode>;
public:
ObTableQueryDListResult();
~ObTableQueryDListResult();
virtual int add_row(const common::ObNewRow &row, ObString family_name) override;
virtual int add_all_row(ObTableQueryIterableResultBase &other) override;
virtual bool reach_batch_size_or_result_size(const int32_t batch_count, const int64_t max_result_size) override;
virtual int get_row(ObNewRow &row) override;
int get_row(ObHTableCellEntity *&row);
ObCellDLinkedList &get_cell_list() { return cell_list_; }
void reset();
private:
ObCellDLinkedList cell_list_;
};
class ObTableQueryIterableResult: public ObTableQueryIterableResultBase
{
OB_UNIS_VERSION(1);
public:
ObTableQueryIterableResult();
int add_row(const common::ObNewRow &row, ObString family_name);
int add_all_row(ObTableQueryIterableResult &other);
int64_t get_row_count() const { return row_count_; }
int add_row(const common::ObIArray<ObObj> &row);
bool reach_batch_size_or_result_size(const int32_t batch_count, const int64_t max_result_size);
virtual int add_row(const common::ObNewRow &row, ObString family_name) override;
int add_all_row(ObTableQueryDListResult &other);
virtual int add_row(const common::ObIArray<ObObj> &row) override;
virtual bool reach_batch_size_or_result_size(const int32_t batch_count, const int64_t max_result_size) override;
void save_row_count_only(const int row_count) { reset_except_property(); row_count_ += row_count; }
int get_row(ObNewRow &row);
virtual int get_row(ObNewRow &row) override;
void reset_except_property();
private:
int append_family(const ObNewRow &row, ObString family_name);
OB_INLINE int64_t get_lob_storage_count(const common::ObNewRow &row) const
{
int64_t count = 0;
for (int64_t i = 0; i < row.get_count(); ++i) {
if (is_lob_storage(row.get_cell(i).get_type())) {
count++;
}
}
return count;
}
private:
common::ObArenaAllocator allocator_;
int64_t current_ = 0;
int64_t row_count_;
public:
common::ObArray<ObNewRow> rows_;
@ -1671,6 +1711,15 @@ public:
OB_INLINE void set_deserialize_allocator(common::ObIAllocator *allocator) { deserialize_alloc_ = allocator; }
OB_INLINE const ObTableSingleOp &at(int64_t idx) const { return single_ops_.at(idx); }
OB_INLINE ObTableSingleOp &at(int64_t idx) { return single_ops_.at(idx); }
OB_INLINE void operator()(const ObTableTabletOp &other){
reset();
this->tablet_id_ = other.get_tablet_id();
this->option_flag_ = other.get_option_flag();
this->single_ops_ = other.single_ops_;
}
OB_INLINE void reset() {
single_ops_.reset();
}
OB_INLINE const ObTabletID &get_tablet_id() const { return tablet_id_; }
OB_INLINE const uint64_t &get_option_flag() const { return option_flag_; }
OB_INLINE bool is_same_type() const { return is_same_type_; }
@ -1691,7 +1740,8 @@ public:
OB_INLINE void set_is_ls_same_prop_name(bool is_same) { is_ls_same_properties_names_ = is_same; }
OB_INLINE void set_tablet_id(uint64_t tablet_id) { tablet_id_ = ObTabletID(tablet_id); }
OB_INLINE void set_tablet_id(ObTabletID tablet_id) { tablet_id_ = tablet_id; }
OB_INLINE void set_option_flag(uint64_t option_flag) { option_flag_ = option_flag; }
OB_INLINE int add_single_op(ObTableSingleOp single_op) { return single_ops_.push_back(single_op); }