fix lob macro block logic id duplicate after direct load.

This commit is contained in:
obdev
2024-04-02 04:24:02 +00:00
committed by ob-robot
parent 7485e7e275
commit 46511938dc
8 changed files with 139 additions and 44 deletions

View File

@ -49,14 +49,22 @@ int ObTabletDirectLoadInsertParam::assign(const ObTabletDirectLoadInsertParam &o
return ret;
}
ObDDLInsertRowIterator::ObDDLInsertRowIterator(
sql::ObPxMultiPartSSTableInsertOp *op,
const bool is_slice_empty, const share::ObLSID &ls_id, const common::ObTabletID &tablet_id,
const int64_t rowkey_cnt, const int64_t snapshot_version, const int64_t context_id, const int64_t parallel_idx)
: lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), op_(op), ls_id_(ls_id), current_tablet_id_(tablet_id), current_row_(), is_next_row_cached_(true),
is_slice_empty_(is_slice_empty), rowkey_count_(rowkey_cnt), snapshot_version_(snapshot_version), lob_slice_id_(0), context_id_(context_id), parallel_idx_(parallel_idx)
ObDDLInsertRowIterator::ObDDLInsertRowIterator()
: is_inited_(false),
lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
op_(nullptr),
ls_id_(),
current_tablet_id_(),
current_row_(),
is_next_row_cached_(true),
is_slice_empty_(false),
rowkey_count_(-1),
snapshot_version_(-1),
lob_slice_id_(-1),
lob_id_cache_(),
context_id_(-1),
macro_seq_()
{
lob_id_cache_.set(1/*start*/, 0/*end*/);
}
ObDDLInsertRowIterator::~ObDDLInsertRowIterator()
@ -64,10 +72,48 @@ ObDDLInsertRowIterator::~ObDDLInsertRowIterator()
}
int ObDDLInsertRowIterator::init(
sql::ObPxMultiPartSSTableInsertOp *op,
const bool is_slice_empty,
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const int64_t rowkey_cnt,
const int64_t snapshot_version,
const int64_t context_id,
const int64_t parallel_idx)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || rowkey_cnt < 0
|| snapshot_version < 0 || context_id < 0 || parallel_idx < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(ls_id), K(tablet_id), K(rowkey_cnt), K(snapshot_version), K(context_id), K(parallel_idx));
} else if (OB_FAIL(macro_seq_.set_parallel_degree(parallel_idx))) {
LOG_WARN("set parallel failed", K(ret), K(parallel_idx));
} else {
op_ = op;
is_slice_empty_ = is_slice_empty;
ls_id_ = ls_id;
current_tablet_id_ = tablet_id;
rowkey_count_ = rowkey_cnt;
snapshot_version_ = snapshot_version;
context_id_ = context_id;
is_next_row_cached_ = true;
lob_id_cache_.set(1/*start*/, 0/*end*/);
is_inited_ = true;
}
return ret;
}
int ObDDLInsertRowIterator::close_lob_sstable_slice()
{
int ret = OB_SUCCESS;
if (lob_slice_id_ > 0) {
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (lob_slice_id_ > 0) {
ObDirectLoadSliceInfo slice_info;
slice_info.is_full_direct_load_ = true;
slice_info.is_lob_slice_ = true;
@ -76,7 +122,8 @@ int ObDDLInsertRowIterator::close_lob_sstable_slice()
slice_info.slice_id_ = lob_slice_id_;
slice_info.context_id_ = context_id_;
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
if (OB_FAIL(tenant_direct_load_mgr->close_sstable_slice(slice_info))) {
ObMacroDataSeq unused_seq;
if (OB_FAIL(tenant_direct_load_mgr->close_sstable_slice(slice_info, nullptr/*insert_monitor*/, unused_seq))) {
LOG_WARN("close sstable slice failed", K(ret), K(slice_info));
} else {
lob_slice_id_ = 0;
@ -91,7 +138,10 @@ int ObDDLInsertRowIterator::get_next_row(
{
int ret = OB_SUCCESS;
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
if (OB_UNLIKELY(nullptr == op_ || snapshot_version_ <= 0 || nullptr == tenant_direct_load_mgr)) {
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(nullptr == op_ || snapshot_version_ <= 0 || nullptr == tenant_direct_load_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator is null", K(ret), KP(op_), K(snapshot_version_), KP(tenant_direct_load_mgr), K(MTL_ID()));
} else {
@ -225,12 +275,19 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice()
slice_info.data_tablet_id_ = current_tablet_id_;
slice_info.slice_id_ = lob_slice_id_;
slice_info.context_id_ = context_id_;
ObMacroDataSeq block_start_seq;
ObTabletAutoincrementService &auto_inc = ObTabletAutoincrementService::get_instance();
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
ObTabletDirectLoadMgrHandle direct_load_mgr_handle;
direct_load_mgr_handle.reset();
if (OB_ISNULL(tenant_direct_load_mgr)) {
int64_t CACHE_SIZE_REQUESTED = AUTO_INC_CACHE_SIZE;
#ifdef ERRSIM
int64_t negative_inject_num = OB_E(EventTable::EN_DDL_LOBID_CACHE_SIZE_INJECTED) OB_SUCCESS;
CACHE_SIZE_REQUESTED = negative_inject_num * -1;
#endif
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_ISNULL(tenant_direct_load_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), K(MTL_ID()));
} else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr(current_tablet_id_,
@ -239,22 +296,20 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice()
} else if (OB_FALSE_IT(lob_id_cache_.tablet_id_ =
direct_load_mgr_handle.get_obj()->get_lob_meta_tablet_id())) {
// fetch cache via lob meta tablet id.
} else if (OB_FALSE_IT(lob_id_cache_.cache_size_ = AUTO_INC_CACHE_SIZE)) {
} else if (lob_slice_id_ > 0 &&
OB_FAIL(tenant_direct_load_mgr->close_sstable_slice(slice_info))) {
} else if (OB_FALSE_IT(lob_id_cache_.cache_size_ = CACHE_SIZE_REQUESTED)) {
} else if (lob_slice_id_ > 0
&& OB_FAIL(tenant_direct_load_mgr->close_sstable_slice(slice_info, nullptr/*insert_monitor*/, macro_seq_))) {
LOG_WARN("close old lob slice failed", K(ret), K(slice_info));
} else if (OB_FAIL(auto_inc.get_tablet_cache_interval(MTL_ID(), lob_id_cache_))) {
LOG_WARN("get_autoinc_seq fail", K(ret), K(MTL_ID()), K(slice_info));
} else if (OB_UNLIKELY(AUTO_INC_CACHE_SIZE > lob_id_cache_.count())) {
} else if (OB_UNLIKELY(CACHE_SIZE_REQUESTED > lob_id_cache_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected autoincrement value count", K(ret), K(lob_id_cache_));
} else if (OB_FAIL(block_start_seq.set_parallel_degree(parallel_idx_))) {
LOG_WARN("set parall degree failed", K(ret), K(parallel_idx_));
} else {
// new slice info to open.
slice_info.slice_id_ = 0;
if (OB_FAIL(tenant_direct_load_mgr->open_sstable_slice(block_start_seq, slice_info))) {
LOG_WARN("open lob sstable slice failed", KR(ret), K(block_start_seq), K(slice_info));
if (OB_FAIL(tenant_direct_load_mgr->open_sstable_slice(macro_seq_, slice_info))) {
LOG_WARN("open lob sstable slice failed", KR(ret), K(macro_seq_), K(slice_info));
} else {
lob_slice_id_ = slice_info.slice_id_;
}