From c4ea6912313560a70b3675382ba9a9c15d018ae9 Mon Sep 17 00:00:00 2001 From: AnimationFan <3067477338@qq.com> Date: Thu, 19 Dec 2024 10:48:35 +0000 Subject: [PATCH] fix bug, set ddl major sstable meta block start seq as slice_cnt * step size --- src/share/ob_ddl_common.cpp | 50 +++++++++++++++++++ src/share/ob_ddl_common.h | 2 + .../pdml/static/ob_px_sstable_insert_op.cpp | 30 ++++++++++- src/storage/ddl/ob_complement_data_task.cpp | 7 ++- src/storage/ddl/ob_complement_data_task.h | 5 +- .../ddl/ob_direct_insert_sstable_ctx_new.h | 1 + src/storage/ddl/ob_direct_load_mgr_agent.cpp | 2 + src/storage/ddl/ob_direct_load_struct.cpp | 15 ++++-- src/storage/ddl/ob_direct_load_struct.h | 9 ++-- .../ob_direct_load_insert_table_ctx.cpp | 2 + 10 files changed, 111 insertions(+), 12 deletions(-) diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index 10bd89138..63f2a8441 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -2528,6 +2528,56 @@ int64_t ObDDLUtil::get_default_ddl_tx_timeout() } +/* +* return the map between tablet id & slice cnt; +* note that pair <0, 0> may exist when result is not partition table +*/ + +int ObDDLUtil::get_task_tablet_slice_count(const int64_t tenant_id, const int64_t ddl_task_id, bool &is_partitioned_table, common::hash::ObHashMap &tablet_slice_cnt_map) +{ + int ret = OB_SUCCESS; + + bool use_idem_mode = false; + rootserver::ObDDLSliceInfo ddl_slice_info; + ObMySQLProxy *sql_proxy = GCTX.sql_proxy_; + ObArenaAllocator arena(ObMemAttr(tenant_id, "get_slice_info")); + bool is_use_idem_mode = false; + is_partitioned_table = true; + if (OB_ISNULL(sql_proxy)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql proxy is null", K(ret)); + } else if (OB_FAIL(rootserver::ObDDLTaskRecordOperator::get_schedule_info_for_update( + *sql_proxy, tenant_id, ddl_task_id, arena, ddl_slice_info, use_idem_mode))) { + LOG_WARN("fail to get schedule info", K(ret), K(tenant_id), K(ddl_task_id)); + } else { + for (int64_t i = 0; i < ddl_slice_info.part_ranges_.count() && OB_SUCC(ret); i++) { + int64_t tablet_slice_cnt = 0; + const ObPxTabletRange &cur_part_range = ddl_slice_info.part_ranges_.at(i); + const int64_t cur_tablet_id = cur_part_range.tablet_id_; + if (0 == cur_tablet_id && 1 == ddl_slice_info.part_ranges_.count()) { + is_partitioned_table = false; + } + + if (OB_FAIL(tablet_slice_cnt_map.get_refactored(cur_tablet_id, tablet_slice_cnt))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + if (OB_FAIL(tablet_slice_cnt_map.set_refactored(cur_tablet_id, 0))) { + LOG_WARN("failed to set refactor", K(ret)); + } + } else { + LOG_WARN("failed to get slice cnt", K(ret)); + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(tablet_slice_cnt_map.set_refactored(cur_tablet_id, tablet_slice_cnt + cur_part_range.range_cut_.count(), 1 /* over write*/))) { + LOG_WARN("failed to set slice cnt", K(ret), K(tablet_slice_cnt), K( cur_part_range.range_cut_.count())); + } + } + } + return ret; +} + int ObDDLUtil::get_data_information( const uint64_t tenant_id, const uint64_t task_id, diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index 11728a11a..b82bfaaa5 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -762,6 +762,8 @@ public: static int64_t get_default_ddl_rpc_timeout(); static int64_t get_default_ddl_tx_timeout(); + static int get_task_tablet_slice_count(const int64_t tenant_id, const int64_t task_id, bool &is_partition_table, common::hash::ObHashMap &tablet_slice_count_map); + static int get_data_information( const uint64_t tenant_id, const uint64_t task_id, diff --git a/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp b/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp index 7455c9582..0e891b361 100644 --- a/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_sstable_insert_op.cpp @@ -166,6 +166,7 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row() ObSqlCtx *sql_ctx = NULL; int64_t notify_idx = 0; int64_t unused_row_scan_cnt = 0; + int64_t ddl_task_id = MY_SPEC.plan_->get_ddl_task_id(); ObInsertMonitor insert_monitor(unused_row_scan_cnt, op_monitor_info_.otherstat_2_value_, op_monitor_info_.otherstat_1_value_); #ifdef ERRSIM if (OB_SUCC(ret)) { @@ -228,6 +229,19 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row() } else if (need_idempotent_autoinc_val && OB_FAIL(build_table_slice_info())) { LOG_WARN("fail to build table slice info", K(ret)); } + + /* get tablet slice info */ + bool is_partition_table = true; + common::hash::ObHashMap tablet_slice_cnt_map; + if (OB_FAIL(ret)) { + } else if (!is_shared_storage_dempotent_mode(ddl_ctrl.direct_load_type_)) { + } else if (OB_FAIL(tablet_slice_cnt_map.create(participants_.count(), "SliceInfoM", + ObModIds::OB_HASH_NODE, tenant_id))) { + LOG_WARN("fail to create hash table", K(ret)); + } else if (OB_FAIL(ObDDLUtil::get_task_tablet_slice_count(MTL_ID(), ddl_task_id, is_partition_table, tablet_slice_cnt_map))) { + LOG_WARN("failed to get tablet slice count", K(ret), K(ddl_task_id), K(MTL_ID())); + } + for (notify_idx = 0; OB_SUCC(ret) && notify_idx < participants_.count();) { clear_evaluated_flag(); bool is_current_slice_empty = false; @@ -247,7 +261,19 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row() count_rows_finish_ && curr_tablet_idx_ < tablet_seq_caches_.count() && curr_tablet_idx_ >= 0 ? &tablet_seq_caches_.at(curr_tablet_idx_) : nullptr; ObDirectLoadMgrAgent ddl_agent; - if (OB_FAIL(ddl_agent.init(slice_info.context_id_, slice_info.ls_id_, slice_info.data_tablet_id_, ddl_ctrl.direct_load_type_))) { + + if (!is_shared_storage_dempotent_mode(ddl_ctrl.direct_load_type_)) { + } else if (MY_SPEC.regenerate_heap_table_pk_) { + slice_info.total_slice_cnt_ = ctx_.get_sqc_handler()->get_sqc_ctx().get_task_count(); + } else if (OB_FAIL(tablet_slice_cnt_map.get_refactored(is_partition_table ? slice_info.data_tablet_id_.id() : 0, slice_info.total_slice_cnt_))) { + LOG_WARN("failed to get tablet slice cnt", K(ret), K(is_partition_table), K(slice_info.data_tablet_id_)); + } else if (slice_info.total_slice_cnt_ < 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("slice cnt should not less than 0", K(ret), K(is_partition_table), K(slice_info.data_tablet_id_)); + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ddl_agent.init(slice_info.context_id_, slice_info.ls_id_, slice_info.data_tablet_id_, ddl_ctrl.direct_load_type_))) { LOG_WARN("init agent failed", K(ret), K(slice_info)); } else if (all_slices_empty || is_all_partition_finished_) { is_current_slice_empty = true; @@ -291,7 +317,7 @@ int ObPxMultiPartSSTableInsertOp::inner_get_next_row() ObDDLInsertRowIterator row_iter; if (OB_FAIL(row_iter.init(tenant_id, ddl_agent, &slice_row_iter, notify_ls_id, notify_tablet_id, slice_info.context_id_, tablet_slice_param, - table_schema->get_lob_columns_count(), is_vec_data_complement_))) { + table_schema->get_lob_columns_count(), slice_info.total_slice_cnt_, is_vec_data_complement_))) { LOG_WARN("init ddl insert rot iterator failed", K(ret)); } else if (OB_FAIL(ddl_agent.fill_sstable_slice(slice_info, &row_iter, affected_rows, &insert_monitor))) { diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 2acb787df..ad84034eb 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -458,6 +458,8 @@ int ObComplementDataContext::init( direct_load_param.runtime_only_param_.table_id_ = param.dest_table_id_; direct_load_param.runtime_only_param_.schema_version_ = param.dest_schema_version_; direct_load_param.runtime_only_param_.task_cnt_ = param.concurrent_cnt_; // real slice count. + total_slice_cnt_ = param.ranges_.count(); + if (OB_ISNULL(tenant_direct_load_mgr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret)); @@ -1339,12 +1341,13 @@ int ObComplementWriteTask::append_row(ObScan *scan) slice_info.ls_id_ = param_->dest_ls_id_; slice_info.data_tablet_id_ = param_->dest_tablet_id_; slice_info.context_id_ = context_->context_id_; + slice_info.total_slice_cnt_ = context_->total_slice_cnt_; ObInsertMonitor insert_monitor(context_->row_scanned_, context_->row_inserted_, context_->cg_row_inserted_); ObDDLInsertRowIterator row_iter; ObTabletSliceParam tablet_slice_param(context_->concurrent_cnt_, task_id_); if (OB_FAIL(row_iter.init(param_->orig_tenant_id_, context_->ddl_agent_, scan, - param_->dest_ls_id_, param_->dest_tablet_id_, context_->context_id_, tablet_slice_param, context_->lob_cols_cnt_))) { - LOG_WARN("init ddl insert row iterator failed", K(ret)); + param_->dest_ls_id_, param_->dest_tablet_id_, context_->context_id_, tablet_slice_param, context_->lob_cols_cnt_, context_->total_slice_cnt_))) { + LOG_WARN("init ddl insert row iterator failed", K(ret), K(context_->total_slice_cnt_)); } else if (OB_FAIL(context_->ddl_agent_.open_sstable_slice(macro_start_seq, slice_info))) { LOG_WARN("open slice failed", K(ret), K(macro_start_seq), K(slice_info)); } else if (OB_FAIL(context_->ddl_agent_.fill_sstable_slice(slice_info, &row_iter, affected_rows, &insert_monitor))) { diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index a17b5f6b3..39147e562 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -166,7 +166,7 @@ public: is_inited_(false), ddl_agent_(), direct_load_type_(ObDirectLoadType::DIRECT_LOAD_INVALID), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS), lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0), - physical_row_count_(0), row_scanned_(0), row_inserted_(0), cg_row_inserted_(0), context_id_(0), lob_cols_cnt_(0) + physical_row_count_(0), row_scanned_(0), row_inserted_(0), cg_row_inserted_(0), context_id_(0), lob_cols_cnt_(0), total_slice_cnt_(-1) {} ~ObComplementDataContext() { destroy(); } int init( @@ -177,7 +177,7 @@ public: int add_column_checksum(const ObIArray &report_col_checksums, const ObIArray &report_col_ids); int get_column_checksum(ObIArray &report_col_checksums, ObIArray &report_col_ids); TO_STRING_KV(K_(is_inited), K_(ddl_agent), K_(direct_load_type), K_(is_major_sstable_exist), K_(complement_data_ret), K_(concurrent_cnt), - K_(physical_row_count), K_(row_scanned), K_(row_inserted), K_(cg_row_inserted), K_(context_id), K_(lob_cols_cnt)); + K_(physical_row_count), K_(row_scanned), K_(row_inserted), K_(cg_row_inserted), K_(context_id), K_(lob_cols_cnt), K_(total_slice_cnt)); public: bool is_inited_; ObDirectLoadMgrAgent ddl_agent_; @@ -194,6 +194,7 @@ public: int64_t lob_cols_cnt_; ObArray report_col_checksums_; ObArray report_col_ids_; + int64_t total_slice_cnt_; }; class ObComplementPrepareTask; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h index 7c04bd435..3f09042c2 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h @@ -372,6 +372,7 @@ public: blocksstable::ObMacroDataSeq &next_seq); virtual int update_max_lob_id(const int64_t lob_id) { UNUSED(lob_id); return common::OB_SUCCESS; } + virtual int set_total_slice_cnt(const int64_t slice_cnt) { UNUSED(slice_cnt); return OB_NOT_SUPPORTED;} // for ref_cnt void inc_ref() { ATOMIC_INC(&ref_cnt_); } diff --git a/src/storage/ddl/ob_direct_load_mgr_agent.cpp b/src/storage/ddl/ob_direct_load_mgr_agent.cpp index d1748fc49..3bfb57735 100644 --- a/src/storage/ddl/ob_direct_load_mgr_agent.cpp +++ b/src/storage/ddl/ob_direct_load_mgr_agent.cpp @@ -181,6 +181,8 @@ int ObDirectLoadMgrAgent::open_sstable_slice_for_ss( LOG_WARN("error sys", K(ret)); } else if (OB_FAIL(mgr_handle_.get_obj()->open_sstable_slice(slice_info.is_lob_slice_, start_seq, slice_info.context_id_, slice_info.slice_id_))) { LOG_WARN("open sstable slice failed", K(ret), K(start_seq), K(slice_info)); + } else if (OB_FAIL(mgr_handle_.get_obj()->set_total_slice_cnt(slice_info.total_slice_cnt_))) { + LOG_WARN("failed to set total slice cnt for direct load mgr", K(ret)); } return ret; } diff --git a/src/storage/ddl/ob_direct_load_struct.cpp b/src/storage/ddl/ob_direct_load_struct.cpp index cdc89797b..2f584fefc 100755 --- a/src/storage/ddl/ob_direct_load_struct.cpp +++ b/src/storage/ddl/ob_direct_load_struct.cpp @@ -182,7 +182,8 @@ ObDDLInsertRowIterator::ObDDLInsertRowIterator() lob_allocator_(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), lob_slice_id_(0), lob_cols_cnt_(0), - is_skip_lob_(false) + is_skip_lob_(false), + total_slice_cnt_(-1) { lob_id_cache_.set(1/*start*/, 0/*end*/); } @@ -200,6 +201,7 @@ int ObDDLInsertRowIterator::init( const int64_t context_id, const ObTabletSliceParam &tablet_slice_param, const int64_t lob_cols_cnt, + const int64_t total_slice_cnt, const bool is_skip_lob) { int ret = OB_SUCCESS; @@ -212,9 +214,10 @@ int ObDDLInsertRowIterator::init( || !tablet_id.is_valid() || context_id < 0 // no need check tablet slice param, invalid when slice empty - || lob_cols_cnt < 0)) { + || lob_cols_cnt < 0 + || (is_shared_storage_dempotent_mode(agent.get_direct_load_type()) && total_slice_cnt < 0))) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(source_tenant_id), KP(slice_row_iter), K(ls_id), K(tablet_id), K(context_id), K(tablet_slice_param), K(lob_cols_cnt)); + LOG_WARN("invalid argument", K(ret), K(source_tenant_id), KP(slice_row_iter), K(ls_id), K(tablet_id), K(context_id), K(tablet_slice_param), K(lob_cols_cnt), K(total_slice_cnt)); } else if (lob_cols_cnt > 0 && tablet_slice_param.is_valid() && OB_FAIL(lob_id_generator_.init(tablet_slice_param.slice_idx_ * ObTabletSliceParam::LOB_ID_SEQ_INTERVAL, // start ObTabletSliceParam::LOB_ID_SEQ_INTERVAL, // interval @@ -230,6 +233,9 @@ int ObDDLInsertRowIterator::init( const int64_t parallel_idx = tablet_slice_param.slice_idx_ >= 0 ? tablet_slice_param.slice_idx_ : 0; is_skip_lob_ = is_skip_lob; is_inited_ = true; + if ((is_shared_storage_dempotent_mode(ddl_agent_->get_direct_load_type()))) { + total_slice_cnt_ = total_slice_cnt; + } if (OB_FAIL(macro_seq_.set_parallel_degree(parallel_idx))) { LOG_WARN("set failed", K(ret), K(parallel_idx)); #ifdef OB_BUILD_SHARED_STORAGE @@ -343,6 +349,9 @@ int ObDDLInsertRowIterator::switch_to_new_lob_slice() slice_info.data_tablet_id_ = current_tablet_id_; slice_info.slice_id_ = lob_slice_id_; slice_info.context_id_ = context_id_; + if (is_shared_storage_dempotent_mode(ddl_agent_->get_direct_load_type())) { + slice_info.total_slice_cnt_ = total_slice_cnt_; + } ObTabletAutoincrementService &auto_inc = ObTabletAutoincrementService::get_instance(); ObTabletID lob_meta_tablet_id; int64_t CACHE_SIZE_REQUESTED = AUTO_INC_CACHE_SIZE; diff --git a/src/storage/ddl/ob_direct_load_struct.h b/src/storage/ddl/ob_direct_load_struct.h index 26c6cf456..a62713479 100755 --- a/src/storage/ddl/ob_direct_load_struct.h +++ b/src/storage/ddl/ob_direct_load_struct.h @@ -144,11 +144,11 @@ struct ObDirectLoadSliceInfo final public: ObDirectLoadSliceInfo() : is_full_direct_load_(false), is_lob_slice_(false), ls_id_(), data_tablet_id_(), slice_id_(-1), - context_id_(0), src_tenant_id_(MTL_ID()), is_task_finish_(false) + context_id_(0), src_tenant_id_(MTL_ID()), is_task_finish_(false), total_slice_cnt_(-1) { } ~ObDirectLoadSliceInfo() = default; bool is_valid() const { return ls_id_.is_valid() && data_tablet_id_.is_valid() && slice_id_ >= 0 && context_id_ >= 0 && src_tenant_id_ > 0; } - TO_STRING_KV(K_(is_full_direct_load), K_(is_lob_slice), K_(ls_id), K_(data_tablet_id), K_(slice_id), K_(context_id), K_(src_tenant_id), K_(is_task_finish)); + TO_STRING_KV(K_(is_full_direct_load), K_(is_lob_slice), K_(ls_id), K_(data_tablet_id), K_(slice_id), K_(context_id), K_(src_tenant_id), K_(is_task_finish), K_(total_slice_cnt)); public: bool is_full_direct_load_; bool is_lob_slice_; @@ -158,6 +158,7 @@ public: int64_t context_id_; uint64_t src_tenant_id_; bool is_task_finish_; + int64_t total_slice_cnt_; DISALLOW_COPY_AND_ASSIGN(ObDirectLoadSliceInfo); }; @@ -383,6 +384,7 @@ public: const int64_t context_id, const ObTabletSliceParam &tablet_slice_param, const int64_t lob_cols_cnt, + const int64_t total_slice_cnt, const bool is_skip_lob = false); virtual int get_next_row(const blocksstable::ObDatumRow *&row) override { @@ -391,7 +393,7 @@ public: } int get_next_row(const bool skip_lob, const blocksstable::ObDatumRow *&row) override; TO_STRING_KV(K_(is_inited), K_(ls_id), K_(current_tablet_id), K_(context_id), K_(macro_seq), - K_(lob_id_generator), K_(lob_id_cache), K_(lob_slice_id), K_(lob_cols_cnt), K_(is_skip_lob)); + K_(lob_id_generator), K_(lob_id_cache), K_(lob_slice_id), K_(lob_cols_cnt), K_(is_skip_lob), K_(total_slice_cnt)); public: int switch_to_new_lob_slice(); int close_lob_sstable_slice(); @@ -413,6 +415,7 @@ private: int64_t lob_slice_id_; int64_t lob_cols_cnt_; bool is_skip_lob_; + int64_t total_slice_cnt_; }; class ObLobMetaRowIterator : public ObIStoreRowIterator diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp index 317f885f8..3efe895bc 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp @@ -658,6 +658,7 @@ int ObDirectLoadInsertTabletContext::open_sstable_slice(const ObMacroDataSeq &st slice_info.data_tablet_id_ = tablet_id_; slice_info.slice_id_ = slice_id; slice_info.context_id_ = context_id_; + slice_info.total_slice_cnt_ = param_->parallel_; //mock total slice cnt if (OB_FAIL(open())) { LOG_WARN("fail to open tablet direct load", KR(ret)); } else if (OB_FAIL(ddl_agent_.open_sstable_slice(start_seq, slice_info))) { @@ -687,6 +688,7 @@ int ObDirectLoadInsertTabletContext::open_lob_sstable_slice(const ObMacroDataSeq slice_info.data_tablet_id_ = tablet_id_; slice_info.slice_id_ = slice_id; slice_info.context_id_ = context_id_; + slice_info.total_slice_cnt_ = param_->parallel_; //mock total slice cnt if (OB_FAIL(open())) { LOG_WARN("fail to open tablet direct load", KR(ret)); } else if (OB_FAIL(ddl_agent_.open_sstable_slice(start_seq, slice_info))) {