[FEAT MERGE] inc direct load phase2

Co-authored-by: suz-yang <suz.yang@foxmail.com>
Co-authored-by: fkuner <784819644@qq.com>
Co-authored-by: shadowao <aozeliu@qq.com>
This commit is contained in:
coolfishchen
2024-06-19 09:19:57 +00:00
committed by ob-robot
parent 2a4fd5983e
commit 1f2f6669a5
56 changed files with 2978 additions and 772 deletions

View File

@ -320,7 +320,7 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice()
ObLobMetaRowIterator::ObLobMetaRowIterator()
: is_inited_(false), iter_(nullptr), trans_id_(0), trans_version_(0), sql_no_(0),
tmp_row_(), lob_meta_write_result_()
tmp_row_(), lob_meta_write_result_(), direct_load_type_(DIRECT_LOAD_INVALID)
{
}
@ -332,7 +332,8 @@ ObLobMetaRowIterator::~ObLobMetaRowIterator()
int ObLobMetaRowIterator::init(ObLobMetaWriteIter *iter,
const transaction::ObTransID &trans_id,
const int64_t trans_version,
const int64_t sql_no)
const int64_t sql_no,
const ObDirectLoadType direct_load_type)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
@ -348,6 +349,7 @@ int ObLobMetaRowIterator::init(ObLobMetaWriteIter *iter,
trans_id_ = trans_id;
trans_version_ = trans_version;
sql_no_ = sql_no;
direct_load_type_ = direct_load_type;
is_inited_ = true;
}
return ret;
@ -360,6 +362,7 @@ void ObLobMetaRowIterator::reset()
trans_id_.reset();
trans_version_ = 0;
sql_no_ = 0;
direct_load_type_ = DIRECT_LOAD_INVALID;
tmp_row_.reset();
}
@ -392,9 +395,10 @@ int ObLobMetaRowIterator::get_next_row(const blocksstable::ObDatumRow *&row)
LOG_WARN("transform failed", K(ret), K(lob_meta_write_result_.info_));
} else {
tmp_row_.storage_datums_[ObLobMetaUtil::SEQ_ID_COL_ID + 1].set_int(-trans_version_);
tmp_row_.storage_datums_[ObLobMetaUtil::SEQ_ID_COL_ID + 2].set_int(-sql_no_);
tmp_row_.storage_datums_[ObLobMetaUtil::SEQ_ID_COL_ID + 2].set_int(-get_seq_no());
tmp_row_.set_trans_id(trans_id_);
tmp_row_.row_flag_.set_flag(ObDmlFlag::DF_INSERT);
tmp_row_.mvcc_row_flag_.set_last_multi_version_row(true);
tmp_row_.mvcc_row_flag_.set_uncommitted_row(trans_id_.is_valid());
row = &tmp_row_;
}
@ -402,6 +406,11 @@ int ObLobMetaRowIterator::get_next_row(const blocksstable::ObDatumRow *&row)
return ret;
}
int64_t ObLobMetaRowIterator::get_seq_no() const
{
return is_incremental_direct_load(direct_load_type_) ? lob_meta_write_result_.seq_no_ : sql_no_;
}
ObTabletDDLParam::ObTabletDDLParam()
: direct_load_type_(ObDirectLoadType::DIRECT_LOAD_INVALID),
ls_id_(),
@ -840,6 +849,8 @@ int ObDirectLoadSliceWriter::prepare_iters(
const int64_t timeout_ts,
const int64_t lob_inrow_threshold,
const uint64_t src_tenant_id,
const ObDirectLoadType direct_load_type,
transaction::ObTxDesc* tx_desc,
ObLobMetaRowIterator *&row_iter)
{
int ret = OB_SUCCESS;
@ -851,11 +862,11 @@ int ObDirectLoadSliceWriter::prepare_iters(
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc lob meta write iter failed", K(ret));
} else {
meta_write_iter_ = new (buf) ObLobMetaWriteIter(datum.get_string(), &iter_allocator, ObLobMetaUtil::LOB_OPER_PIECE_DATA_SIZE);
// keep allocator is same as insert_lob_column
meta_write_iter_ = new (buf) ObLobMetaWriteIter(&allocator, ObLobMetaUtil::LOB_OPER_PIECE_DATA_SIZE);
}
} else {
meta_write_iter_->set_data(datum.get_string());
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(row_iterator_)) {
void *buf = nullptr;
@ -872,13 +883,17 @@ int ObDirectLoadSliceWriter::prepare_iters(
ObLobStorageParam lob_storage_param;
lob_storage_param.inrow_threshold_ = lob_inrow_threshold;
int64_t unused_affected_rows = 0;
if (OB_FAIL(ObInsertLobColumnHelper::insert_lob_column(
allocator, nullptr, ls_id, tablet_id, lob_id, obj_type, cs_type, lob_storage_param, datum,
if (is_incremental_direct_load(direct_load_type) && OB_ISNULL(tx_desc)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tx_desc should not be null if is incremental_direct_load", K(ret), K(direct_load_type),
K(ls_id), K(tablet_id), K(lob_id), K(trans_version), K(seq_no), K(obj_type), K(cs_type), K(trans_id));
} else if (OB_FAIL(ObInsertLobColumnHelper::insert_lob_column(
allocator, tx_desc, ls_id, tablet_id, lob_id, obj_type, cs_type, lob_storage_param, datum,
timeout_ts, true/*has_lob_header*/, src_tenant_id, *meta_write_iter_))) {
LOG_WARN("fail to insert_lob_col", K(ret), K(ls_id), K(tablet_id), K(lob_id), K(src_tenant_id));
} else if (OB_FAIL(row_iterator_->init(meta_write_iter_, trans_id,
trans_version, seq_no))) {
LOG_WARN("fail to lob meta row iterator", K(ret), K(trans_id), K(trans_version), K(seq_no));
trans_version, seq_no, direct_load_type))) {
LOG_WARN("fail to lob meta row iterator", K(ret), K(trans_id), K(trans_version), K(seq_no), K(direct_load_type));
} else {
row_iter = row_iterator_;
}
@ -970,7 +985,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
ObLobMetaRowIterator *row_iter = nullptr;
if (OB_FAIL(prepare_iters(allocator, iter_allocator, datum, info.ls_id_,
info.data_tablet_id_, info.trans_version_, col_types.at(i).get_type(), col_types.at(i).get_collation_type(), lob_id,
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, row_iter))) {
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, info.direct_load_type_, info.tx_desc_, row_iter))) {
LOG_WARN("fail to prepare iters", K(ret), KP(row_iter), K(datum));
} else {
while (OB_SUCC(ret)) {
@ -1018,6 +1033,59 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
return ret;
}
int ObDirectLoadSliceWriter::fill_lob_meta_sstable_slice(
const share::SCN &start_scn,
const uint64_t table_id,
const ObTabletID &curr_tablet_id,
ObIStoreRowIterator *row_iter,
int64_t &affected_rows)
{
int ret = OB_SUCCESS;
affected_rows = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadSliceWriter not init", KR(ret), KP(this));
} else {
const int64_t rowkey_column_count = ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT;
const int64_t column_count = ObLobMetaUtil::LOB_META_COLUMN_CNT + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
while (OB_SUCC(ret)) {
const blocksstable::ObDatumRow *cur_row = nullptr;
if (OB_FAIL(THIS_WORKER.check_status())) {
LOG_WARN("check status failed", K(ret));
} else if (ATOMIC_LOAD(&is_canceled_)) {
ret = OB_CANCELED;
LOG_WARN("fil sstable task canceled", K(ret), K(is_canceled_));
} else if (OB_FAIL(row_iter->get_next_row(cur_row))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("get next row failed", K(ret));
}
} else if (OB_ISNULL(cur_row) || !cur_row->is_valid() || cur_row->get_column_count() != column_count) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KPC(cur_row), K(column_count));
} else if (OB_FAIL(check_null(false/*is_index_table*/, rowkey_column_count, *cur_row))) {
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
} else if (OB_FAIL(prepare_slice_store_if_need(rowkey_column_count,
false/*is_column_store*/,
1L/*unsued*/,
1L/*unused*/,
nullptr /*storage_schema*/,
start_scn))) {
LOG_WARN("prepare macro block writer failed", K(ret));
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
LOG_WARN("macro block writer append row failed", K(ret), KPC(cur_row));
}
if (OB_SUCC(ret)) {
++affected_rows;
LOG_DEBUG("sstable insert op append row", K(affected_rows), KPC(cur_row));
}
}
}
return ret;
}
int ObDirectLoadSliceWriter::fill_sstable_slice(
const SCN &start_scn,
const uint64_t table_id,