diff --git a/src/logservice/data_dictionary/ob_data_dict_iterator.cpp b/src/logservice/data_dictionary/ob_data_dict_iterator.cpp index 7bff8a91a6..3865ed0e51 100644 --- a/src/logservice/data_dictionary/ob_data_dict_iterator.cpp +++ b/src/logservice/data_dictionary/ob_data_dict_iterator.cpp @@ -171,7 +171,35 @@ int ObDataDictIterator::next_dict_header(ObDictMetaHeader &header) return ret; } +template +int ObDataDictIterator::next_dict_entry(DICT_ENTRY &dict_entry) +{ + int ret = OB_SUCCESS; + if (dict_pos_ > 0) { + // deserialize from dict_buf_ + int64_t deserialize_pos = 0; + if (OB_FAIL(dict_entry.deserialize(dict_buf_, dict_pos_, deserialize_pos))) { + DDLOG(WARN, "deserialize DICT_ENTRY from dict_buf failed", KR(ret), + K_(dict_pos), K(deserialize_pos)); + } + } else if (palf_pos_ > 0) { + // deserialize from dict_buf_ + if (OB_FAIL(dict_entry.deserialize(palf_buf_, palf_buf_len_, palf_pos_))) { + DDLOG(WARN, "deserialize DICT_ENTRY from palf_buf failed", KR(ret), + K_(palf_buf_len), K_(palf_pos)); + } + } else { + ret = OB_ERR_UNEXPECTED; + DDLOG(WARN, "expect any of dict_pos/palf_pos is valid", KR(ret), K_(palf_pos), K_(dict_pos)); + } + + return ret; +} + +template int ObDataDictIterator::next_dict_entry(ObDictTenantMeta &dict_entry); +template int ObDataDictIterator::next_dict_entry(ObDictDatabaseMeta &dict_entry); +template int ObDataDictIterator::next_dict_entry(ObDictTableMeta &dict_entry); int ObDataDictIterator::append_log_buf_with_base_header_(const char *buf, const int64_t buf_len) { diff --git a/src/logservice/data_dictionary/ob_data_dict_iterator.h b/src/logservice/data_dictionary/ob_data_dict_iterator.h index a072d28b5a..d8d39219e1 100644 --- a/src/logservice/data_dictionary/ob_data_dict_iterator.h +++ b/src/logservice/data_dictionary/ob_data_dict_iterator.h @@ -34,30 +34,7 @@ public: int append_log_buf(const char *buf, const int64_t buf_len, const int64_t pos); // without log_base_header int next_dict_header(ObDictMetaHeader &meta_header); template - int next_dict_entry(DICT_ENTRY &dict_entry) - { - int ret = OB_SUCCESS; - - if (dict_pos_ > 0) { - // deserialize from dict_buf_ - int64_t deserialize_pos = 0; - if (OB_FAIL(dict_entry.deserialize(dict_buf_, dict_pos_, deserialize_pos))) { - DDLOG(WARN, "deserialize DICT_ENTRY from dict_buf failed", KR(ret), - K_(dict_pos), K(deserialize_pos)); - } - } else if (palf_pos_ > 0) { - // deserialize from dict_buf_ - if (OB_FAIL(dict_entry.deserialize(palf_buf_, palf_buf_len_, palf_pos_))) { - DDLOG(WARN, "deserialize DICT_ENTRY from palf_buf failed", KR(ret), - K_(palf_buf_len), K_(palf_pos)); - } - } else { - ret = OB_ERR_UNEXPECTED; - DDLOG(WARN, "expect any of dict_pos/palf_pos is valid", KR(ret), K_(palf_pos), K_(dict_pos)); - } - - return ret; - } + int next_dict_entry(DICT_ENTRY &dict_entry); private: OB_INLINE void release_palf_buf_() { diff --git a/src/logservice/data_dictionary/ob_data_dict_persist_callback.h b/src/logservice/data_dictionary/ob_data_dict_persist_callback.h index b3198e416d..5127ebe666 100644 --- a/src/logservice/data_dictionary/ob_data_dict_persist_callback.h +++ b/src/logservice/data_dictionary/ob_data_dict_persist_callback.h @@ -35,8 +35,9 @@ public: public: virtual int on_success() override { - is_callback_invoked_ = true; is_success_ = true; + MEM_BARRIER(); + is_callback_invoked_ = true; return OB_SUCCESS; } virtual int on_failure() override diff --git a/src/logservice/data_dictionary/ob_data_dict_service.cpp b/src/logservice/data_dictionary/ob_data_dict_service.cpp index 47733f5d33..0ace04209e 100644 --- a/src/logservice/data_dictionary/ob_data_dict_service.cpp +++ b/src/logservice/data_dictionary/ob_data_dict_service.cpp @@ -245,6 +245,8 @@ int ObDataDictService::do_dump_data_dict_() { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; + ObLSHandle ls_handle; // NOTICE: ls_handle is a guard for usage of log_handler. + ObLS *ls = NULL; ObLogHandler *log_handler = NULL; bool is_leader = false; share::SCN snapshot_scn; @@ -252,12 +254,16 @@ int ObDataDictService::do_dump_data_dict_() palf::LSN end_lsn; bool is_cluster_status_normal = false; bool is_data_dict_dump_success = false; + bool is_any_log_callback_fail = false; storage_.reuse(); allocator_.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; DDLOG(WARN, "data_dict_service not inited", KR(ret), K_(tenant_id), K_(is_inited)); + } else if (OB_ISNULL(ls_service_)) { + ret = OB_ERR_UNEXPECTED; + DDLOG(WARN, "invalid ls_service", KR(ret), K_(tenant_id)); } else if (OB_UNLIKELY(stop_flag_)) { ret = OB_NOT_RUNNING; DDLOG(WARN, "data_dict_service not running", KR(ret), K_(tenant_id), K_(stop_flag)); @@ -265,10 +271,16 @@ int ObDataDictService::do_dump_data_dict_() DDLOG(TRACE, "check_cluster_status_normal_ failed", KR(ret), K(is_cluster_status_normal)); } else if (OB_UNLIKELY(! is_cluster_status_normal)) { DDLOG(TRACE, "cluster_status not normal, won't dump_data_dict", K(is_cluster_status_normal)); - } else if (OB_FAIL(get_sys_ls_log_handle_(log_handler))) { + } else if (OB_FAIL(ls_service_->get_ls(share::SYS_LS, ls_handle, ObLSGetMod::DATA_DICT_MOD))) { if (OB_LS_NOT_EXIST != ret || REACH_TIME_INTERVAL_THREAD_LOCAL(PRINT_DETAIL_INTERVAL)) { - DDLOG(WARN, "get_sys_ls_log_handle_ failed", KR(ret)); + DDLOG(WARN, "get_ls for data_dict_service from ls_service failed", KR(ret), K_(tenant_id)); } + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + DDLOG(WARN, "invalid ls get from ls_handle", KR(ret), K_(tenant_id)); + } else if (OB_ISNULL(log_handler = ls->get_log_handler())) { + ret = OB_ERR_UNEXPECTED; + DDLOG(WARN, "invalid log_handler_ get from OBLS", KR(ret), K_(tenant_id)); } else if (check_ls_leader(log_handler, is_leader)) { DDLOG(WARN, "check_is_sys_ls_leader failed", KR(ret)); } else if (! is_leader) { @@ -288,13 +300,15 @@ int ObDataDictService::do_dump_data_dict_() start_lsn, end_lsn, is_data_dict_dump_success, + is_any_log_callback_fail, stop_flag_))) { if (OB_IN_STOP_STATE != tmp_ret && OB_STATE_NOT_MATCH != tmp_ret) { DDLOG(WARN, "finish storage for data_dict_service failed", KR(ret), KR(tmp_ret), K(snapshot_scn), K(start_lsn), K(end_lsn), K_(stop_flag), K_(is_inited)); } ret = tmp_ret; - } else if (is_data_dict_dump_success) { + } else if (is_data_dict_dump_success && ! is_any_log_callback_fail) { + // only report when dict dump success and all log_callback success. const int64_t half_dump_interval = ATOMIC_LOAD(&dump_interval_) / 2; const int64_t report_timeout = DEFAULT_REPORT_TIMEOUT > half_dump_interval ? half_dump_interval : DEFAULT_REPORT_TIMEOUT; const int64_t current_time = get_timestamp_us(); @@ -347,29 +361,6 @@ int ObDataDictService::check_cluster_status_normal_(bool &is_normal) return ret; } -int ObDataDictService::get_sys_ls_log_handle_(ObLogHandler *&log_handler) -{ - int ret = OB_SUCCESS; - ObLSHandle ls_handle; - ObLS *ls = NULL; - - if (OB_ISNULL(ls_service_)) { - ret = OB_ERR_UNEXPECTED; - DDLOG(WARN, "invalid ls_service", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(ls_service_->get_ls(share::SYS_LS, ls_handle, ObLSGetMod::DATA_DICT_MOD))) { - DDLOG(WARN, "get_ls for data_dict_service from ls_service failed", KR(ret), K_(tenant_id)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - DDLOG(WARN, "invalid ls get from ls_handle", KR(ret), K_(tenant_id)); - } else if (OB_ISNULL(log_handler = ls->get_log_handler())) { - ret = OB_ERR_UNEXPECTED; - DDLOG(WARN, "invalid log_handler_ get from OBLS", KR(ret), K_(tenant_id)); - } else { - } - - return ret; -} - int ObDataDictService::get_snapshot_scn_(share::SCN &snapshot_scn) { int ret = OB_SUCCESS; diff --git a/src/logservice/data_dictionary/ob_data_dict_service.h b/src/logservice/data_dictionary/ob_data_dict_service.h index 53541f2b88..c1b179c1b3 100644 --- a/src/logservice/data_dictionary/ob_data_dict_service.h +++ b/src/logservice/data_dictionary/ob_data_dict_service.h @@ -77,7 +77,6 @@ private: void switch_role_to_(bool is_leader); int do_dump_data_dict_(); int check_cluster_status_normal_(bool &is_normal); - int get_sys_ls_log_handle_(logservice::ObLogHandler *&log_handler); int get_snapshot_scn_(share::SCN &snapshot_scn); int generate_dict_and_dump_(const share::SCN &snapshot_scn); int get_tenant_schema_guard_( diff --git a/src/logservice/data_dictionary/ob_data_dict_storager.cpp b/src/logservice/data_dictionary/ob_data_dict_storager.cpp index bfef2491c3..0db4b6cd3e 100644 --- a/src/logservice/data_dictionary/ob_data_dict_storager.cpp +++ b/src/logservice/data_dictionary/ob_data_dict_storager.cpp @@ -83,17 +83,8 @@ ObDataDictStorage::ObDataDictStorage(ObIAllocator &allocator) void ObDataDictStorage::reset() { reuse(); + reset_buf_(); tenant_id_ = OB_INVALID_TENANT_ID; - if (OB_NOT_NULL(palf_buf_)) { - ob_dict_free(palf_buf_); - palf_buf_len_ = 0; - palf_buf_ = NULL; - } - if (OB_NOT_NULL(dict_buf_)) { - ob_dict_free(dict_buf_); - dict_buf_len_ = 0; - dict_buf_ = NULL; - } } void ObDataDictStorage::reuse() @@ -121,10 +112,6 @@ int ObDataDictStorage::init(const uint64_t tenant_id) DDLOG(WARN, "expect palf_buf and dict_buf NULL", KR(ret), KP_(palf_buf), KP_(dict_buf)); } else { tenant_id_ = tenant_id; - if (is_user_tenant(tenant_id)) { - palf_buf_ = static_cast(ob_dict_malloc(DEFAULT_PALF_BUF_SIZE, tenant_id_)); - dict_buf_ = static_cast(ob_dict_malloc(DEFAULT_DICT_BUF_SIZE, tenant_id_)); - } DDLOG(INFO, "data_dict_storager init success", K_(tenant_id)); } @@ -139,6 +126,8 @@ int ObDataDictStorage::prepare(const share::SCN &snapshot_scn, ObLogHandler *log || OB_ISNULL(log_handler)) { ret = OB_INVALID_ARGUMENT; DDLOG(WARN, "invalid log_handler", KR(ret), K_(tenant_id), K(snapshot_scn_)); + } else if (OB_FAIL(prepare_buf_())) { + DDLOG(WARN, "prepare_buf_ failed", KR(ret)); } else { reuse(); snapshot_scn_ = snapshot_scn; @@ -149,24 +138,71 @@ int ObDataDictStorage::prepare(const share::SCN &snapshot_scn, ObLogHandler *log return ret; } +template +int ObDataDictStorage::handle_dict_meta( + const DATA_DICT_META &data_dict_meta, + ObDictMetaHeader &header) +{ + int ret = OB_SUCCESS; + const int64_t dict_serialize_size = data_dict_meta.get_serialize_size(); + // serialize_header + header.set_snapshot_scn(snapshot_scn_); + header.set_dict_serialize_length(dict_serialize_size); + header.set_storage_type(ObDictMetaStorageType::FULL); + const int64_t header_serialize_size = header.get_serialize_size(); + const int64_t total_serialize_size = dict_serialize_size + + header_serialize_size + + log_base_header_.get_serialize_size(); + + if (! need_new_palf_buf_(total_serialize_size)) { + if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) { + DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta)); + } + } else if (OB_FAIL(submit_to_palf_())) { + DDLOG(WARN, "submit_data_dict_to_palf_ failed", KR(ret), K_(palf_buf_len), K_(palf_pos)); + } else if (! need_new_palf_buf_(total_serialize_size)) { + // check if palf_buf_len is enough for header + data_dict. + if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) { + DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta)); + } + } else if (OB_FAIL(prepare_dict_buf_(dict_serialize_size))) { + DDLOG(WARN, "prepare_dict_buf_ failed", KR(ret), K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos)); + } else if (OB_FAIL(data_dict_meta.serialize(dict_buf_, dict_buf_len_, dict_pos_))) { + DDLOG(WARN, "serialize data_dict_meta to dict_buf failed", KR(ret), + K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos)); + } else if (OB_FAIL(segment_dict_buf_to_palf_(header))) { + DDLOG(WARN, "segment_dict_buf_to_palf_ failed", KR(ret), K(header), K_(dict_buf_len), K_(dict_pos), K_(palf_pos)); + } + + if (OB_SUCC(ret)) { + DDLOG(TRACE, "handle data_dict success", K(header), K(data_dict_meta)); + } + + return ret; +} + +template int ObDataDictStorage::handle_dict_meta(const ObDictTenantMeta &data_dict_meta, ObDictMetaHeader &header); +template int ObDataDictStorage::handle_dict_meta(const ObDictDatabaseMeta &data_dict_meta, ObDictMetaHeader &header); +template int ObDataDictStorage::handle_dict_meta(const ObDictTableMeta &data_dict_meta, ObDictMetaHeader &header); + int ObDataDictStorage::finish( palf::LSN &start_lsn, palf::LSN &end_lsn, bool is_dump_success, + bool &is_any_log_callback_fail, volatile bool &stop_flag) { int ret = OB_SUCCESS; const static int64_t WAIT_TIMEOUT_MS = 10; - bool is_any_cb_fail = false; // try submit remian palf_buf if exist data not submit to palf. if (OB_FAIL(submit_to_palf_())) { DDLOG(WARN, "try submit remain palf_buf to palf failed", KR(ret)); } - RETRY_FUNC_ON_ERROR(OB_TIMEOUT, stop_flag, *this, wait_palf_callback_, WAIT_TIMEOUT_MS, is_any_cb_fail, stop_flag); - - if (OB_SUCC(ret) && is_dump_success) { + if (OB_FAIL(wait_palf_callback_(is_any_log_callback_fail, stop_flag))) { + DDLOG(WARN, "wait palf_callback failed", KR(ret), K(is_dump_success), K(is_any_log_callback_fail), K(stop_flag)); + } else if (is_dump_success && ! is_any_log_callback_fail) { if (OB_UNLIKELY(! start_lsn_.is_valid()) || OB_UNLIKELY(! end_lsn_.is_valid())) { ret = OB_STATE_NOT_MATCH; @@ -179,6 +215,8 @@ int ObDataDictStorage::finish( } } + reset_buf_(); // reset palf_buf and dict_buf anyway. + return ret; } @@ -337,6 +375,41 @@ int ObDataDictStorage::parse_dict_metas( return ret; } +int ObDataDictStorage::prepare_buf_() +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_ || ! is_user_tenant(tenant_id_))) { + ret = OB_STATE_NOT_MATCH; + DDLOG(WARN, "data_dict_service only work for user_tenant", KR(ret), K_(tenant_id)); + } else if (OB_NOT_NULL(palf_buf_) || OB_NOT_NULL(dict_buf_)) { + ret = OB_ERR_UNEXPECTED; + DDLOG(WARN, "expect invalid palf_buf and dict_buf before prepare dump data_dict", KR(ret), + KP_(palf_buf), KP_(dict_buf)); + } else { + palf_buf_len_ = DEFAULT_PALF_BUF_SIZE; + dict_buf_len_ = DEFAULT_DICT_BUF_SIZE; + palf_buf_ = static_cast(ob_dict_malloc(palf_buf_len_, tenant_id_)); + dict_buf_ = static_cast(ob_dict_malloc(dict_buf_len_ , tenant_id_)); + } + + return ret; +} + +void ObDataDictStorage::reset_buf_() +{ + if (OB_NOT_NULL(palf_buf_)) { + ob_dict_free(palf_buf_); + palf_buf_len_ = 0; + palf_buf_ = NULL; + } + if (OB_NOT_NULL(dict_buf_)) { + ob_dict_free(dict_buf_); + dict_buf_len_ = 0; + dict_buf_ = NULL; + } +} + int ObDataDictStorage::serialize_log_base_header_() { int ret = OB_SUCCESS; @@ -383,6 +456,43 @@ int ObDataDictStorage::prepare_dict_buf_(const int64_t required_size) return ret; } +template +int ObDataDictStorage::serialize_to_palf_buf_( + const ObDictMetaHeader &header, + const DATA_DICT_META &data_dict) +{ + int ret = OB_SUCCESS; + + if (OB_ISNULL(palf_buf_)) { + ret = OB_STATE_NOT_MATCH; + DDLOG(WARN, "palf_buf shoule be valid", KR(ret)); + } else if (palf_pos_ == 0) { + if (OB_FAIL(serialize_log_base_header_())) { + DDLOG(WARN, "serialize_log_base_header_ failed", KR(ret), K_(palf_pos)); + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(header.serialize(palf_buf_, palf_buf_len_, palf_pos_))) { + DDLOG(WARN, "serialize header to palf_buf failed", KR(ret), K(header), + K_(palf_buf_len), K_(palf_pos), "header_serialize_size", header.get_serialize_size()); + } else if (OB_FAIL(data_dict.serialize(palf_buf_, palf_buf_len_, palf_pos_))) { + DDLOG(WARN, "serialize data_dict to palf_buf failed", KR(ret), K(header), K(data_dict), + K_(palf_buf_len), K_(palf_pos), "dict_serialize_size", data_dict.get_serialize_size()); + } else { + DDLOG(DEBUG, "serialize data_dict to palf_buf success", KR(ret), K(header), K(data_dict), + K_(palf_buf_len), K_(palf_pos), + "header_size", header.get_serialize_size(), + "data_dict_size", data_dict.get_serialize_size()); + } + + return ret; +} + +template int ObDataDictStorage::serialize_to_palf_buf_(const ObDictMetaHeader &header, const ObDictTenantMeta &data_dict); +template int ObDataDictStorage::serialize_to_palf_buf_(const ObDictMetaHeader &header, const ObDictDatabaseMeta &data_dict); +template int ObDataDictStorage::serialize_to_palf_buf_(const ObDictMetaHeader &header, const ObDictTableMeta &data_dict); + int ObDataDictStorage::segment_dict_buf_to_palf_(ObDictMetaHeader &header) { int ret = OB_SUCCESS; @@ -477,9 +587,8 @@ int ObDataDictStorage::submit_to_palf_() K_(palf_buf), K_(palf_pos), K(ref_scn), K(need_nonblock), K(lsn), K(submit_scn)); } else if (OB_FAIL(update_palf_lsn_(lsn))) { DDLOG(WARN, "update_palf_lsn_ failed", KR(ret), K(lsn), K_(start_lsn), K_(end_lsn)); - } else if (OB_FAIL(cb_queue_.push(callback))) { - DDLOG(WARN, "push callback to callback_queue failed", KR(ret)); } else { + cb_queue_.push(callback); // submit to palf success DDLOG(DEBUG, "submit palf_buf to palf succ", K(lsn), K(submit_scn), K_(palf_pos)); total_log_cnt_++; @@ -523,56 +632,82 @@ int ObDataDictStorage::update_palf_lsn_(const palf::LSN &lsn) return ret; } -int ObDataDictStorage::wait_palf_callback_(const int64_t timeout_msec, bool &is_any_cb_fail, volatile bool &stop_flag) +// invoke after all data_dict dumped into palf. +int ObDataDictStorage::wait_palf_callback_(bool &is_any_cb_fail, volatile bool &stop_flag) { int ret = OB_SUCCESS; - static int64_t SLEEP_MS = 1; + static const int64_t CHECK_CB_INTERVAL = 10 * _MSEC_; + static const int64_t PRINT_CB_STATUS_INTERVAL = 5 * _SEC_; // print callback status interval when callback not all invoked is_any_cb_fail = false; - bool is_all_invoked = true; - bool has_cb_on_fail = false; + bool is_all_cb_invoked = true; + bool print_cb_status = false; - // exit loop if all callback invoked or any callback fail - while (OB_SUCC(ret) && (! is_all_invoked && ! is_any_cb_fail) && ! stop_flag) { - if (REACH_TIME_INTERVAL_THREAD_LOCAL(timeout_msec)) { - ret = OB_TIMEOUT; - } else if (OB_FAIL(check_callback_list_(is_all_invoked, has_cb_on_fail))) { - DDLOG(WARN, "check_callback_list_ failed", KR(ret), K(is_all_invoked), K(has_cb_on_fail)); - } else { - is_any_cb_fail = has_cb_on_fail; - if (! is_any_cb_fail && ! is_all_invoked) { - usleep(SLEEP_MS); + // exit loop if any of below cases occur: (1) check_callback_list_ failed; (2) all log_callback invoked. + do { + if (OB_FAIL(check_callback_list_(is_all_cb_invoked, is_any_cb_fail, print_cb_status, stop_flag))) { + DDLOG(WARN, "check_callback_list_ failed", KR(ret)); + } else if (! is_all_cb_invoked) { + if (REACH_TIME_INTERVAL_THREAD_LOCAL(PRINT_CB_STATUS_INTERVAL)) { + print_cb_status = true; } + usleep(CHECK_CB_INTERVAL); } - } + } while (OB_SUCC(ret) && ! is_all_cb_invoked); return ret; } -int ObDataDictStorage::check_callback_list_(bool &is_all_invoked, bool &has_cb_on_fail) +// invoke after all data_dict dumped into palf. +int ObDataDictStorage::check_callback_list_( + bool &is_all_invoked, + bool &has_cb_on_fail, + bool &need_print_cb_status, + volatile bool &stop_flag) { int ret = OB_SUCCESS; is_all_invoked = true; has_cb_on_fail = false; - QLink *item = NULL; + QLink *item = cb_queue_.top(); + // for stat + int64_t total_cb_count = 0; + int64_t not_invoked_cb_count = 0; + int64_t failed_cb_count = 0; - if (OB_FAIL(cb_queue_.top(item))) { - DDLOG(WARN, "get head of palf_cb_queue failed", KR(ret)); - } else { - while (OB_SUCC(ret) && is_all_invoked && !has_cb_on_fail && OB_NOT_NULL(item)) { - QLink *next = item->next_; - ObDataDictPersistCallback *cb = static_cast (item); - if (OB_ISNULL(cb)) { - ret = OB_ERR_UNEXPECTED; - DDLOG(WARN, "convert ObLink to ObDataDictPersistCallback failed", KR(ret), K(item)); - } else { - if (! cb->is_invoked()) { - is_all_invoked = false; - } else if (! cb->is_success()) { - has_cb_on_fail = true; - } + while (OB_SUCC(ret) && OB_NOT_NULL(item)) { + total_cb_count++; + QLink *next = item->next_; + ObDataDictPersistCallback *cb = static_cast(item); + + if (OB_ISNULL(cb)) { + ret = OB_ERR_UNEXPECTED; + DDLOG(WARN, "convert ObLink to ObDataDictPersistCallback failed", KR(ret), K(item)); + } else { + if (! cb->is_invoked()) { + not_invoked_cb_count++; + } else if (! cb->is_success()) { + failed_cb_count++; } - item = next; } + + item = next; + } + + if (not_invoked_cb_count > 0) { + is_all_invoked = false; + } + + if (failed_cb_count > 0) { + has_cb_on_fail = true; + } + + if (is_all_invoked || need_print_cb_status) { + // log callback status, NOTICE: stop_flag may set if ls role change or tenant stop. + DDLOG(INFO, "[STAT] callbacks_status", KR(ret), K(total_cb_count), K(not_invoked_cb_count), K(failed_cb_count), + K(is_all_invoked), K(need_print_cb_status), K(stop_flag)); + } + + if (need_print_cb_status) { + need_print_cb_status = false; } return ret; @@ -581,9 +716,9 @@ int ObDataDictStorage::check_callback_list_(bool &is_all_invoked, bool &has_cb_o void ObDataDictStorage::reset_cb_queue_() { int ret = OB_SUCCESS; - while (! cb_queue_.is_empty()) { + while (! cb_queue_.empty()) { QLink *item = NULL; - if (OB_FAIL(cb_queue_.pop(item))) { + if (OB_ISNULL(item = cb_queue_.pop())) { DDLOG(WARN, "pop item from data_dict_meta persist_callback_queue failed", KR(ret)); } else { allocator_.free(item); diff --git a/src/logservice/data_dictionary/ob_data_dict_storager.h b/src/logservice/data_dictionary/ob_data_dict_storager.h index 705d2366b4..070debf701 100644 --- a/src/logservice/data_dictionary/ob_data_dict_storager.h +++ b/src/logservice/data_dictionary/ob_data_dict_storager.h @@ -44,49 +44,12 @@ public: template int handle_dict_meta( const DATA_DICT_META &data_dict_meta, - ObDictMetaHeader &header) - { - int ret = OB_SUCCESS; - const int64_t dict_serialize_size = data_dict_meta.get_serialize_size(); - // serialize_header - header.set_snapshot_scn(snapshot_scn_); - header.set_dict_serialize_length(dict_serialize_size); - header.set_storage_type(ObDictMetaStorageType::FULL); - const int64_t header_serialize_size = header.get_serialize_size(); - const int64_t total_serialize_size = dict_serialize_size - + header_serialize_size - + log_base_header_.get_serialize_size(); - - if (! need_new_palf_buf_(total_serialize_size)) { - if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) { - DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta)); - } - } else if (OB_FAIL(submit_to_palf_())) { - DDLOG(WARN, "submit_data_dict_to_palf_ failed", KR(ret), K_(palf_buf_len), K_(palf_pos)); - } else if (! need_new_palf_buf_(total_serialize_size)) { - // check if palf_buf_len is enough for header + data_dict. - if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) { - DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta)); - } - } else if (OB_FAIL(prepare_dict_buf_(dict_serialize_size))) { - DDLOG(WARN, "prepare_dict_buf_ failed", KR(ret), K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos)); - } else if (OB_FAIL(data_dict_meta.serialize(dict_buf_, dict_buf_len_, dict_pos_))) { - DDLOG(WARN, "serialize data_dict_meta to dict_buf failed", KR(ret), - K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos)); - } else if (OB_FAIL(segment_dict_buf_to_palf_(header))) { - DDLOG(WARN, "segment_dict_buf_to_palf_ failed", KR(ret), K(header), K_(dict_buf_len), K_(dict_pos), K_(palf_pos)); - } - - if (OB_SUCC(ret)) { - DDLOG(TRACE, "handle data_dict success", K(header), K(data_dict_meta)); - } - - return ret; - } + ObDictMetaHeader &header); int finish( palf::LSN &start_lsn, palf::LSN &end_lsn, bool is_dump_success, + bool &is_any_log_callback_fail, volatile bool &stop_flag); public: // generate data_dict_meta for specified schemas, and serialize metas into buf, which is allocated @@ -121,6 +84,8 @@ protected: // protected only for unittest. virtual int submit_to_palf_(); private: + int prepare_buf_(); + void reset_buf_(); OB_INLINE bool need_new_palf_buf_(const int64_t required_size) const { return palf_buf_len_ - palf_pos_ < required_size; } int serialize_log_base_header_(); @@ -128,35 +93,7 @@ private: template int serialize_to_palf_buf_( const ObDictMetaHeader &header, - const DATA_DICT_META &data_dict) - { - int ret = OB_SUCCESS; - - if (OB_ISNULL(palf_buf_)) { - ret = OB_STATE_NOT_MATCH; - DDLOG(WARN, "palf_buf shoule be valid", KR(ret)); - } else if (palf_pos_ == 0) { - if (OB_FAIL(serialize_log_base_header_())) { - DDLOG(WARN, "serialize_log_base_header_ failed", KR(ret), K_(palf_pos)); - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(header.serialize(palf_buf_, palf_buf_len_, palf_pos_))) { - DDLOG(WARN, "serialize header to palf_buf failed", KR(ret), K(header), - K_(palf_buf_len), K_(palf_pos), "header_serialize_size", header.get_serialize_size()); - } else if (OB_FAIL(data_dict.serialize(palf_buf_, palf_buf_len_, palf_pos_))) { - DDLOG(WARN, "serialize data_dict to palf_buf failed", KR(ret), K(header), K(data_dict), - K_(palf_buf_len), K_(palf_pos), "dict_serialize_size", data_dict.get_serialize_size()); - } else { - DDLOG(DEBUG, "serialize data_dict to palf_buf success", KR(ret), K(header), K(data_dict), - K_(palf_buf_len), K_(palf_pos), - "header_size", header.get_serialize_size(), - "data_dict_size", data_dict.get_serialize_size()); - } - - return ret; - } + const DATA_DICT_META &data_dict); int segment_dict_buf_to_palf_(ObDictMetaHeader &header); int alloc_palf_cb_(ObDataDictPersistCallback *&callback); @@ -164,8 +101,12 @@ private: // @param bool is_any_cb_fail true if any callback failed. // @retval OB_SUCCESS all callback invoked or has any callback failed. // @revval OB_TIMEOUT timeout while waiting callback invoke. - int wait_palf_callback_(const int64_t timeout_msec, bool &is_any_cb_fail, volatile bool &stop_flag); - int check_callback_list_(bool &is_all_invoked, bool &has_cb_on_fail); + int wait_palf_callback_(bool &is_any_cb_fail, volatile bool &stop_flag); + int check_callback_list_( + bool &is_all_invoked, + bool &has_cb_on_fail, + bool &need_print_cb_status, + volatile bool &stop_flag); void reset_cb_queue_(); private: static const int64_t DEFAULT_PALF_BUF_SIZE; @@ -180,7 +121,7 @@ private: palf::LSN end_lsn_; logservice::ObLogHandler *log_handler_; logservice::ObLogBaseHeader log_base_header_; - ObSpLinkQueue cb_queue_; + ObSpScLinkQueue cb_queue_; char *palf_buf_; // tmp buf for serialize and deserialize with palf char *dict_buf_; // dict_buf int64_t palf_buf_len_; // palf_buf_len diff --git a/src/logservice/libobcdc/src/ob_log_tenant.cpp b/src/logservice/libobcdc/src/ob_log_tenant.cpp index 8c4a4381eb..b153bad98a 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant.cpp @@ -92,8 +92,6 @@ int ObLogTenant::init(const uint64_t tenant_id, LOG_ERROR("invalid argument", K(tenant_id), K(tenant_name), K(start_tstamp_ns), K(start_seq), K(start_schema_version), K(cf_handle)); ret = OB_INVALID_ARGUMENT; - } else if (OB_FAIL(ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(tenant_id))) { - LOG_ERROR("create and add tenant allocator failed", K(ret), K(tenant_id)); } else if (OB_ISNULL(task_queue_ = OB_NEW(ObLogTenantTaskQueue, ObModIds::OB_LOG_TENANT_TASK_QUEUE, *this))) { LOG_ERROR("create task queue fail", K(task_queue_)); ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp index 5a2bfa5dc2..47729423f8 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp @@ -525,6 +525,8 @@ int ObLogTenantMgr::add_tenant( LOG_ERROR("invalid arguments", KR(ret), K(tenant_id), K(sys_schema_version)); } else if (is_meta_tenant(tenant_id)) { LOG_INFO("won't add meta tenant", K(tenant_id), K(sys_schema_version)); + } else if (OB_FAIL(ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(tenant_id))) { + LOG_ERROR("create and add tenant allocator failed", K(ret), K(tenant_id)); } else { if (is_online_refresh_mode(refresh_mode_)) { TenantSchemaInfo tenant_schema_info; @@ -645,6 +647,10 @@ int ObLogTenantMgr::add_tenant( // and other unexpected case, otherwise global_heartbeat will be stucked.) try_del_tenant_start_ddl_info_(tenant_id); + if (! add_tenant_succ) { + ObMallocAllocator::get_instance()->recycle_tenant_allocator(tenant_id); + } + return ret; } diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 8113a19c96..acd2430408 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1310,7 +1310,7 @@ DEF_BOOL(_enable_tenant_leak_memory_protection, OB_CLUSTER_PARAMETER, "True", "p DEF_TIME(_advance_checkpoint_timeout, OB_CLUSTER_PARAMETER, "30m", "[10s,180m]", "the timeout for backup/migrate advance checkpoint Range: [10s,180m]", ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_TIME(dump_data_dictionary_to_log_interval, OB_TENANT_PARAMETER, "24h", "(0s,]", +DEF_TIME(dump_data_dictionary_to_log_interval, OB_TENANT_PARAMETER, "5m", "(0s,]", "data dictionary dump to log(SYS LS) interval" "Range: (0s,+∞)", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));