read msd from frozen memtables in mini merge procedure
This commit is contained in:
committed by
wangzelin.wzl
parent
b3d436156c
commit
6ff3c39b49
@ -82,7 +82,14 @@ void ObTabletAutoincSeq::reset()
|
||||
|
||||
bool ObTabletAutoincSeq::is_valid() const
|
||||
{
|
||||
return true;
|
||||
bool valid = true;
|
||||
|
||||
if (intervals_.empty()) {
|
||||
valid = false;
|
||||
}
|
||||
// TODO(shuangcan.yjw): verify elemetns in array
|
||||
|
||||
return valid;
|
||||
}
|
||||
|
||||
int ObTabletAutoincSeq::get_autoinc_seq_value(uint64_t &autoinc_seq)
|
||||
|
||||
@ -76,7 +76,7 @@ struct ObTabletAutoincInterval final
|
||||
public:
|
||||
ObTabletAutoincInterval()
|
||||
: tablet_id_(), start_(0), end_(0) {}
|
||||
bool is_valid() const { return tablet_id_.is_valid(); }
|
||||
bool is_valid() const { return tablet_id_.is_valid() && end_ >= start_; }
|
||||
void reset()
|
||||
{
|
||||
tablet_id_.reset();
|
||||
|
||||
@ -32,8 +32,10 @@
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace blocksstable;
|
||||
using namespace common;
|
||||
using namespace storage;
|
||||
using namespace memtable;
|
||||
using namespace blocksstable;
|
||||
namespace compaction
|
||||
{
|
||||
|
||||
@ -977,7 +979,9 @@ int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
|
||||
if (ctx.param_.tablet_id_.is_special_merge_tablet()) {
|
||||
param.multi_version_start_ = 1;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
// for mini merge, read all msd from frozen memtable
|
||||
if (ctx.param_.is_mini_merge() && OB_FAIL(read_msd_from_memtable(ctx, param))) {
|
||||
LOG_WARN("failed to read msd from memtable", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(ctx.ls_handle_.get_ls()->update_tablet_table_store(
|
||||
ctx.param_.tablet_id_, param, new_tablet_handle))) {
|
||||
LOG_WARN("failed to update tablet table store", K(ret), K(param));
|
||||
@ -1011,6 +1015,69 @@ int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeFinishTask::read_msd_from_memtable(ObTabletMergeCtx &ctx, ObUpdateTableStoreParam ¶m)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(traverse_all_memtables(ctx, ¶m.tx_data_, MultiSourceDataUnitType::TABLET_TX_DATA))) {
|
||||
LOG_WARN("failed to read tx data from memtables", K(ret));
|
||||
} else if (OB_FAIL(traverse_all_memtables(ctx, ¶m.binding_info_, MultiSourceDataUnitType::TABLET_BINDING_INFO))) {
|
||||
LOG_WARN("failed to read tx data from memtables", K(ret));
|
||||
} else if (OB_FAIL(traverse_all_memtables(ctx, ¶m.auto_inc_seq_, MultiSourceDataUnitType::TABLET_SEQ))) {
|
||||
LOG_WARN("failed to read tx data from memtables", K(ret));
|
||||
} else {
|
||||
LOG_INFO("succeeded to read msd from memtable", K(ret),
|
||||
"ls_id", ctx.param_.ls_id_,
|
||||
"tablet_id", ctx.param_.tablet_id_,
|
||||
"tx_data", param.tx_data_,
|
||||
"binding_info", param.binding_info_,
|
||||
"auto_inc_seq", param.auto_inc_seq_);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeFinishTask::traverse_all_memtables(
|
||||
ObTabletMergeCtx &ctx,
|
||||
ObIMultiSourceDataUnit *msd,
|
||||
const MultiSourceDataUnitType &type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObIArray<ObITable*> &tables = ctx.tables_handle_.get_tables();
|
||||
ObITable *table = nullptr;
|
||||
ObMemtable *memtable = nullptr;
|
||||
|
||||
if (OB_ISNULL(msd)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret));
|
||||
}
|
||||
|
||||
for (int64_t i = tables.count() - 1; OB_SUCC(ret) && i >= 0; --i) {
|
||||
if (OB_ISNULL(table = tables.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is null", K(ret), K(tables), KP(table));
|
||||
} else if (OB_UNLIKELY(!table->is_memtable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is not memtable", K(ret), K(tables), KPC(table));
|
||||
} else if (OB_UNLIKELY(!table->is_frozen_memtable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is not frozen memtable", K(ret), K(tables), KPC(table));
|
||||
} else if (table->is_data_memtable()) {
|
||||
memtable = static_cast<ObMemtable*>(table);
|
||||
if (memtable->has_multi_source_data_unit(type)) {
|
||||
if (OB_FAIL(memtable->get_multi_source_data_unit(msd, nullptr/*allocator*/))) {
|
||||
LOG_WARN("failed to get msd from memtable", K(ret), K(type));
|
||||
} else {
|
||||
// succeeded to get msd, just break
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletMergeFinishTask::try_schedule_compaction_after_mini(
|
||||
ObTabletMergeCtx &ctx,
|
||||
ObTabletHandle &tablet_handle)
|
||||
|
||||
@ -35,6 +35,13 @@ class ObITable;
|
||||
struct ObGetMergeTablesResult;
|
||||
class ObTablet;
|
||||
class ObTabletHandle;
|
||||
struct ObUpdateTableStoreParam;
|
||||
}
|
||||
|
||||
namespace memtable
|
||||
{
|
||||
enum class MultiSourceDataUnitType;
|
||||
class ObIMultiSourceDataUnit;
|
||||
}
|
||||
|
||||
namespace blocksstable
|
||||
@ -145,6 +152,8 @@ private:
|
||||
int get_merged_sstable(ObTabletMergeCtx &ctx, blocksstable::ObSSTable *&sstable);
|
||||
int add_sstable_for_merge(ObTabletMergeCtx &ctx);
|
||||
int try_schedule_compaction_after_mini(ObTabletMergeCtx &ctx, storage::ObTabletHandle &tablet_handle);
|
||||
int read_msd_from_memtable(ObTabletMergeCtx &ctx, storage::ObUpdateTableStoreParam ¶m);
|
||||
int traverse_all_memtables(ObTabletMergeCtx &ctx, memtable::ObIMultiSourceDataUnit *msd, const memtable::MultiSourceDataUnitType &type);
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObBasicTabletMergeDag *merge_dag_;
|
||||
|
||||
@ -1022,19 +1022,15 @@ int ObLSTabletService::update_tablet_table_store(
|
||||
time_guard.click("GetOld");
|
||||
ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*);
|
||||
ObTablet *old_tablet = old_tablet_handle.get_obj();
|
||||
ObTabletTxMultiSourceDataUnit tx_data;
|
||||
ObTabletBindingInfo ddl_data;
|
||||
const ObTabletTxMultiSourceDataUnit *tx_data = nullptr;
|
||||
const ObTabletBindingInfo *binding_info = nullptr;
|
||||
const ObTabletAutoincSeq *auto_inc_seq = nullptr;
|
||||
ObMetaDiskAddr disk_addr;
|
||||
ObTabletAutoincSeq autoinc_seq;
|
||||
|
||||
if (OB_FAIL(old_tablet->get_tx_data(tx_data))) {
|
||||
LOG_WARN("failed to get tx data from old tablet", K(ret), KPC(old_tablet));
|
||||
} else if (OB_FAIL(old_tablet->get_ddl_data(ddl_data))) {
|
||||
LOG_WARN("failed to get ddl data from old tablet", K(ret), KPC(old_tablet));
|
||||
} else if (OB_FAIL(old_tablet->get_latest_autoinc_seq(autoinc_seq))) {
|
||||
LOG_WARN("failed to get autoinc seq from old tablet", K(ret));
|
||||
} else if (OB_FAIL(new_tablet->init(param, *old_tablet, tx_data, ddl_data, autoinc_seq))) {
|
||||
LOG_WARN("failed to init tablet", K(ret), K(param), KPC(old_tablet), K(tx_data), K(ddl_data), K(autoinc_seq));
|
||||
if (OB_FAIL(choose_msd(param, *old_tablet, tx_data, binding_info, auto_inc_seq))) {
|
||||
LOG_WARN("failed to choose msd", K(ret), K(param), KPC(old_tablet));
|
||||
} else if (OB_FAIL(new_tablet->init(param, *old_tablet, *tx_data, *binding_info, *auto_inc_seq))) {
|
||||
LOG_WARN("failed to init tablet", K(ret), K(param), KPC(old_tablet), KPC(tx_data), KPC(binding_info), KPC(auto_inc_seq));
|
||||
} else if (FALSE_IT(time_guard.click("InitNew"))) {
|
||||
} else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(new_tablet_handle, disk_addr))) {
|
||||
LOG_WARN("fail to write update tablet slog", K(ret), K(new_tablet_handle), K(disk_addr));
|
||||
@ -1053,6 +1049,32 @@ int ObLSTabletService::update_tablet_table_store(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTabletService::choose_msd(
|
||||
const ObUpdateTableStoreParam ¶m,
|
||||
const ObTablet &old_tablet,
|
||||
const ObTabletTxMultiSourceDataUnit *&tx_data,
|
||||
const ObTabletBindingInfo *&binding_info,
|
||||
const share::ObTabletAutoincSeq *&auto_inc_seq)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTabletMeta &old_tablet_meta = old_tablet.get_tablet_meta();
|
||||
tx_data = ¶m.tx_data_;
|
||||
binding_info = ¶m.binding_info_;
|
||||
auto_inc_seq = ¶m.auto_inc_seq_;
|
||||
|
||||
if (!tx_data->is_valid()) {
|
||||
tx_data = &old_tablet_meta.tx_data_;
|
||||
}
|
||||
if (!binding_info->is_valid()) {
|
||||
binding_info = &old_tablet_meta.ddl_data_;
|
||||
}
|
||||
if (!auto_inc_seq->is_valid()) {
|
||||
auto_inc_seq = &old_tablet_meta.autoinc_seq_;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTabletService::update_tablet_report_status(const common::ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1356,12 +1378,15 @@ int ObLSTabletService::try_pin_tablet_if_needed(const ObTabletHandle &tablet_han
|
||||
ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*);
|
||||
ObTablet *tablet = tablet_handle.get_obj();
|
||||
ObTabletTxMultiSourceDataUnit tx_data;
|
||||
bool exist_on_memtable = false;
|
||||
|
||||
if (OB_ISNULL(tablet)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, tablet is null", K(ret), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet->get_tx_data(tx_data))) {
|
||||
} else if (OB_FAIL(tablet->inner_get_tx_data(tx_data, exist_on_memtable))) {
|
||||
LOG_WARN("failed to get tx data", K(ret), KPC(tablet));
|
||||
} else if (!tx_data.is_valid()) {
|
||||
// tablet is not valid, do nothing
|
||||
} else if (!tx_data.is_in_tx()) {
|
||||
// tablet not in tx, do nothing
|
||||
} else {
|
||||
|
||||
@ -497,6 +497,12 @@ private:
|
||||
const ObMigrationTabletParam &mig_tablet_param,
|
||||
ObTabletHandle &handle);
|
||||
int delete_all_tablets();
|
||||
int choose_msd(
|
||||
const ObUpdateTableStoreParam ¶m,
|
||||
const ObTablet &old_tablet,
|
||||
const ObTabletTxMultiSourceDataUnit *&tx_data,
|
||||
const ObTabletBindingInfo *&binding_info,
|
||||
const share::ObTabletAutoincSeq *&auto_inc_seq);
|
||||
static int build_create_sstable_param_for_migration(
|
||||
const blocksstable::ObMigrationSSTableParam &migrate_sstable_param,
|
||||
ObTabletCreateSSTableParam &create_sstable_param);
|
||||
|
||||
@ -239,18 +239,18 @@ int ObMetaPointerMap<Key, T>::erase(const Key &key)
|
||||
ret = common::OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid argument", K(ret), K(key));
|
||||
} else {
|
||||
common::ObBucketHashWLockGuard lock_guard(ResourceMap::bucket_lock_, ResourceMap::hash_func_(key));
|
||||
if (OB_FAIL(ResourceMap::map_.get_refactored(key, ptr))) {
|
||||
STORAGE_LOG(WARN, "fail to get from map", K(ret));
|
||||
} else if (OB_FAIL(ResourceMap::map_.erase_refactored(key))) {
|
||||
STORAGE_LOG(WARN, "fail to erase from map", K(ret));
|
||||
} else {
|
||||
ObMetaPointer<T> *value = ptr->get_value_ptr();
|
||||
value->reset_obj();
|
||||
if (OB_FAIL(ResourceMap::dec_handle_ref(ptr))) {
|
||||
STORAGE_LOG(WARN, "fail to dec handle ref", K(ret));
|
||||
}
|
||||
}
|
||||
common::ObBucketHashWLockGuard lock_guard(ResourceMap::bucket_lock_, ResourceMap::hash_func_(key));
|
||||
if (OB_FAIL(ResourceMap::map_.get_refactored(key, ptr))) {
|
||||
STORAGE_LOG(WARN, "fail to get from map", K(ret));
|
||||
} else if (OB_FAIL(ResourceMap::map_.erase_refactored(key))) {
|
||||
STORAGE_LOG(WARN, "fail to erase from map", K(ret));
|
||||
} else {
|
||||
ObMetaPointer<T> *value = ptr->get_value_ptr();
|
||||
value->reset_obj();
|
||||
if (OB_FAIL(ResourceMap::dec_handle_ref(ptr))) {
|
||||
STORAGE_LOG(WARN, "fail to dec handle ref", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -221,7 +221,10 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
need_check_sstable_(false),
|
||||
ddl_checkpoint_ts_(0),
|
||||
ddl_start_log_ts_(0),
|
||||
ddl_snapshot_version_(0)
|
||||
ddl_snapshot_version_(0),
|
||||
tx_data_(),
|
||||
binding_info_(),
|
||||
auto_inc_seq_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -246,7 +249,10 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
need_check_sstable_(need_check_sstable),
|
||||
ddl_checkpoint_ts_(0),
|
||||
ddl_start_log_ts_(0),
|
||||
ddl_snapshot_version_(0)
|
||||
ddl_snapshot_version_(0),
|
||||
tx_data_(),
|
||||
binding_info_(),
|
||||
auto_inc_seq_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -270,7 +276,10 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
need_check_sstable_(false),
|
||||
ddl_checkpoint_ts_(0),
|
||||
ddl_start_log_ts_(0),
|
||||
ddl_snapshot_version_(0)
|
||||
ddl_snapshot_version_(0),
|
||||
tx_data_(),
|
||||
binding_info_(),
|
||||
auto_inc_seq_()
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@ -17,11 +17,14 @@
|
||||
#include "lib/ob_replica_define.h"
|
||||
#include "common/ob_store_range.h"
|
||||
#include "common/ob_member_list.h"
|
||||
#include "share/ob_tablet_autoincrement_param.h"
|
||||
#include "share/schema/ob_schema_struct.h"
|
||||
#include "share/schema/ob_table_schema.h"
|
||||
#include "storage/ob_i_table.h"
|
||||
#include "storage/ob_storage_schema.h"
|
||||
#include "storage/tablet/ob_tablet_table_store_flag.h"
|
||||
#include "storage/tablet/ob_tablet_multi_source_data.h"
|
||||
#include "storage/tablet/ob_tablet_binding_helper.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -315,7 +318,8 @@ struct ObUpdateTableStoreParam
|
||||
bool is_valid() const;
|
||||
TO_STRING_KV(K_(table_handle), K_(snapshot_version), K_(clog_checkpoint_ts), K_(multi_version_start),
|
||||
K_(keep_old_ddl_sstable), K_(need_report), KPC_(storage_schema), K_(rebuild_seq), K_(update_with_major_flag),
|
||||
K_(need_check_sstable), K_(ddl_checkpoint_ts), K_(ddl_start_log_ts), K_(ddl_snapshot_version));
|
||||
K_(need_check_sstable), K_(ddl_checkpoint_ts), K_(ddl_start_log_ts), K_(ddl_snapshot_version),
|
||||
K_(tx_data), K_(binding_info), K_(auto_inc_seq));
|
||||
|
||||
ObTableHandleV2 table_handle_;
|
||||
int64_t snapshot_version_;
|
||||
@ -330,6 +334,11 @@ struct ObUpdateTableStoreParam
|
||||
int64_t ddl_checkpoint_ts_;
|
||||
int64_t ddl_start_log_ts_;
|
||||
int64_t ddl_snapshot_version_;
|
||||
|
||||
// msd
|
||||
ObTabletTxMultiSourceDataUnit tx_data_;
|
||||
ObTabletBindingInfo binding_info_;
|
||||
share::ObTabletAutoincSeq auto_inc_seq_;
|
||||
};
|
||||
|
||||
struct ObBatchUpdateTableStoreParam final
|
||||
|
||||
@ -239,7 +239,7 @@ int ObTablet::init(
|
||||
set_next_tablet_guard(old_tablet.next_tablet_guard_);
|
||||
}
|
||||
is_inited_ = true;
|
||||
LOG_INFO("succeeded to init tablet", K(ret), K(param), K(old_tablet), K(tx_data), K(autoinc_seq));
|
||||
LOG_INFO("succeeded to init tablet", K(ret), K(param), K(old_tablet), K(tx_data), K(ddl_data), K(autoinc_seq), KPC(this));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && param.need_report_ && param.table_handle_.get_table()->is_major_sstable()) {
|
||||
@ -257,7 +257,6 @@ int ObTablet::init(
|
||||
reset();
|
||||
}
|
||||
|
||||
LOG_INFO("Update tablet info", K(ret), K(param), K(old_tablet), KPC(this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -39,21 +39,40 @@ namespace storage
|
||||
{
|
||||
|
||||
ObTabletBindingInfo::ObTabletBindingInfo()
|
||||
: redefined_(false),
|
||||
snapshot_version_(INT64_MAX),
|
||||
schema_version_(INT64_MAX),
|
||||
data_tablet_id_(),
|
||||
hidden_tablet_ids_(),
|
||||
lob_meta_tablet_id_(),
|
||||
lob_piece_tablet_id_()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
void ObTabletBindingInfo::reset()
|
||||
{
|
||||
redefined_ = false;
|
||||
snapshot_version_ = OB_INVALID_VERSION;
|
||||
schema_version_ = OB_INVALID_VERSION;
|
||||
snapshot_version_ = INT64_MAX;
|
||||
schema_version_ = INT64_MAX;
|
||||
data_tablet_id_.reset();
|
||||
hidden_tablet_ids_.reset();
|
||||
lob_meta_tablet_id_.reset();
|
||||
lob_piece_tablet_id_.reset();
|
||||
}
|
||||
|
||||
bool ObTabletBindingInfo::is_valid() const
|
||||
{
|
||||
bool valid = true;
|
||||
|
||||
if (INT64_MAX == snapshot_version_) {
|
||||
valid = false;
|
||||
} else if (INT64_MAX == schema_version_) {
|
||||
valid = false;
|
||||
}
|
||||
|
||||
return valid;
|
||||
}
|
||||
|
||||
int ObTabletBindingInfo::assign(const ObTabletBindingInfo &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -61,7 +61,7 @@ public:
|
||||
|
||||
virtual int deep_copy(const memtable::ObIMultiSourceDataUnit *src, ObIAllocator *allocator = nullptr) override;
|
||||
virtual void reset() override;
|
||||
virtual bool is_valid() const override { return true; }
|
||||
virtual bool is_valid() const override;
|
||||
virtual int64_t get_data_size() const override { return sizeof(ObTabletBindingInfo); }
|
||||
virtual memtable::MultiSourceDataUnitType type() const override { return memtable::MultiSourceDataUnitType::TABLET_BINDING_INFO; }
|
||||
|
||||
|
||||
@ -997,11 +997,6 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets(
|
||||
|
||||
if (OB_FAIL(ObTabletBindingHelper::lock_and_set_tx_data(tablet_handle, tx_data, trans_flags.for_replay_))) {
|
||||
LOG_WARN("failed to lock tablet binding", K(ret), K(key), K(tx_data));
|
||||
// TODO(bowen.gbw): temproarily swallow 4200 error while prepare remove tablets
|
||||
if (trans_flags.for_replay_ && OB_HASH_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
LOG_INFO("swallow 4200 error", K(ret), K(key), K(tx_data), K(trans_flags));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,6 +110,8 @@ int ObTabletMeta::init(
|
||||
report_status_.data_checksum_ = 0;
|
||||
report_status_.row_count_ = 0;
|
||||
|
||||
ddl_data_.snapshot_version_ = OB_INVALID_VERSION;
|
||||
ddl_data_.schema_version_ = OB_INVALID_VERSION;
|
||||
ddl_data_.lob_meta_tablet_id_ = lob_meta_tablet_id;
|
||||
ddl_data_.lob_piece_tablet_id_ = lob_piece_tablet_id;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user