fix: support raw_read() for log cache

This commit is contained in:
obdev 2024-05-24 04:39:21 +00:00 committed by ob-robot
parent c1584fc288
commit c10a663da7
11 changed files with 118 additions and 21 deletions

View File

@ -80,7 +80,7 @@ int ObSimpleLogClusterTestBase::start()
} else if (OB_FAIL(member_region_map_.create(OB_MAX_MEMBER_NUMBER,
ObMemAttr(MTL_ID(), ObModIds::OB_HASH_NODE, ObCtxIds::DEFAULT_CTX_ID)))) {
} else if (OB_FAIL(generate_sorted_server_list_(node_cnt_))) {
} else if (OB_FAIL(init_log_kv_cache_())) {
} else if (OB_FAIL(init_global_kv_cache_())) {
} else {
// 如果需要新增arb server,将其作为memberlist最后一项
// TODO by runlin, 这个是暂时的解决方法,以后可以走加减成员的流程
@ -131,8 +131,6 @@ int ObSimpleLogClusterTestBase::close()
break;
}
}
OB_LOG_KV_CACHE.destroy();
ObKVGlobalCache::get_instance().destroy();
return ret;
}
@ -170,7 +168,7 @@ int ObSimpleLogClusterTestBase::generate_sorted_server_list_(const int64_t node_
return ret;
}
int ObSimpleLogClusterTestBase::init_log_kv_cache_()
int ObSimpleLogClusterTestBase::init_global_kv_cache_()
{
int ret = OB_SUCCESS;
const int64_t KV_CACHE_WASH_TIMER_INTERVAL_US = 60 * 1000L * 1000L;
@ -180,9 +178,11 @@ int ObSimpleLogClusterTestBase::init_log_kv_cache_()
&ObTenantMemLimitGetter::get_instance(), DEFAULT_BUCKET_NUM,
DEFAULT_MAX_CACHE_SIZE, lib::ACHUNK_SIZE,
KV_CACHE_WASH_TIMER_INTERVAL_US))) {
PALF_LOG(WARN, "ObKVGlobalCache init failed");
} else if (OB_FAIL(OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1))) {
PALF_LOG(WARN, "OB_LOG_KV_CACHE init failed");
if (OB_INIT_TWICE == ret) {
ret = OB_SUCCESS;
} else {
PALF_LOG(WARN, "ObKVGlobalCache init failed", KR(ret));
}
}
return ret;
}

View File

@ -77,8 +77,7 @@ public:
private:
static int generate_sorted_server_list_(const int64_t node_cnt);
static int init_log_kv_cache_();
static int init_global_kv_cache_();
protected:
static void SetUpTestCase();
static void TearDownTestCase();

View File

@ -396,6 +396,7 @@ int ObSimpleLogServer::init_log_service_()
palf::PalfOptions opts;
if (disk_opts_.is_valid()) {
opts.disk_options_ = disk_opts_;
opts.enable_log_cache_ = true;
} else {
opts.disk_options_.log_disk_usage_limit_size_ = 2 * 1024 * 1024 * 1024ul;
opts.disk_options_.log_disk_utilization_threshold_ = 80;
@ -405,6 +406,7 @@ int ObSimpleLogServer::init_log_service_()
opts.disk_options_.log_writer_parallelism_ = 2;
disk_opts_ = opts.disk_options_;
inner_table_disk_opts_ = disk_opts_;
opts.enable_log_cache_ = true;
}
std::string clog_dir = clog_dir_ + "/tenant_1";
allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_);
@ -413,6 +415,7 @@ int ObSimpleLogServer::init_log_service_()
net_keepalive_ = MTL_NEW(MockNetKeepAliveAdapter, "SimpleLog");
if (OB_FAIL(net_keepalive_->init(&deliver_))) {
} else if (OB_FAIL(init_log_kv_cache_())) {
} else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &batch_rpc_, &ls_service_,
&location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_, &mock_locality_manager_))) {
SERVER_LOG(ERROR, "init_log_service_ fail", K(ret));
@ -458,6 +461,10 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false)
log_block_pool_.destroy();
guard.click("destroy_palf_env");
if (OB_LOG_KV_CACHE.inited_) {
OB_LOG_KV_CACHE.destroy();
}
if (is_shutdown) {
TG_STOP(batch_rpc_tg_id_);
@ -496,6 +503,19 @@ int ObSimpleLogServer::simple_restart(const std::string &cluster_name, const int
return ret;
}
int ObSimpleLogServer::init_log_kv_cache_()
{
int ret = OB_SUCCESS;
if (OB_FAIL(OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1))) {
if (OB_INIT_TWICE == ret) {
ret = OB_SUCCESS;
} else {
PALF_LOG(WARN, "OB_LOG_KV_CACHE init failed", KR(ret));
}
}
return ret;
}
int ObMittestBlacklist::init(const common::ObAddr &self)
{
int ret = OB_SUCCESS;

View File

@ -391,6 +391,8 @@ protected:
const int64_t new_log_disk_size,
int64_t &allowed_log_disk_size);
int update_disk_opts_no_lock_(const PalfDiskOptions &opts);
int init_log_kv_cache_();
private:
int64_t node_id_;

View File

@ -185,6 +185,39 @@ TEST_F(TestObSimpleLogCache, concurrent_read)
OB_LOG_KV_CACHE.destroy();
}
TEST_F(TestObSimpleLogCache, raw_read)
{
disable_hot_cache_ = true;
SET_CASE_LOG_FILE(TEST_NAME, "raw_read");
OB_LOGGER.set_log_level("TRACE");
int server_idx = 0;
int64_t leader_idx = 0;
int64_t id = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "start to test log cache", K(id));
EXPECT_EQ(OB_SUCCESS, OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1));
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
std::vector<LSN> lsn_array;
std::vector<SCN> scn_array;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, MAX_LOG_BODY_SIZE, id, lsn_array, scn_array));
const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
int64_t aligned_offset = common::upper_align(lsn_array[35].val_, LOG_DIO_ALIGN_SIZE);
LSN aligned_lsn(aligned_offset);
int64_t in_read_size = 2 * 1024 * 1024;
int64_t out_read_size = 0;
char *read_buf = reinterpret_cast<char*>(mtl_malloc_align(
LOG_DIO_ALIGN_SIZE, in_read_size, "mittest"));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->raw_read(aligned_lsn, read_buf, in_read_size, out_read_size));
if (OB_NOT_NULL(read_buf)) {
mtl_free_align(read_buf);
}
OB_LOG_KV_CACHE.destroy();
}
// enable in 4.4
TEST_F(TestObSimpleLogCache, DISABLED_fill_cache_when_slide)
{

View File

@ -450,7 +450,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_;
SCN block_end_scn;
{
PalfGroupBufferIterator iterator;
PalfGroupBufferIterator iterator(palf_id_);
auto get_file_end_lsn = [](){
return LSN(PALF_BLOCK_SIZE);
};

View File

@ -547,7 +547,7 @@ int LogColdCache::read(const int64_t flashback_version,
K(in_read_size), K(cache_lines_read_size), PRINT_INFO);
} else if (OB_FAIL(allow_filling_cache_(iterator_info, enable_fill_cache))) {
PALF_LOG(WARN, "allow_filling_cache failed", K(ret), K(enable_fill_cache), PRINT_INFO);
} else if (OB_FAIL(deal_with_miss_(enable_fill_cache, cache_lines_read_size, read_lsn,
} else if (OB_FAIL(deal_with_miss_(enable_fill_cache, cache_lines_read_size, read_buf.buf_len_, read_lsn,
real_read_size, cache_out_read_size, iterator_info))) {
PALF_LOG(WARN, "fail to deal with miss", K(ret), K(cache_lines_read_size), K(enable_fill_cache),
K(read_lsn), K(real_read_size), K(out_read_size), K(cache_out_read_size), PRINT_INFO);
@ -564,7 +564,8 @@ int LogColdCache::read(const int64_t flashback_version,
} else if (OB_FAIL(fill_cache_lines_(flashback_version, read_lsn, disk_out_read_size, read_buf.buf_ + cache_out_read_size))) {
PALF_LOG(WARN, "fail to fill cache", K(ret), K(read_lsn), K(flashback_version), K(out_read_size), PRINT_INFO);
} else if (0 == cache_out_read_size) {
// after cache miss, adjust read_lsn to new_read_lsn to read more log (up to 'diff') for filling first missing cache line
// if read_buf isn't large enough, read_lsn is equal to lsn, which means 'diff' is 0;
// if read_buf is large enough, adjust read_lsn to new_read_lsn to read more log (up to 'diff') for filling first missing cache line
// so, adjust buf_ to ignore 'diff' part before return
offset_t diff = lsn - read_lsn;
out_read_size = out_read_size - diff;
@ -609,6 +610,7 @@ int LogColdCache::allow_filling_cache_(LogIteratorInfo *iterator_info, bool &ena
int LogColdCache::deal_with_miss_(const bool enable_fill_cache,
const int64_t has_read_size,
const int64_t buf_len,
LSN &lsn,
int64_t &in_read_size,
int64_t &out_read_size,
@ -624,11 +626,18 @@ int LogColdCache::deal_with_miss_(const bool enable_fill_cache,
lsn.val_ += real_read_size;
in_read_size -= real_read_size;
out_read_size = real_read_size;
} else if (enable_fill_cache) {
// adjust lsn to lower_aligned_lsn to fill first miss cache line
} else {
// try to fill the first missing cache line
LSN new_read_start_lsn = LogCacheUtils::lower_align_with_start(lsn, CACHE_LINE_SIZE);
in_read_size += (lsn - new_read_start_lsn);
lsn = new_read_start_lsn;
offset_t backoff = lsn - new_read_start_lsn;
if (buf_len < in_read_size + backoff) {
// buf isn't large enough to fill the first missing cache line
} else if (enable_fill_cache) {
// adjust lsn to lower_aligned_lsn to fill first miss cache line
in_read_size += backoff;
lsn = new_read_start_lsn;
}
}
iterator_info->inc_miss_cnt();
@ -702,6 +711,7 @@ int LogColdCache::get_cache_line_(const LSN &cache_read_lsn,
ret = OB_ERR_UNEXPECTED;
PALF_LOG(WARN, "get null buf from log kv cache", K(ret), K(key));
} else {
// buf_size is either CACHE_LINE_SIZE or LAST_CACHE_LINE_SIZE
int64_t buf_size = val_handle.value_->get_buf_size();
if (0 == diff) {
// start with aligned_lsn
@ -715,7 +725,7 @@ int LogColdCache::get_cache_line_(const LSN &cache_read_lsn,
MEMCPY(buf + read_pos, cache_log_buf + diff, curr_round_read_size);
PALF_LOG(TRACE, "cache hit, read from log cold cache", K(key), K(read_pos), K(cache_read_lsn),
PALF_LOG(TRACE, "cache hit, read from log cold cache", K(key), K(read_pos), K(cache_read_lsn), K(flashback_version),
K(aligned_lsn), K(in_read_size), K(curr_round_read_size), PRINT_INFO);
}
@ -736,7 +746,9 @@ int LogColdCache::fill_cache_lines_(const int64_t flashback_version,
PALF_LOG(WARN, "invalid argument", K(ret), K(lsn), K(fill_size), K(flashback_version));
} else {
// lsn is always expected to be aligned to CACHE_LINE_SIZE
// if not, it means lsn is aligned to 4KB in deal_with_miss() for DIO. This part is already in kvcache, so need to ignore
// In the following situations, it's not align to CACHE_LINE_SIZE:
// 1. lsn is aligned to 4KB in deal_with_miss_() for DIO. This part is already in kvcache, so need to ignore
// 2. the buf isn't large enough to fill first missing cache line. Filling log cache from the second cache line
LSN fill_lsn = LogCacheUtils::is_lower_align_with_start(lsn, CACHE_LINE_SIZE) ?
lsn : LogCacheUtils::upper_align_with_start(lsn, CACHE_LINE_SIZE);
int64_t diff = fill_lsn - lsn;
@ -883,7 +895,7 @@ void LogColdCache::LogCacheStat::print_stat_info(int64_t cache_store_size, int64
int64_t total_cnt = (interval_hit_cnt + interval_miss_cnt == 0) ? 1 : interval_hit_cnt + interval_miss_cnt;
PALF_LOG(INFO, "[PALF STAT LOG COLD CACHE HIT RATE]", "hit_cnt", interval_hit_cnt,
"miss_cnt", interval_miss_cnt, "hit_rate",
interval_hit_cnt * 1.0 / (interval_hit_cnt + interval_miss_cnt),
interval_hit_cnt * 1.0 / total_cnt,
"cache_read_size", interval_cache_read_size, K(cache_store_size),
K(cache_fill_amplification_), K(palf_id), K(MTL_ID()));
last_record_hit_cnt_ = hit_cnt_;

View File

@ -222,6 +222,7 @@ private:
*/
int deal_with_miss_(const bool enable_fill_cache,
const int64_t has_read_size,
const int64_t buf_len,
LSN &lsn,
int64_t &in_read_size,
int64_t &out_read_size,

View File

@ -278,6 +278,7 @@ int PalfEnvImpl::init(
tenant_id_ = tenant_id;
is_inited_ = true;
is_running_ = true;
enable_log_cache_ = options.enable_log_cache_;
PALF_LOG(INFO, "PalfEnvImpl init success", K(ret), K(self_), KPC(this));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {

View File

@ -865,6 +865,7 @@ int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantMo
ret = is_virtual_tenant_id(id_) ? OB_SUCCESS : OB_ENTRY_NOT_EXIST;
} else {
mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = tenant_config->_log_writer_parallelism;
mtl_init_ctx_->palf_options_.enable_log_cache_ = tenant_config->_enable_log_cache;
}
LOG_INFO("construct_mtl_init_ctx success", "palf_options", mtl_init_ctx_->palf_options_.disk_options_);
}

View File

@ -140,7 +140,7 @@ TEST_F(TestLogCache, test_miss)
int64_t out_read_size = 0;
int64_t has_read_size = 5000;
int64_t real_read_size = lower_align(has_read_size, LOG_DIO_ALIGN_SIZE);
EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size,lsn, in_read_size , out_read_size, &iterator_info));
EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, in_read_size + CACHE_LINE_SIZE, lsn, in_read_size, out_read_size, &iterator_info));
EXPECT_EQ(old_lsn.val_ + real_read_size, lsn.val_);
EXPECT_EQ(MAX_LOG_BODY_SIZE - real_read_size, in_read_size);
EXPECT_EQ(real_read_size, out_read_size);
@ -154,10 +154,38 @@ TEST_F(TestLogCache, test_miss)
int64_t in_read_size = MAX_LOG_BODY_SIZE;
int64_t has_read_size = 0;
int64_t out_read_size = 0;
EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, lsn, in_read_size , out_read_size, &iterator_info));
EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, in_read_size + CACHE_LINE_SIZE, lsn, in_read_size, out_read_size, &iterator_info));
EXPECT_EQ(PALF_BLOCK_SIZE, lsn.val_);
EXPECT_EQ(MAX_LOG_BODY_SIZE + (old_lsn.val_ - lsn.val_), in_read_size);
}
// test miss for small buf
{
iterator_info.reset();
LSN lsn(PALF_INITIAL_LSN_VAL);
int64_t in_read_size = MAX_LOG_BODY_SIZE;
int64_t has_read_size = 0;
int64_t out_read_size = 0;
int64_t buf_len = in_read_size;
EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, buf_len, lsn, in_read_size, out_read_size, &iterator_info));
// shouldn't read more log because of small buf size
EXPECT_EQ(PALF_INITIAL_LSN_VAL, lsn.val_);
EXPECT_EQ(MAX_LOG_BODY_SIZE, in_read_size);
}
// test miss for last cache line in log block
{
iterator_info.reset();
// read lsn is in the last cache line of the second log block
LSN lsn(PALF_BLOCK_SIZE * 2 - 32 * 1024);
int64_t in_read_size = 32 * 1024;
int64_t has_read_size = 0;
int64_t out_read_size = 0;
int64_t buf_len = in_read_size + CACHE_LINE_SIZE;
EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, buf_len, lsn, in_read_size, out_read_size, &iterator_info));
EXPECT_EQ(PALF_BLOCK_SIZE * 2 - 60 * 1024, lsn);
EXPECT_EQ(LAST_CACHE_LINE_SIZE, in_read_size);
}
}
TEST_F(TestLogCache, test_flashback)