[OBCDC] lob storage add stat info

This commit is contained in:
shadowao
2023-09-18 11:47:40 +00:00
committed by ob-robot
parent e80c248fb5
commit 49700a7ec3
8 changed files with 296 additions and 18 deletions

View File

@ -14,6 +14,8 @@
#include "ob_cdc_lob_aux_meta_storager.h"
#include "ob_log_store_service.h" // IObStoreService
#include "ob_log_utils.h" // get_timestamp
#include "ob_log_config.h" // ObLogConfig
using namespace oceanbase::common;
@ -86,13 +88,36 @@ void ObCDCLobAuxDataCleanTask::reset()
tenant_id_ = OB_INVALID_TENANT_ID;
}
double ObCDCLobAuxMetaDataStatInfo::calc_rps(const int64_t delta_time, const int64_t req_cnt, const int64_t last_req_cnt)
{
double rps = 0.0;
const int64_t delta_req_count = req_cnt - last_req_cnt;
if (delta_time > 0) {
rps = (double)(delta_req_count) * 1000000.0 / (double)delta_time;
}
return rps;
}
double ObCDCLobAuxMetaDataStatInfo::calc_rate(const int64_t delta_time, const int64_t total_size, const int64_t last_total_size)
{
double rate = 0.0;
const int64_t delta_data_size = total_size - last_total_size;
double delta_data_size_formatted = (double)delta_data_size / (double)_M_;
if (delta_time > 0) {
rate = (double)(delta_data_size_formatted) * 1000000.0 / (double)delta_time;
}
return rate;
}
ObCDCLobAuxMetaStorager::ObCDCLobAuxMetaStorager() :
is_inited_(false),
lob_aux_meta_map_(),
lob_aux_meta_allocator_(),
store_service_(nullptr),
memory_alloced_(0),
memory_limit_(0),
enable_memory_(true)
enable_memory_(true),
clean_task_interval_(0)
{
}
@ -116,19 +141,22 @@ int ObCDCLobAuxMetaStorager::init(IObStoreService *store_service)
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(lob_aux_meta_map_.init("LobAuxMetaMap"))) {
LOG_ERROR("lob_aux_meta_map_ init failed", KR(ret));
} else if (OB_FAIL(lob_aux_meta_allocator_.init(memory_limit_,
LOB_AUX_META_ALLOCATOR_HOLD_LIMIT,
LOB_AUX_META_ALLOCATOR_PAGE_SIZE))) {
} else if (OB_FAIL(lob_aux_meta_allocator_.init(
LOB_AUX_META_ALLOCATOR_PAGE_SIZE,
"LobAuxMetaData",
OB_SERVER_TENANT_ID,
INT64_MAX))) {
LOG_ERROR("init lob_aux_meta_allocator_ fail", KR(ret));
} else {
lob_aux_meta_allocator_.set_label("LobAuxMetaData");
memory_alloced_ = 0;
}
}
if (OB_SUCC(ret)) {
store_service_ = store_service;
is_inited_ = true;
clean_task_interval_ = TCONF.lob_data_storage_clean_interval_sec * _SEC_;
LOG_INFO("ObCDCLobAuxMetaStorager init succ", K(enable_memory_), K(memory_limit_));
LOG_INFO("ObCDCLobAuxMetaStorager init succ", K(enable_memory_), K(memory_limit_), K(clean_task_interval_));
}
return ret;
@ -140,17 +168,28 @@ void ObCDCLobAuxMetaStorager::destroy()
if (enable_memory_) {
lob_aux_meta_map_.destroy();
lob_aux_meta_allocator_.destroy();
memory_alloced_ = 0;
memory_limit_ = 0;
enable_memory_ = false;
}
clean_task_interval_ = 0;
store_service_ = nullptr;
is_inited_ = false;
}
}
void ObCDCLobAuxMetaStorager::configure(const ObLogConfig &cfg)
{
int64_t memory_limit = cfg.lob_data_storage_memory_limit.get();
int64_t clean_task_interval = cfg.lob_data_storage_clean_interval_sec * _SEC_;
ATOMIC_STORE(&memory_limit_, memory_limit);
ATOMIC_STORE(&clean_task_interval_, clean_task_interval);
LOG_INFO("[LOB_STORAGER] [CONFIG]", K(memory_limit), K(clean_task_interval));
}
bool ObCDCLobAuxMetaStorager::is_need_to_disk_(int64_t alloc_size)
{
return lob_aux_meta_allocator_.allocated() + alloc_size > memory_limit_;
return ATOMIC_LOAD(&memory_alloced_) + alloc_size > memory_limit_;
}
int ObCDCLobAuxMetaStorager::put(
@ -179,6 +218,10 @@ int ObCDCLobAuxMetaStorager::put(
LOG_ERROR("disk_put_ failed", KR(ret), K(key), K(lob_data), K(lob_data_len));
}
}
if (OB_SUCC(ret)) {
ATOMIC_INC(&stat_info_.put_req_cnt_);
ATOMIC_AAF(&stat_info_.put_data_total_size_, lob_data_len);
}
return ret;
}
@ -192,13 +235,17 @@ int ObCDCLobAuxMetaStorager::memory_put_(
void *alloc_lob_data = lob_aux_meta_allocator_.alloc(lob_data_len);
if (OB_ISNULL(alloc_lob_data)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("alloc memory failed", KR(ret), K(key), K(lob_data), K(lob_data_len));
LOG_WARN("alloc memory failed", KR(ret), K(key),
"md5", calc_md5_cstr(lob_data, lob_data_len), K(lob_data_len));
} else {
memcpy(alloc_lob_data, lob_data, lob_data_len);
LobAuxMetaValue value(static_cast<char *>(alloc_lob_data), lob_data_len);
if (OB_FAIL(lob_aux_meta_map_.insert(key, value))) {
LOG_ERROR("[OBCDC][LOB_AUX][PUT][MEM] lob_aux_meta_map_ insert failed", KR(ret), KCSTRING(key_type), K(key), K(lob_data), K(lob_data_len));
} else {
ATOMIC_AAF(&memory_alloced_, lob_data_len);
ATOMIC_INC(&stat_info_.mem_put_req_cnt_);
ATOMIC_AAF(&stat_info_.mem_put_data_total_size_, lob_data_len);
LOG_DEBUG("[OBCDC][LOB_AUX][PUT][MEM] lob_aux_meta_map_ insert succ", KCSTRING(key_type), K(key), K(value));
}
}
@ -253,6 +300,10 @@ int ObCDCLobAuxMetaStorager::get(
}
}
}
if (OB_SUCC(ret)) {
ATOMIC_INC(&stat_info_.get_req_cnt_);
ATOMIC_AAF(&stat_info_.get_data_total_size_, lob_data_len);
}
return ret;
}
@ -274,6 +325,9 @@ int ObCDCLobAuxMetaStorager::memory_get_(
} else {
lob_data = value.lob_data_;
lob_data_len = value.lob_data_len_;
ATOMIC_INC(&stat_info_.mem_get_req_cnt_);
ATOMIC_AAF(&stat_info_.mem_get_data_total_size_, lob_data_len);
}
return ret;
}
@ -417,6 +471,7 @@ int ObCDCLobAuxMetaStorager::memory_del_(const LobAuxMetaKey &key)
LOG_ERROR("lob_data_ptr is nullptr", KR(ret), K(key));
} else {
lob_aux_meta_allocator_.free(lob_data_ptr);
ATOMIC_AAF(&memory_alloced_, -value.lob_data_len_);
}
}
return ret;
@ -477,7 +532,8 @@ int ObCDCLobAuxMetaStorager::memory_del_(const uint64_t tenant_id, const int64_t
} else {
LOG_INFO("[OBCDC][LOB_AUX][CLEAN_TASK][MEM] ObCDCLobAuxMetaStorager del", K(tenant_id), K(commit_version), "purge_count",
lob_aux_meata_purger.purge_count_, "map_count", lob_aux_meta_map_.count(),
"memory_used", SIZE_TO_STR(lob_aux_meta_allocator_.allocated()));
"memory_alloced", SIZE_TO_STR(memory_alloced_),
"memory_hold", SIZE_TO_STR(lob_aux_meta_allocator_.allocated()));
}
return ret;
}
@ -485,6 +541,8 @@ int ObCDCLobAuxMetaStorager::memory_del_(const uint64_t tenant_id, const int64_t
int ObCDCLobAuxMetaStorager::disk_del_(void* cf_family, const int64_t commit_version)
{
int ret = OB_SUCCESS;
int64_t estimate_live_data_size = 0;
int64_t estimate_num_keys = 0;
std::string begin_key;
std::string end_key;
// some trans may have same commit_version,
@ -497,8 +555,20 @@ int ObCDCLobAuxMetaStorager::disk_del_(void* cf_family, const int64_t commit_ver
LOG_ERROR("[OBCDC][LOB_AUX][CLEAN_TASK][DISK] store_service_ del fail", KR(ret), "begin_key", begin_key.c_str(),
"end_key", end_key.c_str(), K(commit_version));
}
} else if (OB_FAIL(store_service_->compact_range(cf_family, begin_key, end_key))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("store_service_ compact_range fail", KR(ret), "begin_key", begin_key.c_str(),
"end_key", end_key.c_str(), K(commit_version));
}
} else if (OB_FAIL(store_service_->get_mem_usage(cf_family, estimate_live_data_size, estimate_num_keys))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("store_service_ get_mem_usage fail", KR(ret), "begin_key", begin_key.c_str(),
"end_key", end_key.c_str(), K(commit_version));
}
} else {
LOG_INFO("[OBCDC][LOB_AUX][CLEAN_TASK][DISK] store_service_ del_range succ", K(commit_version), KCSTRING(end_key.c_str()));
LOG_INFO("[OBCDC][LOB_AUX][CLEAN_TASK][DISK] store_service_ del_range succ", K(commit_version), "end_key", end_key.c_str(),
"estimate_live_data_size", SIZE_TO_STR(estimate_live_data_size),
K(estimate_num_keys));
}
return ret;
}
@ -531,6 +601,80 @@ int ObCDCLobAuxMetaStorager::get_clean_task(const uint64_t tenant_id, ObCDCLobAu
return ret;
}
void ObCDCLobAuxMetaStorager::print_stat_info()
{
int64_t current_timestamp = get_timestamp();
int64_t local_last_stat_time = stat_info_.last_stat_time_;
int64_t delta_time = current_timestamp - local_last_stat_time;
// Update last statistic value
stat_info_.last_stat_time_ = current_timestamp;
// get req rps
int64_t get_req_cnt = ATOMIC_LOAD(&stat_info_.get_req_cnt_);
int64_t last_get_req_cnt = ATOMIC_LOAD(&stat_info_.last_get_req_cnt_);
double get_req_rps = stat_info_.calc_rps(delta_time, get_req_cnt, last_get_req_cnt);
stat_info_.last_get_req_cnt_ = get_req_cnt;
// mem get req rps
int64_t mem_get_req_cnt = ATOMIC_LOAD(&stat_info_.mem_get_req_cnt_);
int64_t last_mem_get_req_cnt = ATOMIC_LOAD(&stat_info_.last_mem_get_req_cnt_);
double mem_get_req_rps = stat_info_.calc_rps(delta_time, mem_get_req_cnt, last_mem_get_req_cnt);
double mem_get_req_rps_percentage = (get_req_rps != 0 ? mem_get_req_rps/get_req_rps : 0);
stat_info_.last_mem_get_req_cnt_ = mem_get_req_cnt;
// get rate
int64_t get_data_total_size = ATOMIC_LOAD(&stat_info_.get_data_total_size_);
int64_t last_get_data_total_size = ATOMIC_LOAD(&stat_info_.last_get_data_total_size_);
double get_rate = stat_info_.calc_rate(delta_time, get_data_total_size, last_get_data_total_size);
stat_info_.last_get_data_total_size_ = get_data_total_size;
// mem get rate
int64_t mem_get_data_total_size = ATOMIC_LOAD(&stat_info_.mem_get_data_total_size_);
int64_t last_mem_get_data_total_size = ATOMIC_LOAD(&stat_info_.last_mem_get_data_total_size_);
double mem_get_rate = stat_info_.calc_rate(delta_time, mem_get_data_total_size, last_mem_get_data_total_size);
double mem_get_rate_percentage = (get_rate != 0 ? mem_get_rate/get_rate : 0);
stat_info_.last_mem_get_data_total_size_ = mem_get_data_total_size;
// put req rps
int64_t put_req_cnt = ATOMIC_LOAD(&stat_info_.put_req_cnt_);
int64_t last_put_req_cnt = ATOMIC_LOAD(&stat_info_.last_put_req_cnt_);
double put_req_rps = stat_info_.calc_rps(delta_time, put_req_cnt, last_put_req_cnt);
stat_info_.last_put_req_cnt_ = put_req_cnt;
// mem put req rps
int64_t mem_put_req_cnt = ATOMIC_LOAD(&stat_info_.mem_put_req_cnt_);
int64_t last_mem_put_req_cnt = ATOMIC_LOAD(&stat_info_.last_mem_put_req_cnt_);
double mem_put_req_rps = stat_info_.calc_rps(delta_time, mem_put_req_cnt, last_mem_put_req_cnt);
double mem_put_req_rps_percentage = (put_req_rps != 0 ? mem_put_req_rps/put_req_rps : 0);
stat_info_.last_mem_put_req_cnt_ = mem_put_req_cnt;
// put rate
int64_t put_data_total_size = ATOMIC_LOAD(&stat_info_.put_data_total_size_);
int64_t last_put_data_total_size = ATOMIC_LOAD(&stat_info_.last_put_data_total_size_);
double put_rate = stat_info_.calc_rate(delta_time, put_data_total_size, last_put_data_total_size);
stat_info_.last_put_data_total_size_ = put_data_total_size;
// mem put rate
int64_t mem_put_data_total_size = ATOMIC_LOAD(&stat_info_.mem_put_data_total_size_);
int64_t last_mem_put_data_total_size = ATOMIC_LOAD(&stat_info_.last_mem_put_data_total_size_);
double mem_put_rate = stat_info_.calc_rate(delta_time, mem_put_data_total_size, last_mem_put_data_total_size);
double mem_put_rate_percentage = (put_rate != 0 ? mem_put_rate/put_rate : 0);
stat_info_.last_mem_put_data_total_size_ = mem_put_data_total_size;
_LOG_INFO("[LOB_STORAGER] [STAT] "
"GET_RPS=%.3lf MEM_GET_RPS=%.3lf MEM_GET_RPS_PERCENTAGE=%.3lf "
"GET_RATE=%.5lfM/s MEM_GET_RATE=%.5lfM/s MEM_GET_RATE_PERCENTAGE=%.3lf "
"PUT_RPS=%.3lf MEM_PUT_RPS=%.3lf MEM_PUT_RPS_PERCENTAGE=%.3lf "
"PUT_RATE=%.5lfM/s MEM_PUT_RATE=%.5lfM/s MEM_PUT_RATE_PERCENTAGE=%.3lf "
"MAP_COUNT=%lu MEMORY_ALLOCED=%s MEMORY_HOLD=%s ",
get_req_rps, mem_get_req_rps, mem_get_req_rps_percentage,
get_rate, mem_get_rate, mem_get_rate_percentage,
put_req_rps, mem_put_req_rps, mem_put_req_rps_percentage,
put_rate, mem_put_rate, mem_put_rate_percentage,
lob_aux_meta_map_.count(), SIZE_TO_STR(memory_alloced_), SIZE_TO_STR(lob_aux_meta_allocator_.allocated()));
}
bool ObCDCLobAuxMetaStorager::LobAuxMetaDataPurger::operator()(
const LobAuxMetaKey &lob_aux_meta_key,
LobAuxMetaValue &lob_aux_meta_value)
@ -548,7 +692,10 @@ bool ObCDCLobAuxMetaStorager::LobAuxMetaDataPurger::operator()(
} else {
host_.lob_aux_meta_allocator_.free(lob_data_ptr);
++purge_count_;
LOG_DEBUG("purge succ", K(lob_aux_meta_key), K(purge_count_));
ATOMIC_AAF(&(host_.memory_alloced_), -lob_aux_meta_value.lob_data_len_);
LOG_DEBUG("purge succ", K(lob_aux_meta_key), K(purge_count_),
"lob_data_len", lob_aux_meta_value.lob_data_len_,
"memory_alloced", SIZE_TO_STR(host_.memory_alloced_));
}
}

View File

@ -140,7 +140,53 @@ public:
bool is_task_push_;
};
struct ObCDCLobAuxMetaDataStatInfo
{
ObCDCLobAuxMetaDataStatInfo() { reset(); }
void reset()
{
last_stat_time_ = 0;
get_req_cnt_ = 0;
last_get_req_cnt_ = 0;
mem_get_req_cnt_ = 0;
last_mem_get_req_cnt_ = 0;
put_req_cnt_ = 0;
last_put_req_cnt_ = 0;
mem_put_req_cnt_ = 0;
last_mem_put_req_cnt_ = 0;
get_data_total_size_ = 0;
mem_get_data_total_size_ = 0;
put_data_total_size_ = 0;
mem_put_data_total_size_ = 0;
}
double calc_rps(const int64_t delta_time, const int64_t req_cnt, const int64_t last_req_cnt);
double calc_rate(const int64_t delta_time, const int64_t total_size, const int64_t last_total_size);
int64_t last_stat_time_;
int64_t get_req_cnt_;
int64_t last_get_req_cnt_;
int64_t mem_get_req_cnt_;
int64_t last_mem_get_req_cnt_;
int64_t put_req_cnt_;
int64_t last_put_req_cnt_;
int64_t mem_put_req_cnt_;
int64_t last_mem_put_req_cnt_;
int64_t get_data_total_size_;
int64_t last_get_data_total_size_;
int64_t mem_get_data_total_size_;
int64_t last_mem_get_data_total_size_;
int64_t put_data_total_size_;
int64_t last_put_data_total_size_;
int64_t mem_put_data_total_size_;
int64_t last_mem_put_data_total_size_;
};
class IObStoreService;
class ObLogConfig;
class ObCDCLobAuxMetaStorager
{
public:
@ -169,6 +215,9 @@ public:
int clean_unused_data(ObCDCLobAuxDataCleanTask *task);
int get_cf_handle(const uint64_t tenant_id, void *&cf_handle);
int64_t get_clean_task_interval() const { return clean_task_interval_; }
void configure(const ObLogConfig &cfg);
void print_stat_info();
private:
int del_lob_col_value_(
const int64_t commit_version,
@ -238,14 +287,27 @@ private:
typedef common::ObConcurrentFIFOAllocator LobAuxMetaAllocator;
static const int64_t LOB_AUX_META_ALLOCATOR_TOTAL_LIMIT = 1 * _G_;
static const int64_t LOB_AUX_META_ALLOCATOR_HOLD_LIMIT = 32 * _M_;
static const int64_t LOB_AUX_META_ALLOCATOR_PAGE_SIZE = 2 * _M_;
// The page size affects the memory hold by allocator.
// If page size is OB_MALLOC_BIG_BLOCK_SIZE(2M), the memory hold by allocator is
// eight times as much memory as it is actually used.
// If page size is OB_MALLOC_NORMAL_BLOCK_SIZE(8K), the memory hold by allocator is
// twice as much memory as it is actually used.
// And the memory hold almost equal to what is actually used
// when page size is OB_MALLOC_NORMAL_BLOCK_SIZE and one piece lob data is 256K.
// So we use OB_MALLOC_NORMAL_BLOCK_SIZE as the page size
static const int64_t LOB_AUX_META_ALLOCATOR_PAGE_SIZE = OB_MALLOC_NORMAL_BLOCK_SIZE;
bool is_inited_;
LobAuxMetaMap lob_aux_meta_map_;
LobAuxMetaAllocator lob_aux_meta_allocator_;
IObStoreService *store_service_;
int64_t memory_alloced_;
int64_t memory_limit_;
bool enable_memory_;
int64_t clean_task_interval_;
ObCDCLobAuxMetaDataStatInfo stat_info_;
DISALLOW_COPY_AND_ASSIGN(ObCDCLobAuxMetaStorager);
};

View File

@ -575,6 +575,8 @@ public:
T_DEF_BOOL(enable_lob_data_storage_memory, OB_CLUSTER_PARAMETER, 1, "0:disabled, 1:enabled");
// lob_data_storage_memory_limit takes effect only when enable_lob_data_storage_memory is true.
DEF_CAP(lob_data_storage_memory_limit, OB_CLUSTER_PARAMETER, "1G", "[128M,]", "lob data storage memory limit");
T_DEF_INT_INFT(lob_data_storage_clean_interval_sec, OB_CLUSTER_PARAMETER, 5, 1,
"lob_data_storage clean task nterval in seconds");
#undef OB_CLUSTER_PARAMETER

View File

@ -2161,6 +2161,7 @@ void ObLogInstance::timer_routine()
print_trans_stat_();
resource_collector_->print_stat_info();
reader_->print_stat_info();
lob_aux_meta_storager_.print_stat_info();
}
// Periodic memory recycling
@ -2310,6 +2311,11 @@ void ObLogInstance::reload_config_()
committer_->configure(config);
}
// cofig lob storager
if (OB_SUCC(ret)) {
lob_aux_meta_storager_.configure(config);
}
// config rs_server_provider_
if (OB_NOT_NULL(rs_server_provider_)) {
if (is_tenant_sync_mode()) {

View File

@ -627,19 +627,20 @@ int ObLogResourceCollector::push_lob_data_clean_task_(const uint64_t tenant_id,
if (OB_FAIL(lob_aux_meta_storager.get_clean_task(tenant_id, clean_task))){
LOG_ERROR("lob_aux_meta_storager get_clean_task failed", KR(ret), K(tenant_id));
} else {
bool is_task_push = ATOMIC_LOAD(&clean_task->is_task_push_);
if (is_task_push || ! REACH_TIME_INTERVAL(5 * _SEC_)) {
LOG_DEBUG("no need push clean task", K(is_task_push), K(tenant_id), K(commit_version));
const bool is_task_push = ATOMIC_LOAD(&clean_task->is_task_push_);
const int64_t clean_task_interval = lob_aux_meta_storager.get_clean_task_interval();
if (is_task_push || ! REACH_TIME_INTERVAL(clean_task_interval)) {
LOG_DEBUG("no need push clean task", K(is_task_push), K(tenant_id), K(commit_version), K(clean_task_interval));
// try set flag by cas, oldv is false, newv is ture
// expect return oldv (false). if return true, means cas fail, just skip
} else if (ATOMIC_CAS(&clean_task->is_task_push_, false, true)) {
LOG_DEBUG("no need push clean task", K(is_task_push), K(tenant_id), K(commit_version));
LOG_DEBUG("no need push clean task", K(is_task_push), K(tenant_id), K(commit_version), K(clean_task_interval));
} else {
ATOMIC_STORE(&clean_task->commit_version_, commit_version);
if (OB_FAIL(push_task_into_queue_(*clean_task))) {
LOG_ERROR("push_task_into_queue_ failed", KR(ret), KPC(clean_task));
LOG_ERROR("push_task_into_queue_ failed", KR(ret), KPC(clean_task), K(clean_task_interval));
} else {
LOG_INFO("push lob data clean task succ", KPC(clean_task));
LOG_INFO("push lob data clean task succ", KPC(clean_task), K(clean_task_interval));
}
}
}

View File

@ -327,6 +327,7 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key)
int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key)
{
int ret = OB_SUCCESS;
int64_t start_ts = get_timestamp();
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handle);
if (OB_ISNULL(column_family_handle)) {
@ -341,12 +342,45 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key
if (!s.ok()) {
LOG_ERROR("DeleteRange %s from rocksdb failed, error %s", begin_key.c_str(), s.ToString().c_str());
ret = OB_ERR_UNEXPECTED;
} else {
// NOTICE invoke this interface lob data clean task interval
double time_cost = (get_timestamp() - start_ts)/1000.0;
_LOG_INFO("DEL_RANGE time_cost=%.3lfms start_key=%s end_key=%s", time_cost, begin_key.c_str(), end_key.c_str());
}
}
return ret;
}
int RocksDbStoreService::compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key)
{
int ret = OB_SUCCESS;
int64_t start_ts = get_timestamp();
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handle);
if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else {
rocksdb::Slice begin(begin_key);
rocksdb::Slice end(end_key);
rocksdb::Status s = m_db_->CompactRange(rocksdb::CompactRangeOptions(), column_family_handle,
&begin, &end);
if (!s.ok()) {
LOG_ERROR("CompactRange %s from rocksdb failed, error %s", begin_key.c_str(), s.ToString().c_str());
ret = OB_ERR_UNEXPECTED;
} else {
// NOTICE invoke this interface lob data clean task interval
double time_cost = (get_timestamp() - start_ts)/1000.0;
_LOG_INFO("COMPACT_RANGE time_cost=%.3lfms start_key=%s end_key=%s", time_cost, begin_key.c_str(), end_key.c_str());
}
}
return ret;
}
int RocksDbStoreService::create_column_family(const std::string& column_family_name,
void *&cf_handle)
{
@ -496,5 +530,26 @@ void RocksDbStoreService::get_mem_usage(const std::vector<uint64_t> ids,
"block_cache_pinned", SIZE_TO_STR(total_block_cache_pinned_usage));
}
int RocksDbStoreService::get_mem_usage(void * cf_handle, int64_t &estimate_live_data_size, int64_t &estimate_num_keys)
{
int ret = OB_SUCCESS;
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handle);
if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_INVALID_ARGUMENT;
} else {
std::string estimate_live_data_size_str;
std::string estimate_num_keys_str;
m_db_->GetProperty(column_family_handle, "rocksdb.estimate-live-data-size", &estimate_live_data_size_str);
m_db_->GetProperty(column_family_handle, "rocksdb.estimate-num-keys", &estimate_num_keys_str);
c_str_to_int(estimate_live_data_size_str.c_str(), estimate_live_data_size);
c_str_to_int(estimate_num_keys_str.c_str(), estimate_num_keys);
}
return ret;
}
}
}

View File

@ -49,6 +49,7 @@ public:
virtual int del(const std::string &key);
virtual int del(void *cf_handle, const std::string &key);
virtual int del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key);
virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key);
virtual int create_column_family(const std::string& column_family_name,
void *&cf_handle);
@ -59,6 +60,7 @@ public:
virtual int close() override;
virtual void get_mem_usage(const std::vector<uint64_t> ids,
const std::vector<void *> cf_handles);
virtual int get_mem_usage(void * cf_handle, int64_t &estimate_live_data_size, int64_t &estimate_num_keys);
OB_INLINE bool is_stopped() const { return ATOMIC_LOAD(&is_stopped_); }
private:

View File

@ -62,6 +62,7 @@ public:
virtual int del(const std::string &key) = 0;
virtual int del(void *cf_handle, const std::string &key) = 0;
virtual int del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) = 0;
virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) = 0;
virtual int create_column_family(const std::string& column_family_name,
void *&cf_handle) = 0;
@ -70,6 +71,8 @@ public:
virtual void get_mem_usage(const std::vector<uint64_t> ids,
const std::vector<void *> cf_handles) = 0;
virtual int get_mem_usage(void * cf_handle, int64_t &estimate_live_data_size, int64_t &estimate_num_keys) = 0;
};
}