diff --git a/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp b/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp index 1304aa1d49..a9188ef20a 100644 --- a/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp @@ -212,7 +212,7 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row() LOG_WARN("create sstable slice writer failed", K(ret), K(block_start_seq), K(slice_info)); } else { ObDDLInsertRowIterator row_iter(this, is_current_slice_empty /*is_slice_empty*/, - notify_ls_id, notify_tablet_id, table_schema->get_rowkey_column_num(), snapshot_version_, slice_info.context_id_); + notify_ls_id, notify_tablet_id, table_schema->get_rowkey_column_num(), snapshot_version_, slice_info.context_id_, parallel_idx); if (OB_FAIL(tenant_direct_load_mgr->fill_sstable_slice(slice_info, &row_iter, affected_rows, diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 76dcad66d6..07ccaee10b 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -1274,7 +1274,7 @@ int ObComplementWriteTask::append_row(ObScan *scan) ObTabletDirectLoadMgrHandle direct_load_hdl; bool is_major_sstable_exist = false; ObDDLInsertRowIterator row_iter(nullptr/*ObPxMultiPartSSTableInsertOp*/, false/*is_slice_empty*/, - param_->dest_ls_id_, param_->dest_tablet_id_, 0/*unused_rowkey_num*/, param_->snapshot_version_, context_->context_id_); + param_->dest_ls_id_, param_->dest_tablet_id_, 0/*unused_rowkey_num*/, param_->snapshot_version_, context_->context_id_, task_id_); blocksstable::ObNewRowBuilder new_row_builder; int64_t lob_inrow_threshold = OB_DEFAULT_LOB_INROW_THRESHOLD; if (OB_ISNULL(tenant_direct_load_mgr)) { diff --git a/src/storage/ddl/ob_direct_load_struct.cpp b/src/storage/ddl/ob_direct_load_struct.cpp index f4ee15a583..0ef25d5c30 100644 --- a/src/storage/ddl/ob_direct_load_struct.cpp +++ b/src/storage/ddl/ob_direct_load_struct.cpp @@ -51,9 +51,9 @@ int ObTabletDirectLoadInsertParam::assign(const ObTabletDirectLoadInsertParam &o ObDDLInsertRowIterator::ObDDLInsertRowIterator( sql::ObPxMultiPartSSTableInsertOp *op, const bool is_slice_empty, const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, - const int64_t rowkey_cnt, const int64_t snapshot_version, const int64_t context_id) + const int64_t rowkey_cnt, const int64_t snapshot_version, const int64_t context_id, const int64_t parallel_idx) : lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), op_(op), ls_id_(ls_id), current_tablet_id_(tablet_id), current_row_(), is_next_row_cached_(true), - is_slice_empty_(is_slice_empty), rowkey_count_(rowkey_cnt), snapshot_version_(snapshot_version), lob_slice_id_(0), context_id_(context_id) + is_slice_empty_(is_slice_empty), rowkey_count_(rowkey_cnt), snapshot_version_(snapshot_version), lob_slice_id_(0), context_id_(context_id), parallel_idx_(parallel_idx) { lob_id_cache_.set(1/*start*/, 0/*end*/); } @@ -246,10 +246,8 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice() } else if (OB_UNLIKELY(AUTO_INC_CACHE_SIZE > lob_id_cache_.count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected autoincrement value count", K(ret), K(lob_id_cache_)); - } else if (OB_FAIL(lob_id_cache_.get_value(lob_id))) { - LOG_WARN("get value failed", K(ret), K(lob_id)); - } else if (OB_FAIL(block_start_seq.set_parallel_degree(lob_id / AUTO_INC_CACHE_SIZE))) { - LOG_WARN("set parall degree failed", K(ret), K(lob_id)); + } else if (OB_FAIL(block_start_seq.set_parallel_degree(parallel_idx_))) { + LOG_WARN("set parall degree failed", K(ret), K(parallel_idx_)); } else { // new slice info to open. slice_info.slice_id_ = 0; diff --git a/src/storage/ddl/ob_direct_load_struct.h b/src/storage/ddl/ob_direct_load_struct.h index cba5d25c54..17e9bcaeda 100644 --- a/src/storage/ddl/ob_direct_load_struct.h +++ b/src/storage/ddl/ob_direct_load_struct.h @@ -265,7 +265,8 @@ public: const common::ObTabletID &tablet_id, const int64_t rowkey_cnt, const int64_t snapshot_version, - const int64_t context_id); + const int64_t context_id, + const int64_t parallel_idx); virtual ~ObDDLInsertRowIterator(); virtual int get_next_row(const blocksstable::ObDatumRow *&row) override; TO_STRING_KV(K_(ls_id), K_(current_tablet_id), K_(current_row), K_(is_slice_empty), K_(is_next_row_cached), K_(rowkey_count), K_(snapshot_version), @@ -289,6 +290,7 @@ private: int64_t lob_slice_id_; share::ObTabletCacheInterval lob_id_cache_; int64_t context_id_; + int64_t parallel_idx_; }; class ObLobMetaRowIterator : public ObIStoreRowIterator