set tablet_pool for replay
This commit is contained in:
@ -1819,6 +1819,7 @@ int ObLSTabletService::replay_create_tablet(
|
|||||||
ObTablet *tablet = nullptr;
|
ObTablet *tablet = nullptr;
|
||||||
int64_t pos = 0;
|
int64_t pos = 0;
|
||||||
ObMetaDiskAddr old_addr;
|
ObMetaDiskAddr old_addr;
|
||||||
|
ObTabletPoolType pool_type;
|
||||||
ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash());
|
ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash());
|
||||||
time_guard.click("Lock");
|
time_guard.click("Lock");
|
||||||
if (OB_FAIL(ObTabletCreateDeleteHelper::create_tmp_tablet(key, allocator, tablet_hdl))) {
|
if (OB_FAIL(ObTabletCreateDeleteHelper::create_tmp_tablet(key, allocator, tablet_hdl))) {
|
||||||
@ -1839,7 +1840,23 @@ int ObLSTabletService::replay_create_tablet(
|
|||||||
LOG_WARN("failed to init shared params", K(ret), K(ls_id), K(tablet_id));
|
LOG_WARN("failed to init shared params", K(ret), K(ls_id), K(tablet_id));
|
||||||
} else if (OB_FAIL(tablet_id_set_.set(tablet_id))) {
|
} else if (OB_FAIL(tablet_id_set_.set(tablet_id))) {
|
||||||
LOG_WARN("fail to set tablet id set", K(ret), K(tablet_id));
|
LOG_WARN("fail to set tablet id set", K(ret), K(tablet_id));
|
||||||
} else if (OB_FAIL(t3m->compare_and_swap_tablet(key, old_addr, disk_addr))) {
|
} else {
|
||||||
|
if (tablet->is_empty_shell()) {
|
||||||
|
pool_type = ObTabletPoolType::TP_NORMAL;
|
||||||
|
} else {
|
||||||
|
const int64_t try_cache_size = sizeof(ObTablet)
|
||||||
|
+ tablet->rowkey_read_info_->get_deep_copy_size();
|
||||||
|
if (try_cache_size > ObTenantMetaMemMgr::NORMAL_TABLET_POOL_SIZE) {
|
||||||
|
pool_type = ObTabletPoolType::TP_LARGE;
|
||||||
|
} else {
|
||||||
|
pool_type = ObTabletPoolType::TP_NORMAL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
// do nothing
|
||||||
|
} else if (OB_FAIL(t3m->compare_and_swap_tablet(key, old_addr, disk_addr, pool_type, true /* whether to set tablet pool */))) {
|
||||||
LOG_WARN("fail to compare and swap tablat in t3m", K(ret), K(key), K(old_addr), K(disk_addr));
|
LOG_WARN("fail to compare and swap tablat in t3m", K(ret), K(key), K(old_addr), K(disk_addr));
|
||||||
} else if (FALSE_IT(time_guard.click("CASwap"))) {
|
} else if (FALSE_IT(time_guard.click("CASwap"))) {
|
||||||
} else if (OB_FAIL(tablet->check_and_set_initial_state())) {
|
} else if (OB_FAIL(tablet->check_and_set_initial_state())) {
|
||||||
|
|||||||
@ -39,6 +39,7 @@ public:
|
|||||||
int get_in_memory_obj(ObMetaObjGuard<T> &guard);
|
int get_in_memory_obj(ObMetaObjGuard<T> &guard);
|
||||||
void get_obj(ObMetaObjGuard<T> &guard);
|
void get_obj(ObMetaObjGuard<T> &guard);
|
||||||
|
|
||||||
|
void set_obj_pool(ObITenantMetaObjPool &obj_pool);
|
||||||
void set_obj(const ObMetaObjGuard<T> &guard);
|
void set_obj(const ObMetaObjGuard<T> &guard);
|
||||||
void set_addr_without_reset_obj(const ObMetaDiskAddr &addr);
|
void set_addr_without_reset_obj(const ObMetaDiskAddr &addr);
|
||||||
void set_addr_with_reset_obj(const ObMetaDiskAddr &addr);
|
void set_addr_with_reset_obj(const ObMetaDiskAddr &addr);
|
||||||
@ -281,6 +282,12 @@ bool ObMetaPointer<T>::is_in_memory() const
|
|||||||
return nullptr != obj_.ptr_;
|
return nullptr != obj_.ptr_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
void ObMetaPointer<T>::set_obj_pool(ObITenantMetaObjPool &obj_pool)
|
||||||
|
{
|
||||||
|
obj_.pool_ = &obj_pool;
|
||||||
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
void ObMetaPointer<T>::set_addr_without_reset_obj(const ObMetaDiskAddr &addr)
|
void ObMetaPointer<T>::set_addr_without_reset_obj(const ObMetaDiskAddr &addr)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -54,10 +54,13 @@ public:
|
|||||||
ObMetaObjGuard<T> &new_guard);
|
ObMetaObjGuard<T> &new_guard);
|
||||||
// TIPS:
|
// TIPS:
|
||||||
// - only compare and swap pure address, but no reset object.
|
// - only compare and swap pure address, but no reset object.
|
||||||
|
// only used for replay and compat, others mustn't call this func
|
||||||
int compare_and_swap_address_without_object(
|
int compare_and_swap_address_without_object(
|
||||||
const Key &key,
|
const Key &key,
|
||||||
const ObMetaDiskAddr &old_addr,
|
const ObMetaDiskAddr &old_addr,
|
||||||
const ObMetaDiskAddr &new_addr);
|
const ObMetaDiskAddr &new_addr,
|
||||||
|
const bool set_pool /* whether to set pool */,
|
||||||
|
ObITenantMetaObjPool *pool);
|
||||||
template <typename Operator> int for_each_value_store(Operator &op);
|
template <typename Operator> int for_each_value_store(Operator &op);
|
||||||
int wash_meta_obj(const Key &key, ObMetaObjGuard<ObTablet> &guard, void *&free_obj);
|
int wash_meta_obj(const Key &key, ObMetaObjGuard<ObTablet> &guard, void *&free_obj);
|
||||||
int64_t count() const { return ResourceMap::map_.size(); }
|
int64_t count() const { return ResourceMap::map_.size(); }
|
||||||
@ -807,7 +810,9 @@ template <typename Key, typename T>
|
|||||||
int ObMetaPointerMap<Key, T>::compare_and_swap_address_without_object(
|
int ObMetaPointerMap<Key, T>::compare_and_swap_address_without_object(
|
||||||
const Key &key,
|
const Key &key,
|
||||||
const ObMetaDiskAddr &old_addr,
|
const ObMetaDiskAddr &old_addr,
|
||||||
const ObMetaDiskAddr &new_addr)
|
const ObMetaDiskAddr &new_addr,
|
||||||
|
const bool set_pool /* whether to set pool */,
|
||||||
|
ObITenantMetaObjPool *pool)
|
||||||
{
|
{
|
||||||
int ret = common::OB_SUCCESS;
|
int ret = common::OB_SUCCESS;
|
||||||
uint64_t hash_val = 0;
|
uint64_t hash_val = 0;
|
||||||
@ -816,9 +821,10 @@ int ObMetaPointerMap<Key, T>::compare_and_swap_address_without_object(
|
|||||||
if (OB_UNLIKELY(!key.is_valid()
|
if (OB_UNLIKELY(!key.is_valid()
|
||||||
|| !old_addr.is_valid()
|
|| !old_addr.is_valid()
|
||||||
|| !new_addr.is_valid()
|
|| !new_addr.is_valid()
|
||||||
|| new_addr.is_none())) {
|
|| new_addr.is_none()
|
||||||
|
|| (set_pool && nullptr == pool))) {
|
||||||
ret = common::OB_INVALID_ARGUMENT;
|
ret = common::OB_INVALID_ARGUMENT;
|
||||||
STORAGE_LOG(WARN, "invalid argument", K(ret), K(key), K(old_addr), K(new_addr));
|
STORAGE_LOG(WARN, "invalid argument", K(ret), K(key), K(old_addr), K(new_addr), K(set_pool), KP(pool));
|
||||||
} else if (OB_FAIL(ResourceMap::hash_func_(key, hash_val))) {
|
} else if (OB_FAIL(ResourceMap::hash_func_(key, hash_val))) {
|
||||||
STORAGE_LOG(WARN, "fail to calc hash", K(ret), K(key));
|
STORAGE_LOG(WARN, "fail to calc hash", K(ret), K(key));
|
||||||
} else {
|
} else {
|
||||||
@ -833,6 +839,9 @@ int ObMetaPointerMap<Key, T>::compare_and_swap_address_without_object(
|
|||||||
STORAGE_LOG(WARN, "old address has changed, need to get again", K(ret), KPC(t_ptr), K(old_addr));
|
STORAGE_LOG(WARN, "old address has changed, need to get again", K(ret), KPC(t_ptr), K(old_addr));
|
||||||
} else {
|
} else {
|
||||||
t_ptr->set_addr_with_reset_obj(new_addr);
|
t_ptr->set_addr_with_reset_obj(new_addr);
|
||||||
|
if (set_pool) {
|
||||||
|
t_ptr->set_obj_pool(*pool);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -1786,28 +1786,44 @@ int ObTenantMetaMemMgr::compare_and_swap_tablet(
|
|||||||
int ObTenantMetaMemMgr::compare_and_swap_tablet(
|
int ObTenantMetaMemMgr::compare_and_swap_tablet(
|
||||||
const ObTabletMapKey &key,
|
const ObTabletMapKey &key,
|
||||||
const ObMetaDiskAddr &old_addr,
|
const ObMetaDiskAddr &old_addr,
|
||||||
const ObMetaDiskAddr &new_addr)
|
const ObMetaDiskAddr &new_addr,
|
||||||
|
const ObTabletPoolType &pool_type,
|
||||||
|
const bool set_pool /* whether to set tablet pool */)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool is_exist = false;
|
bool is_exist = false;
|
||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("ObTenantMetaMemMgr hasn't been initialized", K(ret));
|
LOG_WARN("ObTenantMetaMemMgr hasn't been initialized", K(ret));
|
||||||
} else if (OB_UNLIKELY(!key.is_valid() || !old_addr.is_valid() || !(new_addr.is_disked() || new_addr.is_memory()))) {
|
} else if (OB_UNLIKELY(!key.is_valid() || !old_addr.is_valid() || !(new_addr.is_disked()
|
||||||
|
|| new_addr.is_memory()) || (set_pool && ObTabletPoolType::TP_MAX == pool_type))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
FLOG_WARN("invalid argument", K(ret), K(key), K(old_addr), K(new_addr));
|
FLOG_WARN("invalid argument", K(ret), K(key), K(old_addr), K(new_addr), K(set_pool), K(pool_type));
|
||||||
} else {
|
} else {
|
||||||
|
ObITenantMetaObjPool *pool = nullptr;
|
||||||
ObBucketHashWLockGuard lock_guard(bucket_lock_, key.hash());
|
ObBucketHashWLockGuard lock_guard(bucket_lock_, key.hash());
|
||||||
if (OB_FAIL(has_tablet(key, is_exist))) {
|
if (OB_FAIL(has_tablet(key, is_exist))) {
|
||||||
LOG_WARN("fail to check tablet is exist", K(ret), K(key));
|
LOG_WARN("fail to check tablet is exist", K(ret), K(key));
|
||||||
} else if (OB_UNLIKELY(!is_exist)) {
|
} else if (OB_UNLIKELY(!is_exist)) {
|
||||||
ret = OB_ENTRY_NOT_EXIST;
|
ret = OB_ENTRY_NOT_EXIST;
|
||||||
LOG_WARN("this tablet isn't exist in map", K(ret), K(key), K(is_exist));
|
LOG_WARN("this tablet isn't exist in map", K(ret), K(key), K(is_exist));
|
||||||
|
} else if (set_pool) {
|
||||||
|
if (ObTabletPoolType::TP_NORMAL == pool_type) {
|
||||||
|
pool = &tablet_buffer_pool_;
|
||||||
|
} else if (ObTabletPoolType::TP_LARGE == pool_type) {
|
||||||
|
pool = &large_tablet_buffer_pool_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
// do nothing
|
||||||
} else if (OB_FAIL(tablet_map_.compare_and_swap_address_without_object(key,
|
} else if (OB_FAIL(tablet_map_.compare_and_swap_address_without_object(key,
|
||||||
old_addr,
|
old_addr,
|
||||||
new_addr))) {
|
new_addr,
|
||||||
|
set_pool,
|
||||||
|
pool))) {
|
||||||
LOG_WARN("fail to compare and swap tablet address in map", K(ret), K(key), K(old_addr),
|
LOG_WARN("fail to compare and swap tablet address in map", K(ret), K(key), K(old_addr),
|
||||||
K(new_addr));
|
K(new_addr), K(set_pool), KP(pool));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG_DEBUG("compare and swap object", K(ret), K(old_addr), K(new_addr), K(lbt()));
|
LOG_DEBUG("compare and swap object", K(ret), K(old_addr), K(new_addr), K(lbt()));
|
||||||
|
|||||||
@ -177,7 +177,6 @@ public:
|
|||||||
void stop();
|
void stop();
|
||||||
void wait();
|
void wait();
|
||||||
void destroy();
|
void destroy();
|
||||||
|
|
||||||
int print_old_chain(
|
int print_old_chain(
|
||||||
const ObTabletMapKey &key,
|
const ObTabletMapKey &key,
|
||||||
const ObTabletPointer &tablet_ptr,
|
const ObTabletPointer &tablet_ptr,
|
||||||
@ -254,10 +253,13 @@ public:
|
|||||||
ObLSService &ls_service,
|
ObLSService &ls_service,
|
||||||
bool &is_released,
|
bool &is_released,
|
||||||
const char *module);
|
const char *module);
|
||||||
|
// only used for replay and compat, others mustn't call this func
|
||||||
int compare_and_swap_tablet(
|
int compare_and_swap_tablet(
|
||||||
const ObTabletMapKey &key,
|
const ObTabletMapKey &key,
|
||||||
const ObMetaDiskAddr &old_addr,
|
const ObMetaDiskAddr &old_addr,
|
||||||
const ObMetaDiskAddr &new_addr);
|
const ObMetaDiskAddr &new_addr,
|
||||||
|
const ObTabletPoolType &pool_type = ObTabletPoolType::TP_MAX,
|
||||||
|
const bool set_pool = false /* whether to set tablet pool */);
|
||||||
int compare_and_swap_tablet(
|
int compare_and_swap_tablet(
|
||||||
const ObTabletMapKey &key,
|
const ObTabletMapKey &key,
|
||||||
const ObTabletHandle &old_handle,
|
const ObTabletHandle &old_handle,
|
||||||
|
|||||||
@ -1319,15 +1319,15 @@ int ObTablet::load_deserialize_v2(
|
|||||||
const bool prepare_memtable)
|
const bool prepare_memtable)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, rowkey_read_info_))) {
|
if (new_pos - pos < length_ && OB_FAIL(tablet_meta_.deserialize(buf, len, new_pos))) {
|
||||||
LOG_WARN("fail to allocate and new rowkey read info", K(ret));
|
|
||||||
} else if (new_pos - pos < length_ && OB_FAIL(tablet_meta_.deserialize(buf, len, new_pos))) {
|
|
||||||
LOG_WARN("failed to deserialize tablet meta", K(ret), K(len), K(new_pos));
|
LOG_WARN("failed to deserialize tablet meta", K(ret), K(len), K(new_pos));
|
||||||
} else if (new_pos - pos < length_ && OB_FAIL(table_store_addr_.addr_.deserialize(buf, len, new_pos))) {
|
} else if (new_pos - pos < length_ && OB_FAIL(table_store_addr_.addr_.deserialize(buf, len, new_pos))) {
|
||||||
LOG_WARN("failed to deserialize table store addr", K(ret), K(len), K(new_pos));
|
LOG_WARN("failed to deserialize table store addr", K(ret), K(len), K(new_pos));
|
||||||
} else if (FALSE_IT(table_store_addr_.addr_.set_seq(tablet_addr_.seq()))) {
|
} else if (FALSE_IT(table_store_addr_.addr_.set_seq(tablet_addr_.seq()))) {
|
||||||
} else if (new_pos - pos < length_ && OB_FAIL(storage_schema_addr_.addr_.deserialize(buf, len, new_pos))) {
|
} else if (new_pos - pos < length_ && OB_FAIL(storage_schema_addr_.addr_.deserialize(buf, len, new_pos))) {
|
||||||
LOG_WARN("failed to deserialize storage schema addr", K(ret), K(len), K(new_pos));
|
LOG_WARN("failed to deserialize storage schema addr", K(ret), K(len), K(new_pos));
|
||||||
|
} else if (!is_empty_shell() && OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, rowkey_read_info_))) {
|
||||||
|
LOG_WARN("fail to allocate and new rowkey read info", K(ret));
|
||||||
} else if (!is_empty_shell() && new_pos - pos < length_ && OB_FAIL(rowkey_read_info_->deserialize(allocator, buf, len, new_pos))) {
|
} else if (!is_empty_shell() && new_pos - pos < length_ && OB_FAIL(rowkey_read_info_->deserialize(allocator, buf, len, new_pos))) {
|
||||||
LOG_WARN("fail to deserialize rowkey read info", K(ret), K(len), K(new_pos));
|
LOG_WARN("fail to deserialize rowkey read info", K(ret), K(len), K(new_pos));
|
||||||
} else if (new_pos - pos < length_ && OB_FAIL(mds_data_.deserialize(buf, len, new_pos))) {
|
} else if (new_pos - pos < length_ && OB_FAIL(mds_data_.deserialize(buf, len, new_pos))) {
|
||||||
@ -4771,7 +4771,7 @@ int64_t ObTablet::to_string(char *buf, const int64_t buf_len) const
|
|||||||
KP_(next_tablet),
|
KP_(next_tablet),
|
||||||
KP_(memtable_mgr),
|
KP_(memtable_mgr),
|
||||||
KP_(log_handler),
|
KP_(log_handler),
|
||||||
K_(rowkey_read_info),
|
KPC_(rowkey_read_info),
|
||||||
K_(mds_data),
|
K_(mds_data),
|
||||||
K_(hold_ref_cnt),
|
K_(hold_ref_cnt),
|
||||||
K_(is_inited),
|
K_(is_inited),
|
||||||
|
|||||||
@ -72,7 +72,6 @@ class ObTabletPersister final
|
|||||||
public:
|
public:
|
||||||
ObTabletPersister() = default;
|
ObTabletPersister() = default;
|
||||||
~ObTabletPersister() = default;
|
~ObTabletPersister() = default;
|
||||||
|
|
||||||
// Persist the old tablet itself and all internal members, and transform it into a new tablet
|
// Persist the old tablet itself and all internal members, and transform it into a new tablet
|
||||||
// from object pool. The old tablet can be allocated by allocator or from object pool.
|
// from object pool. The old tablet can be allocated by allocator or from object pool.
|
||||||
static int persist_and_transform_tablet(
|
static int persist_and_transform_tablet(
|
||||||
|
|||||||
Reference in New Issue
Block a user