From 600495620c8526a7605a1170a1cf5ae0f87f7132 Mon Sep 17 00:00:00 2001 From: SanmuWangZJU Date: Tue, 6 Feb 2024 17:06:06 +0000 Subject: [PATCH] [CP] [OBCDC] Fix SST file not recycled after trans output --- .../libobcdc/src/ob_log_committer.cpp | 5 + src/logservice/libobcdc/src/ob_log_config.h | 2 + .../libobcdc/src/ob_log_instance.cpp | 67 +++++++++++++ src/logservice/libobcdc/src/ob_log_instance.h | 9 +- .../libobcdc/src/ob_log_part_trans_task.h | 1 + src/logservice/libobcdc/src/ob_log_reader.cpp | 2 +- .../src/ob_log_resource_collector.cpp | 14 ++- .../src/ob_log_rocksdb_store_service.cpp | 93 +++++++++++++++---- .../src/ob_log_rocksdb_store_service.h | 7 +- .../libobcdc/src/ob_log_storager.cpp | 2 +- .../libobcdc/src/ob_log_store_service.h | 3 +- src/logservice/libobcdc/src/ob_log_tenant.cpp | 79 ++++++++++++++-- src/logservice/libobcdc/src/ob_log_tenant.h | 11 ++- .../libobcdc/src/ob_log_tenant_mgr.cpp | 40 +++++++- .../libobcdc/src/ob_log_tenant_mgr.h | 15 +++ .../libobcdc/src/ob_log_trans_log.cpp | 2 +- 16 files changed, 311 insertions(+), 41 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_committer.cpp b/src/logservice/libobcdc/src/ob_log_committer.cpp index 4c160520db..e042f5180e 100644 --- a/src/logservice/libobcdc/src/ob_log_committer.cpp +++ b/src/logservice/libobcdc/src/ob_log_committer.cpp @@ -1177,6 +1177,7 @@ int ObLogCommitter::after_trans_handled_(PartTransTask *participants) // Update Commit information // NOTE: Since the above guarantees that the reference count is greater than the number of Binlog Records, the list of participants here must be valid PartTransTask *part_trans_task = participants; + const bool is_ddl_trans = part_trans_task->is_ddl_trans(); while (OB_SUCC(ret) && ! stop_flag_ && OB_NOT_NULL(part_trans_task)) { PartTransTask *next = part_trans_task->next_task(); @@ -1188,6 +1189,10 @@ int ObLogCommitter::after_trans_handled_(PartTransTask *participants) // Decrement the reference count after the Commit message is updated // If the reference count is 0, the partition transaction is recycled else if (0 == part_trans_task->dec_ref_cnt()) { + if (is_ddl_trans && ! part_trans_task->is_sys_ls_part_trans()) { + // mark user_ls part_trans_task in ddl_trans not served, and will recycle redo of the part_trans_task in resource_collector. + part_trans_task->set_unserved(); + } if (OB_FAIL(resource_collector_->revert(part_trans_task))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("revert PartTransTask fail", KR(ret), K(part_trans_task)); diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index bcb0eb27d1..569800899e 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -252,6 +252,8 @@ public: // the destination of archive log. DEF_STR(archive_dest, OB_CLUSTER_PARAMETER, "|", "the location of archive log"); T_DEF_INT_INFT(rocksdb_write_buffer_size, OB_CLUSTER_PARAMETER, 64, 16, "write buffer size[M]"); + DEF_TIME(rocksdb_flush_interval, OB_CLUSTER_PARAMETER, "10m", "[0s,1d]", "rocksdb flush interval for redo_storage, 0s means never"); + DEF_TIME(rocksdb_compact_interval, OB_CLUSTER_PARAMETER, "6h", "[0s,7d]", "rocksdb compact interval for redo_storage, 0s means never"); T_DEF_INT_INFT(io_thread_num, OB_CLUSTER_PARAMETER, 4, 1, "io thread number"); T_DEF_INT(idle_pool_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 32, "idle pool thread num"); diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index 9d5b0f8bd9..f2128cda20 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -101,6 +101,8 @@ namespace libobcdc ObLogInstance *ObLogInstance::instance_ = NULL; static ObSimpleMemLimitGetter mem_limit_getter; +const int64_t ObLogInstance::DAEMON_THREAD_COUNT = 1; + ObLogInstance *ObLogInstance::get_instance() { if (NULL == instance_) { @@ -560,6 +562,8 @@ int ObLogInstance::init_common_(uint64_t start_tstamp_ns, ERROR_CALLBACK err_cb) LOG_ERROR("check config fail", KR(ret)); } else if (OB_FAIL(dump_config_())) { LOG_ERROR("dump_config_ fail", KR(ret)); + } else if (OB_FAIL(lib::ThreadPool::set_thread_count(DAEMON_THREAD_COUNT))) { + LOG_ERROR("set ObLogInstance daemon thread count failed", KR(ret), K(DAEMON_THREAD_COUNT)); } else if (OB_FAIL(trans_task_pool_alloc_.init( TASK_POOL_ALLOCATOR_TOTAL_LIMIT, TASK_POOL_ALLOCATOR_HOLD_LIMIT, @@ -1384,6 +1388,7 @@ void ObLogInstance::do_destroy_(const bool force_destroy) timer_tid_ = 0; sql_tid_ = 0; flow_control_tid_ = 0; + lib::ThreadPool::destroy(); (void)trans_task_pool_.destroy(); (void)trans_task_pool_alloc_.destroy(); @@ -1552,6 +1557,7 @@ void ObLogInstance::mark_stop_flag(const char *stop_reason) committer_->mark_stop_flag(); resource_collector_->mark_stop_flag(); timezone_info_getter_->mark_stop_flag(); + lib::ThreadPool::stop(); LOG_INFO("mark_stop_flag end", K(global_errno_), KCSTRING(stop_reason)); } @@ -2229,6 +2235,8 @@ int ObLogInstance::start_threads_() } else if (0 != (pthread_ret = pthread_create(&flow_control_tid_, NULL, flow_control_thread_func_, this))) { LOG_ERROR("start flow control thread fail", K(pthread_ret), KERRNOMSG(pthread_ret)); ret = OB_ERR_UNEXPECTED; + } else if (OB_FAIL(lib::ThreadPool::start())) { + LOG_ERROR("start daemon threads failed", KR(ret), K(DAEMON_THREAD_COUNT)); } else { LOG_INFO("start instance threads succ", K(timer_tid_), K(sql_tid_), K(flow_control_tid_)); } @@ -2273,6 +2281,65 @@ void ObLogInstance::wait_threads_stop_() flow_control_tid_ = 0; } + + LOG_INFO("wait daemon threads stop"); + lib::ThreadPool::wait(); + LOG_INFO("wait daemon threads stop done"); +} + +void ObLogInstance::run1() +{ + int ret = OB_SUCCESS; + const int64_t thread_idx = lib::ThreadPool::get_thread_idx(); + const int64_t thread_count = lib::ThreadPool::get_thread_count(); + + if (OB_UNLIKELY(thread_count != DAEMON_THREAD_COUNT || thread_idx >= thread_count)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid thread count or thread idx", KR(ret), K(thread_idx), K(thread_count), K(DAEMON_THREAD_COUNT)); + } else if (0 == thread_idx) { + // handle storage_operation + lib::set_thread_name("CDC-BGD-STORAGE_OP"); + if (OB_FAIL(daemon_handle_storage_op_thd_())) { + LOG_ERROR("handle_storage_op in background failed", KR(ret), K(thread_idx), K(thread_count)); + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpect daemon thread", KR(ret), K(thread_count), K(thread_idx)); + } + + if (OB_SUCCESS != ret && OB_IN_STOP_STATE != ret) { + handle_error(ret, "obcdc daemon thread[idx=%ld] exits, err=%d", lib::ThreadPool::get_thread_idx(), ret); + mark_stop_flag("DAEMON THEAD EXIST"); + } +} + +int ObLogInstance::daemon_handle_storage_op_thd_() +{ + int ret = OB_SUCCESS; + const int64_t TIMER_INTERVAL = 5 * _SEC_; + + while (OB_SUCC(ret) && ! lib::ThreadPool::has_set_stop()) { + ob_usleep(TIMER_INTERVAL); + ObLogTraceIdGuard trace_guard; + + if (! is_memory_working_mode(working_mode_)) { + if (OB_ISNULL(tenant_mgr_)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("expect valid tenant_mgr", KR(ret)); + } else { + const int64_t redo_flush_interval = TCONF.rocksdb_flush_interval.get(); + const int64_t redo_compact_interval = TCONF.rocksdb_compact_interval.get(); + if (redo_flush_interval > 0 && REACH_TIME_INTERVAL(redo_flush_interval)) { + tenant_mgr_->flush_storaged_redo(); + } + if (redo_compact_interval > 0 && REACH_TIME_INTERVAL(redo_compact_interval)) { + tenant_mgr_->compact_storaged_redo(); + } + } + } + } + + return ret; } void ObLogInstance::reload_config_() diff --git a/src/logservice/libobcdc/src/ob_log_instance.h b/src/logservice/libobcdc/src/ob_log_instance.h index 1e31a363de..1017050f8f 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.h +++ b/src/logservice/libobcdc/src/ob_log_instance.h @@ -92,7 +92,7 @@ public: typedef common::sqlclient::ObMySQLServerProvider ServerProviderType; -class ObLogInstance : public IObCDCInstance, public IObLogErrHandler +class ObLogInstance : public IObCDCInstance, public IObLogErrHandler, public lib::ThreadPool { public: virtual ~ObLogInstance(); @@ -218,6 +218,8 @@ private: static void *flow_control_thread_func_(void *args); int start_threads_(); void wait_threads_stop_(); + void run1() override; + int daemon_handle_storage_op_thd_(); void reload_config_(); void print_tenant_memory_usage_(); void global_flow_control_(); @@ -288,7 +290,10 @@ private: private: static ObLogInstance *instance_; - + // Threads that runs with instance + // thread count = 1, start from idx 0; + // thread 0: use to operate storage(manul flush and compact) + static const int64_t DAEMON_THREAD_COUNT; private: bool inited_; bool is_running_; diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index 6acb605343..70a14b06ca 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -1140,6 +1140,7 @@ public: bool &is_not_barrier, ObSchemaOperationType &op_type) const; ObIAllocator &get_log_entry_task_base_allocator() { return log_entry_task_base_allocator_; }; + void set_unserved() { set_unserved_(); } TO_STRING_KV( "state", serve_state_, diff --git a/src/logservice/libobcdc/src/ob_log_reader.cpp b/src/logservice/libobcdc/src/ob_log_reader.cpp index a30e9eecf0..aa88fc0774 100644 --- a/src/logservice/libobcdc/src/ob_log_reader.cpp +++ b/src/logservice/libobcdc/src/ob_log_reader.cpp @@ -220,7 +220,7 @@ int ObLogReader::handle_task_(ObLogEntryTask &log_entry_task, LOG_ERROR("get_tenant_guard fail", KR(ret)); } else { tenant = guard.get_tenant(); - column_family_handle = tenant->get_cf(); + column_family_handle = tenant->get_redo_storage_cf_handle(); } if (OB_FAIL(ret)) { diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index 6e480d61e7..e8b75459a4 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -299,7 +299,7 @@ int ObLogResourceCollector::revert_log_entry_task_(ObLogEntryTask *log_entry_tas const bool is_test_mode_on = TCONF.test_mode_on != 0; if (is_test_mode_on) { - LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task, K(data_len)); + LOG_INFO("LogEntryTask-free", "LogEntryTask", *log_entry_task, "addr", log_entry_task, K(data_len), K(is_log_entry_stored)); } if (is_log_entry_stored) { @@ -336,7 +336,7 @@ int ObLogResourceCollector::del_store_service_data_(const uint64_t tenant_id, LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id)); } else { tenant = guard.get_tenant(); - column_family_handle = tenant->get_cf(); + column_family_handle = tenant->get_redo_storage_cf_handle(); } if (OB_SUCC(ret) && ! RCThread::is_stoped()) { @@ -698,6 +698,7 @@ int ObLogResourceCollector::revert_dml_binlog_record_(ObLogBR &br, volatile bool return ret; } +// @deperate: should not use it case redo_storage_key don't contain trans_id anymore int ObLogResourceCollector::del_trans_(const uint64_t tenant_id, const ObString &trans_id_str) { @@ -718,7 +719,7 @@ int ObLogResourceCollector::del_trans_(const uint64_t tenant_id, LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id)); } else { tenant = guard.get_tenant(); - column_family_handle = tenant->get_cf(); + column_family_handle = tenant->get_redo_storage_cf_handle(); } if (OB_SUCC(ret)) { @@ -748,10 +749,13 @@ int ObLogResourceCollector::dec_ref_cnt_and_try_to_recycle_log_entry_task_(ObLog LOG_ERROR("part_trans_task is NULL", KPC(log_entry_task)); ret = OB_ERR_UNEXPECTED; } else { + const int64_t row_ref_cnt = log_entry_task->dec_row_ref_cnt(); + const bool need_revert_log_entry_task = (row_ref_cnt == 0); + if (TCONF.test_mode_on) { - LOG_INFO("revert_dml_binlog_record", KP(&br), K(br), KP(log_entry_task), KPC(log_entry_task)); + // print while revert each row + LOG_INFO("revert_dml_binlog_record", KP(&br), K(br), KP(log_entry_task), K(need_revert_log_entry_task), KPC(log_entry_task)); } - const bool need_revert_log_entry_task = (log_entry_task->dec_row_ref_cnt() == 0); if (need_revert_log_entry_task) { if (OB_FAIL(revert_log_entry_task_(log_entry_task))) { diff --git a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp index 8bb7cb2e21..07e7a0b73d 100644 --- a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp +++ b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp @@ -23,6 +23,9 @@ #include "lib/oblog/ob_log_module.h" // LOG_* #include "lib/ob_errno.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/utilities/table_properties_collectors.h" + namespace oceanbase { namespace libobcdc @@ -51,9 +54,12 @@ int RocksDbStoreService::init(const std::string &path) } else if (OB_FAIL(init_dir_(path.c_str()))) { LOG_ERROR("init_dir_ fail", K(ret)); } else { + const int total_threads = 32; + const int64_t sliding_window_size = 10000; + const int64_t deletion_trigger = 1000; + const double deletion_ratio = 0.1; m_db_path_ = path; m_options_.create_if_missing = true; - const int total_threads = 32; // By default, RocksDB uses only one background thread for flush and // compaction. Calling this function will set it up such that total of // `total_threads` is used. Good value for `total_threads` is the number of @@ -66,6 +72,21 @@ int RocksDbStoreService::init(const std::string &path) // 2G m_options_.db_write_buffer_size = 2 << 30; m_options_.max_open_files = 100; + // Creates a factory of a table property collector that marks a SST + // file as need-compaction when it observe at least "D" deletion + // entries in any "N" consecutive entries, or the ratio of tombstone + // entries >= deletion_ratio. + // + // @param sliding_window_size "N". Note that this number will be + // round up to the smallest multiple of 128 that is no less + // than the specified size. + // @param deletion_trigger "D". Note that even when "N" is changed, + // the specified number for "D" will not be changed. + // @param deletion_ratio, if <= 0 or > 1, disable triggering compaction + // based on deletion ratio. Disabled by default. + m_options_.table_properties_collector_factories.emplace_back( + rocksdb::NewCompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio)); + m_options_.statistics = rocksdb::CreateDBStatistics(); rocksdb::Status s = rocksdb::DB::Open(m_options_, m_db_path_, &m_db_); if (!s.ok()) { @@ -290,7 +311,9 @@ int RocksDbStoreService::del(const std::string &key) ret = OB_IN_STOP_STATE; } else { // find column family handle for cf - rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), key); + rocksdb::WriteOptions writer_options; + writer_options.disableWAL = true; + rocksdb::Status s = m_db_->Delete(writer_options, key); if (!s.ok()) { ret = OB_IO_ERROR; _LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); @@ -336,7 +359,9 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key } else if (is_stopped()) { ret = OB_IN_STOP_STATE; } else { - rocksdb::Status s = m_db_->DeleteRange(rocksdb::WriteOptions(), column_family_handle, + rocksdb::WriteOptions writer_options; + writer_options.disableWAL = true; + rocksdb::Status s = m_db_->DeleteRange(writer_options, column_family_handle, begin_key, end_key); if (!s.ok()) { @@ -352,35 +377,70 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key return ret; } -int RocksDbStoreService::compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) +int RocksDbStoreService::compact_range( + void *cf_handle, + const std::string &begin_key, + const std::string &end_key, + const bool op_entire_cf) { int ret = OB_SUCCESS; int64_t start_ts = get_timestamp(); rocksdb::ColumnFamilyHandle *column_family_handle = static_cast(cf_handle); + rocksdb::CompactRangeOptions compact_option; + compact_option.change_level = true; if (OB_ISNULL(column_family_handle)) { - LOG_ERROR("column_family_handle is NULL"); ret = OB_ERR_UNEXPECTED; + LOG_ERROR("column_family_handle is NULL"); } else if (is_stopped()) { ret = OB_IN_STOP_STATE; } else { - rocksdb::Slice begin(begin_key); - rocksdb::Slice end(end_key); - rocksdb::Status s = m_db_->CompactRange(rocksdb::CompactRangeOptions(), column_family_handle, - &begin, &end); + rocksdb::Status s; + if (op_entire_cf) { + // compact from nullptr to nullptr means compact all data in the column_family + s = m_db_->CompactRange(compact_option, column_family_handle, nullptr, nullptr); + } else { + rocksdb::Slice begin(begin_key); + rocksdb::Slice end(end_key); + s = m_db_->CompactRange(compact_option, column_family_handle, &begin, &end); + } if (!s.ok()) { - LOG_ERROR("CompactRange %s from rocksdb failed, error %s", begin_key.c_str(), s.ToString().c_str()); - ret = OB_ERR_UNEXPECTED; + _LOG_WARN("COMPACT_RANGE [%s - %s] failed, reason: [%s]", begin_key.c_str(), end_key.c_str(), s.ToString().c_str()); } else { - // NOTICE invoke this interface lob data clean task interval - double time_cost = (get_timestamp() - start_ts)/1000.0; - _LOG_INFO("COMPACT_RANGE time_cost=%.3lfms start_key=%s end_key=%s", time_cost, begin_key.c_str(), end_key.c_str()); + int64_t time_cost = get_timestamp() - start_ts; + _LOG_INFO("COMPACT_RANGE [%s - %s] time_cost=%s", begin_key.c_str(), end_key.c_str(), TVAL_TO_STR(time_cost)); } } return ret; } +int RocksDbStoreService::flush(void *cf_handle) +{ + int ret = OB_SUCCESS; + int64_t start_ts = get_timestamp(); + rocksdb::ColumnFamilyHandle *column_family_handle = static_cast(cf_handle); + rocksdb::FlushOptions flush_option; // default wait=true, allow_wait_stall=false + + if (OB_ISNULL(column_family_handle)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("column_family_handle is NULL"); + } else if (is_stopped()) { + ret = OB_IN_STOP_STATE; + } else { + rocksdb::Status s = m_db_->Flush(flush_option, column_family_handle); + + if (! s.ok()) { + _LOG_WARN("ROCKSDB FLUSH failed, reason: [%s]", s.ToString().c_str()); + } else { + int64_t time_cost = get_timestamp() - start_ts; + _LOG_INFO("ROCKSDB FLUSH SUCC, time_cost=%s", TVAL_TO_STR(time_cost)); + } + } + + return ret; +} + int RocksDbStoreService::create_column_family(const std::string& column_family_name, void *&cf_handle) { @@ -403,6 +463,7 @@ int RocksDbStoreService::create_column_family(const std::string& column_family_n cf_options.max_write_buffer_number = 9; // Column Family's default memtable size is 64M, when the maximum limit is exceeded, memtable -> immutable memtable, increase write_buffer_size, can reduce write amplification cf_options.write_buffer_size = rocksdb_write_buffer_size << 20; + cf_options.level_compaction_dynamic_level_bytes = true; // config rocksdb compression // supported compress algorithms will print in LOG file // cf_options.compression = rocksdb::CompressionType::kLZ4Compression; @@ -435,8 +496,6 @@ int RocksDbStoreService::drop_column_family(void *cf_handle) if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_INVALID_ARGUMENT; - } else if (is_stopped()) { - ret = OB_IN_STOP_STATE; } else { rocksdb::Status status = m_db_->DropColumnFamily(column_family_handle); @@ -459,8 +518,6 @@ int RocksDbStoreService::destory_column_family(void *cf_handle) if (OB_ISNULL(column_family_handle)) { LOG_ERROR("column_family_handle is NULL"); ret = OB_INVALID_ARGUMENT; - } else if (is_stopped()) { - ret = OB_IN_STOP_STATE; } else { rocksdb::Status status = m_db_->DestroyColumnFamilyHandle(column_family_handle); diff --git a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h index fcb3ca6a35..49c2460f04 100644 --- a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h +++ b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.h @@ -49,7 +49,12 @@ public: virtual int del(const std::string &key); virtual int del(void *cf_handle, const std::string &key); virtual int del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key); - virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key); + virtual int compact_range( + void *cf_handle, + const std::string &begin_key, + const std::string &end_key, + const bool op_entire_cf = false); + virtual int flush(void *cf_handle); virtual int create_column_family(const std::string& column_family_name, void *&cf_handle); diff --git a/src/logservice/libobcdc/src/ob_log_storager.cpp b/src/logservice/libobcdc/src/ob_log_storager.cpp index 2ec8b425cb..e4ffe110f4 100644 --- a/src/logservice/libobcdc/src/ob_log_storager.cpp +++ b/src/logservice/libobcdc/src/ob_log_storager.cpp @@ -274,7 +274,7 @@ int ObLogStorager::handle_task_(IObLogBatchBufTask &batch_task, LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id)); } else { tenant = guard.get_tenant(); - void *column_family_handle = tenant->get_cf(); + void *column_family_handle = tenant->get_redo_storage_cf_handle(); if (OB_FAIL(store_key.get_key(key))) { LOG_ERROR("store_key get_key fail", KR(ret)); diff --git a/src/logservice/libobcdc/src/ob_log_store_service.h b/src/logservice/libobcdc/src/ob_log_store_service.h index 47ede0690f..0f37c064ff 100644 --- a/src/logservice/libobcdc/src/ob_log_store_service.h +++ b/src/logservice/libobcdc/src/ob_log_store_service.h @@ -62,7 +62,8 @@ public: virtual int del(const std::string &key) = 0; virtual int del(void *cf_handle, const std::string &key) = 0; virtual int del_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) = 0; - virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key) = 0; + virtual int compact_range(void *cf_handle, const std::string &begin_key, const std::string &end_key, const bool op_entire_cf = false) = 0; + virtual int flush(void *cf_handle) = 0; virtual int create_column_family(const std::string& column_family_name, void *&cf_handle) = 0; diff --git a/src/logservice/libobcdc/src/ob_log_tenant.cpp b/src/logservice/libobcdc/src/ob_log_tenant.cpp index 60017ec52c..adcfa2540d 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant.cpp @@ -25,6 +25,7 @@ #include "ob_log_timezone_info_getter.h" // ObCDCTimeZoneInfoGetter #include "ob_log_start_schema_matcher.h" // ObLogStartSchemaMatcher +#include "ob_log_store_service.h" // IObLogStoreService #define STAT(level, tag_str, args...) OBLOG_LOG(level, "[STAT] [TENANT] " tag_str, ##args) #define ISTAT(tag_str, args...) STAT(INFO, tag_str, ##args) @@ -54,7 +55,7 @@ ObLogTenant::ObLogTenant() : committer_global_heartbeat_(OB_INVALID_VERSION), committer_cur_schema_version_(OB_INVALID_VERSION), committer_next_trans_schema_version_(OB_INVALID_VERSION), - cf_handle_(NULL), + redo_cf_handle_(NULL), lob_storage_cf_handle_(nullptr), lob_storage_clean_task_() { @@ -74,7 +75,7 @@ int ObLogTenant::init( const int64_t start_tstamp_ns, const int64_t start_seq, const int64_t start_schema_version, - void *cf_handle, + void *redo_cf_handle, void *lob_storage_cf_handle, ObLogTenantMgr &tenant_mgr) { @@ -89,9 +90,9 @@ int ObLogTenant::init( || OB_UNLIKELY(start_seq < 0) || OB_UNLIKELY(start_schema_version <= 0) || OB_ISNULL(tenant_name) - || OB_ISNULL(cf_handle)) { + || OB_ISNULL(redo_cf_handle)) { LOG_ERROR("invalid argument", K(tenant_id), K(tenant_name), K(start_tstamp_ns), K(start_seq), - K(start_schema_version), K(cf_handle)); + K(start_schema_version), K(redo_cf_handle)); ret = OB_INVALID_ARGUMENT; } else if (OB_ISNULL(task_queue_ = OB_NEW(ObLogTenantTaskQueue, ObModIds::OB_LOG_TENANT_TASK_QUEUE, *this))) { LOG_ERROR("create task queue fail", K(task_queue_)); @@ -136,7 +137,7 @@ int ObLogTenant::init( committer_global_heartbeat_ = OB_INVALID_VERSION; committer_cur_schema_version_ = start_schema_version; committer_next_trans_schema_version_ = start_schema_version; - cf_handle_ = cf_handle; + redo_cf_handle_ = redo_cf_handle; lob_storage_cf_handle_ = lob_storage_cf_handle; lob_storage_clean_task_.tenant_id_ = tenant_id; @@ -162,6 +163,8 @@ int ObLogTenant::init_all_ddl_operation_table_schema_info_() void ObLogTenant::reset() { + int ret = OB_SUCCESS; + if (inited_) { LOG_INFO("destroy tenant", K_(tenant_id), K_(tenant_name), K_(start_schema_version)); } @@ -195,9 +198,28 @@ void ObLogTenant::reset() committer_global_heartbeat_ = OB_INVALID_VERSION; committer_cur_schema_version_ = OB_INVALID_VERSION; committer_next_trans_schema_version_ = OB_INVALID_VERSION; - cf_handle_ = NULL; - lob_storage_cf_handle_ = nullptr; lob_storage_clean_task_.reset(); + IObStoreService *store_service = TCTX.store_service_; + if (OB_NOT_NULL(store_service)) { + LOG_INFO("prepare drop tenant column family", K(tenant_id)); + if (OB_NOT_NULL(lob_storage_cf_handle_) && OB_FAIL(store_service->drop_column_family(lob_storage_cf_handle_))) { + LOG_WARN("drop_lob_column_family failed", K(tenant_id)); + } + if (OB_NOT_NULL(redo_cf_handle_) && OB_FAIL(store_service->drop_column_family(redo_cf_handle_))) { + LOG_WARN("drop_redo_column_family failed", K(tenant_id)); + } + LOG_INFO("prepare destroy tenant column family", K(tenant_id)); + if (OB_NOT_NULL(lob_storage_cf_handle_) && OB_FAIL(store_service->destory_column_family(lob_storage_cf_handle_))) { + LOG_WARN("destroy_lob_column_family failed", K(tenant_id)); + } + if (OB_NOT_NULL(redo_cf_handle_) && OB_FAIL(store_service->destory_column_family(redo_cf_handle_))) { + LOG_WARN("destroy_redo_column_family failed", K(tenant_id)); + } + redo_cf_handle_ = nullptr; + lob_storage_cf_handle_ = nullptr; + LOG_INFO("handle tenant column family done", K(tenant_id)); + } + ObMallocAllocator::get_instance()->recycle_tenant_allocator(tenant_id); } @@ -870,6 +892,49 @@ int ObLogTenant::update_data_start_schema_version_on_split_mode() return ret; } +void ObLogTenant::flush_storage() +{ + int ret = OB_SUCCESS; + IObStoreService *store_service = TCTX.store_service_; + + if (OB_ISNULL(store_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("store_service should be valid", KR(ret), K_(tenant_id)); + } else { + if (OB_FAIL(store_service->flush(lob_storage_cf_handle_))) { + LOG_WARN("flush tenant lob_column_family failed", KR(ret), K_(tenant_id)); + } else { + LOG_INFO("flush tenant lob column_family succ", K_(tenant_id)); + } + + if (OB_FAIL(store_service->flush(redo_cf_handle_))) { + LOG_WARN("flush tenant redo_column_family failed", KR(ret), K_(tenant_id)); + } else { + LOG_INFO("flush tenant redo column_family succ", K_(tenant_id)); + } + } +} + +// DEL_RANGE and COMPACT_RANGE of lob data will invoke by resource_collector and implied in lob_aux_storager, +// here won't compact lob_column_family +void ObLogTenant::compact_storage() +{ + int ret = OB_SUCCESS; + IObStoreService *store_service = TCTX.store_service_; + const bool compact_entire_column_family = true; + + if (OB_ISNULL(store_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("store_service should be valid", KR(ret), K_(tenant_id)); + } else { + if (OB_FAIL(store_service->compact_range(redo_cf_handle_, std::string(), std::string(), compact_entire_column_family))) { + LOG_WARN("compact tenant redo column_family failed", KR(ret), K_(tenant_id)); + } else { + LOG_INFO("compact tenant redo column_family succ", K_(tenant_id)); + } + } +} + } // namespace libobcdc } // namespace oceanbase diff --git a/src/logservice/libobcdc/src/ob_log_tenant.h b/src/logservice/libobcdc/src/ob_log_tenant.h index 896f692f97..ad69a72cb8 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant.h +++ b/src/logservice/libobcdc/src/ob_log_tenant.h @@ -132,7 +132,7 @@ public: IObLogPartMgr &get_part_mgr() { return part_mgr_; } int64_t get_global_schema_version() const { return global_seq_and_schema_version_.hi; } int64_t get_global_seq() const { return global_seq_and_schema_version_.lo; } - void *get_cf() { return cf_handle_; } + void *get_redo_storage_cf_handle() { return redo_cf_handle_; } void *get_lob_storage_cf_handle() { return lob_storage_cf_handle_; } ObCDCLobAuxDataCleanTask& get_lob_storage_clean_task() { return lob_storage_clean_task_; } @@ -239,6 +239,13 @@ public: return part_mgr_.get_table_info_of_tablet_id(tablet_id, table_info); } + // flush memory data in local storage(e.g. memtable for rocksdb) + void flush_storage(); + + // compact data in local storage + // NOTE: LOB data will compact by resource_collector with lob_aux_meta_storager + void compact_storage(); + public: enum { @@ -327,7 +334,7 @@ private: // Transaction data and DDL data need to be matched for consumption, where the global_schema_version of the current transaction is recorded, which is used by the DDL to determine if it needs to be consumed. int64_t committer_next_trans_schema_version_ CACHE_ALIGNED; - void *cf_handle_; + void *redo_cf_handle_; void *lob_storage_cf_handle_; ObCDCLobAuxDataCleanTask lob_storage_clean_task_; diff --git a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp index 9c53887a16..7af240ad67 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp @@ -998,7 +998,7 @@ int ObLogTenantMgr::remove_tenant_(const uint64_t tenant_id, ObLogTenant *tenant } else if (OB_ISNULL(tenant)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("tenant is NULL", KR(ret), K(tenant_id), K(tenant)); - } else if (OB_ISNULL(cf = tenant->get_cf())) { + } else if (OB_ISNULL(cf = tenant->get_redo_storage_cf_handle())) { ret= OB_ERR_UNEXPECTED; LOG_ERROR("cf is NULL", KR(ret), K(tid), KPC(tenant)); } else if (OB_FAIL(store_service->drop_column_family(cf))) { @@ -1434,7 +1434,7 @@ bool ObLogTenantMgr::TenantPrinter::operator()(const TenantID &tid, ObLogTenant serving_tenant_count_++; } (void)tenant_ids_.push_back(tid.tenant_id_); - (void)cf_handles_.push_back(tenant->get_cf()); + (void)cf_handles_.push_back(tenant->get_redo_storage_cf_handle()); (void)lob_storage_cf_handles_.push_back(tenant->get_lob_storage_cf_handle()); } return true; @@ -1458,6 +1458,42 @@ void ObLogTenantMgr::print_stat_info() } } +bool ObLogTenantMgr::TenantRedoStorageOperator::operator()(const TenantID &tid, ObLogTenant *tenant) +{ + if (OB_NOT_NULL(tenant)) { + if (TenantStorageOp::FLUSH == op_) { + tenant->flush_storage(); + } else if (TenantStorageOp::COMPACT == op_) { + tenant->compact_storage(); + } else { + LOG_INFO("UNKNOWN_TENANT_STORAGE_OPERATION, IGNORE", K(tid), K_(op)); + } + } + + return true; +} + +void ObLogTenantMgr::flush_storaged_redo() +{ + IObStoreService *store_service = TCTX.store_service_; + + if (OB_NOT_NULL(store_service)) { + TenantStorageOp op = TenantStorageOp::FLUSH; + TenantRedoStorageOperator redo_compactor(*store_service, op); + (void)tenant_hash_map_.for_each(redo_compactor); + } +} +void ObLogTenantMgr::compact_storaged_redo() +{ + IObStoreService *store_service = TCTX.store_service_; + + if (OB_NOT_NULL(store_service)) { + TenantStorageOp op = TenantStorageOp::COMPACT; + TenantRedoStorageOperator redo_compactor(*store_service, op); + (void)tenant_hash_map_.for_each(redo_compactor); + } +} + int ObLogTenantMgr::recycle_ls(const logservice::TenantLSID &tls_id) { int ret = OB_SUCCESS; diff --git a/src/logservice/libobcdc/src/ob_log_tenant_mgr.h b/src/logservice/libobcdc/src/ob_log_tenant_mgr.h index b93fc52ae5..2729caf6c3 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant_mgr.h +++ b/src/logservice/libobcdc/src/ob_log_tenant_mgr.h @@ -139,6 +139,9 @@ public: palf::LSN &sys_ls_min_handle_log_lsn) = 0; virtual void print_stat_info() = 0; + // compact redo storaged in storage in case of occupy too much resource + virtual void flush_storaged_redo() = 0; + virtual void compact_storaged_redo() = 0; virtual int register_ls_add_callback(LSAddCallback *callback) = 0; virtual int register_ls_recycle_callback(LSRecycleCallback *callback) = 0; @@ -229,6 +232,8 @@ public: palf::LSN &sys_ls_min_handle_log_lsn); virtual bool is_inited() { return inited_; } void print_stat_info(); + void flush_storaged_redo(); + void compact_storaged_redo(); template int for_each_tenant(Func &func) { @@ -338,6 +343,16 @@ private: bool operator()(const TenantID &tid, ObLogTenant *tenant); }; + enum TenantStorageOp {FLUSH = 0, COMPACT=1}; + + struct TenantRedoStorageOperator + { + IObStoreService &store_service_; + TenantStorageOp op_; + TenantRedoStorageOperator(IObStoreService &store_service, TenantStorageOp &op) : store_service_(store_service), op_(op) {} + bool operator()(const TenantID &tid, ObLogTenant *tenant); + }; + struct SetDataStartSchemaVersionFunc { int64_t data_start_schema_version_; diff --git a/src/logservice/libobcdc/src/ob_log_trans_log.cpp b/src/logservice/libobcdc/src/ob_log_trans_log.cpp index 3889653439..32dabf1412 100644 --- a/src/logservice/libobcdc/src/ob_log_trans_log.cpp +++ b/src/logservice/libobcdc/src/ob_log_trans_log.cpp @@ -190,7 +190,7 @@ void DmlRedoLogNode::reset() ATOMIC_SET(&is_readed_, false); row_head_ = NULL; row_tail_ = NULL; - valid_row_num_ = 0; + ATOMIC_SET(&valid_row_num_, 0); ATOMIC_SET(&is_parsed_, false); ATOMIC_SET(&is_formatted_, false); reserve_field_ = 0;