[BUG.FIX] reduce lock scope between resize and mark sweep.
This commit is contained in:
@ -138,8 +138,7 @@ ObBlockManager::ObBlockManager()
|
|||||||
marker_status_(),
|
marker_status_(),
|
||||||
marker_lock_(),
|
marker_lock_(),
|
||||||
is_mark_sweep_enabled_(false),
|
is_mark_sweep_enabled_(false),
|
||||||
is_doing_mark_sweep_(false),
|
sweep_lock_(),
|
||||||
cond_(),
|
|
||||||
mark_block_task_(*this),
|
mark_block_task_(*this),
|
||||||
inspect_bad_block_task_(*this),
|
inspect_bad_block_task_(*this),
|
||||||
timer_(),
|
timer_(),
|
||||||
@ -172,8 +171,6 @@ int ObBlockManager::init(
|
|||||||
LOG_WARN("fail to init timer", K(ret));
|
LOG_WARN("fail to init timer", K(ret));
|
||||||
} else if (OB_FAIL(bucket_lock_.init(DEFAULT_LOCK_BUCKET_COUNT, ObLatchIds::BLOCK_MANAGER_LOCK))) {
|
} else if (OB_FAIL(bucket_lock_.init(DEFAULT_LOCK_BUCKET_COUNT, ObLatchIds::BLOCK_MANAGER_LOCK))) {
|
||||||
LOG_WARN("fail to init bucket lock", K(ret));
|
LOG_WARN("fail to init bucket lock", K(ret));
|
||||||
} else if (OB_FAIL(cond_.init(common::ObWaitEventIds::DEFAULT_COND_WAIT))) {
|
|
||||||
LOG_WARN("fail to init thread cond", K(ret));
|
|
||||||
} else if (OB_FAIL(block_map_.init("BlockMap", OB_SYS_TENANT_ID))) {
|
} else if (OB_FAIL(block_map_.init("BlockMap", OB_SYS_TENANT_ID))) {
|
||||||
LOG_WARN("fail to init block map", K(ret));
|
LOG_WARN("fail to init block map", K(ret));
|
||||||
} else if (OB_FAIL(super_block_buf_holder_.init(ObServerSuperBlockHeader::OB_MAX_SUPER_BLOCK_SIZE))) {
|
} else if (OB_FAIL(super_block_buf_holder_.init(ObServerSuperBlockHeader::OB_MAX_SUPER_BLOCK_SIZE))) {
|
||||||
@ -288,9 +285,7 @@ void ObBlockManager::destroy()
|
|||||||
super_block_buf_holder_.reset();
|
super_block_buf_holder_.reset();
|
||||||
default_block_size_ = 0;
|
default_block_size_ = 0;
|
||||||
is_mark_sweep_enabled_ = false;
|
is_mark_sweep_enabled_ = false;
|
||||||
is_doing_mark_sweep_ = false;
|
|
||||||
marker_status_.reset();
|
marker_status_.reset();
|
||||||
cond_.destroy();
|
|
||||||
blk_seq_generator_.reset();
|
blk_seq_generator_.reset();
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
}
|
}
|
||||||
@ -648,10 +643,8 @@ int ObBlockManager::resize_file(const int64_t new_data_file_size,
|
|||||||
} else if (!is_mark_sweep_enabled()) {
|
} else if (!is_mark_sweep_enabled()) {
|
||||||
LOG_INFO("mark and sweep is disabled, do not resize file at present");
|
LOG_INFO("mark and sweep is disabled, do not resize file at present");
|
||||||
} else {
|
} else {
|
||||||
disable_mark_sweep();
|
SpinWLockGuard sweep_guard(sweep_lock_);
|
||||||
if (OB_FAIL(wait_mark_sweep_finish())) {
|
if (OB_UNLIKELY(!super_block_.is_valid())) {
|
||||||
LOG_WARN("fail to wait mark and sweep finish", K(ret));
|
|
||||||
} else if (OB_UNLIKELY(!super_block_.is_valid())) {
|
|
||||||
LOG_INFO("observer may be starting", K(super_block_));
|
LOG_INFO("observer may be starting", K(super_block_));
|
||||||
} else {
|
} else {
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
@ -687,7 +680,6 @@ int ObBlockManager::resize_file(const int64_t new_data_file_size,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enable_mark_sweep();
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -956,7 +948,6 @@ void ObBlockManager::mark_and_sweep()
|
|||||||
LOG_WARN("slog replay hasn't finished, this task can't start", K(ret));
|
LOG_WARN("slog replay hasn't finished, this task can't start", K(ret));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
set_mark_sweep_doing();
|
|
||||||
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID))) {
|
if (OB_FAIL(mark_info.init(ObModIds::OB_STORAGE_FILE_BLOCK_REF, OB_SERVER_TENANT_ID))) {
|
||||||
LOG_WARN("fail to init mark info, ", K(ret));
|
LOG_WARN("fail to init mark info, ", K(ret));
|
||||||
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.count())))) {
|
} else if (OB_FAIL(macro_id_set.create(MAX(2, block_map_.count())))) {
|
||||||
@ -978,6 +969,7 @@ void ObBlockManager::mark_and_sweep()
|
|||||||
pending_free_count_ += mark_info.count();
|
pending_free_count_ += mark_info.count();
|
||||||
mark_cost_time_ = ObTimeUtility::fast_current_time() - start_time_;
|
mark_cost_time_ = ObTimeUtility::fast_current_time() - start_time_;
|
||||||
//sweep
|
//sweep
|
||||||
|
SpinWLockGuard guard(sweep_lock_);
|
||||||
if (OB_FAIL(do_sweep(mark_info))) {
|
if (OB_FAIL(do_sweep(mark_info))) {
|
||||||
LOG_WARN("do sweep fail", K(ret));
|
LOG_WARN("do sweep fail", K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -995,7 +987,6 @@ void ObBlockManager::mark_and_sweep()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
set_mark_sweep_done();
|
|
||||||
}
|
}
|
||||||
macro_id_set.destroy();
|
macro_id_set.destroy();
|
||||||
}
|
}
|
||||||
@ -1432,29 +1423,6 @@ int ObBlockManager::update_mark_info(const MacroBlockId ¯o_id,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObBlockManager::wait_mark_sweep_finish()
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
ObThreadCondGuard guard(cond_);
|
|
||||||
while (is_doing_mark_sweep_) {
|
|
||||||
cond_.wait_us(100);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObBlockManager::set_mark_sweep_doing()
|
|
||||||
{
|
|
||||||
ObThreadCondGuard guard(cond_);
|
|
||||||
is_doing_mark_sweep_ = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObBlockManager::set_mark_sweep_done()
|
|
||||||
{
|
|
||||||
ObThreadCondGuard guard(cond_);
|
|
||||||
is_doing_mark_sweep_ = false;
|
|
||||||
cond_.broadcast();
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObBlockManager::BlockMapIterator::get_next_block(common::ObIOFd &block_id)
|
int ObBlockManager::BlockMapIterator::get_next_block(common::ObIOFd &block_id)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -382,10 +382,6 @@ private:
|
|||||||
void enable_mark_sweep() { ATOMIC_SET(&is_mark_sweep_enabled_, true); }
|
void enable_mark_sweep() { ATOMIC_SET(&is_mark_sweep_enabled_, true); }
|
||||||
bool is_mark_sweep_enabled() { return ATOMIC_LOAD(&is_mark_sweep_enabled_); }
|
bool is_mark_sweep_enabled() { return ATOMIC_LOAD(&is_mark_sweep_enabled_); }
|
||||||
|
|
||||||
int wait_mark_sweep_finish();
|
|
||||||
void set_mark_sweep_doing();
|
|
||||||
void set_mark_sweep_done();
|
|
||||||
|
|
||||||
int extend_file_size_if_need();
|
int extend_file_size_if_need();
|
||||||
bool check_can_be_extend(
|
bool check_can_be_extend(
|
||||||
const int64_t reserved_size);
|
const int64_t reserved_size);
|
||||||
@ -471,8 +467,7 @@ private:
|
|||||||
common::SpinRWLock marker_lock_;
|
common::SpinRWLock marker_lock_;
|
||||||
|
|
||||||
bool is_mark_sweep_enabled_;
|
bool is_mark_sweep_enabled_;
|
||||||
bool is_doing_mark_sweep_;
|
common::SpinRWLock sweep_lock_;
|
||||||
ObThreadCond cond_; // for mark sweep
|
|
||||||
|
|
||||||
MarkBlockTask mark_block_task_;
|
MarkBlockTask mark_block_task_;
|
||||||
InspectBadBlockTask inspect_bad_block_task_;
|
InspectBadBlockTask inspect_bad_block_task_;
|
||||||
|
|||||||
Reference in New Issue
Block a user