[fix](ut) repair segcompaction ut (#38165) (#38225)

cherry-pick #38165
This commit is contained in:
zhannngchen
2024-08-09 15:52:18 +08:00
committed by GitHub
parent 4aae68f62f
commit 8a682d43ec
2 changed files with 243 additions and 265 deletions

View File

@ -46,7 +46,6 @@ list(REMOVE_ITEM UT_FILES
${CMAKE_CURRENT_SOURCE_DIR}/olap/rowset/segment_v2/frame_of_reference_page_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/olap/rowset/segment_v2/plain_page_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/olap/rowset/segment_v2/rle_page_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/olap/segcompaction_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/runtime/decimal_value_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/runtime/result_buffer_mgr_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/util/decompress_test.cpp

View File

@ -23,7 +23,6 @@
#include <vector>
#include "common/config.h"
#include "env/env_posix.h"
#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/olap_file.pb.h"
#include "io/fs/local_file_system.h"
@ -47,14 +46,12 @@ namespace doris {
using namespace ErrorCode;
static const uint32_t MAX_PATH_LEN = 1024;
static std::unique_ptr<StorageEngine> l_engine;
static StorageEngine* l_engine;
static const std::string lTestDir = "./data_test/data/segcompaction_test";
class SegCompactionTest : public testing::Test {
public:
SegCompactionTest() : _data_dir(std::make_unique<DataDir>(lTestDir)) {
_data_dir->update_capacity();
}
SegCompactionTest() = default;
void SetUp() {
config::enable_segcompaction = true;
@ -76,18 +73,32 @@ public:
doris::EngineOptions options;
options.store_paths = paths;
Status s = doris::StorageEngine::open(options, &l_engine);
l_engine = new StorageEngine(options);
ExecEnv::GetInstance()->set_storage_engine(l_engine);
Status s = l_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
_data_dir = new DataDir(lTestDir, 1000000000);
static_cast<void>(_data_dir->init());
static_cast<void>(_data_dir->update_capacity());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok());
l_engine->start_bg_threads();
s = l_engine->start_bg_threads();
EXPECT_TRUE(s.ok()) << s.to_string();
}
void TearDown() {
l_engine.reset();
SAFE_DELETE(_data_dir);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(lTestDir).ok());
if (l_engine != nullptr) {
l_engine->stop();
delete l_engine;
l_engine = nullptr;
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
config::enable_segcompaction = false;
}
@ -122,7 +133,8 @@ protected:
}
// (k1 int, k2 varchar(20), k3 int) keys (k1, k2)
void create_tablet_schema(TabletSchemaSPtr tablet_schema, KeysType keystype) {
void create_tablet_schema(TabletSchemaSPtr tablet_schema, KeysType keystype,
int num_value_col = 1) {
TabletSchemaPB tablet_schema_pb;
tablet_schema_pb.set_keys_type(keystype);
tablet_schema_pb.set_num_short_key_columns(2);
@ -152,15 +164,18 @@ protected:
column_2->set_is_nullable(true);
column_2->set_is_bf_column(false);
ColumnPB* column_3 = tablet_schema_pb.add_column();
column_3->set_unique_id(3);
column_3->set_name("v1");
column_3->set_type("INT");
column_3->set_length(4);
column_3->set_is_key(false);
column_3->set_is_nullable(false);
column_3->set_is_bf_column(false);
column_3->set_aggregation("SUM");
for (int i = 1; i <= num_value_col; i++) {
ColumnPB* v_column = tablet_schema_pb.add_column();
v_column->set_unique_id(2 + i);
v_column->set_name(fmt::format("v{}", i));
v_column->set_type("INT");
v_column->set_length(4);
v_column->set_is_key(false);
v_column->set_is_nullable(false);
v_column->set_is_bf_column(false);
v_column->set_default_value(std::to_string(i * 10));
v_column->set_aggregation("SUM");
}
tablet_schema->init_from_pb(tablet_schema_pb);
}
@ -192,16 +207,11 @@ protected:
l_engine->create_tablet(req, &profile);
rowset_writer_context->tablet = l_engine->tablet_manager()->get_tablet(TTabletId tablet_id);
#endif
std::shared_ptr<DataDir> data_dir = std::make_shared<DataDir>(lTestDir);
TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
tablet_meta->_tablet_id = 1;
tablet_meta->set_partition_id(10000);
static_cast<void>(tablet_meta->set_partition_id(10000));
tablet_meta->_schema = tablet_schema;
auto tablet = std::make_shared<Tablet>(*l_engine, tablet_meta, data_dir.get(), "test_str");
char* tmp_str = (char*)malloc(20);
strncpy(tmp_str, "test_tablet_name", 20);
tablet->_full_name = tmp_str;
auto tablet = std::make_shared<Tablet>(*l_engine, tablet_meta, _data_dir, "test_str");
// tablet->key
rowset_writer_context->tablet = tablet;
}
@ -217,7 +227,7 @@ protected:
}
private:
std::unique_ptr<DataDir> _data_dir;
DataDir* _data_dir = nullptr;
};
TEST_F(SegCompactionTest, SegCompactionThenRead) {
@ -239,27 +249,25 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
RowCursor input_row;
input_row.init(tablet_schema);
EXPECT_TRUE(s.ok());
// for segment "i", row "rid"
// k1 := rid*10 + i
// k2 := k1 * 10
// k3 := rid
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -281,7 +289,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
RowsetReaderContext reader_context;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = READER_CUMULATIVE_COMPACTION;
reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION;
reader_context.need_ordered_result = true;
std::vector<uint32_t> return_columns = {0, 1, 2};
reader_context.return_columns = &return_columns;
@ -321,6 +329,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
}
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read);
EXPECT_EQ(num_rows_read, num_segments * rows_per_segment);
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
size_t total_num_rows = 0;
for (const auto& i : segment_num_rows) {
@ -347,10 +356,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
RowCursor input_row;
input_row.init(tablet_schema);
EXPECT_TRUE(s.ok());
// for segment "i", row "rid"
// k1 := rid*10 + i
@ -359,85 +365,90 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
int num_segments = 4;
uint32_t rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 2;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 8;
rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -445,17 +456,18 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
num_segments = 1;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -491,10 +503,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) {
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
RowCursor input_row;
input_row.init(tablet_schema);
EXPECT_TRUE(s.ok());
// for segment "i", row "rid"
// k1 := rid*10 + i
@ -503,85 +512,90 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) {
int num_segments = 1;
uint32_t rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 4096;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
}
num_segments = 1;
rows_per_segment = 6400;
for (int i = 0; i < num_segments; ++i) {
vectorized::Arena arena;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < rows_per_segment; ++rid) {
uint32_t k1 = rid * 100 + i;
uint32_t k2 = i;
uint32_t k3 = rid;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
}
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -615,69 +629,59 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
EXPECT_TRUE(s.ok());
RowCursor input_row;
input_row.init(tablet_schema);
vectorized::Arena arena;
uint32_t k1 = 0;
uint32_t k2 = 0;
uint32_t k3 = 0;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
// segment#0
k1 = k2 = 1;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 4;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 6;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
// segment#1
k1 = k2 = 2;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 4;
k3 = 2;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 6;
k3 = 2;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -685,28 +689,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
// segment#2
k1 = k2 = 3;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 6;
k3 = 3;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 9;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -714,28 +714,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
// segment#3
k1 = k2 = 4;
k3 = 3;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 9;
k3 = 2;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 12;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -743,12 +739,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
// segment#4
k1 = k2 = 25;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -756,12 +752,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
// segment#5
k1 = k2 = 26;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -779,7 +775,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
RowsetReaderContext reader_context;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = READER_CUMULATIVE_COMPACTION;
reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION;
reader_context.need_ordered_result = true;
std::vector<uint32_t> return_columns = {0, 1, 2};
reader_context.return_columns = &return_columns;
@ -849,69 +845,60 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
std::unique_ptr<RowsetWriter> rowset_writer;
s = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer);
EXPECT_EQ(Status::OK(), s);
EXPECT_TRUE(s.ok());
RowCursor input_row;
input_row.init(tablet_schema);
vectorized::Arena arena;
uint32_t k1 = 0;
uint32_t k2 = 0;
uint32_t k3 = 0;
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
// segment#0
k1 = k2 = 1;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 4;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 6;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
// segment#1
k1 = k2 = 2;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 4;
k3 = 2;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 6;
k3 = 2;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -919,28 +906,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
// segment#2
k1 = k2 = 3;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 6;
k3 = 3;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 9;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -948,28 +931,24 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
// segment#3
k1 = k2 = 4;
k3 = 3;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 9;
k3 = 2;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
k1 = k2 = 12;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -977,12 +956,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
// segment#4
k1 = k2 = 25;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -990,12 +969,12 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
// segment#5
k1 = k2 = 26;
k3 = 1;
input_row.set_field_content(0, reinterpret_cast<char*>(&k1), &arena);
input_row.set_field_content(1, reinterpret_cast<char*>(&k2), &arena);
input_row.set_field_content(2, reinterpret_cast<char*>(&k3), &arena);
s = rowset_writer->add_row(input_row);
EXPECT_EQ(Status::OK(), s);
columns[0]->insert_data((const char*)&k1, sizeof(k1));
columns[1]->insert_data((const char*)&k2, sizeof(k2));
columns[2]->insert_data((const char*)&k3, sizeof(k3));
s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
sleep(1);
@ -1013,7 +992,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
RowsetReaderContext reader_context;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = READER_CUMULATIVE_COMPACTION;
reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION;
reader_context.need_ordered_result = true;
std::vector<uint32_t> return_columns = {0, 1, 2};
reader_context.return_columns = &return_columns;