diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index f99c433c3f..027c08f58d 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -95,7 +95,7 @@ int ObTenantDirectLoadMgr::init() { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); - const int64_t bucket_num = 1000L * 100L; // 10w + const int64_t bucket_num = common::hash::cal_next_prime(1000L * 100L); const int64_t memory_limit = 1024L * 1024L * 1024L * 10L; // 10GB lib::ObMemAttr attr(tenant_id, "TenantDLMgr"); if (OB_UNLIKELY(is_inited_)) { @@ -245,15 +245,27 @@ int ObTenantDirectLoadMgr::try_create_tablet_direct_load_mgr( { int ret = OB_SUCCESS; direct_load_mgr_handle.reset(); + ObTabletDirectLoadExecContextId exec_id; + ObTabletDirectLoadExecContext exec_context; + exec_id.tablet_id_ = mgr_key.tablet_id_; + exec_id.context_id_ = context_id; + ObSArray all_hash_array; + ObMultiBucketLockGuard lock_guard(bucket_lock_, true/*is_write_lock*/); + const bool need_set_exec_ctx = !is_lob_tablet_mgr && context_id >= 0; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(!mgr_key.is_valid()) || execution_id < 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(mgr_key), K(execution_id)); + } else if (OB_FAIL(all_hash_array.push_back(mgr_key.hash()))) { + LOG_WARN("push back failed", K(ret), K(mgr_key)); + } else if (need_set_exec_ctx && OB_FAIL(all_hash_array.push_back(exec_id.hash()))) { + LOG_WARN("push back failed", K(ret), K(exec_id)); + } else if (OB_FAIL(lock_guard.lock_multi_buckets(all_hash_array))) { + LOG_WARN("lock mult buckets failed", K(ret)); } else { ObTabletDirectLoadMgr *direct_load_mgr = nullptr; - ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash()); if (OB_FAIL(get_tablet_mgr_no_lock(mgr_key, direct_load_mgr_handle))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; @@ -299,11 +311,7 @@ int ObTenantDirectLoadMgr::try_create_tablet_direct_load_mgr( } } } - if (OB_SUCC(ret) && context_id >= 0 && !is_lob_tablet_mgr) { // only build execution context map for data tablet - ObTabletDirectLoadExecContextId exec_id; - ObTabletDirectLoadExecContext exec_context; - exec_id.tablet_id_ = mgr_key.tablet_id_; - exec_id.context_id_ = context_id; + if (OB_SUCC(ret) && need_set_exec_ctx) { // only build execution context map for data tablet exec_context.execution_id_ = execution_id; exec_context.start_scn_.reset(); if (OB_FAIL(tablet_exec_context_map_.set_refactored(exec_id, exec_context, true /*overwrite*/))) { @@ -356,8 +364,7 @@ int ObTenantDirectLoadMgr::open_tablet_direct_load( } if (OB_SUCC(ret)) { - ObBucketHashRLockGuard guard(bucket_lock_, mgr_key.hash()); - if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { + if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) { LOG_WARN("get table execution context failed", K(ret), K(exec_id)); } } @@ -369,7 +376,7 @@ int ObTenantDirectLoadMgr::open_tablet_direct_load( } if (OB_SUCC(ret)) { - ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash()); + ObBucketHashWLockGuard guard(bucket_lock_, exec_id.hash()); exec_context.start_scn_ = start_scn; if (OB_FAIL(tablet_exec_context_map_.set_refactored(exec_id, exec_context, true/*overwrite*/))) { LOG_WARN("get table execution context failed", K(ret), K(exec_id)); @@ -437,7 +444,7 @@ int ObTenantDirectLoadMgr::fill_sstable_slice( } else { LOG_WARN("get table mgr failed", K(ret), K(slice_info)); } - } else if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { + } else if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) { LOG_WARN("get tablet execution context failed", K(ret)); } else if (OB_FAIL(handle.get_obj()->fill_sstable_slice(slice_info, exec_context.start_scn_, iter, affected_rows, insert_monitor))) { if (OB_TRANS_COMMITED == ret && slice_info.is_full_direct_load_) { @@ -482,7 +489,7 @@ int ObTenantDirectLoadMgr::fill_lob_sstable_slice( } else { LOG_WARN("get table mgr failed", K(ret), K(slice_info)); } - } else if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { + } else if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) { LOG_WARN("get tablet execution context failed", K(ret)); } else if (OB_FAIL(handle.get_obj()->fill_lob_sstable_slice(allocator, slice_info, exec_context.start_scn_, pk_interval, lob_column_idxs, col_types, datum_row))) { LOG_WARN("fail to fill batch sstable slice", KR(ret), K(slice_info), K(datum_row)); @@ -624,7 +631,7 @@ int ObTenantDirectLoadMgr::close_sstable_slice( } else { LOG_WARN("get table mgr failed", K(ret), K(slice_info)); } - } else if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { + } else if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) { LOG_WARN("get tablet execution context failed", K(ret)); } else if (OB_FAIL(handle.get_obj()->close_sstable_slice( slice_info.is_lob_slice_/*is_data_tablet_process_for_lob*/, @@ -673,13 +680,8 @@ int ObTenantDirectLoadMgr::close_tablet_direct_load( } else { if (need_commit) { ObTabletDirectLoadExecContext exec_context; - { - ObBucketHashRLockGuard guard(bucket_lock_, mgr_key.hash()); - if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { - LOG_WARN("get tablet execution context failed", K(ret)); - } - } - if (OB_FAIL(ret)) { + if (OB_FAIL(get_tablet_exec_context_with_rlock(exec_id, exec_context))) { + LOG_WARN("get exec context failed", K(ret), K(exec_id)); } else if (OB_FAIL(handle.get_obj()->close(exec_context.execution_id_, exec_context.start_scn_))) { LOG_WARN("close failed", K(ret)); } @@ -692,7 +694,7 @@ int ObTenantDirectLoadMgr::close_tablet_direct_load( // But how to notify the follower to remove it, with write commit failed log or tablet gc task ?? } if (OB_SUCC(ret)) { - ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash()); + ObBucketHashWLockGuard guard(bucket_lock_, exec_id.hash()); if (OB_FAIL(tablet_exec_context_map_.erase_refactored(exec_id))) { LOG_WARN("erase tablet execution context failed", K(ret), K(exec_id)); } else { @@ -735,12 +737,11 @@ int ObTenantDirectLoadMgr::get_tablet_cache_interval( } else if (OB_FAIL(autoinc_service.get_tablet_cache_interval(MTL_ID(), interval))) { LOG_WARN("failed to get tablet cache intervals", K(ret)); } else { - ObTabletDirectLoadMgrKey mgr_key(tablet_id, true/*full direct load*/); // only support in ddl, which is full direct load - ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash()); ObTabletDirectLoadExecContext exec_context; ObTabletDirectLoadExecContextId exec_id; exec_id.tablet_id_ = tablet_id; exec_id.context_id_ = context_id; + ObBucketHashWLockGuard guard(bucket_lock_, exec_id.hash()); if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { LOG_WARN("get tablet execution context failed", K(ret)); } else { @@ -963,6 +964,27 @@ int ObTenantDirectLoadMgr::get_tablet_mgr_no_lock( return ret; } +int ObTenantDirectLoadMgr::get_tablet_exec_context_with_rlock( + const ObTabletDirectLoadExecContextId &exec_id, + ObTabletDirectLoadExecContext &exec_context) +{ + int ret = OB_SUCCESS; + exec_context.reset(); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_UNLIKELY(!exec_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(exec_id)); + } else { + ObBucketHashRLockGuard guard(bucket_lock_, exec_id.hash()); + if (OB_FAIL(tablet_exec_context_map_.get_refactored(exec_id, exec_context))) { + LOG_WARN("get refactored failed", K(ret), K(exec_id)); + } + } + return ret; +} + int ObTenantDirectLoadMgr::GetGcCandidateOp::operator() (common::hash::HashMapPair &kv) { int ret = OB_SUCCESS; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h index ad0dfcce05..bb2037c5f9 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h @@ -230,6 +230,9 @@ private: const int64_t task_id = 0, const int64_t table_id = common::OB_INVALID_ID, const int64_t execution_id = -1); + int get_tablet_exec_context_with_rlock( + const ObTabletDirectLoadExecContextId &exec_id, + ObTabletDirectLoadExecContext &exec_context); int remove_tablet_direct_load_nolock(const ObTabletDirectLoadMgrKey &mgr_key); // to generate unique slice id for slice writer, putting here is just to // simplify the logic of the tablet_direct_load_mgr. diff --git a/src/storage/ddl/ob_direct_load_struct.h b/src/storage/ddl/ob_direct_load_struct.h index 3fac4d474a..ff0862d9a8 100644 --- a/src/storage/ddl/ob_direct_load_struct.h +++ b/src/storage/ddl/ob_direct_load_struct.h @@ -626,7 +626,7 @@ public: return tablet_id_.hash() + murmurhash(&context_id_, sizeof(context_id_), 0); } int hash(uint64_t &hash_val) const {hash_val = hash(); return OB_SUCCESS;} - bool is_valid() const { return context_id_ >= 0; } + bool is_valid() const { return tablet_id_.is_valid() && context_id_ >= 0; } bool operator == (const ObTabletDirectLoadExecContextId &other) const { return tablet_id_ == other.tablet_id_ && context_id_ == other.context_id_; } TO_STRING_KV(K_(tablet_id), K_(context_id)); @@ -641,7 +641,12 @@ public: ObTabletDirectLoadExecContext() : start_scn_(), execution_id_(0), seq_interval_task_id_(0) {} - ~ObTabletDirectLoadExecContext() = default; + ~ObTabletDirectLoadExecContext() { reset(); } + void reset() { + start_scn_.reset(); + execution_id_ = 0; + seq_interval_task_id_ = 0; + } TO_STRING_KV(K_(start_scn), K_(execution_id), K_(seq_interval_task_id)); public: share::SCN start_scn_;