fix parallel_idx oversize when switch lob slice
This commit is contained in:
@ -212,7 +212,7 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row()
|
|||||||
LOG_WARN("create sstable slice writer failed", K(ret), K(block_start_seq), K(slice_info));
|
LOG_WARN("create sstable slice writer failed", K(ret), K(block_start_seq), K(slice_info));
|
||||||
} else {
|
} else {
|
||||||
ObDDLInsertRowIterator row_iter(this, is_current_slice_empty /*is_slice_empty*/,
|
ObDDLInsertRowIterator row_iter(this, is_current_slice_empty /*is_slice_empty*/,
|
||||||
notify_ls_id, notify_tablet_id, table_schema->get_rowkey_column_num(), snapshot_version_, slice_info.context_id_);
|
notify_ls_id, notify_tablet_id, table_schema->get_rowkey_column_num(), snapshot_version_, slice_info.context_id_, parallel_idx);
|
||||||
if (OB_FAIL(tenant_direct_load_mgr->fill_sstable_slice(slice_info,
|
if (OB_FAIL(tenant_direct_load_mgr->fill_sstable_slice(slice_info,
|
||||||
&row_iter,
|
&row_iter,
|
||||||
affected_rows,
|
affected_rows,
|
||||||
|
|||||||
@ -1274,7 +1274,7 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
|||||||
ObTabletDirectLoadMgrHandle direct_load_hdl;
|
ObTabletDirectLoadMgrHandle direct_load_hdl;
|
||||||
bool is_major_sstable_exist = false;
|
bool is_major_sstable_exist = false;
|
||||||
ObDDLInsertRowIterator row_iter(nullptr/*ObPxMultiPartSSTableInsertOp*/, false/*is_slice_empty*/,
|
ObDDLInsertRowIterator row_iter(nullptr/*ObPxMultiPartSSTableInsertOp*/, false/*is_slice_empty*/,
|
||||||
param_->dest_ls_id_, param_->dest_tablet_id_, 0/*unused_rowkey_num*/, param_->snapshot_version_, context_->context_id_);
|
param_->dest_ls_id_, param_->dest_tablet_id_, 0/*unused_rowkey_num*/, param_->snapshot_version_, context_->context_id_, task_id_);
|
||||||
blocksstable::ObNewRowBuilder new_row_builder;
|
blocksstable::ObNewRowBuilder new_row_builder;
|
||||||
int64_t lob_inrow_threshold = OB_DEFAULT_LOB_INROW_THRESHOLD;
|
int64_t lob_inrow_threshold = OB_DEFAULT_LOB_INROW_THRESHOLD;
|
||||||
if (OB_ISNULL(tenant_direct_load_mgr)) {
|
if (OB_ISNULL(tenant_direct_load_mgr)) {
|
||||||
|
|||||||
@ -51,9 +51,9 @@ int ObTabletDirectLoadInsertParam::assign(const ObTabletDirectLoadInsertParam &o
|
|||||||
ObDDLInsertRowIterator::ObDDLInsertRowIterator(
|
ObDDLInsertRowIterator::ObDDLInsertRowIterator(
|
||||||
sql::ObPxMultiPartSSTableInsertOp *op,
|
sql::ObPxMultiPartSSTableInsertOp *op,
|
||||||
const bool is_slice_empty, const share::ObLSID &ls_id, const common::ObTabletID &tablet_id,
|
const bool is_slice_empty, const share::ObLSID &ls_id, const common::ObTabletID &tablet_id,
|
||||||
const int64_t rowkey_cnt, const int64_t snapshot_version, const int64_t context_id)
|
const int64_t rowkey_cnt, const int64_t snapshot_version, const int64_t context_id, const int64_t parallel_idx)
|
||||||
: lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), op_(op), ls_id_(ls_id), current_tablet_id_(tablet_id), current_row_(), is_next_row_cached_(true),
|
: lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), op_(op), ls_id_(ls_id), current_tablet_id_(tablet_id), current_row_(), is_next_row_cached_(true),
|
||||||
is_slice_empty_(is_slice_empty), rowkey_count_(rowkey_cnt), snapshot_version_(snapshot_version), lob_slice_id_(0), context_id_(context_id)
|
is_slice_empty_(is_slice_empty), rowkey_count_(rowkey_cnt), snapshot_version_(snapshot_version), lob_slice_id_(0), context_id_(context_id), parallel_idx_(parallel_idx)
|
||||||
{
|
{
|
||||||
lob_id_cache_.set(1/*start*/, 0/*end*/);
|
lob_id_cache_.set(1/*start*/, 0/*end*/);
|
||||||
}
|
}
|
||||||
@ -246,10 +246,8 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice()
|
|||||||
} else if (OB_UNLIKELY(AUTO_INC_CACHE_SIZE > lob_id_cache_.count())) {
|
} else if (OB_UNLIKELY(AUTO_INC_CACHE_SIZE > lob_id_cache_.count())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected autoincrement value count", K(ret), K(lob_id_cache_));
|
LOG_WARN("unexpected autoincrement value count", K(ret), K(lob_id_cache_));
|
||||||
} else if (OB_FAIL(lob_id_cache_.get_value(lob_id))) {
|
} else if (OB_FAIL(block_start_seq.set_parallel_degree(parallel_idx_))) {
|
||||||
LOG_WARN("get value failed", K(ret), K(lob_id));
|
LOG_WARN("set parall degree failed", K(ret), K(parallel_idx_));
|
||||||
} else if (OB_FAIL(block_start_seq.set_parallel_degree(lob_id / AUTO_INC_CACHE_SIZE))) {
|
|
||||||
LOG_WARN("set parall degree failed", K(ret), K(lob_id));
|
|
||||||
} else {
|
} else {
|
||||||
// new slice info to open.
|
// new slice info to open.
|
||||||
slice_info.slice_id_ = 0;
|
slice_info.slice_id_ = 0;
|
||||||
|
|||||||
@ -265,7 +265,8 @@ public:
|
|||||||
const common::ObTabletID &tablet_id,
|
const common::ObTabletID &tablet_id,
|
||||||
const int64_t rowkey_cnt,
|
const int64_t rowkey_cnt,
|
||||||
const int64_t snapshot_version,
|
const int64_t snapshot_version,
|
||||||
const int64_t context_id);
|
const int64_t context_id,
|
||||||
|
const int64_t parallel_idx);
|
||||||
virtual ~ObDDLInsertRowIterator();
|
virtual ~ObDDLInsertRowIterator();
|
||||||
virtual int get_next_row(const blocksstable::ObDatumRow *&row) override;
|
virtual int get_next_row(const blocksstable::ObDatumRow *&row) override;
|
||||||
TO_STRING_KV(K_(ls_id), K_(current_tablet_id), K_(current_row), K_(is_slice_empty), K_(is_next_row_cached), K_(rowkey_count), K_(snapshot_version),
|
TO_STRING_KV(K_(ls_id), K_(current_tablet_id), K_(current_row), K_(is_slice_empty), K_(is_next_row_cached), K_(rowkey_count), K_(snapshot_version),
|
||||||
@ -289,6 +290,7 @@ private:
|
|||||||
int64_t lob_slice_id_;
|
int64_t lob_slice_id_;
|
||||||
share::ObTabletCacheInterval lob_id_cache_;
|
share::ObTabletCacheInterval lob_id_cache_;
|
||||||
int64_t context_id_;
|
int64_t context_id_;
|
||||||
|
int64_t parallel_idx_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObLobMetaRowIterator : public ObIStoreRowIterator
|
class ObLobMetaRowIterator : public ObIStoreRowIterator
|
||||||
|
|||||||
Reference in New Issue
Block a user