[BUGFIX] fix table restore import lob over diff tenant
This commit is contained in:
@ -1265,6 +1265,7 @@ int ObComplementWriteTask::append_lob(
|
||||
slice_info.data_tablet_id_ = param_->dest_tablet_id_;
|
||||
slice_info.slice_id_ = iterator.get_lob_slice_id();
|
||||
slice_info.context_id_ = context_->context_id_;
|
||||
slice_info.src_tenant_id_ = param_->orig_tenant_id_;
|
||||
if (OB_FAIL(MTL(ObTenantDirectLoadMgr *)->fill_lob_sstable_slice(lob_allocator, slice_info,
|
||||
iterator.get_lob_id_cache(), lob_column_idxs, col_types, write_row_))) {
|
||||
LOG_WARN("fill batch lob sstable slice failed", K(ret), K(slice_info), K(write_row_));
|
||||
|
||||
@ -1329,7 +1329,7 @@ int ObTabletDirectLoadMgr::fill_lob_sstable_slice(
|
||||
ObDirectLoadSliceWriter *slice_writer = nullptr;
|
||||
const int64_t trans_version = is_full_direct_load(direct_load_type_) ? table_key_.get_snapshot_version() : INT64_MAX;
|
||||
ObBatchSliceWriteInfo info(tablet_id_, ls_id_, trans_version, direct_load_type_, sqc_build_ctx_.build_param_.runtime_only_param_.trans_id_,
|
||||
sqc_build_ctx_.build_param_.runtime_only_param_.seq_no_);
|
||||
sqc_build_ctx_.build_param_.runtime_only_param_.seq_no_, slice_info.src_tenant_id_);
|
||||
|
||||
if (OB_FAIL(lob_mgr_handle_.get_obj()->get_sqc_build_ctx().slice_mgr_map_.get_refactored(slice_info.slice_id_, slice_writer))) {
|
||||
LOG_WARN("get refactored failed", K(ret), K(slice_info), K(sqc_build_ctx_.slice_mgr_map_.size()));
|
||||
@ -1390,7 +1390,7 @@ int ObTabletDirectLoadMgr::fill_lob_sstable_slice(
|
||||
ObDirectLoadSliceWriter *slice_writer = nullptr;
|
||||
const int64_t trans_version = is_full_direct_load(direct_load_type_) ? table_key_.get_snapshot_version() : INT64_MAX;
|
||||
ObBatchSliceWriteInfo info(tablet_id_, ls_id_, trans_version, direct_load_type_, sqc_build_ctx_.build_param_.runtime_only_param_.trans_id_,
|
||||
sqc_build_ctx_.build_param_.runtime_only_param_.seq_no_);
|
||||
sqc_build_ctx_.build_param_.runtime_only_param_.seq_no_, slice_info.src_tenant_id_);
|
||||
|
||||
if (OB_FAIL(lob_mgr_handle_.get_obj()->get_sqc_build_ctx().slice_mgr_map_.get_refactored(slice_info.slice_id_, slice_writer))) {
|
||||
LOG_WARN("get refactored failed", K(ret), K(slice_info), K(sqc_build_ctx_.slice_mgr_map_.size()));
|
||||
|
||||
@ -619,6 +619,7 @@ int ObDirectLoadSliceWriter::prepare_iters(
|
||||
const int64_t seq_no,
|
||||
const int64_t timeout_ts,
|
||||
const int64_t lob_inrow_threshold,
|
||||
const uint64_t src_tenant_id,
|
||||
ObLobMetaRowIterator *&row_iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -652,8 +653,9 @@ int ObDirectLoadSliceWriter::prepare_iters(
|
||||
lob_storage_param.inrow_threshold_ = lob_inrow_threshold;
|
||||
int64_t unused_affected_rows = 0;
|
||||
if (OB_FAIL(ObInsertLobColumnHelper::insert_lob_column(
|
||||
allocator, nullptr, ls_id, tablet_id, lob_id, cs_type, lob_storage_param, datum, timeout_ts, true/*has_lob_header*/, *meta_write_iter_))) {
|
||||
LOG_WARN("fail to insert_lob_col", K(ret), K(ls_id), K(tablet_id), K(lob_id));
|
||||
allocator, nullptr, ls_id, tablet_id, lob_id, cs_type, lob_storage_param, datum,
|
||||
timeout_ts, true/*has_lob_header*/, src_tenant_id, *meta_write_iter_))) {
|
||||
LOG_WARN("fail to insert_lob_col", K(ret), K(ls_id), K(tablet_id), K(lob_id), K(src_tenant_id));
|
||||
} else if (OB_FAIL(row_iterator_->init(meta_write_iter_, trans_id,
|
||||
trans_version, seq_no))) {
|
||||
LOG_WARN("fail to lob meta row iterator", K(ret), K(trans_id), K(trans_version), K(seq_no));
|
||||
@ -711,7 +713,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_memtable(
|
||||
lob_storage_param.inrow_threshold_ = lob_inrow_threshold;
|
||||
if (OB_FAIL(ObInsertLobColumnHelper::insert_lob_column(
|
||||
allocator, info.ls_id_, info.data_tablet_id_, col_types.at(i).get_collation_type(),
|
||||
lob_storage_param, datum, timeout_ts, true/*has_lob_header*/, MTL_ID()))) {
|
||||
lob_storage_param, datum, timeout_ts, true/*has_lob_header*/, info.src_tenant_id_))) {
|
||||
LOG_WARN("fail to insert_lob_col", K(ret), K(datum));
|
||||
}
|
||||
}
|
||||
@ -748,7 +750,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
|
||||
ObLobMetaRowIterator *row_iter = nullptr;
|
||||
if (OB_FAIL(prepare_iters(allocator, iter_allocator, datum, info.ls_id_,
|
||||
info.data_tablet_id_, info.trans_version_, col_types.at(i).get_collation_type(), lob_id,
|
||||
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, row_iter))) {
|
||||
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, row_iter))) {
|
||||
LOG_WARN("fail to prepare iters", K(ret), KP(row_iter), K(datum));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
|
||||
@ -52,20 +52,23 @@ public:
|
||||
trans_version_(0),
|
||||
direct_load_type_(),
|
||||
trans_id_(),
|
||||
seq_no_(0)
|
||||
seq_no_(0),
|
||||
src_tenant_id_(0)
|
||||
{ }
|
||||
ObBatchSliceWriteInfo(const common::ObTabletID &tablet_id, const share::ObLSID &ls_id, const int64_t &trans_version,
|
||||
const ObDirectLoadType &direct_load_type, const transaction::ObTransID &trans_id, const int64_t &seq_no)
|
||||
const ObDirectLoadType &direct_load_type, const transaction::ObTransID &trans_id, const int64_t &seq_no,
|
||||
const uint64_t src_tenant_id)
|
||||
: data_tablet_id_(tablet_id),
|
||||
ls_id_(ls_id),
|
||||
trans_version_(trans_version),
|
||||
direct_load_type_(direct_load_type),
|
||||
trans_id_(trans_id),
|
||||
seq_no_(seq_no)
|
||||
seq_no_(seq_no),
|
||||
src_tenant_id_(src_tenant_id)
|
||||
|
||||
{ }
|
||||
~ObBatchSliceWriteInfo() = default;
|
||||
TO_STRING_KV(K(ls_id_), K(data_tablet_id_), K(trans_version_), K(direct_load_type_));
|
||||
TO_STRING_KV(K(ls_id_), K(data_tablet_id_), K(trans_version_), K(direct_load_type_), K(src_tenant_id_));
|
||||
public:
|
||||
common::ObTabletID data_tablet_id_;
|
||||
share::ObLSID ls_id_;
|
||||
@ -73,6 +76,7 @@ public:
|
||||
ObDirectLoadType direct_load_type_;
|
||||
transaction::ObTransID trans_id_;
|
||||
int64_t seq_no_; //
|
||||
uint64_t src_tenant_id_;
|
||||
};
|
||||
|
||||
struct ObTabletDirectLoadMgrKey final
|
||||
@ -103,11 +107,11 @@ struct ObDirectLoadSliceInfo final
|
||||
public:
|
||||
ObDirectLoadSliceInfo()
|
||||
: is_full_direct_load_(false), is_lob_slice_(false), ls_id_(), data_tablet_id_(), slice_id_(-1),
|
||||
context_id_(0)
|
||||
context_id_(0), src_tenant_id_(MTL_ID())
|
||||
{ }
|
||||
~ObDirectLoadSliceInfo() = default;
|
||||
bool is_valid() const { return ls_id_.is_valid() && data_tablet_id_.is_valid() && slice_id_ >= 0 && context_id_ >= 0; }
|
||||
TO_STRING_KV(K_(is_full_direct_load), K_(is_lob_slice), K_(ls_id), K_(data_tablet_id), K_(slice_id), K_(context_id));
|
||||
bool is_valid() const { return ls_id_.is_valid() && data_tablet_id_.is_valid() && slice_id_ >= 0 && context_id_ >= 0 && src_tenant_id_ > 0; }
|
||||
TO_STRING_KV(K_(is_full_direct_load), K_(is_lob_slice), K_(ls_id), K_(data_tablet_id), K_(slice_id), K_(context_id), K_(src_tenant_id));
|
||||
public:
|
||||
bool is_full_direct_load_;
|
||||
bool is_lob_slice_;
|
||||
@ -115,6 +119,7 @@ public:
|
||||
common::ObTabletID data_tablet_id_;
|
||||
int64_t slice_id_;
|
||||
int64_t context_id_;
|
||||
uint64_t src_tenant_id_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObDirectLoadSliceInfo);
|
||||
};
|
||||
|
||||
@ -527,6 +532,7 @@ private:
|
||||
const int64_t seq_no,
|
||||
const int64_t timeout_ts,
|
||||
const int64_t lob_inrow_threshold,
|
||||
const uint64_t src_tenant_id,
|
||||
ObLobMetaRowIterator *&row_iter);
|
||||
private:
|
||||
bool is_inited_;
|
||||
|
||||
@ -221,6 +221,7 @@ int ObInsertLobColumnHelper::insert_lob_column(ObIAllocator &allocator,
|
||||
blocksstable::ObStorageDatum &datum,
|
||||
const int64_t timeout_ts,
|
||||
const bool has_lob_header,
|
||||
const uint64_t src_tenant_id,
|
||||
ObLobMetaWriteIter &iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -272,6 +273,7 @@ int ObInsertLobColumnHelper::insert_lob_column(ObIAllocator &allocator,
|
||||
lob_param.offset_ = 0;
|
||||
lob_param.spec_lob_id_ = lob_id;
|
||||
lob_param.inrow_threshold_ = lob_storage_param.inrow_threshold_;
|
||||
lob_param.src_tenant_id_ = src_tenant_id;
|
||||
if (!src.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid src lob locator.", K(ret));
|
||||
|
||||
@ -249,6 +249,7 @@ public:
|
||||
blocksstable::ObStorageDatum &datum,
|
||||
const int64_t timeout_ts,
|
||||
const bool has_lob_header,
|
||||
const uint64_t src_tenant_id,
|
||||
ObLobMetaWriteIter &iter);
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user