[DATA_DICT] Fix usage of log_handler and log_callback

This commit is contained in:
SanmuWangZJU
2023-02-06 22:59:01 +08:00
committed by ob-robot
parent 550fd8e950
commit 6fb94fc439
10 changed files with 257 additions and 181 deletions

View File

@ -171,7 +171,35 @@ int ObDataDictIterator::next_dict_header(ObDictMetaHeader &header)
return ret;
}
template<class DICT_ENTRY>
int ObDataDictIterator::next_dict_entry(DICT_ENTRY &dict_entry)
{
int ret = OB_SUCCESS;
if (dict_pos_ > 0) {
// deserialize from dict_buf_
int64_t deserialize_pos = 0;
if (OB_FAIL(dict_entry.deserialize(dict_buf_, dict_pos_, deserialize_pos))) {
DDLOG(WARN, "deserialize DICT_ENTRY from dict_buf failed", KR(ret),
K_(dict_pos), K(deserialize_pos));
}
} else if (palf_pos_ > 0) {
// deserialize from dict_buf_
if (OB_FAIL(dict_entry.deserialize(palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "deserialize DICT_ENTRY from palf_buf failed", KR(ret),
K_(palf_buf_len), K_(palf_pos));
}
} else {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "expect any of dict_pos/palf_pos is valid", KR(ret), K_(palf_pos), K_(dict_pos));
}
return ret;
}
template int ObDataDictIterator::next_dict_entry(ObDictTenantMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(ObDictDatabaseMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(ObDictTableMeta &dict_entry);
int ObDataDictIterator::append_log_buf_with_base_header_(const char *buf, const int64_t buf_len)
{

View File

@ -34,30 +34,7 @@ public:
int append_log_buf(const char *buf, const int64_t buf_len, const int64_t pos); // without log_base_header
int next_dict_header(ObDictMetaHeader &meta_header);
template<class DICT_ENTRY>
int next_dict_entry(DICT_ENTRY &dict_entry)
{
int ret = OB_SUCCESS;
if (dict_pos_ > 0) {
// deserialize from dict_buf_
int64_t deserialize_pos = 0;
if (OB_FAIL(dict_entry.deserialize(dict_buf_, dict_pos_, deserialize_pos))) {
DDLOG(WARN, "deserialize DICT_ENTRY from dict_buf failed", KR(ret),
K_(dict_pos), K(deserialize_pos));
}
} else if (palf_pos_ > 0) {
// deserialize from dict_buf_
if (OB_FAIL(dict_entry.deserialize(palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "deserialize DICT_ENTRY from palf_buf failed", KR(ret),
K_(palf_buf_len), K_(palf_pos));
}
} else {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "expect any of dict_pos/palf_pos is valid", KR(ret), K_(palf_pos), K_(dict_pos));
}
return ret;
}
int next_dict_entry(DICT_ENTRY &dict_entry);
private:
OB_INLINE void release_palf_buf_()
{

View File

@ -35,8 +35,9 @@ public:
public:
virtual int on_success() override
{
is_callback_invoked_ = true;
is_success_ = true;
MEM_BARRIER();
is_callback_invoked_ = true;
return OB_SUCCESS;
}
virtual int on_failure() override

View File

@ -245,6 +245,8 @@ int ObDataDictService::do_dump_data_dict_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLSHandle ls_handle; // NOTICE: ls_handle is a guard for usage of log_handler.
ObLS *ls = NULL;
ObLogHandler *log_handler = NULL;
bool is_leader = false;
share::SCN snapshot_scn;
@ -252,12 +254,16 @@ int ObDataDictService::do_dump_data_dict_()
palf::LSN end_lsn;
bool is_cluster_status_normal = false;
bool is_data_dict_dump_success = false;
bool is_any_log_callback_fail = false;
storage_.reuse();
allocator_.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
DDLOG(WARN, "data_dict_service not inited", KR(ret), K_(tenant_id), K_(is_inited));
} else if (OB_ISNULL(ls_service_)) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "invalid ls_service", KR(ret), K_(tenant_id));
} else if (OB_UNLIKELY(stop_flag_)) {
ret = OB_NOT_RUNNING;
DDLOG(WARN, "data_dict_service not running", KR(ret), K_(tenant_id), K_(stop_flag));
@ -265,10 +271,16 @@ int ObDataDictService::do_dump_data_dict_()
DDLOG(TRACE, "check_cluster_status_normal_ failed", KR(ret), K(is_cluster_status_normal));
} else if (OB_UNLIKELY(! is_cluster_status_normal)) {
DDLOG(TRACE, "cluster_status not normal, won't dump_data_dict", K(is_cluster_status_normal));
} else if (OB_FAIL(get_sys_ls_log_handle_(log_handler))) {
} else if (OB_FAIL(ls_service_->get_ls(share::SYS_LS, ls_handle, ObLSGetMod::DATA_DICT_MOD))) {
if (OB_LS_NOT_EXIST != ret || REACH_TIME_INTERVAL_THREAD_LOCAL(PRINT_DETAIL_INTERVAL)) {
DDLOG(WARN, "get_sys_ls_log_handle_ failed", KR(ret));
DDLOG(WARN, "get_ls for data_dict_service from ls_service failed", KR(ret), K_(tenant_id));
}
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "invalid ls get from ls_handle", KR(ret), K_(tenant_id));
} else if (OB_ISNULL(log_handler = ls->get_log_handler())) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "invalid log_handler_ get from OBLS", KR(ret), K_(tenant_id));
} else if (check_ls_leader(log_handler, is_leader)) {
DDLOG(WARN, "check_is_sys_ls_leader failed", KR(ret));
} else if (! is_leader) {
@ -288,13 +300,15 @@ int ObDataDictService::do_dump_data_dict_()
start_lsn,
end_lsn,
is_data_dict_dump_success,
is_any_log_callback_fail,
stop_flag_))) {
if (OB_IN_STOP_STATE != tmp_ret && OB_STATE_NOT_MATCH != tmp_ret) {
DDLOG(WARN, "finish storage for data_dict_service failed", KR(ret), KR(tmp_ret),
K(snapshot_scn), K(start_lsn), K(end_lsn), K_(stop_flag), K_(is_inited));
}
ret = tmp_ret;
} else if (is_data_dict_dump_success) {
} else if (is_data_dict_dump_success && ! is_any_log_callback_fail) {
// only report when dict dump success and all log_callback success.
const int64_t half_dump_interval = ATOMIC_LOAD(&dump_interval_) / 2;
const int64_t report_timeout = DEFAULT_REPORT_TIMEOUT > half_dump_interval ? half_dump_interval : DEFAULT_REPORT_TIMEOUT;
const int64_t current_time = get_timestamp_us();
@ -347,29 +361,6 @@ int ObDataDictService::check_cluster_status_normal_(bool &is_normal)
return ret;
}
int ObDataDictService::get_sys_ls_log_handle_(ObLogHandler *&log_handler)
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLS *ls = NULL;
if (OB_ISNULL(ls_service_)) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "invalid ls_service", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ls_service_->get_ls(share::SYS_LS, ls_handle, ObLSGetMod::DATA_DICT_MOD))) {
DDLOG(WARN, "get_ls for data_dict_service from ls_service failed", KR(ret), K_(tenant_id));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "invalid ls get from ls_handle", KR(ret), K_(tenant_id));
} else if (OB_ISNULL(log_handler = ls->get_log_handler())) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "invalid log_handler_ get from OBLS", KR(ret), K_(tenant_id));
} else {
}
return ret;
}
int ObDataDictService::get_snapshot_scn_(share::SCN &snapshot_scn)
{
int ret = OB_SUCCESS;

View File

@ -77,7 +77,6 @@ private:
void switch_role_to_(bool is_leader);
int do_dump_data_dict_();
int check_cluster_status_normal_(bool &is_normal);
int get_sys_ls_log_handle_(logservice::ObLogHandler *&log_handler);
int get_snapshot_scn_(share::SCN &snapshot_scn);
int generate_dict_and_dump_(const share::SCN &snapshot_scn);
int get_tenant_schema_guard_(

View File

@ -83,17 +83,8 @@ ObDataDictStorage::ObDataDictStorage(ObIAllocator &allocator)
void ObDataDictStorage::reset()
{
reuse();
reset_buf_();
tenant_id_ = OB_INVALID_TENANT_ID;
if (OB_NOT_NULL(palf_buf_)) {
ob_dict_free(palf_buf_);
palf_buf_len_ = 0;
palf_buf_ = NULL;
}
if (OB_NOT_NULL(dict_buf_)) {
ob_dict_free(dict_buf_);
dict_buf_len_ = 0;
dict_buf_ = NULL;
}
}
void ObDataDictStorage::reuse()
@ -121,10 +112,6 @@ int ObDataDictStorage::init(const uint64_t tenant_id)
DDLOG(WARN, "expect palf_buf and dict_buf NULL", KR(ret), KP_(palf_buf), KP_(dict_buf));
} else {
tenant_id_ = tenant_id;
if (is_user_tenant(tenant_id)) {
palf_buf_ = static_cast<char*>(ob_dict_malloc(DEFAULT_PALF_BUF_SIZE, tenant_id_));
dict_buf_ = static_cast<char*>(ob_dict_malloc(DEFAULT_DICT_BUF_SIZE, tenant_id_));
}
DDLOG(INFO, "data_dict_storager init success", K_(tenant_id));
}
@ -139,6 +126,8 @@ int ObDataDictStorage::prepare(const share::SCN &snapshot_scn, ObLogHandler *log
|| OB_ISNULL(log_handler)) {
ret = OB_INVALID_ARGUMENT;
DDLOG(WARN, "invalid log_handler", KR(ret), K_(tenant_id), K(snapshot_scn_));
} else if (OB_FAIL(prepare_buf_())) {
DDLOG(WARN, "prepare_buf_ failed", KR(ret));
} else {
reuse();
snapshot_scn_ = snapshot_scn;
@ -149,24 +138,71 @@ int ObDataDictStorage::prepare(const share::SCN &snapshot_scn, ObLogHandler *log
return ret;
}
template<class DATA_DICT_META>
int ObDataDictStorage::handle_dict_meta(
const DATA_DICT_META &data_dict_meta,
ObDictMetaHeader &header)
{
int ret = OB_SUCCESS;
const int64_t dict_serialize_size = data_dict_meta.get_serialize_size();
// serialize_header
header.set_snapshot_scn(snapshot_scn_);
header.set_dict_serialize_length(dict_serialize_size);
header.set_storage_type(ObDictMetaStorageType::FULL);
const int64_t header_serialize_size = header.get_serialize_size();
const int64_t total_serialize_size = dict_serialize_size
+ header_serialize_size
+ log_base_header_.get_serialize_size();
if (! need_new_palf_buf_(total_serialize_size)) {
if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) {
DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta));
}
} else if (OB_FAIL(submit_to_palf_())) {
DDLOG(WARN, "submit_data_dict_to_palf_ failed", KR(ret), K_(palf_buf_len), K_(palf_pos));
} else if (! need_new_palf_buf_(total_serialize_size)) {
// check if palf_buf_len is enough for header + data_dict.
if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) {
DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta));
}
} else if (OB_FAIL(prepare_dict_buf_(dict_serialize_size))) {
DDLOG(WARN, "prepare_dict_buf_ failed", KR(ret), K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos));
} else if (OB_FAIL(data_dict_meta.serialize(dict_buf_, dict_buf_len_, dict_pos_))) {
DDLOG(WARN, "serialize data_dict_meta to dict_buf failed", KR(ret),
K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos));
} else if (OB_FAIL(segment_dict_buf_to_palf_(header))) {
DDLOG(WARN, "segment_dict_buf_to_palf_ failed", KR(ret), K(header), K_(dict_buf_len), K_(dict_pos), K_(palf_pos));
}
if (OB_SUCC(ret)) {
DDLOG(TRACE, "handle data_dict success", K(header), K(data_dict_meta));
}
return ret;
}
template int ObDataDictStorage::handle_dict_meta(const ObDictTenantMeta &data_dict_meta, ObDictMetaHeader &header);
template int ObDataDictStorage::handle_dict_meta(const ObDictDatabaseMeta &data_dict_meta, ObDictMetaHeader &header);
template int ObDataDictStorage::handle_dict_meta(const ObDictTableMeta &data_dict_meta, ObDictMetaHeader &header);
int ObDataDictStorage::finish(
palf::LSN &start_lsn,
palf::LSN &end_lsn,
bool is_dump_success,
bool &is_any_log_callback_fail,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
const static int64_t WAIT_TIMEOUT_MS = 10;
bool is_any_cb_fail = false;
// try submit remian palf_buf if exist data not submit to palf.
if (OB_FAIL(submit_to_palf_())) {
DDLOG(WARN, "try submit remain palf_buf to palf failed", KR(ret));
}
RETRY_FUNC_ON_ERROR(OB_TIMEOUT, stop_flag, *this, wait_palf_callback_, WAIT_TIMEOUT_MS, is_any_cb_fail, stop_flag);
if (OB_SUCC(ret) && is_dump_success) {
if (OB_FAIL(wait_palf_callback_(is_any_log_callback_fail, stop_flag))) {
DDLOG(WARN, "wait palf_callback failed", KR(ret), K(is_dump_success), K(is_any_log_callback_fail), K(stop_flag));
} else if (is_dump_success && ! is_any_log_callback_fail) {
if (OB_UNLIKELY(! start_lsn_.is_valid())
|| OB_UNLIKELY(! end_lsn_.is_valid())) {
ret = OB_STATE_NOT_MATCH;
@ -179,6 +215,8 @@ int ObDataDictStorage::finish(
}
}
reset_buf_(); // reset palf_buf and dict_buf anyway.
return ret;
}
@ -337,6 +375,41 @@ int ObDataDictStorage::parse_dict_metas(
return ret;
}
int ObDataDictStorage::prepare_buf_()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_ || ! is_user_tenant(tenant_id_))) {
ret = OB_STATE_NOT_MATCH;
DDLOG(WARN, "data_dict_service only work for user_tenant", KR(ret), K_(tenant_id));
} else if (OB_NOT_NULL(palf_buf_) || OB_NOT_NULL(dict_buf_)) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "expect invalid palf_buf and dict_buf before prepare dump data_dict", KR(ret),
KP_(palf_buf), KP_(dict_buf));
} else {
palf_buf_len_ = DEFAULT_PALF_BUF_SIZE;
dict_buf_len_ = DEFAULT_DICT_BUF_SIZE;
palf_buf_ = static_cast<char*>(ob_dict_malloc(palf_buf_len_, tenant_id_));
dict_buf_ = static_cast<char*>(ob_dict_malloc(dict_buf_len_ , tenant_id_));
}
return ret;
}
void ObDataDictStorage::reset_buf_()
{
if (OB_NOT_NULL(palf_buf_)) {
ob_dict_free(palf_buf_);
palf_buf_len_ = 0;
palf_buf_ = NULL;
}
if (OB_NOT_NULL(dict_buf_)) {
ob_dict_free(dict_buf_);
dict_buf_len_ = 0;
dict_buf_ = NULL;
}
}
int ObDataDictStorage::serialize_log_base_header_()
{
int ret = OB_SUCCESS;
@ -383,6 +456,43 @@ int ObDataDictStorage::prepare_dict_buf_(const int64_t required_size)
return ret;
}
template<class DATA_DICT_META>
int ObDataDictStorage::serialize_to_palf_buf_(
const ObDictMetaHeader &header,
const DATA_DICT_META &data_dict)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(palf_buf_)) {
ret = OB_STATE_NOT_MATCH;
DDLOG(WARN, "palf_buf shoule be valid", KR(ret));
} else if (palf_pos_ == 0) {
if (OB_FAIL(serialize_log_base_header_())) {
DDLOG(WARN, "serialize_log_base_header_ failed", KR(ret), K_(palf_pos));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(header.serialize(palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "serialize header to palf_buf failed", KR(ret), K(header),
K_(palf_buf_len), K_(palf_pos), "header_serialize_size", header.get_serialize_size());
} else if (OB_FAIL(data_dict.serialize(palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "serialize data_dict to palf_buf failed", KR(ret), K(header), K(data_dict),
K_(palf_buf_len), K_(palf_pos), "dict_serialize_size", data_dict.get_serialize_size());
} else {
DDLOG(DEBUG, "serialize data_dict to palf_buf success", KR(ret), K(header), K(data_dict),
K_(palf_buf_len), K_(palf_pos),
"header_size", header.get_serialize_size(),
"data_dict_size", data_dict.get_serialize_size());
}
return ret;
}
template int ObDataDictStorage::serialize_to_palf_buf_(const ObDictMetaHeader &header, const ObDictTenantMeta &data_dict);
template int ObDataDictStorage::serialize_to_palf_buf_(const ObDictMetaHeader &header, const ObDictDatabaseMeta &data_dict);
template int ObDataDictStorage::serialize_to_palf_buf_(const ObDictMetaHeader &header, const ObDictTableMeta &data_dict);
int ObDataDictStorage::segment_dict_buf_to_palf_(ObDictMetaHeader &header)
{
int ret = OB_SUCCESS;
@ -477,9 +587,8 @@ int ObDataDictStorage::submit_to_palf_()
K_(palf_buf), K_(palf_pos), K(ref_scn), K(need_nonblock), K(lsn), K(submit_scn));
} else if (OB_FAIL(update_palf_lsn_(lsn))) {
DDLOG(WARN, "update_palf_lsn_ failed", KR(ret), K(lsn), K_(start_lsn), K_(end_lsn));
} else if (OB_FAIL(cb_queue_.push(callback))) {
DDLOG(WARN, "push callback to callback_queue failed", KR(ret));
} else {
cb_queue_.push(callback);
// submit to palf success
DDLOG(DEBUG, "submit palf_buf to palf succ", K(lsn), K(submit_scn), K_(palf_pos));
total_log_cnt_++;
@ -523,56 +632,82 @@ int ObDataDictStorage::update_palf_lsn_(const palf::LSN &lsn)
return ret;
}
int ObDataDictStorage::wait_palf_callback_(const int64_t timeout_msec, bool &is_any_cb_fail, volatile bool &stop_flag)
// invoke after all data_dict dumped into palf.
int ObDataDictStorage::wait_palf_callback_(bool &is_any_cb_fail, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
static int64_t SLEEP_MS = 1;
static const int64_t CHECK_CB_INTERVAL = 10 * _MSEC_;
static const int64_t PRINT_CB_STATUS_INTERVAL = 5 * _SEC_; // print callback status interval when callback not all invoked
is_any_cb_fail = false;
bool is_all_invoked = true;
bool has_cb_on_fail = false;
bool is_all_cb_invoked = true;
bool print_cb_status = false;
// exit loop if all callback invoked or any callback fail
while (OB_SUCC(ret) && (! is_all_invoked && ! is_any_cb_fail) && ! stop_flag) {
if (REACH_TIME_INTERVAL_THREAD_LOCAL(timeout_msec)) {
ret = OB_TIMEOUT;
} else if (OB_FAIL(check_callback_list_(is_all_invoked, has_cb_on_fail))) {
DDLOG(WARN, "check_callback_list_ failed", KR(ret), K(is_all_invoked), K(has_cb_on_fail));
} else {
is_any_cb_fail = has_cb_on_fail;
if (! is_any_cb_fail && ! is_all_invoked) {
usleep(SLEEP_MS);
// exit loop if any of below cases occur: (1) check_callback_list_ failed; (2) all log_callback invoked.
do {
if (OB_FAIL(check_callback_list_(is_all_cb_invoked, is_any_cb_fail, print_cb_status, stop_flag))) {
DDLOG(WARN, "check_callback_list_ failed", KR(ret));
} else if (! is_all_cb_invoked) {
if (REACH_TIME_INTERVAL_THREAD_LOCAL(PRINT_CB_STATUS_INTERVAL)) {
print_cb_status = true;
}
usleep(CHECK_CB_INTERVAL);
}
}
} while (OB_SUCC(ret) && ! is_all_cb_invoked);
return ret;
}
int ObDataDictStorage::check_callback_list_(bool &is_all_invoked, bool &has_cb_on_fail)
// invoke after all data_dict dumped into palf.
int ObDataDictStorage::check_callback_list_(
bool &is_all_invoked,
bool &has_cb_on_fail,
bool &need_print_cb_status,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
is_all_invoked = true;
has_cb_on_fail = false;
QLink *item = NULL;
QLink *item = cb_queue_.top();
// for stat
int64_t total_cb_count = 0;
int64_t not_invoked_cb_count = 0;
int64_t failed_cb_count = 0;
if (OB_FAIL(cb_queue_.top(item))) {
DDLOG(WARN, "get head of palf_cb_queue failed", KR(ret));
} else {
while (OB_SUCC(ret) && is_all_invoked && !has_cb_on_fail && OB_NOT_NULL(item)) {
QLink *next = item->next_;
ObDataDictPersistCallback *cb = static_cast<ObDataDictPersistCallback*> (item);
if (OB_ISNULL(cb)) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "convert ObLink to ObDataDictPersistCallback failed", KR(ret), K(item));
} else {
if (! cb->is_invoked()) {
is_all_invoked = false;
} else if (! cb->is_success()) {
has_cb_on_fail = true;
}
while (OB_SUCC(ret) && OB_NOT_NULL(item)) {
total_cb_count++;
QLink *next = item->next_;
ObDataDictPersistCallback *cb = static_cast<ObDataDictPersistCallback*>(item);
if (OB_ISNULL(cb)) {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "convert ObLink to ObDataDictPersistCallback failed", KR(ret), K(item));
} else {
if (! cb->is_invoked()) {
not_invoked_cb_count++;
} else if (! cb->is_success()) {
failed_cb_count++;
}
item = next;
}
item = next;
}
if (not_invoked_cb_count > 0) {
is_all_invoked = false;
}
if (failed_cb_count > 0) {
has_cb_on_fail = true;
}
if (is_all_invoked || need_print_cb_status) {
// log callback status, NOTICE: stop_flag may set if ls role change or tenant stop.
DDLOG(INFO, "[STAT] callbacks_status", KR(ret), K(total_cb_count), K(not_invoked_cb_count), K(failed_cb_count),
K(is_all_invoked), K(need_print_cb_status), K(stop_flag));
}
if (need_print_cb_status) {
need_print_cb_status = false;
}
return ret;
@ -581,9 +716,9 @@ int ObDataDictStorage::check_callback_list_(bool &is_all_invoked, bool &has_cb_o
void ObDataDictStorage::reset_cb_queue_()
{
int ret = OB_SUCCESS;
while (! cb_queue_.is_empty()) {
while (! cb_queue_.empty()) {
QLink *item = NULL;
if (OB_FAIL(cb_queue_.pop(item))) {
if (OB_ISNULL(item = cb_queue_.pop())) {
DDLOG(WARN, "pop item from data_dict_meta persist_callback_queue failed", KR(ret));
} else {
allocator_.free(item);

View File

@ -44,49 +44,12 @@ public:
template<class DATA_DICT_META>
int handle_dict_meta(
const DATA_DICT_META &data_dict_meta,
ObDictMetaHeader &header)
{
int ret = OB_SUCCESS;
const int64_t dict_serialize_size = data_dict_meta.get_serialize_size();
// serialize_header
header.set_snapshot_scn(snapshot_scn_);
header.set_dict_serialize_length(dict_serialize_size);
header.set_storage_type(ObDictMetaStorageType::FULL);
const int64_t header_serialize_size = header.get_serialize_size();
const int64_t total_serialize_size = dict_serialize_size
+ header_serialize_size
+ log_base_header_.get_serialize_size();
if (! need_new_palf_buf_(total_serialize_size)) {
if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) {
DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta));
}
} else if (OB_FAIL(submit_to_palf_())) {
DDLOG(WARN, "submit_data_dict_to_palf_ failed", KR(ret), K_(palf_buf_len), K_(palf_pos));
} else if (! need_new_palf_buf_(total_serialize_size)) {
// check if palf_buf_len is enough for header + data_dict.
if (OB_FAIL(serialize_to_palf_buf_(header, data_dict_meta))) {
DDLOG(WARN, "serialize header_and_dict to palf_buf_ failed", KR(ret), K(header), K(data_dict_meta));
}
} else if (OB_FAIL(prepare_dict_buf_(dict_serialize_size))) {
DDLOG(WARN, "prepare_dict_buf_ failed", KR(ret), K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos));
} else if (OB_FAIL(data_dict_meta.serialize(dict_buf_, dict_buf_len_, dict_pos_))) {
DDLOG(WARN, "serialize data_dict_meta to dict_buf failed", KR(ret),
K(dict_serialize_size), K_(dict_buf_len), K_(dict_pos));
} else if (OB_FAIL(segment_dict_buf_to_palf_(header))) {
DDLOG(WARN, "segment_dict_buf_to_palf_ failed", KR(ret), K(header), K_(dict_buf_len), K_(dict_pos), K_(palf_pos));
}
if (OB_SUCC(ret)) {
DDLOG(TRACE, "handle data_dict success", K(header), K(data_dict_meta));
}
return ret;
}
ObDictMetaHeader &header);
int finish(
palf::LSN &start_lsn,
palf::LSN &end_lsn,
bool is_dump_success,
bool &is_any_log_callback_fail,
volatile bool &stop_flag);
public:
// generate data_dict_meta for specified schemas, and serialize metas into buf, which is allocated
@ -121,6 +84,8 @@ protected:
// protected only for unittest.
virtual int submit_to_palf_();
private:
int prepare_buf_();
void reset_buf_();
OB_INLINE bool need_new_palf_buf_(const int64_t required_size) const
{ return palf_buf_len_ - palf_pos_ < required_size; }
int serialize_log_base_header_();
@ -128,35 +93,7 @@ private:
template<class DATA_DICT_META>
int serialize_to_palf_buf_(
const ObDictMetaHeader &header,
const DATA_DICT_META &data_dict)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(palf_buf_)) {
ret = OB_STATE_NOT_MATCH;
DDLOG(WARN, "palf_buf shoule be valid", KR(ret));
} else if (palf_pos_ == 0) {
if (OB_FAIL(serialize_log_base_header_())) {
DDLOG(WARN, "serialize_log_base_header_ failed", KR(ret), K_(palf_pos));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(header.serialize(palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "serialize header to palf_buf failed", KR(ret), K(header),
K_(palf_buf_len), K_(palf_pos), "header_serialize_size", header.get_serialize_size());
} else if (OB_FAIL(data_dict.serialize(palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "serialize data_dict to palf_buf failed", KR(ret), K(header), K(data_dict),
K_(palf_buf_len), K_(palf_pos), "dict_serialize_size", data_dict.get_serialize_size());
} else {
DDLOG(DEBUG, "serialize data_dict to palf_buf success", KR(ret), K(header), K(data_dict),
K_(palf_buf_len), K_(palf_pos),
"header_size", header.get_serialize_size(),
"data_dict_size", data_dict.get_serialize_size());
}
return ret;
}
const DATA_DICT_META &data_dict);
int segment_dict_buf_to_palf_(ObDictMetaHeader &header);
int alloc_palf_cb_(ObDataDictPersistCallback *&callback);
@ -164,8 +101,12 @@ private:
// @param bool is_any_cb_fail true if any callback failed.
// @retval OB_SUCCESS all callback invoked or has any callback failed.
// @revval OB_TIMEOUT timeout while waiting callback invoke.
int wait_palf_callback_(const int64_t timeout_msec, bool &is_any_cb_fail, volatile bool &stop_flag);
int check_callback_list_(bool &is_all_invoked, bool &has_cb_on_fail);
int wait_palf_callback_(bool &is_any_cb_fail, volatile bool &stop_flag);
int check_callback_list_(
bool &is_all_invoked,
bool &has_cb_on_fail,
bool &need_print_cb_status,
volatile bool &stop_flag);
void reset_cb_queue_();
private:
static const int64_t DEFAULT_PALF_BUF_SIZE;
@ -180,7 +121,7 @@ private:
palf::LSN end_lsn_;
logservice::ObLogHandler *log_handler_;
logservice::ObLogBaseHeader log_base_header_;
ObSpLinkQueue cb_queue_;
ObSpScLinkQueue cb_queue_;
char *palf_buf_; // tmp buf for serialize and deserialize with palf
char *dict_buf_; // dict_buf
int64_t palf_buf_len_; // palf_buf_len

View File

@ -92,8 +92,6 @@ int ObLogTenant::init(const uint64_t tenant_id,
LOG_ERROR("invalid argument", K(tenant_id), K(tenant_name), K(start_tstamp_ns), K(start_seq),
K(start_schema_version), K(cf_handle));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(tenant_id))) {
LOG_ERROR("create and add tenant allocator failed", K(ret), K(tenant_id));
} else if (OB_ISNULL(task_queue_ = OB_NEW(ObLogTenantTaskQueue, ObModIds::OB_LOG_TENANT_TASK_QUEUE, *this))) {
LOG_ERROR("create task queue fail", K(task_queue_));
ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -525,6 +525,8 @@ int ObLogTenantMgr::add_tenant(
LOG_ERROR("invalid arguments", KR(ret), K(tenant_id), K(sys_schema_version));
} else if (is_meta_tenant(tenant_id)) {
LOG_INFO("won't add meta tenant", K(tenant_id), K(sys_schema_version));
} else if (OB_FAIL(ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(tenant_id))) {
LOG_ERROR("create and add tenant allocator failed", K(ret), K(tenant_id));
} else {
if (is_online_refresh_mode(refresh_mode_)) {
TenantSchemaInfo tenant_schema_info;
@ -645,6 +647,10 @@ int ObLogTenantMgr::add_tenant(
// and other unexpected case, otherwise global_heartbeat will be stucked.)
try_del_tenant_start_ddl_info_(tenant_id);
if (! add_tenant_succ) {
ObMallocAllocator::get_instance()->recycle_tenant_allocator(tenant_id);
}
return ret;
}

View File

@ -1310,7 +1310,7 @@ DEF_BOOL(_enable_tenant_leak_memory_protection, OB_CLUSTER_PARAMETER, "True", "p
DEF_TIME(_advance_checkpoint_timeout, OB_CLUSTER_PARAMETER, "30m", "[10s,180m]",
"the timeout for backup/migrate advance checkpoint Range: [10s,180m]",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(dump_data_dictionary_to_log_interval, OB_TENANT_PARAMETER, "24h", "(0s,]",
DEF_TIME(dump_data_dictionary_to_log_interval, OB_TENANT_PARAMETER, "5m", "(0s,]",
"data dictionary dump to log(SYS LS) interval"
"Range: (0s,+∞)",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));