fix ddl mgr key hash conflict detected invalid.
This commit is contained in:
@ -95,7 +95,7 @@ int ObTenantDirectLoadMgr::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
const int64_t bucket_num = 1000L * 100L; // 10w
|
||||
const int64_t bucket_num = common::hash::cal_next_prime(1000L * 100L);
|
||||
const int64_t memory_limit = 1024L * 1024L * 1024L * 10L; // 10GB
|
||||
lib::ObMemAttr attr(tenant_id, "TenantDLMgr");
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
@ -245,15 +245,27 @@ int ObTenantDirectLoadMgr::try_create_tablet_direct_load_mgr(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
direct_load_mgr_handle.reset();
|
||||
ObTabletDirectLoadExecContextId exec_id;
|
||||
ObTabletDirectLoadExecContext exec_context;
|
||||
exec_id.tablet_id_ = mgr_key.tablet_id_;
|
||||
exec_id.context_id_ = context_id;
|
||||
ObSArray<uint64_t> all_hash_array;
|
||||
ObMultiBucketLockGuard lock_guard(bucket_lock_, true/*is_write_lock*/);
|
||||
const bool need_set_exec_ctx = !is_lob_tablet_mgr && context_id >= 0;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (OB_UNLIKELY(!mgr_key.is_valid()) || execution_id < 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg", K(ret), K(mgr_key), K(execution_id));
|
||||
} else if (OB_FAIL(all_hash_array.push_back(mgr_key.hash()))) {
|
||||
LOG_WARN("push back failed", K(ret), K(mgr_key));
|
||||
} else if (need_set_exec_ctx && OB_FAIL(all_hash_array.push_back(exec_id.hash()))) {
|
||||
LOG_WARN("push back failed", K(ret), K(exec_id));
|
||||
} else if (OB_FAIL(lock_guard.lock_multi_buckets(all_hash_array))) {
|
||||
LOG_WARN("lock mult buckets failed", K(ret));
|
||||
} else {
|
||||
ObTabletDirectLoadMgr *direct_load_mgr = nullptr;
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
if (OB_FAIL(get_tablet_mgr_no_lock(mgr_key, direct_load_mgr_handle))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -299,11 +311,7 @@ int ObTenantDirectLoadMgr::try_create_tablet_direct_load_mgr(
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && context_id >= 0 && !is_lob_tablet_mgr) { // only build execution context map for data tablet
|
||||
ObTabletDirectLoadExecContextId exec_id;
|
||||
ObTabletDirectLoadExecContext exec_context;
|
||||
exec_id.tablet_id_ = mgr_key.tablet_id_;
|
||||
exec_id.context_id_ = context_id;
|
||||
if (OB_SUCC(ret) && need_set_exec_ctx) { // only build execution context map for data tablet
|
||||
exec_context.execution_id_ = execution_id;
|
||||
exec_context.start_scn_.reset();
|
||||
if (OB_FAIL(tablet_exec_context_map_.set_refactored(exec_id, exec_context, true /*overwrite*/))) {
|
||||
@ -356,8 +364,7 @@ int ObTenantDirectLoadMgr::open_tablet_direct_load(
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ObBucketHashRLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) {
|
||||
LOG_WARN("get table execution context failed", K(ret), K(exec_id));
|
||||
}
|
||||
}
|
||||
@ -369,7 +376,7 @@ int ObTenantDirectLoadMgr::open_tablet_direct_load(
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, exec_id.hash());
|
||||
exec_context.start_scn_ = start_scn;
|
||||
if (OB_FAIL(tablet_exec_context_map_.set_refactored(exec_id, exec_context, true/*overwrite*/))) {
|
||||
LOG_WARN("get table execution context failed", K(ret), K(exec_id));
|
||||
@ -437,7 +444,7 @@ int ObTenantDirectLoadMgr::fill_sstable_slice(
|
||||
} else {
|
||||
LOG_WARN("get table mgr failed", K(ret), K(slice_info));
|
||||
}
|
||||
} else if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
} else if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) {
|
||||
LOG_WARN("get tablet execution context failed", K(ret));
|
||||
} else if (OB_FAIL(handle.get_obj()->fill_sstable_slice(slice_info, exec_context.start_scn_, iter, affected_rows, insert_monitor))) {
|
||||
if (OB_TRANS_COMMITED == ret && slice_info.is_full_direct_load_) {
|
||||
@ -482,7 +489,7 @@ int ObTenantDirectLoadMgr::fill_lob_sstable_slice(
|
||||
} else {
|
||||
LOG_WARN("get table mgr failed", K(ret), K(slice_info));
|
||||
}
|
||||
} else if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
} else if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) {
|
||||
LOG_WARN("get tablet execution context failed", K(ret));
|
||||
} else if (OB_FAIL(handle.get_obj()->fill_lob_sstable_slice(allocator, slice_info, exec_context.start_scn_, pk_interval, lob_column_idxs, col_types, datum_row))) {
|
||||
LOG_WARN("fail to fill batch sstable slice", KR(ret), K(slice_info), K(datum_row));
|
||||
@ -624,7 +631,7 @@ int ObTenantDirectLoadMgr::close_sstable_slice(
|
||||
} else {
|
||||
LOG_WARN("get table mgr failed", K(ret), K(slice_info));
|
||||
}
|
||||
} else if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
} else if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) {
|
||||
LOG_WARN("get tablet execution context failed", K(ret));
|
||||
} else if (OB_FAIL(handle.get_obj()->close_sstable_slice(
|
||||
slice_info.is_lob_slice_/*is_data_tablet_process_for_lob*/,
|
||||
@ -673,13 +680,8 @@ int ObTenantDirectLoadMgr::close_tablet_direct_load(
|
||||
} else {
|
||||
if (need_commit) {
|
||||
ObTabletDirectLoadExecContext exec_context;
|
||||
{
|
||||
ObBucketHashRLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
LOG_WARN("get tablet execution context failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) {
|
||||
LOG_WARN("get exec context failed", K(ret), K(exec_id));
|
||||
} else if (OB_FAIL(handle.get_obj()->close(exec_context.execution_id_, exec_context.start_scn_))) {
|
||||
LOG_WARN("close failed", K(ret));
|
||||
}
|
||||
@ -692,7 +694,7 @@ int ObTenantDirectLoadMgr::close_tablet_direct_load(
|
||||
// But how to notify the follower to remove it, with write commit failed log or tablet gc task ??
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, exec_id.hash());
|
||||
if (OB_FAIL(tablet_exec_context_map_.erase_refactored(exec_id))) {
|
||||
LOG_WARN("erase tablet execution context failed", K(ret), K(exec_id));
|
||||
} else {
|
||||
@ -735,12 +737,11 @@ int ObTenantDirectLoadMgr::get_tablet_cache_interval(
|
||||
} else if (OB_FAIL(autoinc_service.get_tablet_cache_interval(MTL_ID(), interval))) {
|
||||
LOG_WARN("failed to get tablet cache intervals", K(ret));
|
||||
} else {
|
||||
ObTabletDirectLoadMgrKey mgr_key(tablet_id, true/*full direct load*/); // only support in ddl, which is full direct load
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
ObTabletDirectLoadExecContext exec_context;
|
||||
ObTabletDirectLoadExecContextId exec_id;
|
||||
exec_id.tablet_id_ = tablet_id;
|
||||
exec_id.context_id_ = context_id;
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, exec_id.hash());
|
||||
if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
LOG_WARN("get tablet execution context failed", K(ret));
|
||||
} else {
|
||||
@ -963,6 +964,27 @@ int ObTenantDirectLoadMgr::get_tablet_mgr_no_lock(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDirectLoadMgr::get_tablet_exec_context_with_rlock(
|
||||
const ObTabletDirectLoadExecContextId &exec_id,
|
||||
ObTabletDirectLoadExecContext &exec_context)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
exec_context.reset();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (OB_UNLIKELY(!exec_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(exec_id));
|
||||
} else {
|
||||
ObBucketHashRLockGuard guard(bucket_lock_, exec_id.hash());
|
||||
if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) {
|
||||
LOG_WARN("get refactored failed", K(ret), K(exec_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDirectLoadMgr::GetGcCandidateOp::operator() (common::hash::HashMapPair<ObTabletDirectLoadMgrKey, ObTabletDirectLoadMgr *> &kv)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -230,6 +230,9 @@ private:
|
||||
const int64_t task_id = 0,
|
||||
const int64_t table_id = common::OB_INVALID_ID,
|
||||
const int64_t execution_id = -1);
|
||||
int get_tablet_exec_context_with_rlock(
|
||||
const ObTabletDirectLoadExecContextId &exec_id,
|
||||
ObTabletDirectLoadExecContext &exec_context);
|
||||
int remove_tablet_direct_load_nolock(const ObTabletDirectLoadMgrKey &mgr_key);
|
||||
// to generate unique slice id for slice writer, putting here is just to
|
||||
// simplify the logic of the tablet_direct_load_mgr.
|
||||
|
||||
@ -626,7 +626,7 @@ public:
|
||||
return tablet_id_.hash() + murmurhash(&context_id_, sizeof(context_id_), 0);
|
||||
}
|
||||
int hash(uint64_t &hash_val) const {hash_val = hash(); return OB_SUCCESS;}
|
||||
bool is_valid() const { return context_id_ >= 0; }
|
||||
bool is_valid() const { return tablet_id_.is_valid() && context_id_ >= 0; }
|
||||
bool operator == (const ObTabletDirectLoadExecContextId &other) const {
|
||||
return tablet_id_ == other.tablet_id_ && context_id_ == other.context_id_; }
|
||||
TO_STRING_KV(K_(tablet_id), K_(context_id));
|
||||
@ -641,7 +641,12 @@ public:
|
||||
ObTabletDirectLoadExecContext()
|
||||
: start_scn_(), execution_id_(0), seq_interval_task_id_(0)
|
||||
{}
|
||||
~ObTabletDirectLoadExecContext() = default;
|
||||
~ObTabletDirectLoadExecContext() { reset(); }
|
||||
void reset() {
|
||||
start_scn_.reset();
|
||||
execution_id_ = 0;
|
||||
seq_interval_task_id_ = 0;
|
||||
}
|
||||
TO_STRING_KV(K_(start_scn), K_(execution_id), K_(seq_interval_task_id));
|
||||
public:
|
||||
share::SCN start_scn_;
|
||||
|
||||
Reference in New Issue
Block a user