modify wbp index cache sparsify logic

This commit is contained in:
wanyue-wy 2024-11-14 03:14:39 +00:00 committed by ob-robot
parent a8a00b05fc
commit 37e34761f0
3 changed files with 58 additions and 5 deletions

View File

@ -35,7 +35,7 @@ static const double TMP_FILE_WBP_MEM_LIMIT_PROP = 50; //[0, 100]
class MockWBPIndexCache
{
public:
MockWBPIndexCache(ObTmpWriteBufferPool &wbp) : wbp_(wbp) {}
MockWBPIndexCache(ObTmpWriteBufferPool &wbp) : wbp_(wbp), sparsify_count_(0), ignored_push_count_(0) {}
int push(uint32_t page_index);
int truncate(const int64_t truncate_page_virtual_id);
int compare(const ObTmpFileWBPIndexCache &index_cache);
@ -44,6 +44,8 @@ private:
public:
ObArray<uint32_t> mock_index_cache_;
ObTmpWriteBufferPool &wbp_;
int64_t sparsify_count_;
int64_t ignored_push_count_;
};
int MockWBPIndexCache::push(uint32_t page_index)
@ -54,7 +56,12 @@ int MockWBPIndexCache::push(uint32_t page_index)
LOG_WARN("failed to sparsify", KR(ret));
}
}
if (FAILEDx(mock_index_cache_.push_back(page_index))) {
if (OB_FAIL(ret)) {
} else if (ignored_push_count_ < ((1 << sparsify_count_) - 1)) {
ignored_push_count_ += 1;
} else if (FALSE_IT(ignored_push_count_ = 0)) {
} else if (OB_FAIL(mock_index_cache_.push_back(page_index))) {
LOG_WARN("failed to push back page index", KR(ret), K(page_index));
}
return ret;
@ -106,6 +113,8 @@ int MockWBPIndexCache::sparsify_(const int64_t sparsify_modulus)
}
}
mock_index_cache_ = new_mock_index_cache;
sparsify_count_ += 1;
ignored_push_count_ = 0;
}
return ret;
}
@ -613,16 +622,47 @@ TEST_F(TestTmpFileWBPIndexCache, test_expand_and_sparsify)
ASSERT_EQ(true, wbp_index_cache_.is_full());
ASSERT_EQ(MAX_BUCKET_ARRAY_CAPACITY, wbp_index_cache_.capacity_);
int64_t actual_write_page_num = 0;
int64_t bucket_num = 0;
// 4. push indexes to trigger sparsify
page_num = BUCKET_CAPACITY / 2;
ret = write_and_push_pages(end_page_id, page_num, mock_cache, end_page_id);
ASSERT_EQ(OB_SUCCESS, ret);
ret = mock_cache.compare(wbp_index_cache_);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(1, wbp_index_cache_.sparsify_count_);
ASSERT_EQ(false, wbp_index_cache_.is_full());
ASSERT_EQ(wbp_index_cache_.capacity_ / 2 + 1, wbp_index_cache_.size_);
ASSERT_EQ(page_num, wbp_index_cache_.page_buckets_->at(wbp_index_cache_.right_)->size_);
ASSERT_EQ(MAX_BUCKET_ARRAY_CAPACITY, wbp_index_cache_.capacity_);
actual_write_page_num = page_num / (1 << wbp_index_cache_.sparsify_count_); // in order to keep same sparse interval,
// the other push callings are ignored
bucket_num = wbp_index_cache_.size_;
ASSERT_EQ(wbp_index_cache_.capacity_ / 2 + 1, bucket_num);
ASSERT_EQ(actual_write_page_num % BUCKET_CAPACITY, wbp_index_cache_.page_buckets_->at(wbp_index_cache_.right_)->size_);
// 5. push indexes to trigger sparsify again
page_num = ((1 << wbp_index_cache_.sparsify_count_) - wbp_index_cache_.ignored_push_count_ - 1) + 1 +
(1 << wbp_index_cache_.sparsify_count_) *
(BUCKET_CAPACITY - wbp_index_cache_.page_buckets_->at(wbp_index_cache_.right_)->size_ +
BUCKET_CAPACITY * (MAX_BUCKET_ARRAY_CAPACITY - wbp_index_cache_.size_) - 1);
ret = write_and_push_pages(end_page_id, page_num, mock_cache, end_page_id);
ASSERT_EQ(OB_SUCCESS, ret);
ret = mock_cache.compare(wbp_index_cache_);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(1, wbp_index_cache_.sparsify_count_);
ASSERT_EQ(true, wbp_index_cache_.is_full());
page_num = BUCKET_CAPACITY / 2;
ret = write_and_push_pages(end_page_id, page_num, mock_cache, end_page_id);
ASSERT_EQ(OB_SUCCESS, ret);
ret = mock_cache.compare(wbp_index_cache_);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(2, wbp_index_cache_.sparsify_count_);
ASSERT_EQ(false, wbp_index_cache_.is_full());
ASSERT_EQ(MAX_BUCKET_ARRAY_CAPACITY, wbp_index_cache_.capacity_);
actual_write_page_num = page_num / (1 << wbp_index_cache_.sparsify_count_);
bucket_num = wbp_index_cache_.size_;
ASSERT_EQ(wbp_index_cache_.capacity_ / 2 + 1, bucket_num);
ASSERT_EQ(actual_write_page_num % BUCKET_CAPACITY, wbp_index_cache_.page_buckets_->at(wbp_index_cache_.right_)->size_);
LOG_INFO("test_expand_and_sparsify");
}

View File

@ -26,6 +26,8 @@ ObTmpFileWBPIndexCache::ObTmpFileWBPIndexCache() :
ObTmpFileCircleArray(), bucket_array_allocator_(nullptr), bucket_allocator_(nullptr),
page_buckets_(nullptr) ,
fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD), wbp_(nullptr),
sparsify_count_(0),
ignored_push_count_(0),
max_bucket_array_capacity_(MAX_BUCKET_ARRAY_CAPACITY) {}
ObTmpFileWBPIndexCache::~ObTmpFileWBPIndexCache()
@ -85,6 +87,8 @@ void ObTmpFileWBPIndexCache::reset()
right_ = -1;
size_ = 0;
capacity_ = 0;
sparsify_count_ = 0;
ignored_push_count_ = 0;
if (OB_ISNULL(page_buckets_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("page buckets is null", KR(ret), K(fd_));
@ -146,7 +150,10 @@ int ObTmpFileWBPIndexCache::push(const uint32_t page_index)
}
if (OB_SUCC(ret)) {
if (is_empty() || (OB_NOT_NULL(page_buckets_->at(right_)) && page_buckets_->at(right_)->is_full())) {
if (ignored_push_count_ < ((1 << sparsify_count_) - 1)) {
ignored_push_count_ += 1;
} else if (FALSE_IT(ignored_push_count_ = 0)) {
} else if (is_empty() || (OB_NOT_NULL(page_buckets_->at(right_)) && page_buckets_->at(right_)->is_full())) {
// alloc a new bucket
inc_pos_(right_);
uint64_t tenant_id = MTL_ID();
@ -471,6 +478,8 @@ int ObTmpFileWBPIndexCache::sparsify_()
if (OB_SUCC(ret)) {
right_ = (cur_bucket_pos - 1) % capacity_;
size_ /= 2;
sparsify_count_ += 1;
ignored_push_count_ = 0;
}
}
LOG_INFO("sparsify tmp file wbp index cache over", KR(ret), KPC(this));

View File

@ -85,6 +85,7 @@ public:
}
INHERIT_TO_STRING_KV("ObTmpFileCircleArray", ObTmpFileCircleArray,
K(fd_), KP(wbp_), KP(page_buckets_),
K(sparsify_count_), K(ignored_push_count_),
KP(bucket_array_allocator_), KP(bucket_allocator_));
private:
@ -137,6 +138,9 @@ private:
common::ObArray<ObTmpFilePageIndexBucket*> *page_buckets_;
int64_t fd_;
ObTmpWriteBufferPool* wbp_;
int64_t sparsify_count_; // to keep each page indexes in cache have same sparse interval,
// we need to ignore 2^(sparsify_count_) - 1 push() calling
int64_t ignored_push_count_;
int64_t max_bucket_array_capacity_; // only allowed to modify this var in unit test!!!
};