diff --git a/mittest/logservice/env/ob_simple_log_cluster_testbase.cpp b/mittest/logservice/env/ob_simple_log_cluster_testbase.cpp index 681993a407..8f9138fb2e 100644 --- a/mittest/logservice/env/ob_simple_log_cluster_testbase.cpp +++ b/mittest/logservice/env/ob_simple_log_cluster_testbase.cpp @@ -80,7 +80,7 @@ int ObSimpleLogClusterTestBase::start() } else if (OB_FAIL(member_region_map_.create(OB_MAX_MEMBER_NUMBER, ObMemAttr(MTL_ID(), ObModIds::OB_HASH_NODE, ObCtxIds::DEFAULT_CTX_ID)))) { } else if (OB_FAIL(generate_sorted_server_list_(node_cnt_))) { - } else if (OB_FAIL(init_log_kv_cache_())) { + } else if (OB_FAIL(init_global_kv_cache_())) { } else { // 如果需要新增arb server,将其作为memberlist最后一项 // TODO by runlin, 这个是暂时的解决方法,以后可以走加减成员的流程 @@ -131,8 +131,6 @@ int ObSimpleLogClusterTestBase::close() break; } } - - OB_LOG_KV_CACHE.destroy(); ObKVGlobalCache::get_instance().destroy(); return ret; } @@ -170,7 +168,7 @@ int ObSimpleLogClusterTestBase::generate_sorted_server_list_(const int64_t node_ return ret; } -int ObSimpleLogClusterTestBase::init_log_kv_cache_() +int ObSimpleLogClusterTestBase::init_global_kv_cache_() { int ret = OB_SUCCESS; const int64_t KV_CACHE_WASH_TIMER_INTERVAL_US = 60 * 1000L * 1000L; @@ -180,9 +178,11 @@ int ObSimpleLogClusterTestBase::init_log_kv_cache_() &ObTenantMemLimitGetter::get_instance(), DEFAULT_BUCKET_NUM, DEFAULT_MAX_CACHE_SIZE, lib::ACHUNK_SIZE, KV_CACHE_WASH_TIMER_INTERVAL_US))) { - PALF_LOG(WARN, "ObKVGlobalCache init failed"); - } else if (OB_FAIL(OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1))) { - PALF_LOG(WARN, "OB_LOG_KV_CACHE init failed"); + if (OB_INIT_TWICE == ret) { + ret = OB_SUCCESS; + } else { + PALF_LOG(WARN, "ObKVGlobalCache init failed", KR(ret)); + } } return ret; } diff --git a/mittest/logservice/env/ob_simple_log_cluster_testbase.h b/mittest/logservice/env/ob_simple_log_cluster_testbase.h index e6245e657e..987e0f1ee9 100644 --- a/mittest/logservice/env/ob_simple_log_cluster_testbase.h +++ b/mittest/logservice/env/ob_simple_log_cluster_testbase.h @@ -77,8 +77,7 @@ public: private: static int generate_sorted_server_list_(const int64_t node_cnt); - static int init_log_kv_cache_(); - + static int init_global_kv_cache_(); protected: static void SetUpTestCase(); static void TearDownTestCase(); diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index 2833dc0921..cbda59101f 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -396,6 +396,7 @@ int ObSimpleLogServer::init_log_service_() palf::PalfOptions opts; if (disk_opts_.is_valid()) { opts.disk_options_ = disk_opts_; + opts.enable_log_cache_ = true; } else { opts.disk_options_.log_disk_usage_limit_size_ = 2 * 1024 * 1024 * 1024ul; opts.disk_options_.log_disk_utilization_threshold_ = 80; @@ -405,6 +406,7 @@ int ObSimpleLogServer::init_log_service_() opts.disk_options_.log_writer_parallelism_ = 2; disk_opts_ = opts.disk_options_; inner_table_disk_opts_ = disk_opts_; + opts.enable_log_cache_ = true; } std::string clog_dir = clog_dir_ + "/tenant_1"; allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_); @@ -413,6 +415,7 @@ int ObSimpleLogServer::init_log_service_() net_keepalive_ = MTL_NEW(MockNetKeepAliveAdapter, "SimpleLog"); if (OB_FAIL(net_keepalive_->init(&deliver_))) { + } else if (OB_FAIL(init_log_kv_cache_())) { } else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &batch_rpc_, &ls_service_, &location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_, &mock_locality_manager_))) { SERVER_LOG(ERROR, "init_log_service_ fail", K(ret)); @@ -458,6 +461,10 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false) log_block_pool_.destroy(); guard.click("destroy_palf_env"); + if (OB_LOG_KV_CACHE.inited_) { + OB_LOG_KV_CACHE.destroy(); + } + if (is_shutdown) { TG_STOP(batch_rpc_tg_id_); @@ -496,6 +503,19 @@ int ObSimpleLogServer::simple_restart(const std::string &cluster_name, const int return ret; } +int ObSimpleLogServer::init_log_kv_cache_() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1))) { + if (OB_INIT_TWICE == ret) { + ret = OB_SUCCESS; + } else { + PALF_LOG(WARN, "OB_LOG_KV_CACHE init failed", KR(ret)); + } + } + return ret; +} + int ObMittestBlacklist::init(const common::ObAddr &self) { int ret = OB_SUCCESS; diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index 096b58b91b..f29626d208 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -391,6 +391,8 @@ protected: const int64_t new_log_disk_size, int64_t &allowed_log_disk_size); int update_disk_opts_no_lock_(const PalfDiskOptions &opts); + int init_log_kv_cache_(); + private: int64_t node_id_; diff --git a/mittest/logservice/test_ob_simple_log_cache.cpp b/mittest/logservice/test_ob_simple_log_cache.cpp index 4fc6988a97..5de0dd44e3 100644 --- a/mittest/logservice/test_ob_simple_log_cache.cpp +++ b/mittest/logservice/test_ob_simple_log_cache.cpp @@ -185,6 +185,39 @@ TEST_F(TestObSimpleLogCache, concurrent_read) OB_LOG_KV_CACHE.destroy(); } +TEST_F(TestObSimpleLogCache, raw_read) +{ + disable_hot_cache_ = true; + SET_CASE_LOG_FILE(TEST_NAME, "raw_read"); + OB_LOGGER.set_log_level("TRACE"); + int server_idx = 0; + int64_t leader_idx = 0; + int64_t id = ATOMIC_AAF(&palf_id_, 1); + PALF_LOG(INFO, "start to test log cache", K(id)); + + EXPECT_EQ(OB_SUCCESS, OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1)); + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base()); + std::vector lsn_array; + std::vector scn_array; + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, MAX_LOG_BODY_SIZE, id, lsn_array, scn_array)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + int64_t aligned_offset = common::upper_align(lsn_array[35].val_, LOG_DIO_ALIGN_SIZE); + LSN aligned_lsn(aligned_offset); + int64_t in_read_size = 2 * 1024 * 1024; + int64_t out_read_size = 0; + char *read_buf = reinterpret_cast(mtl_malloc_align( + LOG_DIO_ALIGN_SIZE, in_read_size, "mittest")); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->raw_read(aligned_lsn, read_buf, in_read_size, out_read_size)); + + if (OB_NOT_NULL(read_buf)) { + mtl_free_align(read_buf); + } + OB_LOG_KV_CACHE.destroy(); +} + // enable in 4.4 TEST_F(TestObSimpleLogCache, DISABLED_fill_cache_when_slide) { diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index 568de143bc..7c4b0fea10 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -450,7 +450,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_; SCN block_end_scn; { - PalfGroupBufferIterator iterator; + PalfGroupBufferIterator iterator(palf_id_); auto get_file_end_lsn = [](){ return LSN(PALF_BLOCK_SIZE); }; diff --git a/src/logservice/palf/log_cache.cpp b/src/logservice/palf/log_cache.cpp index 80f91187d8..005fcba4ed 100644 --- a/src/logservice/palf/log_cache.cpp +++ b/src/logservice/palf/log_cache.cpp @@ -547,7 +547,7 @@ int LogColdCache::read(const int64_t flashback_version, K(in_read_size), K(cache_lines_read_size), PRINT_INFO); } else if (OB_FAIL(allow_filling_cache_(iterator_info, enable_fill_cache))) { PALF_LOG(WARN, "allow_filling_cache failed", K(ret), K(enable_fill_cache), PRINT_INFO); - } else if (OB_FAIL(deal_with_miss_(enable_fill_cache, cache_lines_read_size, read_lsn, + } else if (OB_FAIL(deal_with_miss_(enable_fill_cache, cache_lines_read_size, read_buf.buf_len_, read_lsn, real_read_size, cache_out_read_size, iterator_info))) { PALF_LOG(WARN, "fail to deal with miss", K(ret), K(cache_lines_read_size), K(enable_fill_cache), K(read_lsn), K(real_read_size), K(out_read_size), K(cache_out_read_size), PRINT_INFO); @@ -564,7 +564,8 @@ int LogColdCache::read(const int64_t flashback_version, } else if (OB_FAIL(fill_cache_lines_(flashback_version, read_lsn, disk_out_read_size, read_buf.buf_ + cache_out_read_size))) { PALF_LOG(WARN, "fail to fill cache", K(ret), K(read_lsn), K(flashback_version), K(out_read_size), PRINT_INFO); } else if (0 == cache_out_read_size) { - // after cache miss, adjust read_lsn to new_read_lsn to read more log (up to 'diff') for filling first missing cache line + // if read_buf isn't large enough, read_lsn is equal to lsn, which means 'diff' is 0; + // if read_buf is large enough, adjust read_lsn to new_read_lsn to read more log (up to 'diff') for filling first missing cache line // so, adjust buf_ to ignore 'diff' part before return offset_t diff = lsn - read_lsn; out_read_size = out_read_size - diff; @@ -609,6 +610,7 @@ int LogColdCache::allow_filling_cache_(LogIteratorInfo *iterator_info, bool &ena int LogColdCache::deal_with_miss_(const bool enable_fill_cache, const int64_t has_read_size, + const int64_t buf_len, LSN &lsn, int64_t &in_read_size, int64_t &out_read_size, @@ -624,11 +626,18 @@ int LogColdCache::deal_with_miss_(const bool enable_fill_cache, lsn.val_ += real_read_size; in_read_size -= real_read_size; out_read_size = real_read_size; - } else if (enable_fill_cache) { - // adjust lsn to lower_aligned_lsn to fill first miss cache line + } else { + // try to fill the first missing cache line LSN new_read_start_lsn = LogCacheUtils::lower_align_with_start(lsn, CACHE_LINE_SIZE); - in_read_size += (lsn - new_read_start_lsn); - lsn = new_read_start_lsn; + offset_t backoff = lsn - new_read_start_lsn; + + if (buf_len < in_read_size + backoff) { + // buf isn't large enough to fill the first missing cache line + } else if (enable_fill_cache) { + // adjust lsn to lower_aligned_lsn to fill first miss cache line + in_read_size += backoff; + lsn = new_read_start_lsn; + } } iterator_info->inc_miss_cnt(); @@ -702,6 +711,7 @@ int LogColdCache::get_cache_line_(const LSN &cache_read_lsn, ret = OB_ERR_UNEXPECTED; PALF_LOG(WARN, "get null buf from log kv cache", K(ret), K(key)); } else { + // buf_size is either CACHE_LINE_SIZE or LAST_CACHE_LINE_SIZE int64_t buf_size = val_handle.value_->get_buf_size(); if (0 == diff) { // start with aligned_lsn @@ -715,7 +725,7 @@ int LogColdCache::get_cache_line_(const LSN &cache_read_lsn, MEMCPY(buf + read_pos, cache_log_buf + diff, curr_round_read_size); - PALF_LOG(TRACE, "cache hit, read from log cold cache", K(key), K(read_pos), K(cache_read_lsn), + PALF_LOG(TRACE, "cache hit, read from log cold cache", K(key), K(read_pos), K(cache_read_lsn), K(flashback_version), K(aligned_lsn), K(in_read_size), K(curr_round_read_size), PRINT_INFO); } @@ -736,7 +746,9 @@ int LogColdCache::fill_cache_lines_(const int64_t flashback_version, PALF_LOG(WARN, "invalid argument", K(ret), K(lsn), K(fill_size), K(flashback_version)); } else { // lsn is always expected to be aligned to CACHE_LINE_SIZE - // if not, it means lsn is aligned to 4KB in deal_with_miss() for DIO. This part is already in kvcache, so need to ignore + // In the following situations, it's not align to CACHE_LINE_SIZE: + // 1. lsn is aligned to 4KB in deal_with_miss_() for DIO. This part is already in kvcache, so need to ignore + // 2. the buf isn't large enough to fill first missing cache line. Filling log cache from the second cache line LSN fill_lsn = LogCacheUtils::is_lower_align_with_start(lsn, CACHE_LINE_SIZE) ? lsn : LogCacheUtils::upper_align_with_start(lsn, CACHE_LINE_SIZE); int64_t diff = fill_lsn - lsn; @@ -883,7 +895,7 @@ void LogColdCache::LogCacheStat::print_stat_info(int64_t cache_store_size, int64 int64_t total_cnt = (interval_hit_cnt + interval_miss_cnt == 0) ? 1 : interval_hit_cnt + interval_miss_cnt; PALF_LOG(INFO, "[PALF STAT LOG COLD CACHE HIT RATE]", "hit_cnt", interval_hit_cnt, "miss_cnt", interval_miss_cnt, "hit_rate", - interval_hit_cnt * 1.0 / (interval_hit_cnt + interval_miss_cnt), + interval_hit_cnt * 1.0 / total_cnt, "cache_read_size", interval_cache_read_size, K(cache_store_size), K(cache_fill_amplification_), K(palf_id), K(MTL_ID())); last_record_hit_cnt_ = hit_cnt_; diff --git a/src/logservice/palf/log_cache.h b/src/logservice/palf/log_cache.h index 9de060ad70..3195c8a6e9 100644 --- a/src/logservice/palf/log_cache.h +++ b/src/logservice/palf/log_cache.h @@ -222,6 +222,7 @@ private: */ int deal_with_miss_(const bool enable_fill_cache, const int64_t has_read_size, + const int64_t buf_len, LSN &lsn, int64_t &in_read_size, int64_t &out_read_size, diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index e5d5749cff..67a6f55e05 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -278,6 +278,7 @@ int PalfEnvImpl::init( tenant_id_ = tenant_id; is_inited_ = true; is_running_ = true; + enable_log_cache_ = options.enable_log_cache_; PALF_LOG(INFO, "PalfEnvImpl init success", K(ret), K(self_), KPC(this)); } if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index b9c3cbd356..e18d921bec 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -865,6 +865,7 @@ int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantMo ret = is_virtual_tenant_id(id_) ? OB_SUCCESS : OB_ENTRY_NOT_EXIST; } else { mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = tenant_config->_log_writer_parallelism; + mtl_init_ctx_->palf_options_.enable_log_cache_ = tenant_config->_enable_log_cache; } LOG_INFO("construct_mtl_init_ctx success", "palf_options", mtl_init_ctx_->palf_options_.disk_options_); } diff --git a/unittest/logservice/test_log_cache.cpp b/unittest/logservice/test_log_cache.cpp index d940626fd0..686b94e167 100644 --- a/unittest/logservice/test_log_cache.cpp +++ b/unittest/logservice/test_log_cache.cpp @@ -140,7 +140,7 @@ TEST_F(TestLogCache, test_miss) int64_t out_read_size = 0; int64_t has_read_size = 5000; int64_t real_read_size = lower_align(has_read_size, LOG_DIO_ALIGN_SIZE); - EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size,lsn, in_read_size , out_read_size, &iterator_info)); + EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, in_read_size + CACHE_LINE_SIZE, lsn, in_read_size, out_read_size, &iterator_info)); EXPECT_EQ(old_lsn.val_ + real_read_size, lsn.val_); EXPECT_EQ(MAX_LOG_BODY_SIZE - real_read_size, in_read_size); EXPECT_EQ(real_read_size, out_read_size); @@ -154,10 +154,38 @@ TEST_F(TestLogCache, test_miss) int64_t in_read_size = MAX_LOG_BODY_SIZE; int64_t has_read_size = 0; int64_t out_read_size = 0; - EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, lsn, in_read_size , out_read_size, &iterator_info)); + EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, in_read_size + CACHE_LINE_SIZE, lsn, in_read_size, out_read_size, &iterator_info)); EXPECT_EQ(PALF_BLOCK_SIZE, lsn.val_); EXPECT_EQ(MAX_LOG_BODY_SIZE + (old_lsn.val_ - lsn.val_), in_read_size); } + + // test miss for small buf + { + iterator_info.reset(); + LSN lsn(PALF_INITIAL_LSN_VAL); + int64_t in_read_size = MAX_LOG_BODY_SIZE; + int64_t has_read_size = 0; + int64_t out_read_size = 0; + int64_t buf_len = in_read_size; + EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, buf_len, lsn, in_read_size, out_read_size, &iterator_info)); + // shouldn't read more log because of small buf size + EXPECT_EQ(PALF_INITIAL_LSN_VAL, lsn.val_); + EXPECT_EQ(MAX_LOG_BODY_SIZE, in_read_size); + } + + // test miss for last cache line in log block + { + iterator_info.reset(); + // read lsn is in the last cache line of the second log block + LSN lsn(PALF_BLOCK_SIZE * 2 - 32 * 1024); + int64_t in_read_size = 32 * 1024; + int64_t has_read_size = 0; + int64_t out_read_size = 0; + int64_t buf_len = in_read_size + CACHE_LINE_SIZE; + EXPECT_EQ(OB_SUCCESS, cold_cache.deal_with_miss_(true, has_read_size, buf_len, lsn, in_read_size, out_read_size, &iterator_info)); + EXPECT_EQ(PALF_BLOCK_SIZE * 2 - 60 * 1024, lsn); + EXPECT_EQ(LAST_CACHE_LINE_SIZE, in_read_size); + } } TEST_F(TestLogCache, test_flashback)