diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index 6db9c84cae..8d1b9025e2 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -249,7 +249,7 @@ int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status) // advance status else { status_ = status; - table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); + table_load_status_to_string(status_, ctx_->job_stat_->coordinator_.status_); LOG_INFO("LOAD DATA COORDINATOR advance status", K(status)); } } @@ -269,7 +269,7 @@ int ObTableLoadCoordinatorCtx::set_status_error(int error_code) } else { status_ = ObTableLoadStatusType::ERROR; error_code_ = error_code; - table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); + table_load_status_to_string(status_, ctx_->job_stat_->coordinator_.status_); LOG_INFO("LOAD DATA COORDINATOR status error", KR(error_code)); } } @@ -284,7 +284,7 @@ int ObTableLoadCoordinatorCtx::set_status_abort() LOG_INFO("LOAD DATA COORDINATOR already abort"); } else { status_ = ObTableLoadStatusType::ABORT; - table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); + table_load_status_to_string(status_, ctx_->job_stat_->coordinator_.status_); LOG_INFO("LOAD DATA COORDINATOR status abort"); } return ret; diff --git a/src/observer/table_load/ob_table_load_coordinator_trans.cpp b/src/observer/table_load/ob_table_load_coordinator_trans.cpp index f95e9722e6..1a1e89fc6d 100644 --- a/src/observer/table_load/ob_table_load_coordinator_trans.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_trans.cpp @@ -74,7 +74,7 @@ int ObTableLoadCoordinatorTrans::advance_trans_status(ObTableLoadTransStatusType LOG_WARN("fail to advance trans status", KR(ret), K(trans_status)); } else { table_load_trans_status_to_string(trans_status, - trans_ctx_->ctx_->job_stat_->coordinator.trans_status_); + trans_ctx_->ctx_->job_stat_->coordinator_.trans_status_); } return ret; } @@ -86,7 +86,7 @@ int ObTableLoadCoordinatorTrans::set_trans_status_error(int error_code) LOG_WARN("fail to set trans status error", KR(ret)); } else { table_load_trans_status_to_string(ObTableLoadTransStatusType::ERROR, - trans_ctx_->ctx_->job_stat_->coordinator.trans_status_); + trans_ctx_->ctx_->job_stat_->coordinator_.trans_status_); } return ret; } @@ -98,7 +98,7 @@ int ObTableLoadCoordinatorTrans::set_trans_status_abort() LOG_WARN("fail to set trans status abort", KR(ret)); } else { table_load_trans_status_to_string(ObTableLoadTransStatusType::ABORT, - trans_ctx_->ctx_->job_stat_->coordinator.trans_status_); + trans_ctx_->ctx_->job_stat_->coordinator_.trans_status_); } return ret; } diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index c740a5c3a3..a073655929 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -51,7 +51,7 @@ public: int process() override { int ret = OB_SUCCESS; - storage::ObDirectLoadMemSample sample(mem_ctx_); + storage::ObDirectLoadMemSample sample(ctx_, mem_ctx_); if (OB_FAIL(sample.do_sample())) { LOG_WARN("fail to do sample", KR(ret)); } @@ -378,7 +378,7 @@ int ObTableLoadMemCompactor::create_mem_loader(ObDirectLoadMemLoader *&mem_loade int ret = OB_SUCCESS; mem_loader = nullptr; if (OB_ISNULL(mem_loader = - OB_NEWx(ObDirectLoadMemLoader, (&allocator_), &mem_ctx_))) { + OB_NEWx(ObDirectLoadMemLoader, (&allocator_), store_ctx_->ctx_, &mem_ctx_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadMemLoader", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index 384f016ae4..d40ccb24d7 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -286,7 +286,7 @@ int ObTableLoadMerger::build_merge_ctx() merge_param.px_mode_ = param_.px_mode_; merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_; merge_param.dml_row_handler_ = store_ctx_->error_row_handler_; - if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_, + if (OB_FAIL(merge_ctx_.init(store_ctx_->ctx_, merge_param, store_ctx_->ls_partition_ids_, store_ctx_->target_ls_partition_ids_))) { LOG_WARN("fail to init merge ctx", KR(ret)); } else if (store_ctx_->is_multiple_mode_) { diff --git a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp index 2daa6527c5..44b2545d68 100644 --- a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp @@ -118,7 +118,7 @@ public: if (OB_ISNULL(sorter)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected worker", KR(ret), KPC(worker)); - } else if (FALSE_IT(sorter->set_work_param(index_dir_id_, data_dir_id_, heap_table_array_, + } else if (FALSE_IT(sorter->set_work_param(ctx_, index_dir_id_, data_dir_id_, heap_table_array_, heap_table_allocator_))) { } else if (OB_FAIL(sorter->work())) { LOG_WARN("fail to compact", KR(ret)); @@ -157,6 +157,7 @@ private: } else { heap_table_compactor.reuse(); std::swap(curr_round, next_round); + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, mem_ctx_->table_data_desc_.merge_count_per_round_ - 1); } } if (OB_SUCC(ret)) { @@ -173,6 +174,8 @@ private: LOG_WARN("fail to do compact", KR(ret)); } else if (OB_FAIL(mem_ctx_->add_tables_from_table_compactor(heap_table_compactor))) { LOG_WARN("fail to add table from table compactor", KR(ret)); + } else { + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, curr_round->count()); } } } diff --git a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp index d031d32c95..43383c4f16 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp +++ b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp @@ -324,6 +324,8 @@ public: } } else if (OB_FAIL(sstable_builder_.append_row(*datum_row))) { LOG_WARN("fail to append row", KR(ret)); + } else { + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_merge_write_rows_, 1); } } if (OB_SUCC(ret)) { @@ -704,6 +706,8 @@ int ObTableLoadParallelMergeCtx::start(ObTableLoadParallelMergeCb *cb) if (OB_FAIL(construct_split_range_task(tablet_ctx))) { LOG_WARN("fail to construct split range task", KR(ret)); } + } else { + ATOMIC_AAF(&store_ctx_->ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, tablet_ctx->sstables_.size()); } } if (OB_SUCC(ret)) { @@ -898,9 +902,12 @@ int ObTableLoadParallelMergeCtx::handle_tablet_compact_sstable_finish( LOG_WARN("invalid args", KR(ret), KP(tablet_ctx)); } else if (tablet_ctx->sstables_.size() > store_ctx_->table_data_desc_.merge_count_per_round_) { // still need merge + ATOMIC_AAF(&store_ctx_->ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, tablet_ctx->merge_sstable_count_ - 1); if (OB_FAIL(construct_split_range_task(tablet_ctx))) { LOG_WARN("fail to construct split range task", KR(ret)); } + } else { + ATOMIC_AAF(&store_ctx_->ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, tablet_ctx->merge_sstable_count_); } return ret; } diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 52666a218e..ef561ed3c0 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -328,7 +328,7 @@ int ObTableLoadStoreCtx::advance_status(ObTableLoadStatusType status) // advance status else { status_ = status; - table_load_status_to_string(status_, ctx_->job_stat_->store.status_); + table_load_status_to_string(status_, ctx_->job_stat_->store_.status_); LOG_INFO("LOAD DATA STORE advance status", K(status)); } } @@ -348,7 +348,7 @@ int ObTableLoadStoreCtx::set_status_error(int error_code) } else { status_ = ObTableLoadStatusType::ERROR; error_code_ = error_code; - table_load_status_to_string(status_, ctx_->job_stat_->store.status_); + table_load_status_to_string(status_, ctx_->job_stat_->store_.status_); LOG_INFO("LOAD DATA STORE status error", KR(error_code)); } } @@ -363,7 +363,7 @@ int ObTableLoadStoreCtx::set_status_abort() LOG_INFO("LOAD DATA STORE already abort"); } else { status_ = ObTableLoadStatusType::ABORT; - table_load_status_to_string(status_, ctx_->job_stat_->store.status_); + table_load_status_to_string(status_, ctx_->job_stat_->store_.status_); LOG_INFO("LOAD DATA STORE status abort"); } return ret; diff --git a/src/observer/table_load/ob_table_load_store_trans.cpp b/src/observer/table_load/ob_table_load_store_trans.cpp index 5b3bd112b8..1b02cd6217 100644 --- a/src/observer/table_load/ob_table_load_store_trans.cpp +++ b/src/observer/table_load/ob_table_load_store_trans.cpp @@ -85,7 +85,7 @@ int ObTableLoadStoreTrans::advance_trans_status(ObTableLoadTransStatusType trans LOG_WARN("fail to advance trans status", KR(ret), K(trans_status)); } else { table_load_trans_status_to_string(trans_status, - trans_ctx_->ctx_->job_stat_->store.trans_status_); + trans_ctx_->ctx_->job_stat_->store_.trans_status_); } return ret; } @@ -97,7 +97,7 @@ int ObTableLoadStoreTrans::set_trans_status_error(int error_code) LOG_WARN("fail to set trans status error", KR(ret)); } else { table_load_trans_status_to_string(ObTableLoadTransStatusType::ERROR, - trans_ctx_->ctx_->job_stat_->store.trans_status_); + trans_ctx_->ctx_->job_stat_->store_.trans_status_); } return ret; } @@ -109,7 +109,7 @@ int ObTableLoadStoreTrans::set_trans_status_abort() LOG_WARN("fail to set trans status abort", KR(ret)); } else { table_load_trans_status_to_string(ObTableLoadTransStatusType::ABORT, - trans_ctx_->ctx_->job_stat_->store.trans_status_); + trans_ctx_->ctx_->job_stat_->store_.trans_status_); } return ret; } diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index fd86915d62..769de2754f 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -107,14 +107,6 @@ int ObTableLoadTableCtx::register_job_stat() job_stat->start_time_ = ObTimeUtil::current_time(); job_stat->max_allowed_error_rows_ = param_.max_error_row_count_; job_stat->detected_error_rows_ = 0; - job_stat->coordinator.received_rows_ = 0; - job_stat->coordinator.last_commit_segment_id_ = 0; - job_stat->coordinator.status_ = "none"; - job_stat->coordinator.trans_status_ = "none"; - job_stat->store.processed_rows_ = 0; - job_stat->store.last_commit_segment_id_ = 0; - job_stat->store.status_ = "none"; - job_stat->store.trans_status_ = "none"; job_stat->allocator_.set_tenant_id(param_.tenant_id_); if (OB_FAIL(ObTableLoadUtils::deep_copy(schema_.table_name_, job_stat->table_name_, job_stat->allocator_))) { diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 5417594299..8acaba0f23 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -201,7 +201,7 @@ int ObTableLoadTransBucketWriter::write(int32_t session_id, ObTableLoadObjRowArr if (OB_SUCC(ret)) { int64_t row_cnt = obj_rows.count(); - ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->coordinator.received_rows_, row_cnt); + ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->coordinator_.received_rows_, row_cnt); ATOMIC_AAF(&trans_ctx_->ctx_->coordinator_ctx_->result_info_.records_, row_cnt); } } diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index bb291986ce..6e68179616 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -308,7 +308,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id, } } if (OB_SUCC(ret)) { - ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store.processed_rows_, row_array.count()); + ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store_.processed_rows_, row_array.count()); } session_ctx.cast_allocator_.reuse(); } @@ -345,7 +345,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id, } } if (OB_SUCC(ret)) { - ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store.processed_rows_, row_array.count()); + ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store_.processed_rows_, row_array.count()); } } return ret; diff --git a/src/observer/virtual_table/ob_all_virtual_load_data_stat.cpp b/src/observer/virtual_table/ob_all_virtual_load_data_stat.cpp index 0f3aeef86d..2ee83c7c1b 100644 --- a/src/observer/virtual_table/ob_all_virtual_load_data_stat.cpp +++ b/src/observer/virtual_table/ob_all_virtual_load_data_stat.cpp @@ -183,44 +183,58 @@ int ObAllVirtualLoadDataStat::inner_get_next_row(ObNewRow *&row) break; } case COORDINATOR_RECEIVED_ROWS: { - cells[i].set_int(job_status->coordinator.received_rows_); + cells[i].set_int(job_status->coordinator_.received_rows_); break; } case COORDINATOR_LAST_COMMIT_SEGMENT_ID: { - cells[i].set_int(job_status->coordinator.last_commit_segment_id_); + cells[i].set_int(job_status->coordinator_.last_commit_segment_id_); break; } case COORDINATOR_STATUS: { - cells[i].set_varchar(job_status->coordinator.status_); + cells[i].set_varchar(job_status->coordinator_.status_); cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); break; } case COORDINATOR_TRANS_STATUS: { - cells[i].set_varchar(job_status->coordinator.trans_status_); + cells[i].set_varchar(job_status->coordinator_.trans_status_); cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); break; } case STORE_PROCESSED_ROWS: { - cells[i].set_int(job_status->store.processed_rows_); + cells[i].set_int(job_status->store_.processed_rows_); break; } case STORE_LAST_COMMIT_SEGMENT_ID: { - cells[i].set_int(job_status->store.last_commit_segment_id_); + cells[i].set_int(job_status->store_.last_commit_segment_id_); break; } case STORE_STATUS: { - cells[i].set_varchar(job_status->store.status_); + cells[i].set_varchar(job_status->store_.status_); cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); break; } case STORE_TRANS_STATUS: { - cells[i].set_varchar(job_status->store.trans_status_); + cells[i].set_varchar(job_status->store_.trans_status_); cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); break; } case MESSAGE: { - cells[i].set_varchar(job_status->message_); - cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); + int64_t pos = 0; + if (OB_FAIL(databuff_printf(job_status->message_, common::MAX_LOAD_DATA_MESSAGE_LENGTH, pos, + "COMPACT_STAGE_LOAD_ROWS: %ld, COMPACT_STAGE_DUMP_ROWS: %ld, " + "COMPACT_STAGE_PRODUCT_TMP_FILES: %ld, COMPACT_STAGE_CONSUME_TMP_FILES: %ld, " + "COMPACT_STAGE_MERGE_WRITE_ROWS: %ld, MERGE_STAGE_WRITE_ROWS: %ld", + job_status->store_.compact_stage_load_rows_, + job_status->store_.compact_stage_dump_rows_, + job_status->store_.compact_stage_product_tmp_files_, + job_status->store_.compact_stage_consume_tmp_files_, + job_status->store_.compact_stage_merge_write_rows_, + job_status->store_.merge_stage_write_rows_))) { + SERVER_LOG(WARN, "fail to fill message_", K(ret)); + } else { + cells[i].set_varchar(job_status->message_); + cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); + } break; } default: { diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 1182989fc7..770b441204 100755 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -1359,9 +1359,9 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value) job_stat.parallel_, job_stat.max_allowed_error_rows_, job_stat.detected_error_rows_, - job_stat.coordinator.received_rows_, - job_stat.coordinator.status_.length(), - job_stat.coordinator.status_.ptr()); + job_stat.coordinator_.received_rows_, + job_stat.coordinator_.status_.length(), + job_stat.coordinator_.status_.ptr()); } } @@ -1469,10 +1469,10 @@ int ObTableRedefinitionTask::get_direct_load_job_stat(common::ObArenaAllocator & EXTRACT_INT_FIELD_MYSQL(*select_result, "PARALLEL", job_stat.parallel_, int64_t); EXTRACT_INT_FIELD_MYSQL(*select_result, "MAX_ALLOWED_ERROR_ROWS", job_stat.max_allowed_error_rows_, int64_t); EXTRACT_INT_FIELD_MYSQL(*select_result, "DETECTED_ERROR_ROWS", job_stat.detected_error_rows_, int64_t); - EXTRACT_INT_FIELD_MYSQL(*select_result, "COORDINATOR_RECEIVED_ROWS", job_stat.coordinator.received_rows_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*select_result, "COORDINATOR_RECEIVED_ROWS", job_stat.coordinator_.received_rows_, int64_t); EXTRACT_VARCHAR_FIELD_MYSQL(*select_result, "COORDINATOR_STATUS", load_status); if (OB_SUCC(ret) - && OB_FAIL(ob_write_string(allocator, load_status, job_stat.coordinator.status_))) { + && OB_FAIL(ob_write_string(allocator, load_status, job_stat.coordinator_.status_))) { LOG_WARN("failed to write string", KR(ret)); } } diff --git a/src/sql/engine/cmd/ob_load_data_utils.h b/src/sql/engine/cmd/ob_load_data_utils.h index b7ecf66c79..410483fef2 100644 --- a/src/sql/engine/cmd/ob_load_data_utils.h +++ b/src/sql/engine/cmd/ob_load_data_utils.h @@ -331,6 +331,8 @@ struct ObLoadDataStat total_wait_secs_(0), max_allowed_error_rows_(0), detected_error_rows_(0), + coordinator_(), + store_(), message_() {} int64_t aquire() { return ATOMIC_AAF(&ref_cnt_, 1); @@ -365,19 +367,47 @@ struct ObLoadDataStat int64_t total_wait_secs_; int64_t max_allowed_error_rows_; int64_t detected_error_rows_; - struct { - volatile int64_t received_rows_ = 0; // received from client - int64_t last_commit_segment_id_ = 0; - common::ObString status_ = "none"; // none / inited / loading / frozen / merging / commit / error / abort - common::ObString trans_status_ = "none"; // none / inited / running / frozen / commit / error / abort - } coordinator; - - struct { - volatile int64_t processed_rows_ = 0; - int64_t last_commit_segment_id_ = 0; - common::ObString status_ = "none"; - common::ObString trans_status_ = "none"; - } store; + struct coordinator { + coordinator() + : received_rows_(0), + last_commit_segment_id_(0), + status_("none"), + trans_status_("none") + {} + volatile int64_t received_rows_; // received from client + int64_t last_commit_segment_id_; + common::ObString status_; // none / inited / loading / frozen / merging / commit / error / abort + common::ObString trans_status_; // none / inited / running / frozen / commit / error / abort + TO_STRING_KV(K(received_rows_), K(last_commit_segment_id_), K(status_), K(trans_status_)); + } coordinator_; + struct store { + store() + : processed_rows_(0), + last_commit_segment_id_(0), + status_("none"), + trans_status_("none"), + compact_stage_load_rows_(0), + compact_stage_dump_rows_(0), + compact_stage_product_tmp_files_(0), + compact_stage_consume_tmp_files_(0), + compact_stage_merge_write_rows_(0), + merge_stage_write_rows_(0) + {} + volatile int64_t processed_rows_; + int64_t last_commit_segment_id_; + common::ObString status_; + common::ObString trans_status_; + int64_t compact_stage_load_rows_ CACHE_ALIGNED; + int64_t compact_stage_dump_rows_ CACHE_ALIGNED; + int64_t compact_stage_product_tmp_files_ CACHE_ALIGNED; + int64_t compact_stage_consume_tmp_files_ CACHE_ALIGNED; + int64_t compact_stage_merge_write_rows_ CACHE_ALIGNED; + int64_t merge_stage_write_rows_ CACHE_ALIGNED; + TO_STRING_KV(K(processed_rows_), K(last_commit_segment_id_), K(status_), K(trans_status_), + K(compact_stage_load_rows_), K(compact_stage_dump_rows_), + K(compact_stage_product_tmp_files_), K(compact_stage_consume_tmp_files_), + K(compact_stage_merge_write_rows_), K(merge_stage_write_rows_)); + } store_; char message_[common::MAX_LOAD_DATA_MESSAGE_LENGTH]; TO_STRING_KV(K(tenant_id_), K(job_id_), K(job_type_), @@ -388,10 +418,7 @@ struct ObLoadDataStat K(parsed_rows_), K(total_shuffle_task_), K(total_insert_task_), K(shuffle_rt_sum_), K(insert_rt_sum_), K(total_wait_secs_), K(max_allowed_error_rows_), K(detected_error_rows_), - K(coordinator.received_rows_), K(coordinator.last_commit_segment_id_), - K(coordinator.status_), K(coordinator.trans_status_), - K(store.processed_rows_), K(store.last_commit_segment_id_), - K(store.status_), K(store.trans_status_), K(message_)); + K(coordinator_), K(store_), K(message_)); }; class ObGetAllJobStatusOp diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.cpp b/src/storage/direct_load/ob_direct_load_mem_dump.cpp index d31797f490..32c12ea864 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_dump.cpp @@ -26,6 +26,7 @@ using namespace common; using namespace blocksstable; using namespace table; using namespace sql; +using namespace observer; /** * Context @@ -76,10 +77,12 @@ int ObDirectLoadMemDump::Context::add_table(const ObTabletID &tablet_id, int64_t * ObDirectLoadMemDump */ -ObDirectLoadMemDump::ObDirectLoadMemDump(ObDirectLoadMemContext *mem_ctx, +ObDirectLoadMemDump::ObDirectLoadMemDump(ObTableLoadTableCtx *ctx, + ObDirectLoadMemContext *mem_ctx, const RangeType &range, ObTableLoadHandle context_ptr, int64_t range_idx) : allocator_("TLD_MemDump"), + ctx_(ctx), mem_ctx_(mem_ctx), range_(range), context_ptr_(context_ptr), @@ -285,6 +288,8 @@ int ObDirectLoadMemDump::dump_tables() } else { LOG_WARN("fail to append row", KR(ret), K(datum_row)); } + } else { + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_dump_rows_, 1); } } } @@ -382,6 +387,9 @@ int ObDirectLoadMemDump::compact_tables() LOG_WARN("fail to compact tablet tables", KR(ret)); } } + if (OB_SUCC(ret)) { + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_product_tmp_files_, keys.count()); + } return ret; } diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.h b/src/storage/direct_load/ob_direct_load_mem_dump.h index 0da9c4599d..28fcecefb0 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.h +++ b/src/storage/direct_load/ob_direct_load_mem_dump.h @@ -17,6 +17,7 @@ #include "storage/direct_load/ob_direct_load_mem_define.h" #include "storage/direct_load/ob_direct_load_multi_map.h" #include "storage/direct_load/ob_direct_load_sstable_builder.h" +#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -61,7 +62,8 @@ public: }; public: - ObDirectLoadMemDump(ObDirectLoadMemContext *mem_ctx, + ObDirectLoadMemDump(observer::ObTableLoadTableCtx *ctx, + ObDirectLoadMemContext *mem_ctx, const RangeType &range, table::ObTableLoadHandle context_ptr, int64_t range_idx); ~ObDirectLoadMemDump(); @@ -91,6 +93,7 @@ private: private: // data members ObArenaAllocator allocator_; + observer::ObTableLoadTableCtx *ctx_; ObDirectLoadMemContext *mem_ctx_; RangeType range_; table::ObTableLoadHandle context_ptr_; diff --git a/src/storage/direct_load/ob_direct_load_mem_loader.cpp b/src/storage/direct_load/ob_direct_load_mem_loader.cpp index d258099310..6d994d0e4b 100644 --- a/src/storage/direct_load/ob_direct_load_mem_loader.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_loader.cpp @@ -27,8 +27,8 @@ using namespace blocksstable; * ObDirectLoadMemLoader */ -ObDirectLoadMemLoader::ObDirectLoadMemLoader(ObDirectLoadMemContext *mem_ctx) - : mem_ctx_(mem_ctx) +ObDirectLoadMemLoader::ObDirectLoadMemLoader(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx) + : ctx_(ctx), mem_ctx_(mem_ctx) { } @@ -121,6 +121,7 @@ int ObDirectLoadMemLoader::work() LOG_WARN("fail to add item", KR(ret)); } else { external_row = nullptr; + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_load_rows_, 1); } } } diff --git a/src/storage/direct_load/ob_direct_load_mem_loader.h b/src/storage/direct_load/ob_direct_load_mem_loader.h index fa4f8055e1..fe69a0a513 100644 --- a/src/storage/direct_load/ob_direct_load_mem_loader.h +++ b/src/storage/direct_load/ob_direct_load_mem_loader.h @@ -15,6 +15,8 @@ #include "storage/direct_load/ob_direct_load_i_table.h" #include "storage/direct_load/ob_direct_load_mem_context.h" #include "storage/direct_load/ob_direct_load_mem_worker.h" +#include "observer/table_load/ob_table_load_service.h" +#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -27,7 +29,7 @@ class ObDirectLoadMemLoader : public ObDirectLoadMemWorker typedef ObDirectLoadExternalMultiPartitionRowChunk ChunkType; typedef ObDirectLoadExternalMultiPartitionRowCompare CompareType; public: - ObDirectLoadMemLoader(ObDirectLoadMemContext *mem_ctx); + ObDirectLoadMemLoader(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx); virtual ~ObDirectLoadMemLoader(); int add_table(ObIDirectLoadPartitionTable *table) override; int work() override; @@ -35,6 +37,7 @@ public: private: int close_chunk(ChunkType *&chunk); private: + observer::ObTableLoadTableCtx *ctx_; ObDirectLoadMemContext *mem_ctx_; ObDirectLoadExternalFragmentArray fragments_; }; diff --git a/src/storage/direct_load/ob_direct_load_mem_sample.cpp b/src/storage/direct_load/ob_direct_load_mem_sample.cpp index 961987e804..8500dc0287 100644 --- a/src/storage/direct_load/ob_direct_load_mem_sample.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_sample.cpp @@ -26,8 +26,8 @@ using namespace blocksstable; using namespace observer; using namespace table; -ObDirectLoadMemSample::ObDirectLoadMemSample(ObDirectLoadMemContext *mem_ctx) - : mem_ctx_(mem_ctx), range_count_(mem_ctx_->mem_dump_task_count_) {} +ObDirectLoadMemSample::ObDirectLoadMemSample(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx) + : ctx_(ctx), mem_ctx_(mem_ctx), range_count_(mem_ctx_->mem_dump_task_count_) {} int ObDirectLoadMemSample::gen_ranges(ObIArray &chunks, ObIArray &ranges) @@ -125,7 +125,7 @@ int ObDirectLoadMemSample::add_dump(int64_t idx, { int ret = OB_SUCCESS; storage::ObDirectLoadMemDump *mem_dump = OB_NEW( - ObDirectLoadMemDump, ObMemAttr(MTL_ID(), "TLD_mem_dump"), mem_ctx_, range, context_ptr, idx); + ObDirectLoadMemDump, ObMemAttr(MTL_ID(), "TLD_mem_dump"), ctx_, mem_ctx_, range, context_ptr, idx); if (mem_dump == nullptr) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate mem dump", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_mem_sample.h b/src/storage/direct_load/ob_direct_load_mem_sample.h index 01cb1a4516..267aafa203 100644 --- a/src/storage/direct_load/ob_direct_load_mem_sample.h +++ b/src/storage/direct_load/ob_direct_load_mem_sample.h @@ -17,6 +17,7 @@ #include #include "storage/direct_load/ob_direct_load_mem_dump.h" #include "storage/direct_load/ob_direct_load_mem_context.h" +#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -31,7 +32,7 @@ class ObDirectLoadMemSample typedef ObDirectLoadExternalMultiPartitionRowRange RangeType; typedef ObDirectLoadExternalMultiPartitionRowCompare CompareType; public: - ObDirectLoadMemSample(ObDirectLoadMemContext *mem_ctx); + ObDirectLoadMemSample(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx); virtual ~ObDirectLoadMemSample() {} int do_sample(); @@ -47,6 +48,7 @@ private: private: // data members + observer::ObTableLoadTableCtx *ctx_; ObDirectLoadMemContext *mem_ctx_; int64_t range_count_; }; diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index e2ddff8491..b45e06d885 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -81,7 +81,7 @@ bool ObDirectLoadMergeParam::is_valid() const */ ObDirectLoadMergeCtx::ObDirectLoadMergeCtx() - : allocator_("TLD_MergeCtx"), is_inited_(false) + : allocator_("TLD_MergeCtx"), ctx_(nullptr), is_inited_(false) { allocator_.set_tenant_id(MTL_ID()); tablet_merge_ctx_array_.set_tenant_id(MTL_ID()); @@ -97,7 +97,8 @@ ObDirectLoadMergeCtx::~ObDirectLoadMergeCtx() tablet_merge_ctx_array_.reset(); } -int ObDirectLoadMergeCtx::init(const ObDirectLoadMergeParam ¶m, +int ObDirectLoadMergeCtx::init(ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam ¶m, const ObIArray &ls_partition_ids, const ObIArray &target_ls_partition_ids) { @@ -105,13 +106,15 @@ int ObDirectLoadMergeCtx::init(const ObDirectLoadMergeParam ¶m, if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadMerger init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!param.is_valid() + } else if (OB_UNLIKELY(nullptr == ctx + || !param.is_valid() || ls_partition_ids.empty() || target_ls_partition_ids.empty() || (ls_partition_ids.count() != target_ls_partition_ids.count()))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(ls_partition_ids), K(target_ls_partition_ids)); } else { + ctx_ = ctx; param_ = param; if (OB_FAIL(create_all_tablet_ctxs(ls_partition_ids, target_ls_partition_ids))) { LOG_WARN("fail to create all tablet ctxs", KR(ret)); @@ -138,7 +141,7 @@ int ObDirectLoadMergeCtx::create_all_tablet_ctxs( if (OB_ISNULL(partition_ctx = OB_NEWx(ObDirectLoadTabletMergeCtx, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadTabletMergeCtx", KR(ret)); - } else if (OB_FAIL(partition_ctx->init(param_, ls_partition_id, target_ls_partition_id))) { + } else if (OB_FAIL(partition_ctx->init(ctx_, param_, ls_partition_id, target_ls_partition_id))) { LOG_WARN("fail to init tablet ctx", KR(ret), K(param_), K(ls_partition_id), K(target_ls_partition_id)); } else if (OB_FAIL(tablet_merge_ctx_array_.push_back(partition_ctx))) { LOG_WARN("fail to push back", KR(ret)); @@ -186,7 +189,8 @@ ObDirectLoadTabletMergeCtx::~ObDirectLoadTabletMergeCtx() rescan_task_array_.reset(); } -int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, +int ObDirectLoadTabletMergeCtx::init(ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam ¶m, const ObTableLoadLSIdAndPartitionId &ls_partition_id, const ObTableLoadLSIdAndPartitionId &target_ls_partition_id) @@ -195,7 +199,8 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadTabletMergeCtx init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!param.is_valid() + } else if (OB_UNLIKELY(nullptr == ctx + || !param.is_valid() || !ls_partition_id.is_valid() || !target_ls_partition_id.is_valid())) { ret = OB_INVALID_ARGUMENT; @@ -208,6 +213,7 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, if (OB_FAIL(origin_table_.init(origin_table_param))) { LOG_WARN("fail to init origin sstable", KR(ret)); } else { + ctx_ = ctx; param_ = param; target_partition_id_ = target_ls_partition_id.part_tablet_id_.partition_id_; tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_; @@ -449,7 +455,7 @@ int ObDirectLoadTabletMergeCtx::build_empty_data_merge_task(const ObIArrayinit(param_, this, &origin_table_, sstable_array_, range, i))) { + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range, i))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { LOG_WARN("fail to push back merge task", KR(ret)); @@ -494,7 +500,7 @@ int ObDirectLoadTabletMergeCtx::build_pk_table_merge_task( if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range, i))) { + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range, i))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { LOG_WARN("fail to push back merge task", KR(ret)); @@ -540,7 +546,7 @@ int ObDirectLoadTabletMergeCtx::build_pk_table_multiple_merge_task( OB_NEWx(ObDirectLoadPartitionRangeMultipleMergeTask, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionRangeMultipleMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_sstable_array_, + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, multiple_sstable_array_, range, i))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { @@ -587,7 +593,7 @@ int ObDirectLoadTabletMergeCtx::build_merge_task_for_multiple_pk_table( OB_NEWx(ObDirectLoadPartitionRangeMultipleMergeTask, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionRangeMultipleMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_sstable_array_, + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, multiple_sstable_array_, range, i))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { @@ -631,7 +637,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_merge_task( if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range, + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range, parallel_idx++))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { @@ -659,7 +665,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_merge_task( OB_NEWx(ObDirectLoadPartitionHeapTableMergeTask, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, external_table, pk_interval, + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, external_table, pk_interval, parallel_idx++))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { @@ -705,7 +711,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task( if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range, + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range, parallel_idx++))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { @@ -735,7 +741,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task( (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, heap_table, pk_interval, parallel_idx++))) { + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, heap_table, pk_interval, parallel_idx++))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { LOG_WARN("fail to push back merge task", KR(ret)); @@ -778,7 +784,7 @@ int ObDirectLoadTabletMergeCtx::build_aggregate_merge_task_for_multiple_heap_tab (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask", KR(ret)); - } else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_heap_table_array_, + } else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, multiple_heap_table_array_, pk_interval))) { LOG_WARN("fail to init merge task", KR(ret)); } else if (OB_FAIL(task_array_.push_back(merge_task))) { diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index 6249c3d44a..9fde5b8a2b 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -24,6 +24,7 @@ #include "storage/direct_load/ob_direct_load_origin_table.h" #include "storage/direct_load/ob_direct_load_table_data_desc.h" #include "storage/direct_load/ob_direct_load_fast_heap_table.h" +#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -81,7 +82,8 @@ class ObDirectLoadMergeCtx public: ObDirectLoadMergeCtx(); ~ObDirectLoadMergeCtx(); - int init(const ObDirectLoadMergeParam ¶m, + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam ¶m, const common::ObIArray &ls_partition_ids, const common::ObIArray &target_ls_partition_ids); const common::ObIArray &get_tablet_merge_ctxs() const @@ -93,6 +95,7 @@ private: const common::ObIArray &target_ls_partition_ids); private: common::ObArenaAllocator allocator_; + observer::ObTableLoadTableCtx *ctx_; ObDirectLoadMergeParam param_; common::ObArray tablet_merge_ctx_array_; bool is_inited_; @@ -103,8 +106,10 @@ class ObDirectLoadTabletMergeCtx public: ObDirectLoadTabletMergeCtx(); ~ObDirectLoadTabletMergeCtx(); - int init(const ObDirectLoadMergeParam ¶m, const table::ObTableLoadLSIdAndPartitionId &ls_partition_id, - const table::ObTableLoadLSIdAndPartitionId &target_ls_partition_id); + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam ¶m, + const table::ObTableLoadLSIdAndPartitionId &ls_partition_id, + const table::ObTableLoadLSIdAndPartitionId &target_ls_partition_id); int build_rescan_task(int64_t thread_count); int build_merge_task(const common::ObIArray &table_array, const common::ObIArray &col_descs, @@ -159,6 +164,7 @@ private: int get_autoincrement_value(uint64_t count, share::ObTabletCacheInterval &interval); private: common::ObArenaAllocator allocator_; + observer::ObTableLoadTableCtx *ctx_; ObDirectLoadMergeParam param_; uint64_t target_partition_id_; common::ObTabletID tablet_id_; diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp index 422bf24f03..aea2b1ca78 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp @@ -27,7 +27,8 @@ using namespace blocksstable; ObDirectLoadMultipleHeapTableSorter::ObDirectLoadMultipleHeapTableSorter( ObDirectLoadMemContext *mem_ctx) - : mem_ctx_(mem_ctx), + : ctx_(nullptr), + mem_ctx_(mem_ctx), allocator_("TLD_Sorter"), extra_buf_(nullptr), index_dir_id_(-1), @@ -149,6 +150,8 @@ int ObDirectLoadMultipleHeapTableSorter::get_tables( LOG_WARN("unexpected table", KR(ret), KPC(table)); } else if (OB_FAIL(heap_table_array_->push_back(heap_table))) { LOG_WARN("fail to push back heap table", KR(ret)); + } else { + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_product_tmp_files_, 1); } } return ret; @@ -218,6 +221,7 @@ int ObDirectLoadMultipleHeapTableSorter::work() LOG_WARN("fail to add item", KR(ret)); } else { row = nullptr; + ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_load_rows_, 1); } } } diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.h b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.h index 5ca7d8e60f..cf3d700f0a 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.h +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.h @@ -16,6 +16,7 @@ #include "storage/direct_load/ob_direct_load_i_table.h" #include "storage/direct_load/ob_direct_load_mem_context.h" #include "storage/direct_load/ob_direct_load_mem_worker.h" +#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -32,10 +33,12 @@ public: int init(); int add_table(ObIDirectLoadPartitionTable *table) override; - void set_work_param(int64_t index_dir_id, int64_t data_dir_id, + void set_work_param(observer::ObTableLoadTableCtx *ctx, + int64_t index_dir_id, int64_t data_dir_id, common::ObIArray &heap_table_array, common::ObIAllocator &heap_table_allocator) { + ctx_ = ctx; index_dir_id_ = index_dir_id; data_dir_id_ = data_dir_id; heap_table_array_ = &heap_table_array; @@ -52,6 +55,7 @@ private: private: // data members + observer::ObTableLoadTableCtx *ctx_; ObDirectLoadMemContext *mem_ctx_; ObDirectLoadExternalFragmentArray fragments_; ObArenaAllocator allocator_; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 7d395b84c3..80814dea51 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -28,13 +28,15 @@ namespace storage using namespace common; using namespace blocksstable; using namespace share; +using namespace observer; /** * ObDirectLoadPartitionMergeTask */ ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask() - : merge_param_(nullptr), + : ctx_(nullptr), + merge_param_(nullptr), merge_ctx_(nullptr), parallel_idx_(-1), affected_rows_(0), @@ -57,6 +59,27 @@ ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask() } } +class ObStoreRowIteratorWrapper : public ObIStoreRowIterator +{ +public: + ObStoreRowIteratorWrapper(observer::ObTableLoadTableCtx *ctx, ObIStoreRowIterator *inner_iter) : + ctx_(ctx), inner_iter_(inner_iter) { + } + + int get_next_row(const blocksstable::ObDatumRow *&row) + { + int ret = inner_iter_->get_next_row(row); + if (ret == OB_SUCCESS) { + ATOMIC_AAF(&ctx_->job_stat_->store_.merge_stage_write_rows_, 1); + } + return ret; + } + +private: + observer::ObTableLoadTableCtx *ctx_; + ObIStoreRowIterator *inner_iter_; +}; + int ObDirectLoadPartitionMergeTask::process() { int ret = OB_SUCCESS; @@ -87,10 +110,11 @@ int ObDirectLoadPartitionMergeTask::process() K(block_start_seq)); } else { LOG_INFO("add sstable slice begin", KP(tablet_ctx), K(slice_id)); + ObStoreRowIteratorWrapper row_iter_wrapper(ctx_, row_iter); if (OB_UNLIKELY(is_stop_)) { ret = OB_CANCELED; LOG_WARN("merge task canceled", KR(ret)); - } else if (OB_FAIL(tablet_ctx->fill_sstable_slice(slice_id, *row_iter, affected_rows_))) { + } else if (OB_FAIL(tablet_ctx->fill_sstable_slice(slice_id, row_iter_wrapper, affected_rows_))) { LOG_WARN("fail to fill sstable slice", KR(ret)); } else if (OB_FAIL(tablet_ctx->close_sstable_slice(slice_id))) { LOG_WARN("fail to close writer", KR(ret)); @@ -290,7 +314,8 @@ ObDirectLoadPartitionRangeMergeTask::~ObDirectLoadPartitionRangeMergeTask() { } -int ObDirectLoadPartitionRangeMergeTask::init(const ObDirectLoadMergeParam &merge_param, +int ObDirectLoadPartitionRangeMergeTask::init(ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadOriginTable *origin_table, const ObIArray &sstable_array, @@ -301,12 +326,13 @@ int ObDirectLoadPartitionRangeMergeTask::init(const ObDirectLoadMergeParam &merg if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadPartitionRangeMergeTask init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx || + } else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx || nullptr == origin_table || !range.is_valid() || parallel_idx < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), K(sstable_array), K(range), K(parallel_idx)); } else { + ctx_ = ctx; merge_param_ = &merge_param; merge_ctx_ = merge_ctx; parallel_idx_ = parallel_idx; @@ -464,6 +490,7 @@ ObDirectLoadPartitionRangeMultipleMergeTask::~ObDirectLoadPartitionRangeMultiple } int ObDirectLoadPartitionRangeMultipleMergeTask::init( + ObTableLoadTableCtx *ctx, const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadOriginTable *origin_table, @@ -475,12 +502,13 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::init( if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadPartitionRangeMultipleMergeTask init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx || + } else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx || nullptr == origin_table || !range.is_valid() || parallel_idx < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), K(sstable_array), K(range), K(parallel_idx)); } else { + ctx_ = ctx; merge_param_ = &merge_param; merge_ctx_ = merge_ctx; parallel_idx_ = parallel_idx; @@ -643,7 +671,8 @@ ObDirectLoadPartitionHeapTableMergeTask::~ObDirectLoadPartitionHeapTableMergeTas { } -int ObDirectLoadPartitionHeapTableMergeTask::init(const ObDirectLoadMergeParam &merge_param, +int ObDirectLoadPartitionHeapTableMergeTask::init(ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadExternalTable *external_table, const ObTabletCacheInterval &pk_interval, @@ -653,13 +682,14 @@ int ObDirectLoadPartitionHeapTableMergeTask::init(const ObDirectLoadMergeParam & if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadPartitionHeapTableMergeTask init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx || + } else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx || nullptr == external_table || parallel_idx < 0 || 0 == pk_interval.count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KP(external_table), K(parallel_idx), K(pk_interval)); } else { + ctx_ = ctx; merge_param_ = &merge_param; merge_ctx_ = merge_ctx; parallel_idx_ = parallel_idx; @@ -820,6 +850,7 @@ ObDirectLoadPartitionHeapTableMultipleMergeTask::~ObDirectLoadPartitionHeapTable } int ObDirectLoadPartitionHeapTableMultipleMergeTask::init( + ObTableLoadTableCtx *ctx, const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadMultipleHeapTable *heap_table, @@ -830,12 +861,14 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::init( if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadPartitionHeapTableMultipleMergeTask init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx || nullptr == heap_table || + } else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || + nullptr == merge_ctx || nullptr == heap_table || parallel_idx < 0 || 0 == pk_interval.count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KPC(heap_table), K(parallel_idx), K(pk_interval)); } else { + ctx_ = ctx; merge_param_ = &merge_param; merge_ctx_ = merge_ctx; parallel_idx_ = parallel_idx; @@ -1076,7 +1109,9 @@ ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask:: } int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::init( - const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, + ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, + ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadOriginTable *origin_table, const ObIArray &heap_table_array, const ObTabletCacheInterval &pk_interval) @@ -1085,12 +1120,13 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::init( if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadPartitionHeapTableMultipleMergeTask init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx || + } else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx || nullptr == origin_table || heap_table_array.empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KP(origin_table), K(heap_table_array)); } else { + ctx_ = ctx; merge_param_ = &merge_param; merge_ctx_ = merge_ctx; parallel_idx_ = 0; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.h b/src/storage/direct_load/ob_direct_load_partition_merge_task.h index b3e5118bee..35b01c1e41 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.h +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.h @@ -19,6 +19,7 @@ #include "storage/direct_load/ob_direct_load_multiple_heap_table_scanner.h" #include "storage/direct_load/ob_direct_load_insert_table_row_iterator.h" #include "sql/engine/expr/ob_expr_sys_op_opnsize.h" +#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -57,6 +58,7 @@ private: int init_sql_statistics(); int init_lob_builder(); protected: + observer::ObTableLoadTableCtx *ctx_; const ObDirectLoadMergeParam *merge_param_; ObDirectLoadTabletMergeCtx *merge_ctx_; int64_t parallel_idx_; @@ -73,7 +75,8 @@ class ObDirectLoadPartitionRangeMergeTask : public ObDirectLoadPartitionMergeTas public: ObDirectLoadPartitionRangeMergeTask(); virtual ~ObDirectLoadPartitionRangeMergeTask(); - int init(const ObDirectLoadMergeParam &merge_param, + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadOriginTable *origin_table, const common::ObIArray &sstable_array, @@ -112,7 +115,8 @@ class ObDirectLoadPartitionRangeMultipleMergeTask : public ObDirectLoadPartition public: ObDirectLoadPartitionRangeMultipleMergeTask(); virtual ~ObDirectLoadPartitionRangeMultipleMergeTask(); - int init(const ObDirectLoadMergeParam &merge_param, + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadOriginTable *origin_table, const common::ObIArray &sstable_array, @@ -151,7 +155,8 @@ class ObDirectLoadPartitionHeapTableMergeTask : public ObDirectLoadPartitionMerg public: ObDirectLoadPartitionHeapTableMergeTask(); virtual ~ObDirectLoadPartitionHeapTableMergeTask(); - int init(const ObDirectLoadMergeParam &merge_param, + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadExternalTable *external_table, const share::ObTabletCacheInterval &pk_interval, @@ -190,7 +195,8 @@ class ObDirectLoadPartitionHeapTableMultipleMergeTask : public ObDirectLoadParti public: ObDirectLoadPartitionHeapTableMultipleMergeTask(); virtual ~ObDirectLoadPartitionHeapTableMultipleMergeTask(); - int init(const ObDirectLoadMergeParam &merge_param, + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadMultipleHeapTable *heap_table, const share::ObTabletCacheInterval &pk_interval, @@ -230,7 +236,8 @@ class ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask public: ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask(); virtual ~ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask(); - int init(const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, + int init(observer::ObTableLoadTableCtx *ctx, + const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx, ObDirectLoadOriginTable *origin_table, const common::ObIArray &heap_table_array, const share::ObTabletCacheInterval &pk_interval);