[CP] [OBCDC] Fix SST file not recycled after trans output

This commit is contained in:
SanmuWangZJU
2024-02-06 17:06:06 +00:00
committed by ob-robot
parent 76593e3ae6
commit 600495620c
16 changed files with 311 additions and 41 deletions

View File

@ -1177,6 +1177,7 @@ int ObLogCommitter::after_trans_handled_(PartTransTask *participants)
// Update Commit information
// NOTE: Since the above guarantees that the reference count is greater than the number of Binlog Records, the list of participants here must be valid
PartTransTask *part_trans_task = participants;
const bool is_ddl_trans = part_trans_task->is_ddl_trans();
while (OB_SUCC(ret) && ! stop_flag_ && OB_NOT_NULL(part_trans_task)) {
PartTransTask *next = part_trans_task->next_task();
@ -1188,6 +1189,10 @@ int ObLogCommitter::after_trans_handled_(PartTransTask *participants)
// Decrement the reference count after the Commit message is updated
// If the reference count is 0, the partition transaction is recycled
else if (0 == part_trans_task->dec_ref_cnt()) {
if (is_ddl_trans && ! part_trans_task->is_sys_ls_part_trans()) {
// mark user_ls part_trans_task in ddl_trans not served, and will recycle redo of the part_trans_task in resource_collector.
part_trans_task->set_unserved();
}
if (OB_FAIL(resource_collector_->revert(part_trans_task))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert PartTransTask fail", KR(ret), K(part_trans_task));

View File

@ -252,6 +252,8 @@ public:
// the destination of archive log.
DEF_STR(archive_dest, OB_CLUSTER_PARAMETER, "|", "the location of archive log");
T_DEF_INT_INFT(rocksdb_write_buffer_size, OB_CLUSTER_PARAMETER, 64, 16, "write buffer size[M]");
DEF_TIME(rocksdb_flush_interval, OB_CLUSTER_PARAMETER, "10m", "[0s,1d]", "rocksdb flush interval for redo_storage, 0s means never");
DEF_TIME(rocksdb_compact_interval, OB_CLUSTER_PARAMETER, "6h", "[0s,7d]", "rocksdb compact interval for redo_storage, 0s means never");
T_DEF_INT_INFT(io_thread_num, OB_CLUSTER_PARAMETER, 4, 1, "io thread number");
T_DEF_INT(idle_pool_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 32, "idle pool thread num");

View File

@ -101,6 +101,8 @@ namespace libobcdc
ObLogInstance *ObLogInstance::instance_ = NULL;
static ObSimpleMemLimitGetter mem_limit_getter;
const int64_t ObLogInstance::DAEMON_THREAD_COUNT = 1;
ObLogInstance *ObLogInstance::get_instance()
{
if (NULL == instance_) {
@ -560,6 +562,8 @@ int ObLogInstance::init_common_(uint64_t start_tstamp_ns, ERROR_CALLBACK err_cb)
LOG_ERROR("check config fail", KR(ret));
} else if (OB_FAIL(dump_config_())) {
LOG_ERROR("dump_config_ fail", KR(ret));
} else if (OB_FAIL(lib::ThreadPool::set_thread_count(DAEMON_THREAD_COUNT))) {
LOG_ERROR("set ObLogInstance daemon thread count failed", KR(ret), K(DAEMON_THREAD_COUNT));
} else if (OB_FAIL(trans_task_pool_alloc_.init(
TASK_POOL_ALLOCATOR_TOTAL_LIMIT,
TASK_POOL_ALLOCATOR_HOLD_LIMIT,
@ -1384,6 +1388,7 @@ void ObLogInstance::do_destroy_(const bool force_destroy)
timer_tid_ = 0;
sql_tid_ = 0;
flow_control_tid_ = 0;
lib::ThreadPool::destroy();
(void)trans_task_pool_.destroy();
(void)trans_task_pool_alloc_.destroy();
@ -1552,6 +1557,7 @@ void ObLogInstance::mark_stop_flag(const char *stop_reason)
committer_->mark_stop_flag();
resource_collector_->mark_stop_flag();
timezone_info_getter_->mark_stop_flag();
lib::ThreadPool::stop();
LOG_INFO("mark_stop_flag end", K(global_errno_), KCSTRING(stop_reason));
}
@ -2229,6 +2235,8 @@ int ObLogInstance::start_threads_()
} else if (0 != (pthread_ret = pthread_create(&flow_control_tid_, NULL, flow_control_thread_func_, this))) {
LOG_ERROR("start flow control thread fail", K(pthread_ret), KERRNOMSG(pthread_ret));
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(lib::ThreadPool::start())) {
LOG_ERROR("start daemon threads failed", KR(ret), K(DAEMON_THREAD_COUNT));
} else {
LOG_INFO("start instance threads succ", K(timer_tid_), K(sql_tid_), K(flow_control_tid_));
}
@ -2273,6 +2281,65 @@ void ObLogInstance::wait_threads_stop_()
flow_control_tid_ = 0;
}
LOG_INFO("wait daemon threads stop");
lib::ThreadPool::wait();
LOG_INFO("wait daemon threads stop done");
}
void ObLogInstance::run1()
{
int ret = OB_SUCCESS;
const int64_t thread_idx = lib::ThreadPool::get_thread_idx();
const int64_t thread_count = lib::ThreadPool::get_thread_count();
if (OB_UNLIKELY(thread_count != DAEMON_THREAD_COUNT || thread_idx >= thread_count)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid thread count or thread idx", KR(ret), K(thread_idx), K(thread_count), K(DAEMON_THREAD_COUNT));
} else if (0 == thread_idx) {
// handle storage_operation
lib::set_thread_name("CDC-BGD-STORAGE_OP");
if (OB_FAIL(daemon_handle_storage_op_thd_())) {
LOG_ERROR("handle_storage_op in background failed", KR(ret), K(thread_idx), K(thread_count));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpect daemon thread", KR(ret), K(thread_count), K(thread_idx));
}
if (OB_SUCCESS != ret && OB_IN_STOP_STATE != ret) {
handle_error(ret, "obcdc daemon thread[idx=%ld] exits, err=%d", lib::ThreadPool::get_thread_idx(), ret);
mark_stop_flag("DAEMON THEAD EXIST");
}
}
int ObLogInstance::daemon_handle_storage_op_thd_()
{
int ret = OB_SUCCESS;
const int64_t TIMER_INTERVAL = 5 * _SEC_;
while (OB_SUCC(ret) && ! lib::ThreadPool::has_set_stop()) {
ob_usleep(TIMER_INTERVAL);
ObLogTraceIdGuard trace_guard;
if (! is_memory_working_mode(working_mode_)) {
if (OB_ISNULL(tenant_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("expect valid tenant_mgr", KR(ret));
} else {
const int64_t redo_flush_interval = TCONF.rocksdb_flush_interval.get();
const int64_t redo_compact_interval = TCONF.rocksdb_compact_interval.get();
if (redo_flush_interval > 0 && REACH_TIME_INTERVAL(redo_flush_interval)) {
tenant_mgr_->flush_storaged_redo();
}
if (redo_compact_interval > 0 && REACH_TIME_INTERVAL(redo_compact_interval)) {
tenant_mgr_->compact_storaged_redo();
}
}
}
}
return ret;
}
void ObLogInstance::reload_config_()

View File

@ -92,7 +92,7 @@ public:
typedef common::sqlclient::ObMySQLServerProvider ServerProviderType;
class ObLogInstance : public IObCDCInstance, public IObLogErrHandler
class ObLogInstance : public IObCDCInstance, public IObLogErrHandler, public lib::ThreadPool
{
public:
virtual ~ObLogInstance();
@ -218,6 +218,8 @@ private:
static void *flow_control_thread_func_(void *args);
int start_threads_();
void wait_threads_stop_();
void run1() override;
int daemon_handle_storage_op_thd_();
void reload_config_();
void print_tenant_memory_usage_();
void global_flow_control_();
@ -288,7 +290,10 @@ private:
private:
static ObLogInstance *instance_;
// Threads that runs with instance
// thread count = 1, start from idx 0;
// thread 0: use to operate storage(manul flush and compact)
static const int64_t DAEMON_THREAD_COUNT;
private:
bool inited_;
bool is_running_;

View File

@ -1140,6 +1140,7 @@ public:
bool &is_not_barrier,
ObSchemaOperationType &op_type) const;
ObIAllocator &get_log_entry_task_base_allocator() { return log_entry_task_base_allocator_; };
void set_unserved() { set_unserved_(); }
TO_STRING_KV(
"state", serve_state_,

View File

@ -220,7 +220,7 @@ int ObLogReader::handle_task_(ObLogEntryTask &log_entry_task,
LOG_ERROR("get_tenant_guard fail", KR(ret));
} else {
tenant = guard.get_tenant();
column_family_handle = tenant->get_cf();
column_family_handle = tenant->get_redo_storage_cf_handle();
}
if (OB_FAIL(ret)) {

View File

@ -299,7 +299,7 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas
const bool is_test_mode_on = TCONF.test_mode_on != 0;
if (is_test_mode_on) {
LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task, K(data_len));
LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task, K(data_len), K(is_log_entry_stored));
}
if (is_log_entry_stored) {
@ -336,7 +336,7 @@ int ObLogResourceCollector::del_store_service_data_(const uint64_t tenant_id,
LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id));
} else {
tenant = guard.get_tenant();
column_family_handle = tenant->get_cf();
column_family_handle = tenant->get_redo_storage_cf_handle();
}
if (OB_SUCC(ret) && ! RCThread::is_stoped()) {
@ -698,6 +698,7 @@ int ObLogResourceCollector::revert_dml_binlog_record_(ObLogBR &br, volatile bool
return ret;
}
// @deperate: should not use it case redo_storage_key don't contain trans_id anymore
int ObLogResourceCollector::del_trans_(const uint64_t tenant_id,
const ObString &trans_id_str)
{
@ -718,7 +719,7 @@ int ObLogResourceCollector::del_trans_(const uint64_t tenant_id,
LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id));
} else {
tenant = guard.get_tenant();
column_family_handle = tenant->get_cf();
column_family_handle = tenant->get_redo_storage_cf_handle();
}
if (OB_SUCC(ret)) {
@ -748,10 +749,13 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog
LOG_ERROR("part_trans_task is NULL", KPC(log_entry_task));
ret = OB_ERR_UNEXPECTED;
} else {
const int64_t row_ref_cnt = log_entry_task->dec_row_ref_cnt();
const bool need_revert_log_entry_task = (row_ref_cnt == 0);
if (TCONF.test_mode_on) {
LOG_INFO("revert_dml_binlog_record", KP(&br), K(br), KP(log_entry_task), KPC(log_entry_task));
// print while revert each row
LOG_INFO("revert_dml_binlog_record", KP(&br), K(br), KP(log_entry_task), K(need_revert_log_entry_task), KPC(log_entry_task));
}
const bool need_revert_log_entry_task = (log_entry_task->dec_row_ref_cnt() == 0);
if (need_revert_log_entry_task) {
if (OB_FAIL(revert_log_entry_task_(log_entry_task))) {

View File

@ -23,6 +23,9 @@
#include "lib/oblog/ob_log_module.h" // LOG_*
#include "lib/ob_errno.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/utilities/table_properties_collectors.h"
namespace oceanbase
{
namespace libobcdc
@ -51,9 +54,12 @@ int RocksDbStoreService::init(const std::string &path)
} else if (OB_FAIL(init_dir_(path.c_str()))) {
LOG_ERROR("init_dir_ fail", K(ret));
} else {
const int total_threads = 32;
const int64_t sliding_window_size = 10000;
const int64_t deletion_trigger = 1000;
const double deletion_ratio = 0.1;
m_db_path_ = path;
m_options_.create_if_missing = true;
const int total_threads = 32;
// By default, RocksDB uses only one background thread for flush and
// compaction. Calling this function will set it up such that total of
// `total_threads` is used. Good value for `total_threads` is the number of
@ -66,6 +72,21 @@ int RocksDbStoreService::init(const std::string &path)
// 2G
m_options_.db_write_buffer_size = 2 << 30;
m_options_.max_open_files = 100;
// Creates a factory of a table property collector that marks a SST
// file as need-compaction when it observe at least "D" deletion
// entries in any "N" consecutive entries, or the ratio of tombstone
// entries >= deletion_ratio.
//
// @param sliding_window_size "N". Note that this number will be
// round up to the smallest multiple of 128 that is no less
// than the specified size.
// @param deletion_trigger "D". Note that even when "N" is changed,
// the specified number for "D" will not be changed.
// @param deletion_ratio, if <= 0 or > 1, disable triggering compaction
// based on deletion ratio. Disabled by default.
m_options_.table_properties_collector_factories.emplace_back(
rocksdb::NewCompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio));
m_options_.statistics = rocksdb::CreateDBStatistics();
rocksdb::Status s = rocksdb::DB::Open(m_options_, m_db_path_, &m_db_);
if (!s.ok()) {
@ -290,7 +311,9 @@ int RocksDbStoreService::del(const std::string &key)
ret = OB_IN_STOP_STATE;
} else {
// find column family handle for cf
rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), key);
rocksdb::WriteOptions writer_options;
writer_options.disableWAL = true;
rocksdb::Status s = m_db_->Delete(writer_options, key);
if (!s.ok()) {
ret = OB_IO_ERROR;
_LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str());
@ -336,7 +359,9 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else {
rocksdb::Status s = m_db_->DeleteRange(rocksdb::WriteOptions(), column_family_handle,
rocksdb::WriteOptions writer_options;
writer_options.disableWAL = true;
rocksdb::Status s = m_db_->DeleteRange(writer_options, column_family_handle,
begin_key, end_key);
if (!s.ok()) {
@ -352,35 +377,70 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key
return ret;
}
int RocksDbStoreService::compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key)
int RocksDbStoreService::compact_range(
void *cf_handle,
const std::string &begin_key,
const std::string &end_key,
const bool op_entire_cf)
{
int ret = OB_SUCCESS;
int64_t start_ts = get_timestamp();
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handle);
rocksdb::CompactRangeOptions compact_option;
compact_option.change_level = true;
if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("column_family_handle is NULL");
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else {
rocksdb::Slice begin(begin_key);
rocksdb::Slice end(end_key);
rocksdb::Status s = m_db_->CompactRange(rocksdb::CompactRangeOptions(), column_family_handle,
&begin, &end);
rocksdb::Status s;
if (op_entire_cf) {
// compact from nullptr to nullptr means compact all data in the column_family
s = m_db_->CompactRange(compact_option, column_family_handle, nullptr, nullptr);
} else {
rocksdb::Slice begin(begin_key);
rocksdb::Slice end(end_key);
s = m_db_->CompactRange(compact_option, column_family_handle, &begin, &end);
}
if (!s.ok()) {
LOG_ERROR("CompactRange %s from rocksdb failed, error %s", begin_key.c_str(), s.ToString().c_str());
ret = OB_ERR_UNEXPECTED;
_LOG_WARN("COMPACT_RANGE [%s - %s] failed, reason: [%s]", begin_key.c_str(), end_key.c_str(), s.ToString().c_str());
} else {
// NOTICE invoke this interface lob data clean task interval
double time_cost = (get_timestamp() - start_ts)/1000.0;
_LOG_INFO("COMPACT_RANGE time_cost=%.3lfms start_key=%s end_key=%s", time_cost, begin_key.c_str(), end_key.c_str());
int64_t time_cost = get_timestamp() - start_ts;
_LOG_INFO("COMPACT_RANGE [%s - %s] time_cost=%s", begin_key.c_str(), end_key.c_str(), TVAL_TO_STR(time_cost));
}
}
return ret;
}
int RocksDbStoreService::flush(void *cf_handle)
{
int ret = OB_SUCCESS;
int64_t start_ts = get_timestamp();
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handle);
rocksdb::FlushOptions flush_option; // default wait=true, allow_wait_stall=false
if (OB_ISNULL(column_family_handle)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("column_family_handle is NULL");
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else {
rocksdb::Status s = m_db_->Flush(flush_option, column_family_handle);
if (! s.ok()) {
_LOG_WARN("ROCKSDB FLUSH failed, reason: [%s]", s.ToString().c_str());
} else {
int64_t time_cost = get_timestamp() - start_ts;
_LOG_INFO("ROCKSDB FLUSH SUCC, time_cost=%s", TVAL_TO_STR(time_cost));
}
}
return ret;
}
int RocksDbStoreService::create_column_family(const std::string& column_family_name,
void *&cf_handle)
{
@ -403,6 +463,7 @@ int RocksDbStoreService::create_column_family(const std::string& column_family_n
cf_options.max_write_buffer_number = 9;
// Column Family's default memtable size is 64M, when the maximum limit is exceeded, memtable -> immutable memtable, increase write_buffer_size, can reduce write amplification
cf_options.write_buffer_size = rocksdb_write_buffer_size << 20;
cf_options.level_compaction_dynamic_level_bytes = true;
// config rocksdb compression
// supported compress algorithms will print in LOG file
// cf_options.compression = rocksdb::CompressionType::kLZ4Compression;
@ -435,8 +496,6 @@ int RocksDbStoreService::drop_column_family(void *cf_handle)
if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_INVALID_ARGUMENT;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else {
rocksdb::Status status = m_db_->DropColumnFamily(column_family_handle);
@ -459,8 +518,6 @@ int RocksDbStoreService::destory_column_family(void *cf_handle)
if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_INVALID_ARGUMENT;
} else if (is_stopped()) {
ret = OB_IN_STOP_STATE;
} else {
rocksdb::Status status = m_db_->DestroyColumnFamilyHandle(column_family_handle);

View File

@ -49,7 +49,12 @@ public:
virtual int del(const std::string &key);
virtual int del(void *cf_handle, const std::string &key);
virtual int del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key);
virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key);
virtual int compact_range(
void *cf_handle,
const std::string &begin_key,
const std::string &end_key,
const bool op_entire_cf = false);
virtual int flush(void *cf_handle);
virtual int create_column_family(const std::string& column_family_name,
void *&cf_handle);

View File

@ -274,7 +274,7 @@ int ObLogStorager::handle_task_(IObLogBatchBufTask &batch_task,
LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id));
} else {
tenant = guard.get_tenant();
void *column_family_handle = tenant->get_cf();
void *column_family_handle = tenant->get_redo_storage_cf_handle();
if (OB_FAIL(store_key.get_key(key))) {
LOG_ERROR("store_key get_key fail", KR(ret));

View File

@ -62,7 +62,8 @@ public:
virtual int del(const std::string &key) = 0;
virtual int del(void *cf_handle, const std::string &key) = 0;
virtual int del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) = 0;
virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) = 0;
virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key, const bool op_entire_cf = false) = 0;
virtual int flush(void *cf_handle) = 0;
virtual int create_column_family(const std::string& column_family_name,
void *&cf_handle) = 0;

View File

@ -25,6 +25,7 @@
#include "ob_log_timezone_info_getter.h" // ObCDCTimeZoneInfoGetter
#include "ob_log_start_schema_matcher.h" // ObLogStartSchemaMatcher
#include "ob_log_store_service.h" // IObLogStoreService
#define STAT(level, tag_str, args...) OBLOG_LOG(level, "[STAT] [TENANT] " tag_str, ##args)
#define ISTAT(tag_str, args...) STAT(INFO, tag_str, ##args)
@ -54,7 +55,7 @@ ObLogTenant::ObLogTenant() :
committer_global_heartbeat_(OB_INVALID_VERSION),
committer_cur_schema_version_(OB_INVALID_VERSION),
committer_next_trans_schema_version_(OB_INVALID_VERSION),
cf_handle_(NULL),
redo_cf_handle_(NULL),
lob_storage_cf_handle_(nullptr),
lob_storage_clean_task_()
{
@ -74,7 +75,7 @@ int ObLogTenant::init(
const int64_t start_tstamp_ns,
const int64_t start_seq,
const int64_t start_schema_version,
void *cf_handle,
void *redo_cf_handle,
void *lob_storage_cf_handle,
ObLogTenantMgr &tenant_mgr)
{
@ -89,9 +90,9 @@ int ObLogTenant::init(
|| OB_UNLIKELY(start_seq < 0)
|| OB_UNLIKELY(start_schema_version <= 0)
|| OB_ISNULL(tenant_name)
|| OB_ISNULL(cf_handle)) {
|| OB_ISNULL(redo_cf_handle)) {
LOG_ERROR("invalid argument", K(tenant_id), K(tenant_name), K(start_tstamp_ns), K(start_seq),
K(start_schema_version), K(cf_handle));
K(start_schema_version), K(redo_cf_handle));
ret = OB_INVALID_ARGUMENT;
} else if (OB_ISNULL(task_queue_ = OB_NEW(ObLogTenantTaskQueue, ObModIds::OB_LOG_TENANT_TASK_QUEUE, *this))) {
LOG_ERROR("create task queue fail", K(task_queue_));
@ -136,7 +137,7 @@ int ObLogTenant::init(
committer_global_heartbeat_ = OB_INVALID_VERSION;
committer_cur_schema_version_ = start_schema_version;
committer_next_trans_schema_version_ = start_schema_version;
cf_handle_ = cf_handle;
redo_cf_handle_ = redo_cf_handle;
lob_storage_cf_handle_ = lob_storage_cf_handle;
lob_storage_clean_task_.tenant_id_ = tenant_id;
@ -162,6 +163,8 @@ int ObLogTenant::init_all_ddl_operation_table_schema_info_()
void ObLogTenant::reset()
{
int ret = OB_SUCCESS;
if (inited_) {
LOG_INFO("destroy tenant", K_(tenant_id), K_(tenant_name), K_(start_schema_version));
}
@ -195,9 +198,28 @@ void ObLogTenant::reset()
committer_global_heartbeat_ = OB_INVALID_VERSION;
committer_cur_schema_version_ = OB_INVALID_VERSION;
committer_next_trans_schema_version_ = OB_INVALID_VERSION;
cf_handle_ = NULL;
lob_storage_cf_handle_ = nullptr;
lob_storage_clean_task_.reset();
IObStoreService *store_service = TCTX.store_service_;
if (OB_NOT_NULL(store_service)) {
LOG_INFO("prepare drop tenant column family", K(tenant_id));
if (OB_NOT_NULL(lob_storage_cf_handle_) && OB_FAIL(store_service->drop_column_family(lob_storage_cf_handle_))) {
LOG_WARN("drop_lob_column_family failed", K(tenant_id));
}
if (OB_NOT_NULL(redo_cf_handle_) && OB_FAIL(store_service->drop_column_family(redo_cf_handle_))) {
LOG_WARN("drop_redo_column_family failed", K(tenant_id));
}
LOG_INFO("prepare destroy tenant column family", K(tenant_id));
if (OB_NOT_NULL(lob_storage_cf_handle_) && OB_FAIL(store_service->destory_column_family(lob_storage_cf_handle_))) {
LOG_WARN("destroy_lob_column_family failed", K(tenant_id));
}
if (OB_NOT_NULL(redo_cf_handle_) && OB_FAIL(store_service->destory_column_family(redo_cf_handle_))) {
LOG_WARN("destroy_redo_column_family failed", K(tenant_id));
}
redo_cf_handle_ = nullptr;
lob_storage_cf_handle_ = nullptr;
LOG_INFO("handle tenant column family done", K(tenant_id));
}
ObMallocAllocator::get_instance()->recycle_tenant_allocator(tenant_id);
}
@ -870,6 +892,49 @@ int ObLogTenant::update_data_start_schema_version_on_split_mode()
return ret;
}
void ObLogTenant::flush_storage()
{
int ret = OB_SUCCESS;
IObStoreService *store_service = TCTX.store_service_;
if (OB_ISNULL(store_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("store_service should be valid", KR(ret), K_(tenant_id));
} else {
if (OB_FAIL(store_service->flush(lob_storage_cf_handle_))) {
LOG_WARN("flush tenant lob_column_family failed", KR(ret), K_(tenant_id));
} else {
LOG_INFO("flush tenant lob column_family succ", K_(tenant_id));
}
if (OB_FAIL(store_service->flush(redo_cf_handle_))) {
LOG_WARN("flush tenant redo_column_family failed", KR(ret), K_(tenant_id));
} else {
LOG_INFO("flush tenant redo column_family succ", K_(tenant_id));
}
}
}
// DEL_RANGE and COMPACT_RANGE of lob data will invoke by resource_collector and implied in lob_aux_storager,
// here won't compact lob_column_family
void ObLogTenant::compact_storage()
{
int ret = OB_SUCCESS;
IObStoreService *store_service = TCTX.store_service_;
const bool compact_entire_column_family = true;
if (OB_ISNULL(store_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("store_service should be valid", KR(ret), K_(tenant_id));
} else {
if (OB_FAIL(store_service->compact_range(redo_cf_handle_, std::string(), std::string(), compact_entire_column_family))) {
LOG_WARN("compact tenant redo column_family failed", KR(ret), K_(tenant_id));
} else {
LOG_INFO("compact tenant redo column_family succ", K_(tenant_id));
}
}
}
} // namespace libobcdc
} // namespace oceanbase

View File

@ -132,7 +132,7 @@ public:
IObLogPartMgr &get_part_mgr() { return part_mgr_; }
int64_t get_global_schema_version() const { return global_seq_and_schema_version_.hi; }
int64_t get_global_seq() const { return global_seq_and_schema_version_.lo; }
void *get_cf() { return cf_handle_; }
void *get_redo_storage_cf_handle() { return redo_cf_handle_; }
void *get_lob_storage_cf_handle() { return lob_storage_cf_handle_; }
ObCDCLobAuxDataCleanTask& get_lob_storage_clean_task() { return lob_storage_clean_task_; }
@ -239,6 +239,13 @@ public:
return part_mgr_.get_table_info_of_tablet_id(tablet_id, table_info);
}
// flush memory data in local storage(e.g. memtable for rocksdb)
void flush_storage();
// compact data in local storage
// NOTE: LOB data will compact by resource_collector with lob_aux_meta_storager
void compact_storage();
public:
enum
{
@ -327,7 +334,7 @@ private:
// Transaction data and DDL data need to be matched for consumption, where the global_schema_version of the current transaction is recorded, which is used by the DDL to determine if it needs to be consumed.
int64_t committer_next_trans_schema_version_ CACHE_ALIGNED;
void *cf_handle_;
void *redo_cf_handle_;
void *lob_storage_cf_handle_;
ObCDCLobAuxDataCleanTask lob_storage_clean_task_;

View File

@ -998,7 +998,7 @@ int ObLogTenantMgr::remove_tenant_(const uint64_t tenant_id, ObLogTenant *tenant
} else if (OB_ISNULL(tenant)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant is NULL", KR(ret), K(tenant_id), K(tenant));
} else if (OB_ISNULL(cf = tenant->get_cf())) {
} else if (OB_ISNULL(cf = tenant->get_redo_storage_cf_handle())) {
ret= OB_ERR_UNEXPECTED;
LOG_ERROR("cf is NULL", KR(ret), K(tid), KPC(tenant));
} else if (OB_FAIL(store_service->drop_column_family(cf))) {
@ -1434,7 +1434,7 @@ bool ObLogTenantMgr::TenantPrinter::operator()(const TenantID &tid, ObLogTenant
serving_tenant_count_++;
}
(void)tenant_ids_.push_back(tid.tenant_id_);
(void)cf_handles_.push_back(tenant->get_cf());
(void)cf_handles_.push_back(tenant->get_redo_storage_cf_handle());
(void)lob_storage_cf_handles_.push_back(tenant->get_lob_storage_cf_handle());
}
return true;
@ -1458,6 +1458,42 @@ void ObLogTenantMgr::print_stat_info()
}
}
bool ObLogTenantMgr::TenantRedoStorageOperator::operator()(const TenantID &tid, ObLogTenant *tenant)
{
if (OB_NOT_NULL(tenant)) {
if (TenantStorageOp::FLUSH == op_) {
tenant->flush_storage();
} else if (TenantStorageOp::COMPACT == op_) {
tenant->compact_storage();
} else {
LOG_INFO("UNKNOWN_TENANT_STORAGE_OPERATION, IGNORE", K(tid), K_(op));
}
}
return true;
}
void ObLogTenantMgr::flush_storaged_redo()
{
IObStoreService *store_service = TCTX.store_service_;
if (OB_NOT_NULL(store_service)) {
TenantStorageOp op = TenantStorageOp::FLUSH;
TenantRedoStorageOperator redo_compactor(*store_service, op);
(void)tenant_hash_map_.for_each(redo_compactor);
}
}
void ObLogTenantMgr::compact_storaged_redo()
{
IObStoreService *store_service = TCTX.store_service_;
if (OB_NOT_NULL(store_service)) {
TenantStorageOp op = TenantStorageOp::COMPACT;
TenantRedoStorageOperator redo_compactor(*store_service, op);
(void)tenant_hash_map_.for_each(redo_compactor);
}
}
int ObLogTenantMgr::recycle_ls(const logservice::TenantLSID &tls_id)
{
int ret = OB_SUCCESS;

View File

@ -139,6 +139,9 @@ public:
palf::LSN &sys_ls_min_handle_log_lsn) = 0;
virtual void print_stat_info() = 0;
// compact redo storaged in storage in case of occupy too much resource
virtual void flush_storaged_redo() = 0;
virtual void compact_storaged_redo() = 0;
virtual int register_ls_add_callback(LSAddCallback *callback) = 0;
virtual int register_ls_recycle_callback(LSRecycleCallback *callback) = 0;
@ -229,6 +232,8 @@ public:
palf::LSN &sys_ls_min_handle_log_lsn);
virtual bool is_inited() { return inited_; }
void print_stat_info();
void flush_storaged_redo();
void compact_storaged_redo();
template <typename Func> int for_each_tenant(Func &func)
{
@ -338,6 +343,16 @@ private:
bool operator()(const TenantID &tid, ObLogTenant *tenant);
};
enum TenantStorageOp {FLUSH = 0, COMPACT=1};
struct TenantRedoStorageOperator
{
IObStoreService &store_service_;
TenantStorageOp op_;
TenantRedoStorageOperator(IObStoreService &store_service, TenantStorageOp &op) : store_service_(store_service), op_(op) {}
bool operator()(const TenantID &tid, ObLogTenant *tenant);
};
struct SetDataStartSchemaVersionFunc
{
int64_t data_start_schema_version_;

View File

@ -190,7 +190,7 @@ void DmlRedoLogNode::reset()
ATOMIC_SET(&is_readed_, false);
row_head_ = NULL;
row_tail_ = NULL;
valid_row_num_ = 0;
ATOMIC_SET(&valid_row_num_, 0);
ATOMIC_SET(&is_parsed_, false);
ATOMIC_SET(&is_formatted_, false);
reserve_field_ = 0;