ObTablet Compression

This commit is contained in:
obdev
2023-08-03 03:54:03 +00:00
committed by ob-robot
parent 45f0c463a3
commit cb81c7d64c
16 changed files with 251 additions and 129 deletions

View File

@ -115,6 +115,7 @@ ObTablet::ObTablet()
: version_(TABLET_VERSION_V2),
length_(0),
wash_score_(INT64_MIN),
mds_data_(),
ref_cnt_(0),
next_tablet_guard_(),
tablet_meta_(),
@ -122,25 +123,24 @@ ObTablet::ObTablet()
table_store_addr_(),
storage_schema_addr_(),
memtable_count_(0),
ddl_kvs_(),
ddl_kvs_(nullptr),
ddl_kv_count_(0),
pointer_hdl_(),
next_full_tablet_guard_(),
allocator_(nullptr),
tablet_addr_(),
allocator_(nullptr),
memtables_lock_(),
memtable_mgr_(nullptr),
log_handler_(nullptr),
mds_data_(),
memtables_lock_(),
mds_cache_lock_(),
tablet_status_cache_(),
ddl_data_cache_(),
next_tablet_(nullptr),
hold_ref_cnt_(false),
is_inited_(false)
is_inited_(false),
mds_cache_lock_(),
tablet_status_cache_(),
ddl_data_cache_()
{
#if defined(__x86_64__)
static_assert(sizeof(ObTablet) + sizeof(ObRowkeyReadInfo) <= 6000, "The size of ObTablet will affect the meta memory manager, and the necessity of adding new fields needs to be considered.");
static_assert(sizeof(ObTablet) + sizeof(ObRowkeyReadInfo) == 1632, "The size of ObTablet will affect the meta memory manager, and the necessity of adding new fields needs to be considered.");
#endif
MEMSET(memtables_, 0x0, sizeof(memtables_));
}
@ -197,6 +197,8 @@ int ObTablet::init(
{
int ret = OB_SUCCESS;
const int64_t default_max_sync_medium_scn = 0;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -226,9 +228,11 @@ int ObTablet::init(
K(create_scn), K(snapshot_version), K(compat_mode), K(store_flag));
} else if (is_ls_inner_tablet() && OB_FAIL(inner_create_memtable())) {
LOG_WARN("failed to create first memtable", K(ret), K(tablet_id));
} else if (OB_FAIL(pull_memtables())) {
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
ALLOC_AND_INIT(allocator, table_store_addr_, (*this), sstable);
ALLOC_AND_INIT(allocator, storage_schema_addr_, table_schema, compat_mode,
true/*skip_column_info*/, ObStorageSchema::STORAGE_SCHEMA_VERSION_V2);
@ -273,6 +277,8 @@ int ObTablet::init(
const ObTabletMdsData &old_mds_data = old_tablet.mds_data_;
const bool update_in_major_type_merge = param.need_report_ && param.sstable_->is_major_sstable();
int64_t finish_medium_scn = 0;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -305,9 +311,11 @@ int ObTablet::init(
LOG_WARN("failed to fetch old table store", K(ret), K(old_tablet));
} else if (OB_FAIL(old_table_store_wrapper.get_member(old_table_store))) {
LOG_WARN("failed to get old table store", K(ret));
} else if (OB_FAIL(pull_memtables())) {
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("failed to pull memtable", K(ret));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
ALLOC_AND_INIT(allocator, table_store_addr_, (*this), param, (*old_table_store));
}
@ -373,6 +381,8 @@ int ObTablet::init(
const ObTabletTableStore *old_table_store = nullptr;
const ObStorageSchema *old_storage_schema = nullptr;
int64_t finish_medium_scn = 0;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -390,7 +400,7 @@ int ObTablet::init(
LOG_WARN("failed to load storage schema", K(ret), K(old_tablet));
} else if (CLICK_FAIL(tablet_meta_.init(old_tablet.tablet_meta_, flush_scn))) {
LOG_WARN("failed to init tablet meta", K(ret), K(old_tablet), K(flush_scn));
} else if (CLICK_FAIL(pull_memtables())) {
} else if (CLICK_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret));
} else if (CLICK_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, table_store_addr_.ptr_))) {
LOG_WARN("fail to alloc and new table store object", K(ret), K_(table_store_addr));
@ -401,6 +411,8 @@ int ObTablet::init(
} else if (CLICK_FAIL(mds_data_.init(allocator, mds_table_data, base_data, finish_medium_scn))) {
LOG_WARN("failed to init mds data", K(ret), K(finish_medium_scn));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
ALLOC_AND_INIT(allocator, storage_schema_addr_, *old_storage_schema);
}
@ -435,6 +447,9 @@ int ObTablet::init(
const share::ObLSID &ls_id = param.ls_id_;
const common::ObTabletID &tablet_id = param.tablet_id_;
allocator_ = &allocator;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret), K(is_inited_));
@ -479,9 +494,11 @@ int ObTablet::init(
} else {
if (OB_FAIL(mds_data_.init(*allocator_, param.mds_data_))) {
LOG_WARN("failed to assign mds data", K(ret), K(param));
} else if (OB_FAIL(pull_memtables())) {
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
ALLOC_AND_INIT(allocator, table_store_addr_, (*this), nullptr/*ObTableHandleV2*/);
ALLOC_AND_INIT(allocator, storage_schema_addr_, param.storage_schema_);
}
@ -521,6 +538,8 @@ int ObTablet::init(
const ObStorageSchema *old_storage_schema = nullptr;
const ObTabletMdsData &old_mds_data = old_tablet.mds_data_;
allocator_ = &allocator;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -544,13 +563,15 @@ int ObTablet::init(
old_tablet.get_multi_version_start(),
old_tablet.tablet_meta_.max_sync_storage_schema_version_))) {
LOG_WARN("fail to init tablet_meta", K(ret), K(old_tablet.tablet_meta_));
} else if (OB_FAIL(pull_memtables())) {
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret));
} else if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, table_store_addr_.ptr_))) {
LOG_WARN("fail to alloc and new table store object", K(ret), K_(table_store_addr));
} else if (OB_FAIL(table_store_addr_.get_ptr()->init(*allocator_, *this, tables, *old_table_store))) {
LOG_WARN("fail to init table store", K(ret), K(old_tablet), K(tables));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
ALLOC_AND_INIT(allocator, storage_schema_addr_, *old_storage_schema);
}
@ -597,6 +618,8 @@ int ObTablet::init(
const ObStorageSchema *old_storage_schema = nullptr;
const ObStorageSchema *storage_schema = nullptr;
int64_t finish_medium_scn = 0;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -623,7 +646,7 @@ int ObTablet::init(
// use max schema to make sure sstable and schema match
))) {
LOG_WARN("failed to init tablet meta", K(ret), K(old_tablet), K(param));
} else if (OB_FAIL(pull_memtables())){
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))){
LOG_WARN("fail to pull memtable", K(ret));
} else if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, table_store_addr_.ptr_))) {
LOG_WARN("fail to alloc and new table store object", K(ret), K_(table_store_addr));
@ -655,6 +678,8 @@ int ObTablet::init(
} else if (OB_FAIL(inner_inc_macro_ref_cnt())) {
LOG_WARN("failed to increase macro ref cnt", K(ret));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
if (old_tablet.get_tablet_meta().has_next_tablet_) {
set_next_tablet_guard(old_tablet.next_tablet_guard_);
}
@ -856,8 +881,13 @@ int ObTablet::init_with_update_medium_info(
|| OB_ISNULL(log_handler_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet pointer handle is invalid", K(ret), K_(pointer_hdl), K_(pointer_hdl), K_(memtable_mgr), K_(log_handler));
} else if (OB_FAIL(assign_memtables(old_tablet))) {
} else if (OB_FAIL(assign_memtables(old_tablet.memtables_, old_tablet.memtable_count_))) {
LOG_WARN("fail to assign memtables", K(ret));
} else if (OB_ISNULL(ddl_kvs_ = static_cast<ObITable**>(allocator.alloc(sizeof(ObITable*) * DDL_KV_ARRAY_SIZE)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for ddl_kvs_", K(ret), KP(ddl_kvs_));
} else if (OB_FAIL(assign_ddl_kvs(old_tablet.ddl_kvs_, old_tablet.ddl_kv_count_))) {
LOG_WARN("fail to assign ddl kvs", K(ret), KP(old_tablet.ddl_kvs_), K(old_tablet.ddl_kv_count_));
} else if (OB_FAIL(old_tablet.fetch_table_store(table_store_wrapper))) {
LOG_WARN("fail to fetch table store", K(ret), K(old_tablet));
} else if (OB_FAIL(table_store_wrapper.get_member(old_table_store))) {
@ -1175,6 +1205,8 @@ int ObTablet::load_deserialize_v1(
ObTabletTxMultiSourceDataUnit tx_data;
ObTabletBindingInfo ddl_data;
ObMediumCompactionInfoList info_list;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_FAIL(ObTabletObjLoadHelper::alloc_and_new(allocator, table_store_addr_.ptr_))) {
LOG_WARN("fail to allocate and new table store", K(ret));
@ -1207,9 +1239,11 @@ int ObTablet::load_deserialize_v1(
if (FAILEDx(build_read_info(allocator))) {
LOG_WARN("failed to build read info", K(ret));
} else if (OB_FAIL(pull_memtables())) {
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret), K(len), K(new_pos));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
set_mem_addr();
mds_data_.set_mem_addr();
version_ = TABLET_VERSION_V2;
@ -1323,6 +1357,8 @@ int ObTablet::load_deserialize_v2(
const bool prepare_memtable)
{
int ret = OB_SUCCESS;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (new_pos - pos < length_ && OB_FAIL(tablet_meta_.deserialize(buf, len, new_pos))) {
LOG_WARN("failed to deserialize tablet meta", K(ret), K(len), K(new_pos));
} else if (new_pos - pos < length_ && OB_FAIL(table_store_addr_.addr_.deserialize(buf, len, new_pos))) {
@ -1336,9 +1372,11 @@ int ObTablet::load_deserialize_v2(
LOG_WARN("fail to deserialize rowkey read info", K(ret), K(len), K(new_pos));
} else if (new_pos - pos < length_ && OB_FAIL(mds_data_.deserialize(buf, len, new_pos))) {
LOG_WARN("failed to deserialize mds data", K(ret), K(len), K(new_pos));
} else if (prepare_memtable && OB_FAIL(pull_memtables())) {
} else if (prepare_memtable && OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret), K(len), K(new_pos));
} else {
ddl_kvs_ = ddl_kvs_addr;
ddl_kv_count_ = ddl_kv_count;
if (!table_store_addr_.addr_.is_none()) {
IO_AND_DESERIALIZE(allocator, table_store_addr_.addr_, table_store_addr_.ptr_, *this);
if (FAILEDx(table_store_addr_.ptr_->batch_cache_sstable_meta(allocator, INT64_MAX))) {// cache all
@ -1363,6 +1401,8 @@ int ObTablet::deserialize(
int64_t remain = buf_header.buf_len_ - sizeof(ObTablet);
int64_t start_pos = sizeof(ObTablet);
ObArenaAllocator allocator;
ObITable **ddl_kvs_addr = nullptr;
int64_t ddl_kv_count = 0;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("cannot deserialize inited tablet meta", K(ret), K_(is_inited));
@ -1385,7 +1425,7 @@ int ObTablet::deserialize(
LOG_WARN("buffer's length is not enough", K(ret), K(length_), K(len - new_pos));
} else if (new_pos - pos < length_ && OB_FAIL(tablet_meta_.deserialize(buf, len, new_pos))) {
LOG_WARN("failed to deserialize tablet meta", K(ret), K(len), K(new_pos));
} else if (OB_FAIL(pull_memtables())) {
} else if (OB_FAIL(pull_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to pull memtable", K(ret), K(len), K(new_pos));
} else if (new_pos - pos < length_ && OB_FAIL(table_store_addr_.addr_.deserialize(buf, len, new_pos))) {
LOG_WARN("failed to deserialize table read info addr", K(ret), K(len), K(new_pos));
@ -1411,6 +1451,22 @@ int ObTablet::deserialize(
remain -= rowkey_read_info_->get_deep_copy_size();
start_pos += rowkey_read_info_->get_deep_copy_size();
}
} else {
if (OB_NOT_NULL(ddl_kvs_addr)) {
const int64_t ddl_kv_size = sizeof(ObITable*) * DDL_KV_ARRAY_SIZE;
if (remain < ddl_kv_size) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to deep copy ddl kv to tablet", K(ret), K(remain), K(ddl_kv_size), K(ddl_kv_count));
} else {
ddl_kvs_ = reinterpret_cast<ObITable**>(tablet_buf + start_pos);
if (OB_FAIL(assign_ddl_kvs(ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("fail to assign ddl_kvs", K(ret), KP(ddl_kvs_addr), K(ddl_kv_count), KP(tablet_buf), K(start_pos));
} else {
start_pos += ddl_kv_size;
remain -= ddl_kv_size;
}
}
}
}
if (OB_SUCC(ret)) {
@ -2485,7 +2541,7 @@ int ObTablet::get_all_sstables(ObTableStoreIterator &iter) const
int ObTablet::get_all_tables(ObTableStoreIterator &iter) const
{
int ret = OB_SUCCESS;
ObSEArray<storage::ObITable *, MEMTABLE_ARRAY_SIZE> memtables;
ObSEArray<storage::ObITable *, MAX_MEMSTORE_CNT> memtables;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K_(is_inited));
@ -2565,7 +2621,7 @@ int ObTablet::get_sstables_size(int64_t &used_size, const bool ignore_shared_blo
int ObTablet::get_memtables(common::ObIArray<storage::ObITable *> &memtables, const bool need_active) const
{
TCRLockGuard guard(memtables_lock_);
common::SpinRLockGuard guard(memtables_lock_);
return inner_get_memtables(memtables, need_active);
}
@ -3233,7 +3289,7 @@ int ObTablet::create_memtable(
const share::ObLSID &ls_id = tablet_meta_.ls_id_;
const common::ObTabletID &tablet_id = tablet_meta_.tablet_id_;
ObTimeGuard time_guard("ObTablet::create_memtable", 10 * 1000);
TCWLockGuard guard(memtables_lock_);
common::SpinWLockGuard guard(memtables_lock_);
time_guard.click("lock");
const SCN new_clog_checkpoint_scn = clog_checkpoint_scn.is_min() ? tablet_meta_.clog_checkpoint_scn_ : clog_checkpoint_scn;
@ -3292,7 +3348,7 @@ int ObTablet::inner_create_memtable(
if (OB_UNLIKELY(!clog_checkpoint_scn.is_valid_and_not_min()) || OB_UNLIKELY(schema_version < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(clog_checkpoint_scn), K(schema_version));
} else if (OB_UNLIKELY(ObTablet::MEMTABLE_ARRAY_SIZE == memtable_count_)) {
} else if (OB_UNLIKELY(MAX_MEMSTORE_CNT == memtable_count_)) {
ret = OB_MINOR_FREEZE_NOT_ALLOW;
if (TC_REACH_TIME_INTERVAL(1000 * 1000)) {
LOG_WARN("The memtable array in the tablet reaches the upper limit, and no more memtable can "
@ -4808,7 +4864,7 @@ int64_t ObTablet::to_string(char *buf, const int64_t buf_len) const
BUF_PRINTF("memtables");
J_COLON();
J_ARRAY_START();
for (int64_t i = 0; i < MEMTABLE_ARRAY_SIZE; ++i) {
for (int64_t i = 0; i < MAX_MEMSTORE_CNT; ++i) {
if (i > 0) {
J_COMMA();
}
@ -4846,12 +4902,12 @@ int ObTablet::refresh_memtable_and_update_seq(const uint64_t seq)
return ret;
}
int ObTablet::pull_memtables()
int ObTablet::pull_memtables(ObArenaAllocator &allocator, ObITable **&ddl_kvs_addr, int64_t &ddl_kv_count)
{
int ret = OB_SUCCESS;
if (OB_FAIL(pull_memtables_without_ddl())) {
LOG_WARN("fail to pull memtables without ddl", K(ret));
} else if (OB_FAIL(pull_ddl_memtables())) {
} else if (OB_FAIL(pull_ddl_memtables(allocator, ddl_kvs_addr, ddl_kv_count))) {
LOG_WARN("failed to pull ddl memtables", K(ret));
}
return ret;
@ -4973,27 +5029,6 @@ int ObTablet::update_memtables()
return ret;
}
int ObTablet::build_memtable()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(memtable_mgr_)) {
ret = OB_ERR_NULL_VALUE;
LOG_ERROR("failed to get all memtables from memtable_mgr", K(ret));
} else if (OB_SUCC(ret)) { // init sstables when first creating tablet
ObSEArray<ObTableHandleV2, MAX_MEMSTORE_CNT> memtable_handles;
if (memtable_mgr_->has_memtable()) {
if (OB_FAIL(memtable_mgr_->get_all_memtables(memtable_handles))) {
LOG_WARN("failed to get all memtables from memtable_mgr", K(ret));
} else if (OB_FAIL(build_memtable(memtable_handles))) {
LOG_WARN("failed to pull memtable array", K(ret));
} else {
LOG_INFO("success to pull memtables when first creating tablet", K(ret), K(*this));
}
}
}
return ret;
}
int ObTablet::inner_get_mds_table(mds::MdsTableHandle &mds_table, bool not_exist_create) const
{
int ret = OB_SUCCESS;
@ -5013,7 +5048,7 @@ int ObTablet::build_memtable(common::ObIArray<ObTableHandleV2> &handle_array, co
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(start_pos < 0 || start_pos >= handle_array.count() || handle_array.count() > MEMTABLE_ARRAY_SIZE)) {
if (OB_UNLIKELY(start_pos < 0 || start_pos >= handle_array.count() || handle_array.count() > MAX_MEMSTORE_CNT)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), K(start_pos), K(handle_array));
}
@ -5204,7 +5239,7 @@ int ObTablet::add_memtable(memtable::ObMemtable* const table)
if (OB_ISNULL(table)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid argument", K(ret), KPC(table));
} else if (MEMTABLE_ARRAY_SIZE == memtable_count_) {
} else if (MAX_MEMSTORE_CNT == memtable_count_) {
ret = OB_SIZE_OVERFLOW;
LOG_DEBUG("memtable is full", K(ret), KPC(table), K(memtable_count_));
} else {
@ -5234,32 +5269,48 @@ bool ObTablet::exist_memtable_with_end_scn(const ObITable *table, const SCN &end
return is_exist;
}
int ObTablet::assign_memtables(const ObTablet &other_tablet)
int ObTablet::assign_memtables(memtable::ObIMemtable * const * memtables, const int64_t memtable_count)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(other_tablet.memtables_)
|| OB_ISNULL(other_tablet.ddl_kvs_)
|| OB_UNLIKELY(other_tablet.memtable_count_ < 0 || 0 != memtable_count_)
|| OB_UNLIKELY(other_tablet.ddl_kv_count_ < 0 || 0 != ddl_kv_count_)) {
if (OB_ISNULL(memtables) || OB_UNLIKELY(memtable_count < 0 || 0 != memtable_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid other tablet memtable array", K(ret), K_(memtable_count), K_(ddl_kv_count), K(other_tablet));
LOG_WARN("invalid memtable argument", K(ret), KP(memtables), K(memtable_count));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < other_tablet.memtable_count_; ++i) {
memtable::ObIMemtable * memtable = other_tablet.memtables_[i];
MEMSET(memtables_, 0, sizeof(memtable::ObIMemtable*) * MAX_MEMSTORE_CNT);
// deep copy memtables to tablet.memtables_ and inc ref
for (int64_t i = 0; OB_SUCC(ret) && i < memtable_count; ++i) {
memtable::ObIMemtable * memtable = memtables[i];
if (OB_ISNULL(memtable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null memtable ptr", K(ret), K(i), K(other_tablet));
LOG_WARN("unexpected null memtable ptr", K(ret), K(i), KP(memtables));
} else {
memtables_[i] = memtable;
memtable->inc_ref();
++memtable_count_;
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < other_tablet.ddl_kv_count_; ++i) {
ObITable *ddl_kv = other_tablet.ddl_kvs_[i];
}
return ret;
}
int ObTablet::assign_ddl_kvs(ObITable * const *ddl_kvs, const int64_t ddl_kv_count)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_NOT_NULL(ddl_kvs) && ddl_kv_count == 0) ||
OB_UNLIKELY(OB_ISNULL(ddl_kvs) && ddl_kv_count > 0) ||
OB_UNLIKELY(ddl_kv_count < 0 || 0 != ddl_kv_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid ddl_kv argument", K(ret), KP(ddl_kvs), K(ddl_kv_count));
} else {
// deep copy ddl_kvs to tablet.ddl_kvs_ and inc ref
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_kv_count; ++i) {
ObITable *ddl_kv = ddl_kvs[i];
if (OB_ISNULL(ddl_kv)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ddl kv", K(ret), K(i), K(other_tablet));
LOG_WARN("unexpected ddl_kvs", K(ret), K(i), KP(ddl_kvs));
} else {
ddl_kvs_[i] = ddl_kv;
ddl_kv->inc_ref();
@ -5267,13 +5318,14 @@ int ObTablet::assign_memtables(const ObTablet &other_tablet)
}
}
}
return ret;
}
void ObTablet::reset_memtable()
{
ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*);
for(int i = 0; i < MEMTABLE_ARRAY_SIZE; ++i) {
for(int i = 0; i < MAX_MEMSTORE_CNT; ++i) {
if (OB_NOT_NULL(memtables_[i])) {
const ObITable::TableType table_type = memtables_[i]->get_key().table_type_;
const int64_t ref_cnt = memtables_[i]->dec_ref();
@ -5335,16 +5387,16 @@ int ObTablet::get_mds_table_handle_(mds::MdsTableHandle &handle,
return ret;
}
int ObTablet::pull_ddl_memtables()
int ObTablet::pull_ddl_memtables(ObArenaAllocator &allocator, ObITable **&ddl_kvs_addr, int64_t &ddl_kv_count)
{
int ret = OB_SUCCESS;
ObArray<ObITable *> ddl_memtables;
ObDDLKvMgrHandle kv_mgr_handle;
bool has_ddl_kv = false;
ObTablesHandleArray ddl_kvs_handle;
if (OB_UNLIKELY(0 != ddl_kv_count_)) {
if (OB_UNLIKELY(0 != ddl_kv_count)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ddl kv count when pull ddl memtables", K(ret), K_(ddl_kv_count), KPC(this));
LOG_WARN("unexpected ddl kv count when pull ddl memtables", K(ret), K(ddl_kv_count), KPC(this));
} else if (OB_FAIL(get_ddl_kv_mgr(kv_mgr_handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("get ddl kv mgr failed", K(ret), KPC(this));
@ -5355,6 +5407,14 @@ int ObTablet::pull_ddl_memtables()
} else if (OB_FAIL(kv_mgr_handle.get_obj()->get_ddl_kvs_for_query(*this, ddl_kvs_handle))) {
LOG_WARN("failed to get all ddl freeze kvs", K(ret));
} else {
ObITable *temp_ddl_kvs;
if (ddl_kvs_handle.get_count() > 0) {
ddl_kvs_addr = static_cast<ObITable**>(allocator.alloc(sizeof(ObITable*) * DDL_KV_ARRAY_SIZE));
if (OB_ISNULL(ddl_kvs_addr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for ddl_kvs_addr", K(ret), K(ddl_kvs_handle.get_count()));
}
}
SCN ddl_checkpoint_scn = get_tablet_meta().ddl_checkpoint_scn_;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_kvs_handle.get_count(); ++i) {
ObDDLKV *ddl_kv = static_cast<ObDDLKV *>(ddl_kvs_handle.get_table(i));
@ -5366,17 +5426,25 @@ int ObTablet::pull_ddl_memtables()
} else if (ddl_kv->get_freeze_scn() > ddl_checkpoint_scn) {
if (OB_FAIL(ddl_kv->prepare_sstable(false/*need_check*/))) {
LOG_WARN("prepare sstable failed", K(ret));
} else if (OB_UNLIKELY(ddl_kv_count_ >= DDL_KV_ARRAY_SIZE)) {
} else if (OB_UNLIKELY(ddl_kv_count >= DDL_KV_ARRAY_SIZE)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl kv count overflow", K(ret), K(i), K_(ddl_kv_count), K(ddl_kvs_handle));
LOG_WARN("ddl kv count overflow", K(ret), K(i), K(ddl_kv_count), K(ddl_kvs_handle));
} else {
ddl_kvs_[ddl_kv_count_] = ddl_kv;
ddl_kvs_addr[ddl_kv_count] = ddl_kv;
ddl_kv->inc_ref();
++ddl_kv_count_;
++ddl_kv_count;
}
}
}
}
if (ddl_kv_count == 0) {
// In the above for loop, ddl_kvs_addr's assignment can be skipped (e.g. ddl_kv->is_closed()).
ddl_kvs_addr = nullptr;
}
if (OB_SUCC(ret) && OB_ISNULL(ddl_kvs_addr) && ddl_kv_count > 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected value on ddl_kvs_addr and ddl_kv_count", KP(ddl_kvs_addr), K(ddl_kv_count));
}
return ret;
}