use the latest start scn to allocate ddl kv.
This commit is contained in:
@ -378,7 +378,6 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObITable::TableKey hidden_table_key;
|
||||
SCN start_scn;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObComplementDataContext not init", K(ret));
|
||||
@ -396,8 +395,11 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam ¶m)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected err", K(ret), K(MTL_ID()));
|
||||
} else if (OB_FAIL(tenant_direct_load_mgr->open_tablet_direct_load(true, /*is_full_direct_load*/
|
||||
param.dest_ls_id_, param.dest_tablet_id_, context_id_, start_scn, tablet_direct_load_mgr_handle_))) {
|
||||
param.dest_ls_id_, param.dest_tablet_id_, context_id_, start_scn_, tablet_direct_load_mgr_handle_))) {
|
||||
LOG_WARN("write ddl start log failed", K(ret));
|
||||
} else if (OB_UNLIKELY(!start_scn_.is_valid_and_not_min())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid start scn", K(ret), K(start_scn_));
|
||||
}
|
||||
LOG_INFO("complement task start ddl redo success", K(ret), K(param));
|
||||
}
|
||||
@ -1367,10 +1369,13 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
} else if (OB_UNLIKELY(!direct_load_hdl.get_full_obj()->get_start_scn().is_valid_and_not_min())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected err", K(ret), K(direct_load_hdl.get_full_obj()->get_start_scn()));
|
||||
} else if (OB_UNLIKELY(context_->start_scn_ != direct_load_hdl.get_full_obj()->get_start_scn())) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_WARN("task expired", K(ret), K(context_->start_scn_), "start_scn", direct_load_hdl.get_full_obj()->get_start_scn());
|
||||
} else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE,
|
||||
hidden_table_key,
|
||||
param_->task_id_,
|
||||
direct_load_hdl.get_full_obj()->get_start_scn(),
|
||||
context_->start_scn_,
|
||||
param_->data_format_version_,
|
||||
&sstable_redo_writer))) {
|
||||
LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key));
|
||||
|
||||
@ -121,7 +121,7 @@ public:
|
||||
ObComplementDataContext():
|
||||
is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS),
|
||||
allocator_("CompleteDataCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0),
|
||||
data_sstable_redo_writer_(), index_builder_(nullptr), tablet_direct_load_mgr_handle_(), row_scanned_(0), row_inserted_(0), context_id_(0)
|
||||
data_sstable_redo_writer_(), index_builder_(nullptr), start_scn_(share::SCN::min_scn()), tablet_direct_load_mgr_handle_(), row_scanned_(0), row_inserted_(0), context_id_(0)
|
||||
{}
|
||||
~ObComplementDataContext() { destroy(); }
|
||||
int init(const ObComplementDataParam ¶m, const blocksstable::ObDataStoreDesc &desc);
|
||||
@ -133,7 +133,8 @@ public:
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id,
|
||||
bool &is_commited);
|
||||
TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder), K_(tablet_direct_load_mgr_handle), K_(row_scanned), K_(row_inserted));
|
||||
TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder),
|
||||
K_(start_scn), K_(tablet_direct_load_mgr_handle), K_(row_scanned), K_(row_inserted));
|
||||
public:
|
||||
bool is_inited_;
|
||||
bool is_major_sstable_exist_;
|
||||
@ -143,6 +144,7 @@ public:
|
||||
int64_t concurrent_cnt_;
|
||||
ObDDLRedoLogWriter data_sstable_redo_writer_;
|
||||
blocksstable::ObSSTableIndexBuilder *index_builder_;
|
||||
share::SCN start_scn_;
|
||||
ObTabletDirectLoadMgrHandle tablet_direct_load_mgr_handle_;
|
||||
int64_t row_scanned_;
|
||||
int64_t row_inserted_;
|
||||
|
||||
@ -244,7 +244,7 @@ int ObDDLMacroBlockClogCb::on_success()
|
||||
const int64_t snapshot_version = redo_info_.table_key_.get_snapshot_version();
|
||||
const uint64_t data_format_version = redo_info_.data_format_version_;
|
||||
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle_.get_obj(), macro_block,
|
||||
snapshot_version, data_format_version))) {
|
||||
snapshot_version, data_format_version, direct_load_mgr_handle_))) {
|
||||
LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle_), K(macro_block),
|
||||
K(snapshot_version), K(data_format_version));
|
||||
}
|
||||
|
||||
@ -299,11 +299,11 @@ int ObDDLRedoReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
|
||||
const ObITable::TableKey &table_key = redo_info.table_key_;
|
||||
bool is_major_sstable_exist = false;
|
||||
uint64_t data_format_version = redo_info.data_format_version_;
|
||||
if (data_format_version <= 0) {
|
||||
|
||||
// to upgrade from lower version without `data_format_version` in redo log,
|
||||
// use data_format_version in start log instead.
|
||||
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
ObTabletDirectLoadMgrHandle direct_load_mgr_handle;
|
||||
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||
if (OB_ISNULL(tenant_direct_load_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected err", K(ret));
|
||||
@ -314,23 +314,24 @@ int ObDDLRedoReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
|
||||
direct_load_mgr_handle,
|
||||
is_major_sstable_exist))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret && is_major_sstable_exist) {
|
||||
need_replay = false;
|
||||
ret = OB_SUCCESS;
|
||||
LOG_INFO("major sstable already exist", K(ret), K(scn_), K(table_key));
|
||||
LOG_INFO("major sstable already exist, ship replay", K(ret), K(scn_), K(table_key));
|
||||
} else {
|
||||
LOG_WARN("get tablet mgr failed", K(ret), K(table_key));
|
||||
}
|
||||
} else {
|
||||
} else if (data_format_version <= 0) {
|
||||
data_format_version = direct_load_mgr_handle.get_obj()->get_data_format_version();
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block,
|
||||
snapshot_version, data_format_version))) {
|
||||
if (OB_SUCC(ret) && need_replay) {
|
||||
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block,
|
||||
snapshot_version, data_format_version, direct_load_mgr_handle))) {
|
||||
LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block),
|
||||
K(snapshot_version), K(data_format_version));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FLOG_INFO("finish replay ddl redo log", K(ret), K(need_replay), KPC_(log), K(macro_block), "ddl_event_info", ObDDLEventInfo());
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -162,20 +162,20 @@ void ObDDLKVHandle::reset()
|
||||
}
|
||||
}
|
||||
|
||||
ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &start_scn, const SCN &scn,
|
||||
const int64_t snapshot_version, const uint64_t data_format_version)
|
||||
ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const SCN &scn,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle)
|
||||
: tablet_(tablet), scn_(scn), kv_handle_(), ret_(OB_SUCCESS)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDDLKV *curr_kv = nullptr;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
if (OB_UNLIKELY(nullptr == tablet || !scn.is_valid_and_not_min())) {
|
||||
if (OB_UNLIKELY(nullptr == tablet || !scn.is_valid_and_not_min() || !direct_load_mgr_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn));
|
||||
LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn), KPC(direct_load_mgr_handle.get_obj()));
|
||||
} else if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(start_scn, scn,
|
||||
snapshot_version, data_format_version, kv_handle_))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(
|
||||
scn, direct_load_mgr_handle, kv_handle_))) {
|
||||
LOG_WARN("acquire ddl kv failed", K(ret));
|
||||
} else if (OB_ISNULL(curr_kv = kv_handle_.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -219,7 +219,8 @@ int ObDDLKVPendingGuard::set_macro_block(
|
||||
ObTablet *tablet,
|
||||
const ObDDLMacroBlock ¯o_block,
|
||||
const int64_t snapshot_version,
|
||||
const uint64_t data_format_version)
|
||||
const uint64_t data_format_version,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
static const int64_t MAX_RETRY_COUNT = 10;
|
||||
@ -230,8 +231,7 @@ int ObDDLKVPendingGuard::set_macro_block(
|
||||
int64_t try_count = 0;
|
||||
while ((OB_SUCCESS == ret || OB_EAGAIN == ret) && try_count < MAX_RETRY_COUNT) {
|
||||
ObDDLKV *ddl_kv = nullptr;
|
||||
ObDDLKVPendingGuard guard(tablet, macro_block.ddl_start_scn_, macro_block.scn_,
|
||||
snapshot_version, data_format_version);
|
||||
ObDDLKVPendingGuard guard(tablet, macro_block.scn_, direct_load_mgr_handle);
|
||||
if (OB_FAIL(guard.get_ddl_kv(ddl_kv))) {
|
||||
LOG_WARN("get ddl kv failed", K(ret));
|
||||
} else if (OB_ISNULL(ddl_kv)) {
|
||||
|
||||
@ -82,7 +82,7 @@ private:
|
||||
|
||||
|
||||
class ObTablet;
|
||||
|
||||
class ObTabletDirectLoadMgrHandle;
|
||||
class ObDDLKVPendingGuard final
|
||||
{
|
||||
public:
|
||||
@ -90,10 +90,13 @@ public:
|
||||
ObTablet *tablet,
|
||||
const ObDDLMacroBlock ¯o_block,
|
||||
const int64_t snapshot_version,
|
||||
const uint64_t data_format_version);
|
||||
const uint64_t data_format_version,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle);
|
||||
public:
|
||||
ObDDLKVPendingGuard(ObTablet *tablet, const share::SCN &start_scn, const share::SCN &scn,
|
||||
const int64_t snapshot_version, const uint64_t data_format_version);
|
||||
ObDDLKVPendingGuard(
|
||||
ObTablet *tablet,
|
||||
const share::SCN &scn,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle);
|
||||
~ObDDLKVPendingGuard();
|
||||
int get_ret() const { return ret_; }
|
||||
int get_ddl_kv(ObDDLKV *&kv);
|
||||
@ -101,7 +104,6 @@ public:
|
||||
TO_STRING_KV(KP(tablet_), K(scn_), K(kv_handle_), K(ret_));
|
||||
private:
|
||||
ObTablet *tablet_;
|
||||
share::SCN start_scn_;
|
||||
share::SCN scn_;
|
||||
ObDDLKVHandle kv_handle_;
|
||||
int ret_;
|
||||
|
||||
@ -1887,6 +1887,19 @@ int ObTabletDirectLoadMgr::wrlock(const int64_t timeout_us, uint32_t &tid)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDirectLoadMgr::rdlock(const int64_t timeout_us, uint32_t &tid)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t abs_timeout_us = timeout_us + ObTimeUtility::current_time();
|
||||
if (OB_SUCC(lock_.rdlock(ObLatchIds::TABLET_DIRECT_LOAD_MGR_LOCK, abs_timeout_us))) {
|
||||
tid = static_cast<uint32_t>(GETTID());
|
||||
}
|
||||
if (OB_TIMEOUT == ret) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTabletDirectLoadMgr::unlock(const uint32_t tid)
|
||||
{
|
||||
if (OB_SUCCESS != lock_.unlock(&tid)) {
|
||||
|
||||
@ -340,6 +340,7 @@ public:
|
||||
// virtual int get_online_stat_collect_result();
|
||||
|
||||
virtual int wrlock(const int64_t timeout_us, uint32_t &lock_tid);
|
||||
virtual int rdlock(const int64_t timeout_us, uint32_t &lock_tid);
|
||||
virtual void unlock(const uint32_t lock_tid);
|
||||
int prepare_index_builder_if_need(const ObTableSchema &table_schema);
|
||||
virtual int wait_notify(const ObDirectLoadSliceWriter *slice_writer, const share::SCN &start_scn);
|
||||
|
||||
@ -1107,6 +1107,9 @@ int ObDDLKV::set_macro_block(
|
||||
} else if (macro_block.scn_ > freeze_scn_) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("this ddl kv is freezed, retry other ddl kv", K(ret), K(ls_id_), K(tablet_id_), K(macro_block), K(freeze_scn_));
|
||||
} else if (OB_UNLIKELY(snapshot_version != snapshot_version_ || data_format_version != data_format_version_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error", K(ret), K(snapshot_version), K(data_format_version), KPC(this));
|
||||
} else {
|
||||
ObDDLMemtable *ddl_memtable = nullptr;
|
||||
// 1. try find the ddl memtable
|
||||
|
||||
@ -336,24 +336,26 @@ int ObTabletDDLKvMgr::get_active_ddl_kv_impl(ObDDLKVHandle &kv_handle)
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::get_or_create_ddl_kv(
|
||||
const share::SCN &start_scn,
|
||||
const share::SCN &scn,
|
||||
const int64_t snapshot_version,
|
||||
const uint64_t data_format_version,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
|
||||
ObDDLKVHandle &kv_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
kv_handle.reset();
|
||||
uint32_t direct_load_lock_tid = 0;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
|
||||
} else if (!scn.is_valid_and_not_min()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(scn));
|
||||
} else if (OB_FAIL(direct_load_mgr_handle.get_obj()->rdlock(TRY_LOCK_TIMEOUT/*10s*/, direct_load_lock_tid))) {
|
||||
// usually use the latest start scn to allocate kv.
|
||||
LOG_WARN("lock failed", K(ret));
|
||||
} else {
|
||||
uint32_t lock_tid = 0; // try lock to avoid hang in clog callback
|
||||
if (OB_FAIL(rdlock(TRY_LOCK_TIMEOUT, lock_tid))) {
|
||||
LOG_WARN("failed to rdlock", K(ret), K(start_scn), KPC(this));
|
||||
LOG_WARN("failed to rdlock", K(ret), KPC(this));
|
||||
} else {
|
||||
try_get_ddl_kv_unlock(scn, kv_handle);
|
||||
}
|
||||
@ -364,20 +366,25 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv(
|
||||
if (OB_SUCC(ret) && !kv_handle.is_valid()) {
|
||||
uint32_t lock_tid = 0; // try lock to avoid hang in clog callback
|
||||
if (OB_FAIL(wrlock(TRY_LOCK_TIMEOUT, lock_tid))) {
|
||||
LOG_WARN("failed to wrlock", K(ret), K(start_scn), KPC(this));
|
||||
LOG_WARN("failed to wrlock", K(ret), KPC(this));
|
||||
} else {
|
||||
try_get_ddl_kv_unlock(scn, kv_handle);
|
||||
if (kv_handle.is_valid()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(alloc_ddl_kv(start_scn,
|
||||
snapshot_version, data_format_version, kv_handle))) {
|
||||
LOG_WARN("create ddl kv failed", K(ret));
|
||||
} else if (OB_FAIL(alloc_ddl_kv(direct_load_mgr_handle.get_obj()->get_start_scn(),
|
||||
direct_load_mgr_handle.get_obj()->get_table_key().get_snapshot_version(),
|
||||
direct_load_mgr_handle.get_obj()->get_data_format_version(),
|
||||
kv_handle))) {
|
||||
LOG_WARN("create ddl kv failed", K(ret), KPC(direct_load_mgr_handle.get_obj()));
|
||||
}
|
||||
}
|
||||
if (lock_tid != 0) {
|
||||
unlock(lock_tid);
|
||||
}
|
||||
}
|
||||
if (direct_load_lock_tid != 0) {
|
||||
direct_load_mgr_handle.get_obj()->unlock(direct_load_lock_tid);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -40,10 +40,8 @@ public:
|
||||
int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr
|
||||
int set_max_freeze_scn(const share::SCN &checkpoint_scn);
|
||||
int get_or_create_ddl_kv(
|
||||
const share::SCN &start_scn,
|
||||
const share::SCN &scn,
|
||||
const int64_t snapshot_version,
|
||||
const uint64_t data_format_version,
|
||||
ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
|
||||
ObDDLKVHandle &kv_handle); // used in active ddl kv guard
|
||||
int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObDDLKVHandle &kv_handle); // locate ddl kv with exeact freeze log ts
|
||||
int get_ddl_kvs(const bool frozen_only, ObIArray<ObDDLKVHandle> &kv_handle_array); // get all freeze ddl kvs
|
||||
|
||||
Reference in New Issue
Block a user