the callback of ddl commit log set commit scn of both data and lob tablet direct load mgr
This commit is contained in:
@ -263,7 +263,7 @@ int ObDDLMacroBlockClogCb::on_failure()
|
||||
}
|
||||
|
||||
ObDDLCommitClogCb::ObDDLCommitClogCb()
|
||||
: is_inited_(false), status_(), ls_id_(), tablet_id_(), start_scn_(SCN::min_scn()), lock_tid_(0), direct_load_mgr_handle_()
|
||||
: is_inited_(false), status_(), ls_id_(), tablet_id_(), start_scn_(SCN::min_scn()), lock_tid_(0), direct_load_mgr_handle_(), lob_direct_load_mgr_handle_()
|
||||
{
|
||||
|
||||
}
|
||||
@ -272,18 +272,21 @@ int ObDDLCommitClogCb::init(const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const share::SCN &start_scn,
|
||||
const uint32_t lock_tid,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle)
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
|
||||
ObTabletDirectLoadMgrHandle &lob_direct_load_mgr_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else if (OB_FAIL(direct_load_mgr_handle_.assign(direct_load_mgr_handle))) {
|
||||
LOG_WARN("assign handle failed", K(ret));
|
||||
} else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !start_scn.is_valid_and_not_min()
|
||||
|| 0 == lock_tid)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(start_scn), K(lock_tid));
|
||||
} else if (OB_FAIL(direct_load_mgr_handle_.assign(direct_load_mgr_handle))) {
|
||||
LOG_WARN("assign handle failed", K(ret));
|
||||
} else if (OB_FAIL(lob_direct_load_mgr_handle_.assign(lob_direct_load_mgr_handle))) {
|
||||
LOG_WARN("assign handle failed", K(ret));
|
||||
} else {
|
||||
ls_id_ = ls_id;
|
||||
tablet_id_ = tablet_id;
|
||||
@ -306,7 +309,11 @@ int ObDDLCommitClogCb::on_success()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error", K(ret), K(tablet_id_));
|
||||
} else {
|
||||
data_direct_load_mgr->set_commit_scn_nolock(__get_scn());
|
||||
const SCN commit_scn = __get_scn();
|
||||
data_direct_load_mgr->set_commit_scn_nolock(commit_scn);
|
||||
if (lob_direct_load_mgr_handle_.is_valid()) {
|
||||
lob_direct_load_mgr_handle_.get_full_obj()->set_commit_scn_nolock(commit_scn);
|
||||
}
|
||||
data_direct_load_mgr->unlock(lock_tid_);
|
||||
}
|
||||
status_.set_ret_code(ret);
|
||||
|
||||
@ -144,7 +144,8 @@ public:
|
||||
const common::ObTabletID &tablet_id,
|
||||
const share::SCN &start_scn,
|
||||
const uint32_t lock_tid,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle);
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
|
||||
ObTabletDirectLoadMgrHandle &lob_direct_load_mgr_handle);
|
||||
virtual int on_success() override;
|
||||
virtual int on_failure() override;
|
||||
inline bool is_success() const { return status_.is_success(); }
|
||||
@ -161,6 +162,7 @@ private:
|
||||
share::SCN start_scn_;
|
||||
uint32_t lock_tid_;
|
||||
ObTabletDirectLoadMgrHandle direct_load_mgr_handle_;
|
||||
ObTabletDirectLoadMgrHandle lob_direct_load_mgr_handle_;
|
||||
};
|
||||
|
||||
class ObDDLClogHeader final
|
||||
|
||||
@ -863,6 +863,7 @@ int ObDDLRedoLogWriter::local_write_ddl_commit_log(
|
||||
const share::ObLSID &ls_id,
|
||||
ObLogHandler *log_handler,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
|
||||
ObTabletDirectLoadMgrHandle &lob_direct_load_mgr_handle,
|
||||
ObDDLCommitLogHandle &handle,
|
||||
uint32_t &lock_tid)
|
||||
{
|
||||
@ -890,7 +891,7 @@ if (OB_ISNULL(buffer = static_cast<char *>(ob_malloc(buffer_size, ObMemAttr(MTL_
|
||||
} else if (OB_ISNULL(cb = op_alloc(ObDDLCommitClogCb))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc memory", K(ret));
|
||||
} else if (OB_FAIL(cb->init(ls_id, log.get_table_key().tablet_id_, log.get_start_scn(), lock_tid, direct_load_mgr_handle))) {
|
||||
} else if (OB_FAIL(cb->init(ls_id, log.get_table_key().tablet_id_, log.get_start_scn(), lock_tid, direct_load_mgr_handle, lob_direct_load_mgr_handle))) {
|
||||
LOG_WARN("init ddl commit log callback failed", K(ret), K(ls_id), K(log));
|
||||
} else if (OB_FAIL(base_header.serialize(buffer, buffer_size, pos))) {
|
||||
LOG_WARN("failed to serialize log base header", K(ret));
|
||||
@ -1301,8 +1302,25 @@ int ObDDLRedoLogWriter::write_commit_log(
|
||||
commit_scn = direct_load_mgr_handle.get_obj()->get_commit_scn(tablet_handle.get_obj()->get_tablet_meta());
|
||||
LOG_WARN("already committed", K(ret), K(start_scn), K(commit_scn), K(direct_load_mgr_handle.get_obj()->get_start_scn()), K(log));
|
||||
} else if (!remote_write_) {
|
||||
if (OB_FAIL(local_write_ddl_commit_log(
|
||||
log, ObDDLClogType::DDL_COMMIT_LOG, ls_id_, ls->get_log_handler(), direct_load_mgr_handle, handle, lock_tid))) {
|
||||
ObTabletBindingMdsUserData ddl_data;
|
||||
ObTabletDirectLoadMgrHandle lob_direct_load_mgr_handle;
|
||||
if (OB_FAIL(tablet_handle.get_obj()->ObITabletMdsInterface::get_ddl_data(share::SCN::max_scn(), ddl_data))) {
|
||||
LOG_WARN("failed to get ddl data from tablet", K(ret), K(tablet_handle));
|
||||
} else if (ddl_data.lob_meta_tablet_id_.is_valid()) {
|
||||
bool is_lob_major_sstable_exist = false;
|
||||
if (OB_FAIL(MTL(ObTenantDirectLoadMgr *)->get_tablet_mgr_and_check_major(ls_id_, ddl_data.lob_meta_tablet_id_,
|
||||
true/* is_full_direct_load */, lob_direct_load_mgr_handle, is_lob_major_sstable_exist))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret && is_lob_major_sstable_exist) {
|
||||
ret = OB_SUCCESS;
|
||||
LOG_INFO("lob meta tablet exist major sstable, skip", K(ret), K(ddl_data.lob_meta_tablet_id_));
|
||||
} else {
|
||||
LOG_WARN("get tablet mgr failed", K(ret), K(ddl_data.lob_meta_tablet_id_));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(local_write_ddl_commit_log(
|
||||
log, ObDDLClogType::DDL_COMMIT_LOG, ls_id_, ls->get_log_handler(), direct_load_mgr_handle, lob_direct_load_mgr_handle, handle, lock_tid))) {
|
||||
if (ObDDLUtil::need_remote_write(ret) && allow_remote_write) {
|
||||
if (OB_FAIL(switch_to_remote_write())) {
|
||||
LOG_WARN("fail to switch to remote write", K(ret), K(table_key));
|
||||
|
||||
@ -302,6 +302,7 @@ private:
|
||||
const share::ObLSID &ls_id,
|
||||
logservice::ObLogHandler *log_handler,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
|
||||
ObTabletDirectLoadMgrHandle &lob_direct_load_mgr_handle,
|
||||
ObDDLCommitLogHandle &handle,
|
||||
uint32_t &lock_tid);
|
||||
int remote_write_ddl_commit_redo(
|
||||
|
||||
@ -315,11 +315,10 @@ int ObTabletDirectLoadMgrHandle::assign(const ObTabletDirectLoadMgrHandle &other
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
reset();
|
||||
if (OB_UNLIKELY(!other.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(other));
|
||||
} else if (OB_FAIL(set_obj(other.tablet_mgr_))) {
|
||||
if (OB_LIKELY(other.is_valid())) {
|
||||
if (OB_FAIL(set_obj(other.tablet_mgr_))) {
|
||||
LOG_WARN("set obj failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user