[fix](group_commit)Fix bound checking problem when reading wal block (#31112)
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
#include "wal_reader.h"
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/sync_point.h"
|
||||
#include "gutil/strings/split.h"
|
||||
#include "olap/wal/wal_manager.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
@ -61,11 +62,17 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
|
||||
//convert to dst block
|
||||
vectorized::Block dst_block;
|
||||
int index = 0;
|
||||
auto columns = block->get_columns_with_type_and_name();
|
||||
if (_column_id_count != columns.size() || columns.size() != _tuple_descriptor->slots().size()) {
|
||||
auto output_block_columns = block->get_columns_with_type_and_name();
|
||||
size_t output_block_column_size = output_block_columns.size();
|
||||
TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", &_column_id_count);
|
||||
TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", &output_block_column_size);
|
||||
if (_column_id_count != src_block.columns() ||
|
||||
output_block_column_size != _tuple_descriptor->slots().size()) {
|
||||
return Status::InternalError(
|
||||
"not equal _column_id_count={} vs columns size={} vs tuple_descriptor size={}",
|
||||
std::to_string(_column_id_count), std::to_string(columns.size()),
|
||||
"not equal wal _column_id_count={} vs wal block columns size={}, "
|
||||
"output block columns size={} vs tuple_descriptor size={}",
|
||||
std::to_string(_column_id_count), std::to_string(src_block.columns()),
|
||||
std::to_string(output_block_column_size),
|
||||
std::to_string(_tuple_descriptor->slots().size()));
|
||||
}
|
||||
for (auto slot_desc : _tuple_descriptor->slots()) {
|
||||
@ -78,9 +85,9 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
|
||||
if (column_ptr != nullptr && slot_desc->is_nullable()) {
|
||||
column_ptr = make_nullable(column_ptr);
|
||||
}
|
||||
dst_block.insert(
|
||||
index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type,
|
||||
columns[index].name));
|
||||
dst_block.insert(index, vectorized::ColumnWithTypeAndName(
|
||||
std::move(column_ptr), output_block_columns[index].type,
|
||||
output_block_columns[index].name));
|
||||
index++;
|
||||
}
|
||||
block->swap(dst_block);
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "common/object_pool.h"
|
||||
#include "common/sync_point.h"
|
||||
#include "gen_cpp/Descriptors_types.h"
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "io/fs/local_file_system.h"
|
||||
@ -40,12 +41,16 @@ public:
|
||||
init();
|
||||
_profile = _runtime_state.runtime_profile();
|
||||
_runtime_state.init_mem_trackers();
|
||||
static_cast<void>(_runtime_state.init(unique_id, query_options, query_globals, _env));
|
||||
WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options, _query_globals, _env),
|
||||
"fail to init _runtime_state");
|
||||
}
|
||||
void init();
|
||||
void generate_scanner(std::shared_ptr<VFileScanner>& scanner);
|
||||
|
||||
void TearDown() override {
|
||||
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
|
||||
WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close scan_node")
|
||||
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
|
||||
fmt::format("fail to delete dir={}", _wal_dir));
|
||||
SAFE_STOP(_env->_wal_manager);
|
||||
}
|
||||
|
||||
@ -53,16 +58,16 @@ protected:
|
||||
virtual void SetUp() override {}
|
||||
|
||||
private:
|
||||
void init_desc_table();
|
||||
void _init_desc_table();
|
||||
|
||||
ExecEnv* _env = nullptr;
|
||||
std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
|
||||
int64_t db_id = 1;
|
||||
int64_t tb_id = 2;
|
||||
int64_t txn_id = 789;
|
||||
int64_t version = 0;
|
||||
int64_t backend_id = 1001;
|
||||
std::string label = "test";
|
||||
std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
|
||||
int64_t _db_id = 1;
|
||||
int64_t _tb_id = 2;
|
||||
int64_t _txn_id = 789;
|
||||
int64_t _version = 0;
|
||||
int64_t _backend_id = 1001;
|
||||
std::string _label = "test";
|
||||
|
||||
TupleId _dst_tuple_id = 0;
|
||||
RuntimeState _runtime_state;
|
||||
@ -73,12 +78,18 @@ private:
|
||||
ScannerCounter _counter;
|
||||
std::vector<TExpr> _pre_filter;
|
||||
TPlanNode _tnode;
|
||||
TUniqueId unique_id;
|
||||
TQueryOptions query_options;
|
||||
TQueryGlobals query_globals;
|
||||
TUniqueId _unique_id;
|
||||
TQueryOptions _query_options;
|
||||
TQueryGlobals _query_globals;
|
||||
std::shared_ptr<NewFileScanNode> _scan_node = nullptr;
|
||||
std::vector<TFileRangeDesc> _ranges;
|
||||
TFileRangeDesc _range_desc;
|
||||
TFileScanRange _scan_range;
|
||||
std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
|
||||
std::unique_ptr<TMasterInfo> _master_info = nullptr;
|
||||
};
|
||||
|
||||
void VWalScannerTest::init_desc_table() {
|
||||
void VWalScannerTest::_init_desc_table() {
|
||||
TDescriptorTable t_desc_table;
|
||||
|
||||
// table descriptors
|
||||
@ -118,6 +129,7 @@ void VWalScannerTest::init_desc_table() {
|
||||
slot_desc.nullIndicatorBit = -1;
|
||||
slot_desc.colName = "c1";
|
||||
slot_desc.slotIdx = 1;
|
||||
slot_desc.col_unique_id = 0;
|
||||
slot_desc.isMaterialized = true;
|
||||
|
||||
t_desc_table.slotDescriptors.push_back(slot_desc);
|
||||
@ -145,6 +157,7 @@ void VWalScannerTest::init_desc_table() {
|
||||
slot_desc.nullIndicatorBit = -1;
|
||||
slot_desc.colName = "c2";
|
||||
slot_desc.slotIdx = 2;
|
||||
slot_desc.col_unique_id = 1;
|
||||
slot_desc.isMaterialized = true;
|
||||
|
||||
t_desc_table.slotDescriptors.push_back(slot_desc);
|
||||
@ -172,6 +185,7 @@ void VWalScannerTest::init_desc_table() {
|
||||
slot_desc.nullIndicatorBit = -1;
|
||||
slot_desc.colName = "c3";
|
||||
slot_desc.slotIdx = 3;
|
||||
slot_desc.col_unique_id = 2;
|
||||
slot_desc.isMaterialized = true;
|
||||
|
||||
t_desc_table.slotDescriptors.push_back(slot_desc);
|
||||
@ -196,9 +210,10 @@ void VWalScannerTest::init_desc_table() {
|
||||
|
||||
void VWalScannerTest::init() {
|
||||
config::group_commit_wal_max_disk_limit = "100M";
|
||||
init_desc_table();
|
||||
static_cast<void>(io::global_local_filesystem()->create_directory(
|
||||
wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id)));
|
||||
_init_desc_table();
|
||||
WARN_IF_ERROR(io::global_local_filesystem()->create_directory(
|
||||
_wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id)),
|
||||
"fail to creat directory");
|
||||
|
||||
// Node Id
|
||||
_tnode.node_id = 0;
|
||||
@ -210,73 +225,87 @@ void VWalScannerTest::init() {
|
||||
_tnode.file_scan_node.tuple_id = 0;
|
||||
_tnode.__isset.file_scan_node = true;
|
||||
|
||||
_scan_node = std::make_shared<NewFileScanNode>(&_obj_pool, _tnode, *_desc_tbl);
|
||||
_scan_node->_output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
|
||||
WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init scan_node");
|
||||
WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare scan_node");
|
||||
|
||||
_range_desc.start_offset = 0;
|
||||
_range_desc.size = 1000;
|
||||
_ranges.push_back(_range_desc);
|
||||
_scan_range.ranges = _ranges;
|
||||
_scan_range.__isset.params = true;
|
||||
_scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
|
||||
_kv_cache.reset(new ShardedKVCache(48));
|
||||
|
||||
_runtime_state._wal_id = _txn_id;
|
||||
|
||||
_master_info.reset(new TMasterInfo());
|
||||
_env = ExecEnv::GetInstance();
|
||||
_env->_master_info = new TMasterInfo();
|
||||
_env->_master_info = _master_info.get();
|
||||
_env->_master_info->network_address.hostname = "host name";
|
||||
_env->_master_info->network_address.port = backend_id;
|
||||
_env->_master_info->network_address.port = _backend_id;
|
||||
_env->_master_info->backend_id = 1001;
|
||||
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
|
||||
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
|
||||
std::string base_path;
|
||||
auto st = _env->_wal_manager->_init_wal_dirs_info();
|
||||
st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path);
|
||||
st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label, base_path);
|
||||
std::string src = "./be/test/exec/test_data/wal_scanner/wal";
|
||||
std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" +
|
||||
std::to_string(version) + "_" + std::to_string(backend_id) + "_" +
|
||||
std::to_string(txn_id) + "_" + label;
|
||||
std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id) + "/" +
|
||||
std::to_string(_version) + "_" + std::to_string(_backend_id) + "_" +
|
||||
std::to_string(_txn_id) + "_" + _label;
|
||||
std::filesystem::copy(src, dst);
|
||||
}
|
||||
|
||||
TEST_F(VWalScannerTest, normal) {
|
||||
std::vector<size_t> index_vector;
|
||||
index_vector.emplace_back(0);
|
||||
index_vector.emplace_back(1);
|
||||
index_vector.emplace_back(2);
|
||||
// config::group_commit_replay_wal_dir = wal_dir;
|
||||
NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
|
||||
scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
|
||||
static_cast<void>(scan_node.init(_tnode, &_runtime_state));
|
||||
auto status = scan_node.prepare(&_runtime_state);
|
||||
EXPECT_TRUE(status.ok());
|
||||
|
||||
std::vector<TFileRangeDesc> ranges;
|
||||
TFileRangeDesc range_desc;
|
||||
{
|
||||
range_desc.start_offset = 0;
|
||||
range_desc.size = 1000;
|
||||
}
|
||||
ranges.push_back(range_desc);
|
||||
TFileScanRange scan_range;
|
||||
scan_range.ranges = ranges;
|
||||
scan_range.__isset.params = true;
|
||||
scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
|
||||
std::unique_ptr<ShardedKVCache> _kv_cache;
|
||||
_kv_cache.reset(new ShardedKVCache(48));
|
||||
_runtime_state._wal_id = txn_id;
|
||||
VFileScanner scanner(&_runtime_state, &scan_node, -1, scan_range, _profile, _kv_cache.get());
|
||||
scanner._is_load = false;
|
||||
void VWalScannerTest::generate_scanner(std::shared_ptr<VFileScanner>& scanner) {
|
||||
scanner = std::make_shared<VFileScanner>(&_runtime_state, _scan_node.get(), -1, _scan_range,
|
||||
_profile, _kv_cache.get());
|
||||
scanner->_is_load = false;
|
||||
vectorized::VExprContextSPtrs _conjuncts;
|
||||
std::unordered_map<std::string, ColumnValueRangeType> _colname_to_value_range;
|
||||
std::unordered_map<std::string, int> _colname_to_slot_id;
|
||||
static_cast<void>(scanner.prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id));
|
||||
WARN_IF_ERROR(scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id),
|
||||
"fail to prepare scanner");
|
||||
}
|
||||
|
||||
TEST_F(VWalScannerTest, normal) {
|
||||
std::shared_ptr<VFileScanner> scanner = nullptr;
|
||||
generate_scanner(scanner);
|
||||
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
|
||||
bool eof = false;
|
||||
auto st = scanner.get_block(&_runtime_state, block.get(), &eof);
|
||||
EXPECT_EQ(3, block->rows());
|
||||
auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
|
||||
ASSERT_TRUE(st.ok());
|
||||
EXPECT_EQ(3, block->rows());
|
||||
block->clear();
|
||||
st = scanner.get_block(&_runtime_state, block.get(), &eof);
|
||||
st = scanner->get_block(&_runtime_state, block.get(), &eof);
|
||||
ASSERT_TRUE(st.ok());
|
||||
EXPECT_EQ(0, block->rows());
|
||||
ASSERT_TRUE(eof);
|
||||
static_cast<void>(scanner.close(&_runtime_state));
|
||||
static_cast<void>(scan_node.close(&_runtime_state));
|
||||
WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
|
||||
}
|
||||
|
||||
{
|
||||
std::stringstream ss;
|
||||
scan_node.runtime_profile()->pretty_print(&ss);
|
||||
LOG(INFO) << ss.str();
|
||||
}
|
||||
TEST_F(VWalScannerTest, fail_with_not_equal) {
|
||||
auto sp = SyncPoint::get_instance();
|
||||
Defer defer {[sp] {
|
||||
sp->clear_call_back("WalReader::set_column_id_count");
|
||||
sp->clear_call_back("WalReader::set_out_block_column_size");
|
||||
}};
|
||||
sp->set_call_back("WalReader::set_column_id_count",
|
||||
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2; });
|
||||
sp->set_call_back("WalReader::set_out_block_column_size",
|
||||
[](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2; });
|
||||
sp->enable_processing();
|
||||
|
||||
std::shared_ptr<VFileScanner> scanner = nullptr;
|
||||
generate_scanner(scanner);
|
||||
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
|
||||
bool eof = false;
|
||||
auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
|
||||
ASSERT_FALSE(st.ok());
|
||||
auto msg = st.to_string();
|
||||
auto pos = msg.find("not equal");
|
||||
ASSERT_TRUE(pos != msg.npos);
|
||||
WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
|
||||
Reference in New Issue
Block a user