633 lines
22 KiB
C++
633 lines
22 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define private public
|
|
#define protected public
|
|
|
|
#include "storage/column_store/ob_cg_scanner.h"
|
|
#include "ob_index_block_data_prepare.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
using namespace storage;
|
|
using namespace common;
|
|
namespace blocksstable
|
|
{
|
|
class TestCGScanner : public TestIndexBlockDataPrepare
|
|
{
|
|
public:
|
|
TestCGScanner();
|
|
virtual ~TestCGScanner();
|
|
static void SetUpTestCase();
|
|
static void TearDownTestCase();
|
|
|
|
virtual void SetUp();
|
|
virtual void TearDown();
|
|
void prepare_cg_query_param(const bool is_reverse_scan, const ObVersionRange &scan_version, const uint32_t cg_idx);
|
|
void destroy_cg_query_param();
|
|
void check_data(ObCGRowScanner *cg_scanner, int64_t start, int64_t locate_count, sql::ObDatum *datums, const bool is_reverse);
|
|
|
|
void test_border(const bool is_reverse);
|
|
void test_random(const bool is_reverse);
|
|
|
|
|
|
public:
|
|
ObArenaAllocator allocator_;
|
|
ObDatumRow start_row_;
|
|
ObDatumRow end_row_;
|
|
ObTableAccessParam access_param_;
|
|
};
|
|
|
|
TestCGScanner::TestCGScanner()
|
|
: TestIndexBlockDataPrepare("Test cg sstable row scanner", ObMergeType::MAJOR_MERGE)
|
|
{
|
|
is_cg_data_ = true;
|
|
}
|
|
|
|
TestCGScanner::~TestCGScanner()
|
|
{
|
|
}
|
|
|
|
void TestCGScanner::SetUpTestCase()
|
|
{
|
|
TestIndexBlockDataPrepare::SetUpTestCase();
|
|
}
|
|
|
|
void TestCGScanner::TearDownTestCase()
|
|
{
|
|
TestIndexBlockDataPrepare::TearDownTestCase();
|
|
}
|
|
|
|
void TestCGScanner::prepare_cg_query_param(const bool is_reverse_scan, const ObVersionRange &scan_version, const uint32_t cg_idx)
|
|
{
|
|
schema_cols_.set_allocator(&allocator_);
|
|
schema_cols_.init(table_schema_.get_column_count());
|
|
ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(schema_cols_));
|
|
access_param_.iter_param_.table_id_ = table_schema_.get_table_id();
|
|
access_param_.iter_param_.tablet_id_ = table_schema_.get_table_id();
|
|
access_param_.iter_param_.is_same_schema_column_ = true;
|
|
access_param_.iter_param_.has_virtual_columns_ = false;
|
|
access_param_.iter_param_.vectorized_enabled_ = true;
|
|
access_param_.iter_param_.pd_storage_flag_.pd_blockscan_ = true;
|
|
access_param_.iter_param_.pd_storage_flag_.pd_filter_ = true;
|
|
access_param_.iter_param_.pd_storage_flag_.use_stmt_iter_pool_ = true;
|
|
access_param_.iter_param_.pd_storage_flag_.use_column_store_ = true;
|
|
access_param_.iter_param_.read_info_ = &cg_read_info_;
|
|
|
|
//jsut for test
|
|
ObQueryFlag query_flag(ObQueryFlag::Forward,
|
|
true, /*is daily merge scan*/
|
|
true, /*is read multiple macro block*/
|
|
true, /*sys task scan, read one macro block in single io*/
|
|
false /*full row scan flag, obsoleted*/,
|
|
false,/*index back*/
|
|
false); /*query_stat*/
|
|
query_flag.set_not_use_row_cache();
|
|
query_flag.set_use_block_cache();
|
|
if (is_reverse_scan) {
|
|
query_flag.scan_order_ = ObQueryFlag::ScanOrder::Reverse;
|
|
} else {
|
|
query_flag.scan_order_ = ObQueryFlag::ScanOrder::Forward;
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, context_.init(query_flag,
|
|
store_ctx_,
|
|
allocator_,
|
|
allocator_,
|
|
scan_version));
|
|
context_.ls_id_ = ls_id_;
|
|
context_.limit_param_ = nullptr;
|
|
context_.is_inited_ = true;
|
|
}
|
|
|
|
void TestCGScanner::destroy_cg_query_param()
|
|
{
|
|
access_param_.reset();
|
|
schema_cols_.reset();
|
|
context_.reset();
|
|
read_info_.reset();
|
|
}
|
|
|
|
void TestCGScanner::SetUp()
|
|
{
|
|
TestIndexBlockDataPrepare::SetUp();
|
|
ASSERT_EQ(OB_SUCCESS, start_row_.init(allocator_, TEST_COLUMN_CNT));
|
|
ASSERT_EQ(OB_SUCCESS, end_row_.init(allocator_, TEST_COLUMN_CNT));
|
|
ObLSID ls_id(ls_id_);
|
|
ObTabletID tablet_id(tablet_id_);
|
|
ObLSHandle ls_handle;
|
|
ObLSService *ls_svr = MTL(ObLSService*);
|
|
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
|
|
|
|
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle_));
|
|
sstable_.key_.table_type_ = ObITable::TableType::NORMAL_COLUMN_GROUP_SSTABLE;
|
|
}
|
|
|
|
void TestCGScanner::TearDown()
|
|
{
|
|
tablet_handle_.reset();
|
|
TestIndexBlockDataPrepare::TearDown();
|
|
}
|
|
|
|
void TestCGScanner::check_data(
|
|
ObCGRowScanner *cg_scanner,
|
|
int64_t start,
|
|
int64_t locate_count,
|
|
sql::ObDatum *datums,
|
|
const bool is_reverse)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
uint64_t count = 0;
|
|
int64_t get_count = 0;
|
|
int64_t data_count = MIN(row_cnt_ - start, locate_count);
|
|
int64_t end = MIN(row_cnt_ - 1, start + locate_count - 1);
|
|
int64_t sql_batch_size = 256;
|
|
while (OB_SUCC(ret)) {
|
|
if (OB_FAIL(cg_scanner->get_next_rows(count, sql_batch_size))) {
|
|
if (OB_ITER_END != ret) {
|
|
STORAGE_LOG(WARN, "Fail to get next rows", K(ret));
|
|
ASSERT_EQ(OB_ITER_END, ret);
|
|
} else {
|
|
for (int64_t i = 0; i < count; i++) {
|
|
//STORAGE_LOG(INFO, "get next row info", K(get_count), K(i), K(datums[i].get_int()));
|
|
if (!is_reverse) {
|
|
ASSERT_EQ(start + get_count, datums[i].get_int()) << i << " start " << start << " locate_count " << locate_count;
|
|
} else {
|
|
ASSERT_EQ(end - get_count, datums[i].get_int()) << i << " start " << start << " locate_count " << locate_count;
|
|
}
|
|
get_count++;
|
|
}
|
|
}
|
|
} else {
|
|
for (int64_t i = 0; i < count; i++) {
|
|
//STORAGE_LOG(INFO, "get next row info", K(get_count), K(i), K(datums[i].get_int()));
|
|
if (!is_reverse) {
|
|
ASSERT_EQ(start + get_count, datums[i].get_int()) << i << " start " << start << " locate_count " << locate_count;
|
|
} else {
|
|
ASSERT_EQ(end - get_count, datums[i].get_int()) << i << " start " << start << " locate_count " << locate_count;
|
|
}
|
|
get_count++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void TestCGScanner::test_random(const bool is_reverse)
|
|
{
|
|
ObVersionRange scan_version;
|
|
scan_version.base_version_ = 1;
|
|
scan_version.multi_version_start_ = 1;
|
|
scan_version.snapshot_version_ = INT64_MAX;
|
|
// prepare query param
|
|
prepare_cg_query_param(is_reverse, scan_version, 0);
|
|
sql::ObExecContext exec_ctx(allocator_);
|
|
sql::ObEvalCtx eval_ctx(exec_ctx);
|
|
sql::ObPushdownExprSpec expr_spec(allocator_);
|
|
static int64_t array_cnt = 1024;
|
|
eval_ctx.batch_idx_ = 0;
|
|
eval_ctx.batch_size_ = 256;
|
|
expr_spec.max_batch_size_ = 256;
|
|
void *datum_buf = allocator_.alloc((sizeof(sql::ObDatum) + sizeof(int64_t)) * array_cnt * 2);
|
|
sql::ObDatum *datums = new (datum_buf) sql::ObDatum[array_cnt];
|
|
eval_ctx.frames_ = (char **)(&datum_buf);
|
|
sql::ObPushdownOperator op(eval_ctx, expr_spec);
|
|
ObFixedArray<int32_t, ObIAllocator> out_cols_project;
|
|
out_cols_project.set_allocator(&allocator_);
|
|
out_cols_project.init(1);
|
|
out_cols_project.push_back(0);
|
|
sql::ExprFixedArray exprs;
|
|
exprs.set_allocator(&allocator_);
|
|
void *expr_buf = allocator_.alloc(sizeof(sql::ObExpr));
|
|
ASSERT_NE(nullptr, expr_buf);
|
|
sql::ObExpr *expr = reinterpret_cast<sql::ObExpr *>(expr_buf);
|
|
expr->reset();
|
|
expr->res_buf_off_ = sizeof(sql::ObDatum) * array_cnt;
|
|
expr->res_buf_len_ = sizeof(int64_t);
|
|
char *ptr = (char *)datum_buf + expr->res_buf_off_;
|
|
for (int64_t i = 0; i < array_cnt; i++) {
|
|
datums[i].ptr_ = ptr;
|
|
ptr += expr->res_buf_len_;
|
|
}
|
|
|
|
exprs.init(1);
|
|
exprs.push_back(expr);
|
|
access_param_.iter_param_.out_cols_project_ = &out_cols_project;
|
|
access_param_.iter_param_.output_exprs_ = &exprs;
|
|
access_param_.iter_param_.op_ = &op;
|
|
void *buf = allocator_.alloc(sizeof(ObBlockRowStore));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObBlockRowStore *block_row_store = new (buf) ObBlockRowStore(context_);
|
|
ASSERT_EQ(OB_SUCCESS, block_row_store->init(access_param_));
|
|
context_.block_row_store_ = block_row_store;
|
|
|
|
buf = allocator_.alloc(sizeof(ObCGRowScanner));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObCGRowScanner *cg_scanner = new (buf) ObCGRowScanner();
|
|
ObSSTableWrapper wrapper;
|
|
wrapper.sstable_ = &sstable_;
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->init(access_param_.iter_param_, context_, wrapper));
|
|
|
|
int retry_cnt = 15;
|
|
while (retry_cnt > 0) {
|
|
int64_t start = ObRandom::rand(0, row_cnt_ - 1);
|
|
int64_t locate_count = ObRandom::rand(1, row_cnt_);
|
|
|
|
STORAGE_LOG(INFO, "start to locate random range", K(retry_cnt), K(start), K(locate_count), K(row_cnt_));
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->locate(ObCSRange(start, locate_count)));
|
|
check_data(cg_scanner, start, locate_count, datums, is_reverse);
|
|
|
|
retry_cnt--;
|
|
}
|
|
|
|
STORAGE_LOG(INFO, "test random finished");
|
|
|
|
allocator_.free(datum_buf);
|
|
datum_buf = nullptr;
|
|
expr->reset();
|
|
allocator_.free(expr);
|
|
expr = nullptr;
|
|
out_cols_project.reset();
|
|
allocator_.free(block_row_store);
|
|
block_row_store = nullptr;
|
|
cg_scanner->reset();
|
|
allocator_.free(cg_scanner);
|
|
cg_scanner = nullptr;
|
|
destroy_cg_query_param();
|
|
}
|
|
|
|
void TestCGScanner::test_border(const bool is_reverse)
|
|
{
|
|
ObVersionRange scan_version;
|
|
scan_version.base_version_ = 1;
|
|
scan_version.multi_version_start_ = 1;
|
|
scan_version.snapshot_version_ = INT64_MAX;
|
|
// prepare query param
|
|
prepare_cg_query_param(is_reverse, scan_version, 0);
|
|
sql::ObExecContext exec_ctx(allocator_);
|
|
sql::ObEvalCtx eval_ctx(exec_ctx);
|
|
sql::ObPushdownExprSpec expr_spec(allocator_);
|
|
static int64_t array_cnt = 1024;
|
|
eval_ctx.batch_idx_ = 0;
|
|
eval_ctx.batch_size_ = 256;
|
|
expr_spec.max_batch_size_ = 256;
|
|
void *datum_buf = allocator_.alloc((sizeof(sql::ObDatum) + sizeof(int64_t)) * array_cnt * 2);
|
|
sql::ObDatum *datums = new (datum_buf) sql::ObDatum[array_cnt];
|
|
eval_ctx.frames_ = (char **)(&datum_buf);
|
|
sql::ObPushdownOperator op(eval_ctx, expr_spec);
|
|
ObFixedArray<int32_t, ObIAllocator> out_cols_project;
|
|
out_cols_project.set_allocator(&allocator_);
|
|
out_cols_project.init(1);
|
|
out_cols_project.push_back(0);
|
|
sql::ExprFixedArray exprs;
|
|
exprs.set_allocator(&allocator_);
|
|
void *expr_buf = allocator_.alloc(sizeof(sql::ObExpr));
|
|
ASSERT_NE(nullptr, expr_buf);
|
|
sql::ObExpr *expr = reinterpret_cast<sql::ObExpr *>(expr_buf);
|
|
expr->reset();
|
|
expr->res_buf_off_ = sizeof(sql::ObDatum) * array_cnt;
|
|
expr->res_buf_len_ = sizeof(int64_t);
|
|
char *ptr = (char *)datum_buf + expr->res_buf_off_;
|
|
for (int64_t i = 0; i < array_cnt; i++) {
|
|
datums[i].ptr_ = ptr;
|
|
ptr += expr->res_buf_len_;
|
|
}
|
|
|
|
exprs.init(1);
|
|
exprs.push_back(expr);
|
|
access_param_.iter_param_.out_cols_project_ = &out_cols_project;
|
|
access_param_.iter_param_.output_exprs_ = &exprs;
|
|
access_param_.iter_param_.op_ = &op;
|
|
void *buf = allocator_.alloc(sizeof(ObBlockRowStore));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObBlockRowStore *block_row_store = new (buf) ObBlockRowStore(context_);
|
|
ASSERT_EQ(OB_SUCCESS, block_row_store->init(access_param_));
|
|
context_.block_row_store_ = block_row_store;
|
|
|
|
buf = allocator_.alloc(sizeof(ObCGRowScanner));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObCGRowScanner *cg_scanner = new (buf) ObCGRowScanner();
|
|
ObSSTableWrapper wrapper;
|
|
wrapper.sstable_ = &sstable_;
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->init(access_param_.iter_param_, context_, wrapper));
|
|
|
|
int64_t start = 0;
|
|
int64_t locate_count = 0;
|
|
ASSERT_EQ(OB_INVALID_ARGUMENT, cg_scanner->locate(ObCSRange(start, locate_count)));
|
|
|
|
start = row_cnt_;
|
|
locate_count = 1;
|
|
ASSERT_EQ(OB_ITER_END, cg_scanner->locate(ObCSRange(start, locate_count)));
|
|
|
|
start = row_cnt_ - 99;
|
|
locate_count = 100;
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->locate(ObCSRange(start, locate_count)));
|
|
check_data(cg_scanner, start, locate_count, datums, is_reverse);
|
|
|
|
start = 0;
|
|
locate_count = row_cnt_ + 1;
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->locate(ObCSRange(start, locate_count)));
|
|
check_data(cg_scanner, start, locate_count, datums, is_reverse);
|
|
|
|
STORAGE_LOG(INFO, "test border finished");
|
|
|
|
allocator_.free(datum_buf);
|
|
datum_buf = nullptr;
|
|
expr->reset();
|
|
allocator_.free(expr);
|
|
expr = nullptr;
|
|
out_cols_project.reset();
|
|
allocator_.free(block_row_store);
|
|
block_row_store = nullptr;
|
|
cg_scanner->reset();
|
|
allocator_.free(cg_scanner);
|
|
cg_scanner = nullptr;
|
|
destroy_cg_query_param();
|
|
}
|
|
|
|
TEST_F(TestCGScanner, test_large_micro_selected)
|
|
{
|
|
ObVersionRange scan_version;
|
|
scan_version.base_version_ = 1;
|
|
scan_version.multi_version_start_ = 1;
|
|
scan_version.snapshot_version_ = INT64_MAX;
|
|
bool is_reverse_scan = false;
|
|
// prepare query param
|
|
prepare_cg_query_param(is_reverse_scan, scan_version, 0);
|
|
sql::ObExecContext exec_ctx(allocator_);
|
|
sql::ObEvalCtx eval_ctx(exec_ctx);
|
|
sql::ObPushdownExprSpec expr_spec(allocator_);
|
|
static int64_t array_cnt = 1024;
|
|
eval_ctx.batch_idx_ = 0;
|
|
eval_ctx.batch_size_ = 256;
|
|
expr_spec.max_batch_size_ = 256;
|
|
void *datum_buf = allocator_.alloc((sizeof(sql::ObDatum) + sizeof(int64_t)) * array_cnt * 2);
|
|
sql::ObDatum *datums = new (datum_buf) sql::ObDatum[array_cnt];
|
|
eval_ctx.frames_ = (char **)(&datum_buf);
|
|
sql::ObPushdownOperator op(eval_ctx, expr_spec);
|
|
ObFixedArray<int32_t, ObIAllocator> out_cols_project;
|
|
out_cols_project.set_allocator(&allocator_);
|
|
out_cols_project.init(1);
|
|
out_cols_project.push_back(0);
|
|
sql::ExprFixedArray exprs;
|
|
exprs.set_allocator(&allocator_);
|
|
void *expr_buf = allocator_.alloc(sizeof(sql::ObExpr));
|
|
ASSERT_NE(nullptr, expr_buf);
|
|
sql::ObExpr *expr = reinterpret_cast<sql::ObExpr *>(expr_buf);
|
|
expr->reset();
|
|
expr->res_buf_off_ = sizeof(sql::ObDatum) * array_cnt;
|
|
expr->res_buf_len_ = sizeof(int64_t);
|
|
char *ptr = (char *)datum_buf + expr->res_buf_off_;
|
|
for (int64_t i = 0; i < array_cnt; i++) {
|
|
datums[i].ptr_ = ptr;
|
|
ptr += expr->res_buf_len_;
|
|
}
|
|
|
|
exprs.init(1);
|
|
exprs.push_back(expr);
|
|
access_param_.iter_param_.out_cols_project_ = &out_cols_project;
|
|
access_param_.iter_param_.output_exprs_ = &exprs;
|
|
access_param_.iter_param_.op_ = &op;
|
|
void *buf = allocator_.alloc(sizeof(ObBlockRowStore));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObBlockRowStore *block_row_store = new (buf) ObBlockRowStore(context_);
|
|
ASSERT_EQ(OB_SUCCESS, block_row_store->init(access_param_));
|
|
context_.block_row_store_ = block_row_store;
|
|
|
|
buf = allocator_.alloc(sizeof(ObCGRowScanner));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObCGRowScanner *cg_scanner = new (buf) ObCGRowScanner();
|
|
ObSSTableWrapper wrapper;
|
|
wrapper.sstable_ = &sstable_;
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->init(access_param_.iter_param_, context_, wrapper));
|
|
|
|
int64_t start = 0;
|
|
int64_t locate_count = row_cnt_;
|
|
int64_t sql_batch_size = 256;
|
|
int64_t total_cnt = row_cnt_ / 10;
|
|
ObCGBitmap bitmap(allocator_);
|
|
bitmap.init(row_cnt_, false);
|
|
bitmap.reuse(0);
|
|
for (int64_t i = 0; i < total_cnt; i++) {
|
|
bitmap.set(i * 10);
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->locate(ObCSRange(start, locate_count), &bitmap));
|
|
int ret = OB_SUCCESS;
|
|
uint64_t count = 0;
|
|
int64_t get_count = 0;
|
|
while (OB_SUCC(ret)) {
|
|
if (OB_FAIL(cg_scanner->get_next_rows(count, sql_batch_size))) {
|
|
if (OB_ITER_END != ret) {
|
|
STORAGE_LOG(WARN, "Fail to get next rows", K(ret));
|
|
ASSERT_EQ(OB_ITER_END, ret);
|
|
} else {
|
|
for (uint64_t i = 0; i < count; i++) {
|
|
// STORAGE_LOG(INFO, "get next row info", K(i), K(count), K(datums[i].get_int()));
|
|
ASSERT_EQ(get_count * 10, datums[i].get_int());
|
|
get_count++;
|
|
}
|
|
}
|
|
} else {
|
|
for (uint64_t i = 0; i < count; i++) {
|
|
// STORAGE_LOG(INFO, "get next row info", K(i), K(count), K(datums[i].get_int()));
|
|
ASSERT_EQ(get_count * 10, datums[i].get_int());
|
|
get_count++;
|
|
}
|
|
}
|
|
}
|
|
ASSERT_EQ(get_count, total_cnt);
|
|
|
|
STORAGE_LOG(INFO, "test large micro selected finished");
|
|
|
|
allocator_.free(datum_buf);
|
|
datum_buf = nullptr;
|
|
expr->reset();
|
|
allocator_.free(expr);
|
|
expr = nullptr;
|
|
out_cols_project.reset();
|
|
allocator_.free(block_row_store);
|
|
block_row_store = nullptr;
|
|
cg_scanner->reset();
|
|
allocator_.free(cg_scanner);
|
|
cg_scanner = nullptr;
|
|
destroy_cg_query_param();
|
|
}
|
|
|
|
TEST_F(TestCGScanner, test_border)
|
|
{
|
|
test_border(false);
|
|
}
|
|
|
|
TEST_F(TestCGScanner, test_random)
|
|
{
|
|
test_random(false);
|
|
}
|
|
|
|
TEST_F(TestCGScanner, test_border_reverse)
|
|
{
|
|
test_border(true);
|
|
}
|
|
|
|
TEST_F(TestCGScanner, test_random_reverse)
|
|
{
|
|
test_random(true);
|
|
}
|
|
|
|
TEST_F(TestCGScanner, test_filter)
|
|
{
|
|
ObVersionRange scan_version;
|
|
scan_version.base_version_ = 1;
|
|
scan_version.multi_version_start_ = 1;
|
|
scan_version.snapshot_version_ = INT64_MAX;
|
|
// prepare query param
|
|
bool is_reverse = true;
|
|
prepare_cg_query_param(is_reverse, scan_version, 0);
|
|
sql::ObExecContext exec_ctx(allocator_);
|
|
sql::ObEvalCtx eval_ctx(exec_ctx);
|
|
sql::ObPushdownExprSpec expr_spec(allocator_);
|
|
|
|
static int64_t array_cnt = 1024;
|
|
eval_ctx.batch_idx_ = 0;
|
|
eval_ctx.batch_size_ = 256;
|
|
expr_spec.max_batch_size_ = 256;
|
|
void *datum_buf = allocator_.alloc((sizeof(sql::ObDatum) + sizeof(int64_t)) * array_cnt * 2);
|
|
sql::ObDatum *datums = new (datum_buf) sql::ObDatum[array_cnt];
|
|
eval_ctx.frames_ = (char **)(&datum_buf);
|
|
sql::ObPushdownOperator op(eval_ctx, expr_spec);
|
|
ObFixedArray<int32_t, ObIAllocator> out_cols_project;
|
|
out_cols_project.set_allocator(&allocator_);
|
|
out_cols_project.init(1);
|
|
out_cols_project.push_back(0);
|
|
sql::ExprFixedArray exprs;
|
|
exprs.set_allocator(&allocator_);
|
|
void *expr_buf = allocator_.alloc(sizeof(sql::ObExpr));
|
|
ASSERT_NE(nullptr, expr_buf);
|
|
sql::ObExpr *expr = reinterpret_cast<sql::ObExpr *>(expr_buf);
|
|
expr->reset();
|
|
expr->res_buf_off_ = sizeof(sql::ObDatum) * array_cnt;
|
|
expr->res_buf_len_ = sizeof(int64_t);
|
|
char *ptr = (char *)datum_buf + expr->res_buf_off_;
|
|
for (int64_t i = 0; i < array_cnt; i++) {
|
|
datums[i].ptr_ = ptr;
|
|
ptr += expr->res_buf_len_;
|
|
}
|
|
|
|
sql::ObPushdownWhiteFilterNode white_filter(allocator_);
|
|
white_filter.op_type_ = sql::WHITE_OP_EQ;
|
|
sql::ObWhiteFilterExecutor filter(allocator_, white_filter, op);
|
|
filter.cg_col_offsets_.init(1);
|
|
filter.col_params_.init(1);
|
|
const ObColumnParam *col_param = nullptr;
|
|
filter.col_params_.push_back(col_param);
|
|
filter.cg_col_offsets_.push_back(0);
|
|
filter.n_cols_ = 1;
|
|
ObObj filter_obj;
|
|
filter_obj.set_int(10);
|
|
int64_t arg_cnt = 2;
|
|
void *expr_buf1 = allocator_.alloc(sizeof(sql::ObExpr));
|
|
void *expr_buf2 = allocator_.alloc(sizeof(sql::ObExpr*) * arg_cnt);
|
|
void *expr_buf3 = allocator_.alloc(sizeof(sql::ObExpr) * arg_cnt);
|
|
filter.filter_.expr_ = reinterpret_cast<sql::ObExpr *>(expr_buf1);
|
|
filter.filter_.expr_->arg_cnt_ = arg_cnt;
|
|
filter.filter_.expr_->args_ = reinterpret_cast<sql::ObExpr **>(expr_buf2);
|
|
filter.datum_params_.init(1);
|
|
ObDatum arg_datum;
|
|
void *datum_buf2 = allocator_.alloc((sizeof(int64_t)) * 32);
|
|
filter.filter_.expr_->args_[0] = reinterpret_cast<sql::ObExpr *>(expr_buf3);
|
|
filter.filter_.expr_->args_[0]->obj_meta_ = filter_obj.get_meta();
|
|
filter.filter_.expr_->args_[0]->datum_meta_.type_ = filter_obj.get_meta().get_type();
|
|
arg_datum.ptr_ = (char *)datum_buf2;
|
|
arg_datum.from_obj(filter_obj);
|
|
filter.datum_params_.push_back(arg_datum);
|
|
filter.filter_.expr_->args_[1] = reinterpret_cast<sql::ObExpr *>(expr_buf3) + 1;
|
|
filter.filter_.expr_->args_[1]->type_ = T_REF_COLUMN;
|
|
filter.cmp_func_ = get_datum_cmp_func(filter.filter_.expr_->args_[0]->obj_meta_, filter.filter_.expr_->args_[0]->obj_meta_);
|
|
|
|
exprs.init(1);
|
|
exprs.push_back(expr);
|
|
access_param_.iter_param_.out_cols_project_ = &out_cols_project;
|
|
access_param_.iter_param_.output_exprs_ = &exprs;
|
|
access_param_.iter_param_.op_ = &op;
|
|
void *buf = allocator_.alloc(sizeof(ObBlockRowStore));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObBlockRowStore *block_row_store = new (buf) ObBlockRowStore(context_);
|
|
ASSERT_EQ(OB_SUCCESS, block_row_store->init(access_param_));
|
|
context_.block_row_store_ = block_row_store;
|
|
|
|
buf = allocator_.alloc(sizeof(ObCGRowScanner));
|
|
ASSERT_NE(nullptr, buf);
|
|
ObCGRowScanner *cg_scanner = new (buf) ObCGRowScanner();
|
|
ObSSTableWrapper wrapper;
|
|
wrapper.sstable_ = &sstable_;
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->init(access_param_.iter_param_, context_, wrapper));
|
|
|
|
sql::PushdownFilterInfo pd_filter;
|
|
pd_filter.filter_ = &filter;
|
|
pd_filter.is_pd_filter_ = true;
|
|
pd_filter.col_capacity_ = 1;
|
|
pd_filter.batch_size_ = 256;
|
|
pd_filter.allocator_ = &allocator_;
|
|
void *buf3 = allocator_.alloc(sizeof(blocksstable::ObStorageDatum) * 1);
|
|
pd_filter.datum_buf_ = new (buf3) blocksstable::ObStorageDatum[1]();
|
|
buf3 = allocator_.alloc(sizeof(char *) * pd_filter.batch_size_);
|
|
pd_filter.cell_data_ptrs_ = reinterpret_cast<const char **>(buf3);
|
|
buf3 = allocator_.alloc(sizeof(int32_t) * pd_filter.batch_size_);
|
|
pd_filter.row_ids_ = reinterpret_cast<int32_t *>(buf3);
|
|
pd_filter.skip_bit_ = to_bit_vector(allocator_.alloc(ObBitVector::memory_size(256)));
|
|
pd_filter.is_inited_ = true;
|
|
|
|
int64_t start = 5;
|
|
int64_t locate_count = 25;
|
|
ObCGBitmap parent_bitmap(allocator_);
|
|
ObCGBitmap bitmap(allocator_);
|
|
bitmap.init(locate_count, false);
|
|
bitmap.reuse(start);
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->locate(ObCSRange(start, locate_count)));
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->apply_filter(nullptr, pd_filter, locate_count, &parent_bitmap, bitmap));
|
|
|
|
ASSERT_EQ(OB_SUCCESS, cg_scanner->locate(ObCSRange(start, locate_count), &bitmap));
|
|
uint64_t count = 0;
|
|
ASSERT_EQ(OB_ITER_END, cg_scanner->get_next_rows(count, 256));
|
|
ASSERT_EQ(1, count);
|
|
ASSERT_EQ(datums[0].get_int(), 10);
|
|
|
|
STORAGE_LOG(INFO, "test filter finished");
|
|
|
|
allocator_.free(datum_buf);
|
|
datum_buf = nullptr;
|
|
expr->reset();
|
|
allocator_.free(expr);
|
|
expr = nullptr;
|
|
allocator_.free(expr_buf1);
|
|
allocator_.free(expr_buf2);
|
|
allocator_.free(expr_buf3);
|
|
allocator_.free(datum_buf2);
|
|
pd_filter.reset();
|
|
out_cols_project.reset();
|
|
allocator_.free(block_row_store);
|
|
block_row_store = nullptr;
|
|
cg_scanner->reset();
|
|
allocator_.free(cg_scanner);
|
|
cg_scanner = nullptr;
|
|
destroy_cg_query_param();
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
system("rm -f test_cg_scanner.log*");
|
|
OB_LOGGER.set_file_name("test_cg_scanner.log", true, true);
|
|
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|