[LogCache]fix:ignore error ret code from internal kvcache

This commit is contained in:
obdev
2024-07-31 08:48:33 +00:00
committed by ob-robot
parent c49d45e82e
commit c6b44db79b
3 changed files with 54 additions and 34 deletions

View File

@ -456,22 +456,23 @@ LogColdCache::LogColdCache()
: palf_id_(INVALID_PALF_ID), palf_env_impl_(NULL), log_reader_(NULL),
kv_cache_(NULL), logical_block_size_(0), log_cache_stat_(), is_inited_(false) {}
int LogColdCache::init(int64_t palf_id,
int LogColdCache::init(const int64_t palf_id,
IPalfEnvImpl *palf_env_impl,
LogStorage *log_storage) {
int ret = OB_SUCCESS;
if (INVALID_PALF_ID == palf_id || OB_ISNULL(log_storage)) {
if (INVALID_PALF_ID == palf_id || OB_ISNULL(palf_env_impl) || OB_ISNULL(log_storage)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "LogColdCache init failed", K(ret), K(palf_id));
PALF_LOG(WARN, "LogColdCache init failed", K(ret), K(palf_id), KP(palf_env_impl), KP(log_storage));
} else if (OB_FAIL(log_storage->get_logical_block_size(logical_block_size_))) {
PALF_LOG(WARN, "get_logical_block_size failed", K(ret), K(palf_id));
} else {
tenant_id_ = palf_env_impl->get_tenant_id();
palf_id_ = palf_id;
palf_env_impl_ = palf_env_impl;
log_reader_ = log_storage->get_log_reader();
kv_cache_ = &OB_LOG_KV_CACHE.get_instance();
is_inited_ = true;
PALF_LOG(INFO, "LogColdCache init successfully", K(is_inited_), K(palf_id), K(log_storage));
PALF_LOG(INFO, "LogColdCache init successfully", K_(is_inited), K_(tenant_id), K(palf_id), K(log_storage));
}
return ret;
@ -484,6 +485,7 @@ LogColdCache::~LogColdCache()
void LogColdCache::destroy()
{
tenant_id_ = OB_INVALID_TENANT_ID;
palf_id_ = INVALID_PALF_ID;
palf_env_impl_ = NULL;
log_reader_ = NULL;
@ -500,13 +502,13 @@ int LogColdCache::alloc_kv_pair(const int64_t flashback_version,
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "LogColdCache is not inited", K(ret));
} else if (OB_FAIL(kv_cache_->alloc(MTL_ID(), sizeof(LogKVCacheKey),
} else if (OB_FAIL(kv_cache_->alloc(tenant_id_, sizeof(LogKVCacheKey),
sizeof(LogKVCacheValue) + CACHE_LINE_SIZE,
fill_buf.kvpair_, fill_buf.handle_,
fill_buf.inst_handle_))) {
PALF_LOG(WARN, "alloc kvpair failed", K(ret), K(palf_id_));
} else {
fill_buf.kvpair_->key_ = new (fill_buf.kvpair_->key_) LogKVCacheKey(MTL_ID(), palf_id_, aligned_lsn, flashback_version);
fill_buf.kvpair_->key_ = new (fill_buf.kvpair_->key_) LogKVCacheKey(tenant_id_, palf_id_, aligned_lsn, flashback_version);
fill_buf.buf_ = reinterpret_cast<char *>(fill_buf.kvpair_->value_) + sizeof(LogKVCacheValue);
fill_buf.kvpair_->value_ = new (fill_buf.kvpair_->value_) LogKVCacheValue(fill_buf.buf_, CACHE_LINE_SIZE);
@ -523,7 +525,7 @@ int LogColdCache::read(const int64_t flashback_version,
int64_t &out_read_size,
LogIteratorInfo *iterator_info)
{
#define PRINT_INFO K(palf_id_), K(MTL_ID())
#define PRINT_INFO K_(palf_id), K_(tenant_id)
int ret = OB_SUCCESS;
bool enable_fill_cache = false;
@ -532,7 +534,7 @@ int LogColdCache::read(const int64_t flashback_version,
LSN read_lsn = lsn;
int64_t real_read_size = in_read_size;
int64_t disk_out_read_size = 0;
ObTenantStatEstGuard tenant_stat_guard(MTL_ID());
ObTenantStatEstGuard tenant_stat_guard(tenant_id_);
// read process:
// 1. read from kv cache
// 2. deal with miss if miss happens
@ -543,8 +545,11 @@ int LogColdCache::read(const int64_t flashback_version,
// read all logs from kv cache successfully
out_read_size += cache_lines_read_size;
} else if (OB_ENTRY_NOT_EXIST != ret) {
PALF_LOG(WARN, "fail to get cache lines", K(ret), K(lsn), K(flashback_version),
K(in_read_size), K(cache_lines_read_size), PRINT_INFO);
// reading from disk directly if other error codes happen
int original_ret = ret;
if (OB_FAIL(read_from_disk_(lsn, in_read_size, read_buf, out_read_size, iterator_info))) {
PALF_LOG(WARN, "read_from_disk_ failed", K(ret), K(original_ret), K(lsn), K(in_read_size), K(out_read_size));
}
} else if (OB_FAIL(allow_filling_cache_(iterator_info, enable_fill_cache))) {
PALF_LOG(WARN, "allow_filling_cache failed", K(ret), K(enable_fill_cache), PRINT_INFO);
} else if (OB_FAIL(deal_with_miss_(enable_fill_cache, cache_lines_read_size, read_buf.buf_len_, read_lsn,
@ -653,7 +658,7 @@ int LogColdCache::get_cache_lines_(const LSN &lsn,
int64_t &out_read_size,
LogIteratorInfo *iterator_info)
{
#define PRINT_INFO K(palf_id_), K(MTL_ID())
#define PRINT_INFO K_(palf_id), K_(tenant_id)
int ret = OB_SUCCESS;
int64_t read_pos = 0;
@ -682,7 +687,7 @@ int LogColdCache::get_cache_lines_(const LSN &lsn,
iterator_info->inc_cache_read_size(out_read_size);
}
log_cache_stat_.print_stat_info(kv_cache_->store_size(MTL_ID()), palf_id_);
log_cache_stat_.print_stat_info(kv_cache_->store_size(tenant_id_), palf_id_);
#undef PRINT_INFO
return ret;
@ -695,13 +700,13 @@ int LogColdCache::get_cache_line_(const LSN &cache_read_lsn,
char *buf,
int64_t &curr_round_read_size)
{
#define PRINT_INFO K(palf_id_), K(MTL_ID())
#define PRINT_INFO K_(palf_id), K_(tenant_id)
int ret = OB_SUCCESS;
curr_round_read_size = 0;
LSN aligned_lsn = LogCacheUtils::lower_align_with_start(cache_read_lsn, CACHE_LINE_SIZE);
offset_t diff = cache_read_lsn - aligned_lsn;
LogKVCacheKey key(MTL_ID(), palf_id_, aligned_lsn, flashback_version);
LogKVCacheKey key(tenant_id_, palf_id_, aligned_lsn, flashback_version);
LogKVCacheValueHandle val_handle;
int tmp_ret = OB_SUCCESS;
char *cache_log_buf = NULL;
@ -739,7 +744,7 @@ int LogColdCache::fill_cache_lines_(const int64_t flashback_version,
char *buf)
{
int ret = OB_SUCCESS;
#define PRINT_INFO K(palf_id_), K(MTL_ID())
#define PRINT_INFO K_(palf_id), K_(tenant_id)
if (!is_valid_flashback_version(flashback_version) || !lsn.is_valid() || 0 >= fill_size) {
ret = OB_INVALID_ARGUMENT;
@ -781,9 +786,15 @@ int LogColdCache::fill_cache_lines_(const int64_t flashback_version,
}
}
if (OB_SIZE_OVERFLOW == ret || OB_ALLOCATE_MEMORY_FAILED == ret) {
// required logs has been read from disk and cache, so it doesn't matter if filling cache fails
// known error code:
// 1. OB_SIZE_OVERFLOW and OB_ALLOCATE_MEMORY_FAILED: unable to fill cache because of memory limit
// 2. OB_ENTRY_NOT_EXIST: unable fill cache because of failling to acuqire hazard version
if (OB_FAIL(ret)) {
if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
PALF_LOG(WARN, "can't fill logs into kv cache", K(ret), K(flashback_version), K(fill_lsn), PRINT_INFO);
}
ret = OB_SUCCESS;
PALF_LOG(TRACE, "can't fill logs because of memory limit", K(flashback_version), K(fill_lsn), PRINT_INFO);
}
}
@ -799,19 +810,19 @@ int LogColdCache::fill_cache_line_(const int64_t flashback_version,
char *buf)
{
int ret = OB_SUCCESS;
LogKVCacheKey new_key(MTL_ID(), palf_id_, fill_lsn, flashback_version);
LogKVCacheKey new_key(tenant_id_, palf_id_, fill_lsn, flashback_version);
LogKVCacheValue new_value;
if (OB_FAIL(new_value.init(buf + fill_pos, fill_size))) {
PALF_LOG(WARN, "new value init failed", K(ret), K(new_key), K(new_value));
} else if (OB_SUCC(kv_cache_->put_log(new_key, new_value))) {
// fill successfully
PALF_LOG(TRACE, "fill cache successfully", K(fill_lsn), K(fill_pos), K(fill_size), K(palf_id_), K(MTL_ID()));
PALF_LOG(TRACE, "fill cache successfully", K(fill_lsn), K(fill_pos), K(fill_size), K(palf_id_));
} else if (OB_ENTRY_EXIST != ret){
PALF_LOG(WARN, "put log into kv cache failed", K(ret), K(new_key), K(new_value));
} else {
ret = OB_SUCCESS;
log_cache_stat_.inc_cache_fill_amplification(fill_size);
PALF_LOG(TRACE, "LogKVCacheKey has existed", K(new_key), K(fill_pos), K(fill_size), K(palf_id_), K(MTL_ID()));
PALF_LOG(TRACE, "LogKVCacheKey has existed", K(new_key), K(fill_pos), K(fill_size), K(palf_id_));
}
return ret;
}
@ -897,7 +908,7 @@ void LogColdCache::LogCacheStat::print_stat_info(int64_t cache_store_size, int64
"miss_cnt", interval_miss_cnt, "hit_rate",
interval_hit_cnt * 1.0 / total_cnt,
"cache_read_size", interval_cache_read_size, K(cache_store_size),
K(cache_fill_amplification_), K(palf_id), K(MTL_ID()));
K(cache_fill_amplification_), K(palf_id));
last_record_hit_cnt_ = hit_cnt_;
last_record_miss_cnt_ = miss_cnt_;
last_record_cache_read_size_ = cache_read_size_;

View File

@ -186,7 +186,7 @@ class LogColdCache
public:
LogColdCache();
~LogColdCache();
int init(int64_t palf_id,
int init(const int64_t palf_id,
IPalfEnvImpl *palf_env_impl,
LogStorage *log_storage);
void destroy();
@ -210,7 +210,7 @@ public:
LogIteratorInfo *iterator_info);
int fill_cache_line(FillBuf &fill_buf);
int alloc_kv_pair(const int64_t flashback_version, const LSN &aligned_lsn, FillBuf &fill_buf);
TO_STRING_KV(K(is_inited_), K(palf_id_), K(log_cache_stat_));
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(palf_id), K_(log_cache_stat));
private:
int allow_filling_cache_(LogIteratorInfo *iterator_info, bool &enable_fill_cache);
/*
@ -280,6 +280,7 @@ private:
int64_t last_record_cache_read_size_;
};
private:
int64_t tenant_id_;
int64_t palf_id_;
IPalfEnvImpl *palf_env_impl_;
LogReader *log_reader_;

View File

@ -19,6 +19,7 @@
#include "share/ob_tenant_mem_limit_getter.h"
#include "share/rc/ob_tenant_base.h"
#include "logservice/palf/log_reader_utils.h"
#include "logservice/palf/palf_env_impl.h"
namespace oceanbase
{
@ -40,6 +41,16 @@ public:
~TestLogCache();
virtual void SetUp();
virtual void TearDown();
static void SetUpTestCase() {
EXPECT_EQ(OB_SUCCESS, ObKVGlobalCache::get_instance().init(&ObTenantMemLimitGetter::get_instance(),
DEFAULT_BUCKET_NUM,
DEFAULT_MAX_CACHE_SIZE,
lib::ACHUNK_SIZE,
KV_CACHE_WASH_TIMER_INTERVAL_US));
}
static void TearDownTestCase() {
ObKVGlobalCache::get_instance().destroy();
}
};
TestLogCache::TestLogCache()
@ -53,12 +64,7 @@ TestLogCache::~TestLogCache()
void TestLogCache::SetUp()
{
// init cache
ObKVGlobalCache::get_instance().init(&ObTenantMemLimitGetter::get_instance(),
DEFAULT_BUCKET_NUM,
DEFAULT_MAX_CACHE_SIZE,
lib::ACHUNK_SIZE,
KV_CACHE_WASH_TIMER_INTERVAL_US);
OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1);
EXPECT_EQ(OB_SUCCESS, OB_LOG_KV_CACHE.init(OB_LOG_KV_CACHE_NAME, 1));
// init MTL
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001);
@ -74,7 +80,7 @@ void TestLogCache::TearDown()
}
LogStorage log_storage;
IPalfEnvImpl *palf_env_impl;
PalfEnvImpl palf_env_impl;
TEST_F(TestLogCache, test_basic_func)
{
@ -83,13 +89,14 @@ TEST_F(TestLogCache, test_basic_func)
const int64_t flashback_version = 0;
int64_t palf_id = 1;
LogColdCache cold_cache;
cold_cache.init(palf_id, palf_env_impl, &log_storage);
cold_cache.init(palf_id, &palf_env_impl, &log_storage);
cold_cache.tenant_id_ = 1001;
cold_cache.is_inited_ = true;
LSN lsn(0);
int64_t in_read_size = MAX_LOG_BODY_SIZE;
char *buf = reinterpret_cast<char *>(ob_malloc(MAX_LOG_BUFFER_SIZE, "LOG_KV_CACHE"));
LogIteratorInfo iterator_info;
{
int64_t out_read_size = 0;
EXPECT_EQ(OB_ENTRY_NOT_EXIST, cold_cache.get_cache_lines_(lsn, flashback_version, in_read_size, buf, out_read_size, &iterator_info));
ReadBuf read_buf(buf, MAX_LOG_BUFFER_SIZE);
@ -130,7 +137,7 @@ TEST_F(TestLogCache, test_miss)
const int64_t flashback_version = 0;
int64_t palf_id = 1;
LogColdCache cold_cache;
cold_cache.init(palf_id, palf_env_impl, &log_storage);
cold_cache.init(palf_id, &palf_env_impl, &log_storage);
LogIteratorInfo iterator_info;
// test miss when has_read_size != 0
{
@ -196,7 +203,8 @@ TEST_F(TestLogCache, test_flashback)
int64_t flashback_version = 0;
int64_t palf_id = 1;
LogColdCache cold_cache;
cold_cache.init(palf_id, palf_env_impl, &log_storage);
cold_cache.init(palf_id, &palf_env_impl, &log_storage);
cold_cache.tenant_id_ = 1001;
LSN lsn(0);
int64_t in_read_size = MAX_LOG_BODY_SIZE;
char *buf = reinterpret_cast<char *>(ob_malloc(MAX_LOG_BUFFER_SIZE, "LOG_KV_CACHE"));