optimize gc sstable.

This commit is contained in:
Tyshawn 2021-07-22 11:30:47 +08:00 committed by wangzelin.wzl
parent 7b201b9253
commit 2cd51447d8
14 changed files with 312 additions and 28 deletions

View File

@ -513,6 +513,7 @@ public:
virtual int get_meta_block_list(common::ObIArray<blocksstable::MacroBlockId>& meta_block_list) const = 0;
virtual int get_all_tables(ObTablesHandle& tables_handle) = 0;
virtual int recycle_unused_sstables(const int64_t max_recycle_cnt, int64_t& recycled_cnt) = 0;
virtual int recycle_sstable(const ObITable::TableKey &table_key) = 0;
virtual int check_can_free(bool& can_free) = 0;
virtual int get_merge_log_ts(int64_t& freeze_ts) = 0;
virtual int get_table_store_cnt(int64_t& table_cnt) const = 0;

View File

@ -243,10 +243,12 @@ public:
{
ATOMIC_INC(&ref_cnt_);
}
inline int64_t dec_ref()
{
return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */);
virtual int64_t dec_ref()
{
return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */);
}
inline int64_t get_ref() const
{
return ATOMIC_LOAD(&ref_cnt_);

View File

@ -5820,7 +5820,16 @@ int ObPartitionGroup::recycle_unused_sstables(const int64_t max_recycle_cnt, int
return ret;
}
int ObPartitionGroup::set_meta_block_list(const common::ObIArray<blocksstable::MacroBlockId>& meta_block_list)
int ObPartitionGroup::recycle_sstable(const ObITable::TableKey &table_key)
{
int ret = OB_SUCCESS;
if (OB_FAIL(pg_storage_.recycle_sstable(table_key))) {
STORAGE_LOG(WARN, "fail to recycle sstable", K(ret), K(table_key));
}
return ret;
}
int ObPartitionGroup::set_meta_block_list(const common::ObIArray<blocksstable::MacroBlockId> &meta_block_list)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {

View File

@ -391,6 +391,7 @@ public:
virtual int get_checkpoint_info(common::ObArenaAllocator& allocator, ObPGCheckpointInfo& pg_checkpoint_info) override;
virtual int acquire_sstable(const ObITable::TableKey& table_key, ObTableHandle& table_handle) override;
virtual int recycle_unused_sstables(const int64_t max_recycle_cnt, int64_t& recycled_cnt) override;
virtual int recycle_sstable(const ObITable::TableKey &table_key) override;
virtual int check_can_free(bool& can_free) override;
virtual bool need_replay_redo() const;

View File

@ -28,8 +28,58 @@ void ObPGSSTableGCTask::runTimerTask()
{
int ret = OB_SUCCESS;
disable_timeout_check();
storage::ObIPartitionGroupIterator* partition_iter = nullptr;
ObIPartitionGroup* partition_group = nullptr;
if (OB_FAIL(ObPGSSTableGarbageCollector::get_instance().gc_free_sstable())) {
LOG_WARN("fail to gc free sstable", K(ret));
}
}
ObPGSSTableGarbageCollector::ObPGSSTableGarbageCollector()
: is_inited_(false), timer_(), gc_task_(), free_sstables_queue_(), do_gc_cnt_by_queue_(0)
{
}
ObPGSSTableGarbageCollector::~ObPGSSTableGarbageCollector()
{
}
int ObPGSSTableGarbageCollector::init()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObPGSSTableGarbageCollector has already been inited", K(ret));
} else if (OB_FAIL(timer_.init())) {
LOG_WARN("fail to init sstable gc timer", K(ret));
} else if (OB_FAIL(schedule_gc_task())) {
LOG_WARN("fail to schedule gc task", K(ret));
} else {
is_inited_ = true;
}
return ret;
}
int ObPGSSTableGarbageCollector::gc_free_sstable()
{
int ret = OB_SUCCESS;
if (DO_ONE_ROUND_PG_ITER_RECYCLE_THRESHOLD == do_gc_cnt_by_queue_) {
if (OB_FAIL(gc_free_sstable_by_pg_iter())) {
LOG_WARN("fail to gc free sstable by pg iter", K(ret));
} else {
do_gc_cnt_by_queue_ = 0;
}
} else if (OB_FAIL(gc_free_sstable_by_queue())) {
LOG_WARN("fail to gc free sstable by queue", K(ret));
} else {
do_gc_cnt_by_queue_++;
}
return ret;
}
int ObPGSSTableGarbageCollector::gc_free_sstable_by_pg_iter()
{
int ret = OB_SUCCESS;
storage::ObIPartitionGroupIterator *partition_iter = nullptr;
ObIPartitionGroup *partition_group = nullptr;
int64_t left_recycle_cnt = ONE_ROUND_RECYCLE_COUNT_THRESHOLD;
if (OB_ISNULL(partition_iter = ObPartitionService::get_instance().alloc_pg_iter())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -58,30 +108,136 @@ void ObPGSSTableGCTask::runTimerTask()
ObPartitionService::get_instance().revert_pg_iter(partition_iter);
partition_iter = nullptr;
}
LOG_INFO("do one gc free sstable by pg iter", K(ret), "free sstable cnt",
ONE_ROUND_RECYCLE_COUNT_THRESHOLD - left_recycle_cnt);
return ret;
}
ObPGSSTableGarbageCollector::ObPGSSTableGarbageCollector() : is_inited_(false), timer_(), gc_task_()
{}
ObPGSSTableGarbageCollector::~ObPGSSTableGarbageCollector()
{}
int ObPGSSTableGarbageCollector::init()
int ObPGSSTableGarbageCollector::gc_free_sstable_by_queue()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObPGSSTableGarbageCollector has already been inited", K(ret));
} else if (OB_FAIL(timer_.init())) {
LOG_WARN("fail to init sstable gc timer", K(ret));
} else if (OB_FAIL(schedule_gc_task())) {
LOG_WARN("fail to schedule gc task", K(ret));
int64_t left_recycle_cnt = ONE_ROUND_RECYCLE_COUNT_THRESHOLD;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObPGSSTableGarbageCollector not init", K(ret));
} else {
is_inited_ = true;
while (OB_SUCC(ret) && left_recycle_cnt > 0 && free_sstables_queue_.size() > 0) {
ObLink *ptr = NULL;
if (OB_FAIL(free_sstables_queue_.pop(ptr))) {
LOG_WARN("fail to pop item", K(ret));
} else if (OB_ISNULL(ptr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, ptr is nullptr", K(ret), KP(ptr));
} else {
ObSSTableGCItem *item = static_cast<ObSSTableGCItem *>(ptr);
ObIPartitionGroupGuard pg_guard;
ObIPartitionGroup *pg = NULL;
if (OB_UNLIKELY(!item->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, ptr is nullptr", K(ret), K(*item));
} else if (OB_FAIL(ObPartitionService::get_instance().get_partition(item->key_.pkey_,
pg_guard))) {
if (OB_PARTITION_NOT_EXIST == ret) {
// skip not exist partition in pg mgr, because the sstables of this partition will be
// gc by ObPGMemoryGarbageCollector.
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to get partition", K(ret), "pkey", item->key_.pkey_);
}
} else if (OB_ISNULL(pg = pg_guard.get_partition_group())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, pg is nullptr", K(ret), KP(pg), "pkey", item->key_.pkey_);
} else if (OB_FAIL(pg->recycle_sstable(item->key_))) {
LOG_WARN("fail to recycle sstable", K(ret), "table key", item->key_);
} else {
left_recycle_cnt--;
}
if (OB_FAIL(ret) && OB_NOT_NULL(item)) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = free_sstables_queue_.push(item))) {
LOG_WARN("fail to push into free sstables queue", K(tmp_ret), K(ret), K(*item));
} else {
item = NULL;
}
}
if (OB_NOT_NULL(item)) {
free_sstable_gc_item(item);
}
}
}
}
LOG_INFO("do one gc free sstable by queue", K(ret), "free sstable cnt",
ONE_ROUND_RECYCLE_COUNT_THRESHOLD - left_recycle_cnt);
return ret;
}
int ObPGSSTableGarbageCollector::push_sstable_into_gc_queue(ObITable::TableKey &key)
{
int ret = OB_SUCCESS;
ObSSTableGCItem *item = NULL;
if (OB_FAIL(alloc_sstable_gc_item(item))) {
LOG_WARN("fail to allocate sstable gc item", K(ret));
} else if (OB_ISNULL(item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, item is nullptr", K(ret), KP(item));
} else if (FALSE_IT(item->key_ = key)) {
} else if (OB_FAIL(push_sstable_gc_item(item))) {
LOG_WARN("fail to push sstable gc item", K(ret), K(*item));
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(item)) {
free_sstable_gc_item(item);
}
}
return ret;
}
int ObPGSSTableGarbageCollector::alloc_sstable_gc_item(ObSSTableGCItem *&item)
{
int ret = OB_SUCCESS;
int64_t size = sizeof(ObSSTableGCItem);
ObMemAttr attr(common::OB_SERVER_TENANT_ID, "SSTableGCItem");
if (OB_ISNULL(item = (ObSSTableGCItem *)ob_malloc(size, attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
LOG_WARN("no memory, failed to allocate sstable gc item", K(ret), K(size));
}
}
return ret;
}
int ObPGSSTableGarbageCollector::push_sstable_gc_item(const ObSSTableGCItem *item)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(item)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", K(ret), KP(item));
} else if (OB_FAIL(free_sstables_queue_.push((ObLink *)item))) {
LOG_WARN("fail to push back into free sstable queue", K(ret), K(*item));
}
return ret;
}
void ObPGSSTableGarbageCollector::free_sstable_gc_item(const ObSSTableGCItem *item)
{
int tmp_ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
tmp_ret = OB_NOT_INIT;
LOG_ERROR("ObPGSSTableGarbageCollector not init", K(tmp_ret));
} else if (OB_ISNULL(item)) {
tmp_ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", K(tmp_ret), KP(item));
} else {
ob_free((void *)item);
}
}
int ObPGSSTableGarbageCollector::schedule_gc_task()
{
int ret = OB_SUCCESS;

View File

@ -14,18 +14,36 @@
#define OB_PG_SSTABLE_GARBAGE_COLLECTOR_H_
#include "lib/task/ob_timer.h"
#include "lib/queue/ob_link_queue.h"
#include "ob_pg_mgr.h"
#include "ob_sstable.h"
namespace oceanbase {
namespace storage {
class ObPGSSTableGCTask : public common::ObTimerTask {
struct ObSSTableGCItem : common::ObLink
{
public:
ObSSTableGCItem() : key_() {}
virtual ~ObSSTableGCItem() {}
bool is_valid()
{
return key_.is_valid();
}
TO_STRING_KV(K_(key));
public:
ObITable::TableKey key_;
};
class ObPGSSTableGCTask : public common::ObTimerTask
{
public:
ObPGSSTableGCTask();
virtual ~ObPGSSTableGCTask();
virtual void runTimerTask() override;
private:
static const int64_t ONE_ROUND_RECYCLE_COUNT_THRESHOLD = 100000L;
};
class ObPGSSTableGarbageCollector {
@ -38,14 +56,32 @@ public:
void wait();
void destroy();
int gc_free_sstable();
int push_sstable_into_gc_queue(ObITable::TableKey &key);
private:
int schedule_gc_task();
int gc_free_sstable_by_queue();
int gc_free_sstable_by_pg_iter();
int alloc_sstable_gc_item(ObSSTableGCItem *&item);
int push_sstable_gc_item(const ObSSTableGCItem *item);
void free_sstable_gc_item(const ObSSTableGCItem *item);
private:
static const int64_t GC_INTERVAL_US = 1 * 1000 * 1000L;
static const int64_t GC_INTERVAL_US = 20 * 1000L; // 20ms
static const int64_t DO_ONE_ROUND_PG_ITER_RECYCLE_THRESHOLD = 50L;
static const int64_t ONE_ROUND_RECYCLE_COUNT_THRESHOLD = 20000L;
bool is_inited_;
common::ObTimer timer_;
ObPGSSTableGCTask gc_task_;
common::ObLinkQueue free_sstables_queue_;
int64_t do_gc_cnt_by_queue_;
};
} // end namespace storage

View File

@ -655,6 +655,57 @@ int ObPGSSTableMgr::recycle_unused_sstables(const int64_t max_recycle_cnt, int64
return ret;
}
int ObPGSSTableMgr::recycle_sstable(const ObITable::TableKey &table_key)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObPGSSTableMgr has not been inited", K(ret));
} else if (!enable_write_log_) {
LOG_INFO("do not recycle unused sstable now");
} else if (OB_UNLIKELY(!table_key.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(table_key));
} else {
ObBucketHashWLockGuard lock_guard(bucket_lock_, table_key.hash());
ObITable *sstable = NULL;
if (OB_FAIL(table_map_.get(table_key, sstable))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS; // maybe this sstable has be removed.
} else {
LOG_WARN("fail to get sstable", K(ret), K(table_key));
}
} else if (OB_ISNULL(sstable)) {
ret = OB_ERR_SYS;
LOG_WARN("error sys, table must not be null", K(ret), KP(sstable));
} else if (OB_UNLIKELY(sstable->get_ref() > 0)) {
// do nothing.
} else {
int64_t lsn = 0;
if (OB_FAIL(SLOGGER.begin(OB_LOG_TABLE_MGR))) {
LOG_WARN("fail to begin transaction", K(ret));
} else if (OB_FAIL(write_remove_sstable_log(sstable->get_key()))) {
LOG_WARN("fail to write remove sstable log", K(ret), K(sstable->get_key()));
} else if (OB_FAIL(SLOGGER.commit(lsn))) {
LOG_WARN("fail to commit slog", K(ret));
} else if (OB_FAIL(table_map_.erase(sstable->get_key()))) {
LOG_WARN("fail to erase from table map", K(ret), K(sstable->get_key()));
abort();
} else {
free_sstable(static_cast<ObSSTable *>(sstable));
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = SLOGGER.abort())) {
LOG_WARN("fail to abort slog", K(ret));
}
}
}
}
return ret;
}
int64_t ObPGSSTableMgr::get_serialize_size()
{
int ret = OB_SUCCESS;

View File

@ -41,6 +41,7 @@ public:
int add_sstables(const bool in_slog_trans, ObTablesHandle& tables_handle);
int add_sstable(const bool in_slog_trans, ObTableHandle& table_handle);
int recycle_unused_sstables(const int64_t max_recycle_cnt, int64_t& recycled_cnt);
int recycle_sstable(const ObITable::TableKey &table_key);
int serialize(common::ObIAllocator& allocator, char*& buf, int64_t& serialize_size, ObTablesHandle* tables_handle);
int deserialize(const char* buf, const int64_t buf_len, int64_t& pos);
int replay_add_sstable(ObSSTable& replay_sstable);

View File

@ -6368,6 +6368,15 @@ int ObPGStorage::recycle_unused_sstables(const int64_t max_recycle_cnt, int64_t&
return ret;
}
int ObPGStorage::recycle_sstable(const ObITable::TableKey &table_key)
{
int ret = OB_SUCCESS;
if (OB_FAIL(sstable_mgr_.recycle_sstable(table_key))) {
STORAGE_LOG(WARN, "fail to recycle sstable", K(ret), K(table_key));
}
return ret;
}
int ObPGStorage::alloc_file_for_old_replay()
{
int ret = OB_SUCCESS;

View File

@ -330,6 +330,7 @@ public:
int replay_remove_sstable(const ObITable::TableKey& table_key);
int acquire_sstable(const ObITable::TableKey& table_key, ObTableHandle& table_handle);
int recycle_unused_sstables(const int64_t max_recycle_cnt, int64_t& recycled_cnt);
int recycle_sstable(const ObITable::TableKey &table_key);
int check_can_free(bool& can_free);
int get_min_frozen_memtable_base_version(int64_t& min_base_version);

View File

@ -26,6 +26,7 @@
#include "ob_partition_scheduler.h"
#include "ob_warm_up_request.h"
#include "ob_multiple_merge.h"
#include "ob_pg_sstable_garbage_collector.h"
#include "storage/ob_sstable_row_exister.h"
#include "storage/ob_sstable_row_multi_exister.h"
#include "storage/ob_sstable_row_getter.h"
@ -3314,7 +3315,21 @@ int ObSSTable::convert_add_macro_block_meta(
return ret;
}
int ObSSTable::serialize_schema_map(char* buf, int64_t data_len, int64_t& pos) const
int64_t ObSSTable::dec_ref()
{
int64_t ref_cnt = ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */);
if (0 == ref_cnt) {
int ret = OB_SUCCESS;
if (OB_FAIL(ObPGSSTableGarbageCollector::get_instance().push_sstable_into_gc_queue(key_))) {
LOG_WARN("fail to push sstable into gc queue", K(ret), K(key_));
}
}
return ref_cnt;
}
int ObSSTable::serialize_schema_map(char *buf, int64_t data_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(serialization::encode_i64(buf, data_len, pos, schema_map_.size()))) {

View File

@ -350,6 +350,7 @@ public:
{
return meta_.has_compact_row_;
}
int64_t dec_ref() override;
VIRTUAL_NEED_SERIALIZE_AND_DESERIALIZE;
INHERIT_TO_STRING_KV(
"ObITable", ObITable, KP(this), K_(status), K_(meta), K_(file_handle), K_(dump_memtable_timestamp));

View File

@ -368,7 +368,7 @@ int ObSSTableRowWholeScanner::open_macro_block()
const ObMicroBlockIndexMgr* index_mgr = NULL;
const char* buf = NULL;
if (OB_FAIL(scan_handle->macro_io_handle_.wait(io_timeout_ms))) {
LOG_ERROR("failed to read macro block from io", K(ret), K(io_timeout_ms));
LOG_WARN("failed to read macro block from io", K(ret), K(io_timeout_ms));
} else if (OB_ISNULL(buf = scan_handle->macro_io_handle_.get_buffer())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, the buffer must not be NULL", K(ret));

View File

@ -375,6 +375,7 @@ public:
MOCK_CONST_METHOD1(get_meta_block_list, int(common::ObIArray<blocksstable::MacroBlockId>& meta_block_list));
MOCK_METHOD1(get_all_tables, int(ObTablesHandle& tables_handle));
MOCK_METHOD2(recycle_unused_sstables, int(const int64_t max_recycle_cnt, int64_t& recycled_cnt));
MOCK_METHOD1(recycle_sstable, int(const ObITable::TableKey &table_key));
MOCK_METHOD1(check_can_free, int(bool& can_free));
MOCK_METHOD0(clear, void());
MOCK_METHOD1(get_merge_log_ts, int(int64_t& merge_log_ts));