display of merge progress

This commit is contained in:
coolfishchen
2024-04-07 07:24:50 +00:00
committed by ob-robot
parent 784d2231f6
commit cac0a7d1e6
26 changed files with 226 additions and 103 deletions

View File

@ -249,7 +249,7 @@ int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status)
// advance status
else {
status_ = status;
table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_);
table_load_status_to_string(status_, ctx_->job_stat_->coordinator_.status_);
LOG_INFO("LOAD DATA COORDINATOR advance status", K(status));
}
}
@ -269,7 +269,7 @@ int ObTableLoadCoordinatorCtx::set_status_error(int error_code)
} else {
status_ = ObTableLoadStatusType::ERROR;
error_code_ = error_code;
table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_);
table_load_status_to_string(status_, ctx_->job_stat_->coordinator_.status_);
LOG_INFO("LOAD DATA COORDINATOR status error", KR(error_code));
}
}
@ -284,7 +284,7 @@ int ObTableLoadCoordinatorCtx::set_status_abort()
LOG_INFO("LOAD DATA COORDINATOR already abort");
} else {
status_ = ObTableLoadStatusType::ABORT;
table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_);
table_load_status_to_string(status_, ctx_->job_stat_->coordinator_.status_);
LOG_INFO("LOAD DATA COORDINATOR status abort");
}
return ret;

View File

@ -74,7 +74,7 @@ int ObTableLoadCoordinatorTrans::advance_trans_status(ObTableLoadTransStatusType
LOG_WARN("fail to advance trans status", KR(ret), K(trans_status));
} else {
table_load_trans_status_to_string(trans_status,
trans_ctx_->ctx_->job_stat_->coordinator.trans_status_);
trans_ctx_->ctx_->job_stat_->coordinator_.trans_status_);
}
return ret;
}
@ -86,7 +86,7 @@ int ObTableLoadCoordinatorTrans::set_trans_status_error(int error_code)
LOG_WARN("fail to set trans status error", KR(ret));
} else {
table_load_trans_status_to_string(ObTableLoadTransStatusType::ERROR,
trans_ctx_->ctx_->job_stat_->coordinator.trans_status_);
trans_ctx_->ctx_->job_stat_->coordinator_.trans_status_);
}
return ret;
}
@ -98,7 +98,7 @@ int ObTableLoadCoordinatorTrans::set_trans_status_abort()
LOG_WARN("fail to set trans status abort", KR(ret));
} else {
table_load_trans_status_to_string(ObTableLoadTransStatusType::ABORT,
trans_ctx_->ctx_->job_stat_->coordinator.trans_status_);
trans_ctx_->ctx_->job_stat_->coordinator_.trans_status_);
}
return ret;
}

View File

@ -51,7 +51,7 @@ public:
int process() override
{
int ret = OB_SUCCESS;
storage::ObDirectLoadMemSample sample(mem_ctx_);
storage::ObDirectLoadMemSample sample(ctx_, mem_ctx_);
if (OB_FAIL(sample.do_sample())) {
LOG_WARN("fail to do sample", KR(ret));
}
@ -378,7 +378,7 @@ int ObTableLoadMemCompactor::create_mem_loader(ObDirectLoadMemLoader *&mem_loade
int ret = OB_SUCCESS;
mem_loader = nullptr;
if (OB_ISNULL(mem_loader =
OB_NEWx(ObDirectLoadMemLoader, (&allocator_), &mem_ctx_))) {
OB_NEWx(ObDirectLoadMemLoader, (&allocator_), store_ctx_->ctx_, &mem_ctx_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadMemLoader", KR(ret));
}

View File

@ -286,7 +286,7 @@ int ObTableLoadMerger::build_merge_ctx()
merge_param.px_mode_ = param_.px_mode_;
merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_;
merge_param.dml_row_handler_ = store_ctx_->error_row_handler_;
if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_,
if (OB_FAIL(merge_ctx_.init(store_ctx_->ctx_, merge_param, store_ctx_->ls_partition_ids_,
store_ctx_->target_ls_partition_ids_))) {
LOG_WARN("fail to init merge ctx", KR(ret));
} else if (store_ctx_->is_multiple_mode_) {

View File

@ -118,7 +118,7 @@ public:
if (OB_ISNULL(sorter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected worker", KR(ret), KPC(worker));
} else if (FALSE_IT(sorter->set_work_param(index_dir_id_, data_dir_id_, heap_table_array_,
} else if (FALSE_IT(sorter->set_work_param(ctx_, index_dir_id_, data_dir_id_, heap_table_array_,
heap_table_allocator_))) {
} else if (OB_FAIL(sorter->work())) {
LOG_WARN("fail to compact", KR(ret));
@ -157,6 +157,7 @@ private:
} else {
heap_table_compactor.reuse();
std::swap(curr_round, next_round);
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, mem_ctx_->table_data_desc_.merge_count_per_round_ - 1);
}
}
if (OB_SUCC(ret)) {
@ -173,6 +174,8 @@ private:
LOG_WARN("fail to do compact", KR(ret));
} else if (OB_FAIL(mem_ctx_->add_tables_from_table_compactor(heap_table_compactor))) {
LOG_WARN("fail to add table from table compactor", KR(ret));
} else {
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, curr_round->count());
}
}
}

View File

@ -324,6 +324,8 @@ public:
}
} else if (OB_FAIL(sstable_builder_.append_row(*datum_row))) {
LOG_WARN("fail to append row", KR(ret));
} else {
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_merge_write_rows_, 1);
}
}
if (OB_SUCC(ret)) {
@ -704,6 +706,8 @@ int ObTableLoadParallelMergeCtx::start(ObTableLoadParallelMergeCb *cb)
if (OB_FAIL(construct_split_range_task(tablet_ctx))) {
LOG_WARN("fail to construct split range task", KR(ret));
}
} else {
ATOMIC_AAF(&store_ctx_->ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, tablet_ctx->sstables_.size());
}
}
if (OB_SUCC(ret)) {
@ -898,9 +902,12 @@ int ObTableLoadParallelMergeCtx::handle_tablet_compact_sstable_finish(
LOG_WARN("invalid args", KR(ret), KP(tablet_ctx));
} else if (tablet_ctx->sstables_.size() > store_ctx_->table_data_desc_.merge_count_per_round_) {
// still need merge
ATOMIC_AAF(&store_ctx_->ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, tablet_ctx->merge_sstable_count_ - 1);
if (OB_FAIL(construct_split_range_task(tablet_ctx))) {
LOG_WARN("fail to construct split range task", KR(ret));
}
} else {
ATOMIC_AAF(&store_ctx_->ctx_->job_stat_->store_.compact_stage_consume_tmp_files_, tablet_ctx->merge_sstable_count_);
}
return ret;
}

View File

@ -328,7 +328,7 @@ int ObTableLoadStoreCtx::advance_status(ObTableLoadStatusType status)
// advance status
else {
status_ = status;
table_load_status_to_string(status_, ctx_->job_stat_->store.status_);
table_load_status_to_string(status_, ctx_->job_stat_->store_.status_);
LOG_INFO("LOAD DATA STORE advance status", K(status));
}
}
@ -348,7 +348,7 @@ int ObTableLoadStoreCtx::set_status_error(int error_code)
} else {
status_ = ObTableLoadStatusType::ERROR;
error_code_ = error_code;
table_load_status_to_string(status_, ctx_->job_stat_->store.status_);
table_load_status_to_string(status_, ctx_->job_stat_->store_.status_);
LOG_INFO("LOAD DATA STORE status error", KR(error_code));
}
}
@ -363,7 +363,7 @@ int ObTableLoadStoreCtx::set_status_abort()
LOG_INFO("LOAD DATA STORE already abort");
} else {
status_ = ObTableLoadStatusType::ABORT;
table_load_status_to_string(status_, ctx_->job_stat_->store.status_);
table_load_status_to_string(status_, ctx_->job_stat_->store_.status_);
LOG_INFO("LOAD DATA STORE status abort");
}
return ret;

View File

@ -85,7 +85,7 @@ int ObTableLoadStoreTrans::advance_trans_status(ObTableLoadTransStatusType trans
LOG_WARN("fail to advance trans status", KR(ret), K(trans_status));
} else {
table_load_trans_status_to_string(trans_status,
trans_ctx_->ctx_->job_stat_->store.trans_status_);
trans_ctx_->ctx_->job_stat_->store_.trans_status_);
}
return ret;
}
@ -97,7 +97,7 @@ int ObTableLoadStoreTrans::set_trans_status_error(int error_code)
LOG_WARN("fail to set trans status error", KR(ret));
} else {
table_load_trans_status_to_string(ObTableLoadTransStatusType::ERROR,
trans_ctx_->ctx_->job_stat_->store.trans_status_);
trans_ctx_->ctx_->job_stat_->store_.trans_status_);
}
return ret;
}
@ -109,7 +109,7 @@ int ObTableLoadStoreTrans::set_trans_status_abort()
LOG_WARN("fail to set trans status abort", KR(ret));
} else {
table_load_trans_status_to_string(ObTableLoadTransStatusType::ABORT,
trans_ctx_->ctx_->job_stat_->store.trans_status_);
trans_ctx_->ctx_->job_stat_->store_.trans_status_);
}
return ret;
}

View File

@ -107,14 +107,6 @@ int ObTableLoadTableCtx::register_job_stat()
job_stat->start_time_ = ObTimeUtil::current_time();
job_stat->max_allowed_error_rows_ = param_.max_error_row_count_;
job_stat->detected_error_rows_ = 0;
job_stat->coordinator.received_rows_ = 0;
job_stat->coordinator.last_commit_segment_id_ = 0;
job_stat->coordinator.status_ = "none";
job_stat->coordinator.trans_status_ = "none";
job_stat->store.processed_rows_ = 0;
job_stat->store.last_commit_segment_id_ = 0;
job_stat->store.status_ = "none";
job_stat->store.trans_status_ = "none";
job_stat->allocator_.set_tenant_id(param_.tenant_id_);
if (OB_FAIL(ObTableLoadUtils::deep_copy(schema_.table_name_, job_stat->table_name_,
job_stat->allocator_))) {

View File

@ -201,7 +201,7 @@ int ObTableLoadTransBucketWriter::write(int32_t session_id, ObTableLoadObjRowArr
if (OB_SUCC(ret)) {
int64_t row_cnt = obj_rows.count();
ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->coordinator.received_rows_, row_cnt);
ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->coordinator_.received_rows_, row_cnt);
ATOMIC_AAF(&trans_ctx_->ctx_->coordinator_ctx_->result_info_.records_, row_cnt);
}
}

View File

@ -308,7 +308,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id,
}
}
if (OB_SUCC(ret)) {
ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store.processed_rows_, row_array.count());
ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store_.processed_rows_, row_array.count());
}
session_ctx.cast_allocator_.reuse();
}
@ -345,7 +345,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id,
}
}
if (OB_SUCC(ret)) {
ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store.processed_rows_, row_array.count());
ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store_.processed_rows_, row_array.count());
}
}
return ret;

View File

@ -183,44 +183,58 @@ int ObAllVirtualLoadDataStat::inner_get_next_row(ObNewRow *&row)
break;
}
case COORDINATOR_RECEIVED_ROWS: {
cells[i].set_int(job_status->coordinator.received_rows_);
cells[i].set_int(job_status->coordinator_.received_rows_);
break;
}
case COORDINATOR_LAST_COMMIT_SEGMENT_ID: {
cells[i].set_int(job_status->coordinator.last_commit_segment_id_);
cells[i].set_int(job_status->coordinator_.last_commit_segment_id_);
break;
}
case COORDINATOR_STATUS: {
cells[i].set_varchar(job_status->coordinator.status_);
cells[i].set_varchar(job_status->coordinator_.status_);
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
break;
}
case COORDINATOR_TRANS_STATUS: {
cells[i].set_varchar(job_status->coordinator.trans_status_);
cells[i].set_varchar(job_status->coordinator_.trans_status_);
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
break;
}
case STORE_PROCESSED_ROWS: {
cells[i].set_int(job_status->store.processed_rows_);
cells[i].set_int(job_status->store_.processed_rows_);
break;
}
case STORE_LAST_COMMIT_SEGMENT_ID: {
cells[i].set_int(job_status->store.last_commit_segment_id_);
cells[i].set_int(job_status->store_.last_commit_segment_id_);
break;
}
case STORE_STATUS: {
cells[i].set_varchar(job_status->store.status_);
cells[i].set_varchar(job_status->store_.status_);
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
break;
}
case STORE_TRANS_STATUS: {
cells[i].set_varchar(job_status->store.trans_status_);
cells[i].set_varchar(job_status->store_.trans_status_);
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
break;
}
case MESSAGE: {
int64_t pos = 0;
if (OB_FAIL(databuff_printf(job_status->message_, common::MAX_LOAD_DATA_MESSAGE_LENGTH, pos,
"COMPACT_STAGE_LOAD_ROWS: %ld, COMPACT_STAGE_DUMP_ROWS: %ld, "
"COMPACT_STAGE_PRODUCT_TMP_FILES: %ld, COMPACT_STAGE_CONSUME_TMP_FILES: %ld, "
"COMPACT_STAGE_MERGE_WRITE_ROWS: %ld, MERGE_STAGE_WRITE_ROWS: %ld",
job_status->store_.compact_stage_load_rows_,
job_status->store_.compact_stage_dump_rows_,
job_status->store_.compact_stage_product_tmp_files_,
job_status->store_.compact_stage_consume_tmp_files_,
job_status->store_.compact_stage_merge_write_rows_,
job_status->store_.merge_stage_write_rows_))) {
SERVER_LOG(WARN, "fail to fill message_", K(ret));
} else {
cells[i].set_varchar(job_status->message_);
cells[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
}
break;
}
default: {

View File

@ -1359,9 +1359,9 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value)
job_stat.parallel_,
job_stat.max_allowed_error_rows_,
job_stat.detected_error_rows_,
job_stat.coordinator.received_rows_,
job_stat.coordinator.status_.length(),
job_stat.coordinator.status_.ptr());
job_stat.coordinator_.received_rows_,
job_stat.coordinator_.status_.length(),
job_stat.coordinator_.status_.ptr());
}
}
@ -1469,10 +1469,10 @@ int ObTableRedefinitionTask::get_direct_load_job_stat(common::ObArenaAllocator &
EXTRACT_INT_FIELD_MYSQL(*select_result, "PARALLEL", job_stat.parallel_, int64_t);
EXTRACT_INT_FIELD_MYSQL(*select_result, "MAX_ALLOWED_ERROR_ROWS", job_stat.max_allowed_error_rows_, int64_t);
EXTRACT_INT_FIELD_MYSQL(*select_result, "DETECTED_ERROR_ROWS", job_stat.detected_error_rows_, int64_t);
EXTRACT_INT_FIELD_MYSQL(*select_result, "COORDINATOR_RECEIVED_ROWS", job_stat.coordinator.received_rows_, int64_t);
EXTRACT_INT_FIELD_MYSQL(*select_result, "COORDINATOR_RECEIVED_ROWS", job_stat.coordinator_.received_rows_, int64_t);
EXTRACT_VARCHAR_FIELD_MYSQL(*select_result, "COORDINATOR_STATUS", load_status);
if (OB_SUCC(ret)
&& OB_FAIL(ob_write_string(allocator, load_status, job_stat.coordinator.status_))) {
&& OB_FAIL(ob_write_string(allocator, load_status, job_stat.coordinator_.status_))) {
LOG_WARN("failed to write string", KR(ret));
}
}

View File

@ -331,6 +331,8 @@ struct ObLoadDataStat
total_wait_secs_(0),
max_allowed_error_rows_(0),
detected_error_rows_(0),
coordinator_(),
store_(),
message_() {}
int64_t aquire() {
return ATOMIC_AAF(&ref_cnt_, 1);
@ -365,19 +367,47 @@ struct ObLoadDataStat
int64_t total_wait_secs_;
int64_t max_allowed_error_rows_;
int64_t detected_error_rows_;
struct {
volatile int64_t received_rows_ = 0; // received from client
int64_t last_commit_segment_id_ = 0;
common::ObString status_ = "none"; // none / inited / loading / frozen / merging / commit / error / abort
common::ObString trans_status_ = "none"; // none / inited / running / frozen / commit / error / abort
} coordinator;
struct {
volatile int64_t processed_rows_ = 0;
int64_t last_commit_segment_id_ = 0;
common::ObString status_ = "none";
common::ObString trans_status_ = "none";
} store;
struct coordinator {
coordinator()
: received_rows_(0),
last_commit_segment_id_(0),
status_("none"),
trans_status_("none")
{}
volatile int64_t received_rows_; // received from client
int64_t last_commit_segment_id_;
common::ObString status_; // none / inited / loading / frozen / merging / commit / error / abort
common::ObString trans_status_; // none / inited / running / frozen / commit / error / abort
TO_STRING_KV(K(received_rows_), K(last_commit_segment_id_), K(status_), K(trans_status_));
} coordinator_;
struct store {
store()
: processed_rows_(0),
last_commit_segment_id_(0),
status_("none"),
trans_status_("none"),
compact_stage_load_rows_(0),
compact_stage_dump_rows_(0),
compact_stage_product_tmp_files_(0),
compact_stage_consume_tmp_files_(0),
compact_stage_merge_write_rows_(0),
merge_stage_write_rows_(0)
{}
volatile int64_t processed_rows_;
int64_t last_commit_segment_id_;
common::ObString status_;
common::ObString trans_status_;
int64_t compact_stage_load_rows_ CACHE_ALIGNED;
int64_t compact_stage_dump_rows_ CACHE_ALIGNED;
int64_t compact_stage_product_tmp_files_ CACHE_ALIGNED;
int64_t compact_stage_consume_tmp_files_ CACHE_ALIGNED;
int64_t compact_stage_merge_write_rows_ CACHE_ALIGNED;
int64_t merge_stage_write_rows_ CACHE_ALIGNED;
TO_STRING_KV(K(processed_rows_), K(last_commit_segment_id_), K(status_), K(trans_status_),
K(compact_stage_load_rows_), K(compact_stage_dump_rows_),
K(compact_stage_product_tmp_files_), K(compact_stage_consume_tmp_files_),
K(compact_stage_merge_write_rows_), K(merge_stage_write_rows_));
} store_;
char message_[common::MAX_LOAD_DATA_MESSAGE_LENGTH];
TO_STRING_KV(K(tenant_id_), K(job_id_), K(job_type_),
@ -388,10 +418,7 @@ struct ObLoadDataStat
K(parsed_rows_), K(total_shuffle_task_), K(total_insert_task_),
K(shuffle_rt_sum_), K(insert_rt_sum_), K(total_wait_secs_),
K(max_allowed_error_rows_), K(detected_error_rows_),
K(coordinator.received_rows_), K(coordinator.last_commit_segment_id_),
K(coordinator.status_), K(coordinator.trans_status_),
K(store.processed_rows_), K(store.last_commit_segment_id_),
K(store.status_), K(store.trans_status_), K(message_));
K(coordinator_), K(store_), K(message_));
};
class ObGetAllJobStatusOp

View File

@ -26,6 +26,7 @@ using namespace common;
using namespace blocksstable;
using namespace table;
using namespace sql;
using namespace observer;
/**
* Context
@ -76,10 +77,12 @@ int ObDirectLoadMemDump::Context::add_table(const ObTabletID &tablet_id, int64_t
* ObDirectLoadMemDump
*/
ObDirectLoadMemDump::ObDirectLoadMemDump(ObDirectLoadMemContext *mem_ctx,
ObDirectLoadMemDump::ObDirectLoadMemDump(ObTableLoadTableCtx *ctx,
ObDirectLoadMemContext *mem_ctx,
const RangeType &range,
ObTableLoadHandle<Context> context_ptr, int64_t range_idx)
: allocator_("TLD_MemDump"),
ctx_(ctx),
mem_ctx_(mem_ctx),
range_(range),
context_ptr_(context_ptr),
@ -285,6 +288,8 @@ int ObDirectLoadMemDump::dump_tables()
} else {
LOG_WARN("fail to append row", KR(ret), K(datum_row));
}
} else {
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_dump_rows_, 1);
}
}
}
@ -382,6 +387,9 @@ int ObDirectLoadMemDump::compact_tables()
LOG_WARN("fail to compact tablet tables", KR(ret));
}
}
if (OB_SUCC(ret)) {
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_product_tmp_files_, keys.count());
}
return ret;
}

View File

@ -17,6 +17,7 @@
#include "storage/direct_load/ob_direct_load_mem_define.h"
#include "storage/direct_load/ob_direct_load_multi_map.h"
#include "storage/direct_load/ob_direct_load_sstable_builder.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
@ -61,7 +62,8 @@ public:
};
public:
ObDirectLoadMemDump(ObDirectLoadMemContext *mem_ctx,
ObDirectLoadMemDump(observer::ObTableLoadTableCtx *ctx,
ObDirectLoadMemContext *mem_ctx,
const RangeType &range,
table::ObTableLoadHandle<Context> context_ptr, int64_t range_idx);
~ObDirectLoadMemDump();
@ -91,6 +93,7 @@ private:
private:
// data members
ObArenaAllocator allocator_;
observer::ObTableLoadTableCtx *ctx_;
ObDirectLoadMemContext *mem_ctx_;
RangeType range_;
table::ObTableLoadHandle<Context> context_ptr_;

View File

@ -27,8 +27,8 @@ using namespace blocksstable;
* ObDirectLoadMemLoader
*/
ObDirectLoadMemLoader::ObDirectLoadMemLoader(ObDirectLoadMemContext *mem_ctx)
: mem_ctx_(mem_ctx)
ObDirectLoadMemLoader::ObDirectLoadMemLoader(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx)
: ctx_(ctx), mem_ctx_(mem_ctx)
{
}
@ -121,6 +121,7 @@ int ObDirectLoadMemLoader::work()
LOG_WARN("fail to add item", KR(ret));
} else {
external_row = nullptr;
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_load_rows_, 1);
}
}
}

View File

@ -15,6 +15,8 @@
#include "storage/direct_load/ob_direct_load_i_table.h"
#include "storage/direct_load/ob_direct_load_mem_context.h"
#include "storage/direct_load/ob_direct_load_mem_worker.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
@ -27,7 +29,7 @@ class ObDirectLoadMemLoader : public ObDirectLoadMemWorker
typedef ObDirectLoadExternalMultiPartitionRowChunk ChunkType;
typedef ObDirectLoadExternalMultiPartitionRowCompare CompareType;
public:
ObDirectLoadMemLoader(ObDirectLoadMemContext *mem_ctx);
ObDirectLoadMemLoader(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx);
virtual ~ObDirectLoadMemLoader();
int add_table(ObIDirectLoadPartitionTable *table) override;
int work() override;
@ -35,6 +37,7 @@ public:
private:
int close_chunk(ChunkType *&chunk);
private:
observer::ObTableLoadTableCtx *ctx_;
ObDirectLoadMemContext *mem_ctx_;
ObDirectLoadExternalFragmentArray fragments_;
};

View File

@ -26,8 +26,8 @@ using namespace blocksstable;
using namespace observer;
using namespace table;
ObDirectLoadMemSample::ObDirectLoadMemSample(ObDirectLoadMemContext *mem_ctx)
: mem_ctx_(mem_ctx), range_count_(mem_ctx_->mem_dump_task_count_) {}
ObDirectLoadMemSample::ObDirectLoadMemSample(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx)
: ctx_(ctx), mem_ctx_(mem_ctx), range_count_(mem_ctx_->mem_dump_task_count_) {}
int ObDirectLoadMemSample::gen_ranges(ObIArray<ChunkType *> &chunks, ObIArray<RangeType> &ranges)
@ -125,7 +125,7 @@ int ObDirectLoadMemSample::add_dump(int64_t idx,
{
int ret = OB_SUCCESS;
storage::ObDirectLoadMemDump *mem_dump = OB_NEW(
ObDirectLoadMemDump, ObMemAttr(MTL_ID(), "TLD_mem_dump"), mem_ctx_, range, context_ptr, idx);
ObDirectLoadMemDump, ObMemAttr(MTL_ID(), "TLD_mem_dump"), ctx_, mem_ctx_, range, context_ptr, idx);
if (mem_dump == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate mem dump", KR(ret));

View File

@ -17,6 +17,7 @@
#include <memory>
#include "storage/direct_load/ob_direct_load_mem_dump.h"
#include "storage/direct_load/ob_direct_load_mem_context.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
@ -31,7 +32,7 @@ class ObDirectLoadMemSample
typedef ObDirectLoadExternalMultiPartitionRowRange RangeType;
typedef ObDirectLoadExternalMultiPartitionRowCompare CompareType;
public:
ObDirectLoadMemSample(ObDirectLoadMemContext *mem_ctx);
ObDirectLoadMemSample(observer::ObTableLoadTableCtx *ctx, ObDirectLoadMemContext *mem_ctx);
virtual ~ObDirectLoadMemSample() {}
int do_sample();
@ -47,6 +48,7 @@ private:
private:
// data members
observer::ObTableLoadTableCtx *ctx_;
ObDirectLoadMemContext *mem_ctx_;
int64_t range_count_;
};

View File

@ -81,7 +81,7 @@ bool ObDirectLoadMergeParam::is_valid() const
*/
ObDirectLoadMergeCtx::ObDirectLoadMergeCtx()
: allocator_("TLD_MergeCtx"), is_inited_(false)
: allocator_("TLD_MergeCtx"), ctx_(nullptr), is_inited_(false)
{
allocator_.set_tenant_id(MTL_ID());
tablet_merge_ctx_array_.set_tenant_id(MTL_ID());
@ -97,7 +97,8 @@ ObDirectLoadMergeCtx::~ObDirectLoadMergeCtx()
tablet_merge_ctx_array_.reset();
}
int ObDirectLoadMergeCtx::init(const ObDirectLoadMergeParam &param,
int ObDirectLoadMergeCtx::init(ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &param,
const ObIArray<ObTableLoadLSIdAndPartitionId> &ls_partition_ids,
const ObIArray<ObTableLoadLSIdAndPartitionId> &target_ls_partition_ids)
{
@ -105,13 +106,15 @@ int ObDirectLoadMergeCtx::init(const ObDirectLoadMergeParam &param,
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadMerger init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!param.is_valid()
} else if (OB_UNLIKELY(nullptr == ctx
|| !param.is_valid()
|| ls_partition_ids.empty()
|| target_ls_partition_ids.empty()
|| (ls_partition_ids.count() != target_ls_partition_ids.count()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param), K(ls_partition_ids), K(target_ls_partition_ids));
} else {
ctx_ = ctx;
param_ = param;
if (OB_FAIL(create_all_tablet_ctxs(ls_partition_ids, target_ls_partition_ids))) {
LOG_WARN("fail to create all tablet ctxs", KR(ret));
@ -138,7 +141,7 @@ int ObDirectLoadMergeCtx::create_all_tablet_ctxs(
if (OB_ISNULL(partition_ctx = OB_NEWx(ObDirectLoadTabletMergeCtx, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadTabletMergeCtx", KR(ret));
} else if (OB_FAIL(partition_ctx->init(param_, ls_partition_id, target_ls_partition_id))) {
} else if (OB_FAIL(partition_ctx->init(ctx_, param_, ls_partition_id, target_ls_partition_id))) {
LOG_WARN("fail to init tablet ctx", KR(ret), K(param_), K(ls_partition_id), K(target_ls_partition_id));
} else if (OB_FAIL(tablet_merge_ctx_array_.push_back(partition_ctx))) {
LOG_WARN("fail to push back", KR(ret));
@ -186,7 +189,8 @@ ObDirectLoadTabletMergeCtx::~ObDirectLoadTabletMergeCtx()
rescan_task_array_.reset();
}
int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam &param,
int ObDirectLoadTabletMergeCtx::init(ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &param,
const ObTableLoadLSIdAndPartitionId &ls_partition_id,
const ObTableLoadLSIdAndPartitionId &target_ls_partition_id)
@ -195,7 +199,8 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam &param,
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadTabletMergeCtx init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!param.is_valid()
} else if (OB_UNLIKELY(nullptr == ctx
|| !param.is_valid()
|| !ls_partition_id.is_valid()
|| !target_ls_partition_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
@ -208,6 +213,7 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam &param,
if (OB_FAIL(origin_table_.init(origin_table_param))) {
LOG_WARN("fail to init origin sstable", KR(ret));
} else {
ctx_ = ctx;
param_ = param;
target_partition_id_ = target_ls_partition_id.part_tablet_id_.partition_id_;
tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_;
@ -449,7 +455,7 @@ int ObDirectLoadTabletMergeCtx::build_empty_data_merge_task(const ObIArray<ObCol
if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range, i))) {
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range, i))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
LOG_WARN("fail to push back merge task", KR(ret));
@ -494,7 +500,7 @@ int ObDirectLoadTabletMergeCtx::build_pk_table_merge_task(
if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range, i))) {
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range, i))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
LOG_WARN("fail to push back merge task", KR(ret));
@ -540,7 +546,7 @@ int ObDirectLoadTabletMergeCtx::build_pk_table_multiple_merge_task(
OB_NEWx(ObDirectLoadPartitionRangeMultipleMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionRangeMultipleMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_sstable_array_,
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, multiple_sstable_array_,
range, i))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
@ -587,7 +593,7 @@ int ObDirectLoadTabletMergeCtx::build_merge_task_for_multiple_pk_table(
OB_NEWx(ObDirectLoadPartitionRangeMultipleMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionRangeMultipleMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_sstable_array_,
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, multiple_sstable_array_,
range, i))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
@ -631,7 +637,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_merge_task(
if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range,
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range,
parallel_idx++))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
@ -659,7 +665,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_merge_task(
OB_NEWx(ObDirectLoadPartitionHeapTableMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, external_table, pk_interval,
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, external_table, pk_interval,
parallel_idx++))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
@ -705,7 +711,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task(
if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionRangeMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionRangeMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, sstable_array_, range,
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, sstable_array_, range,
parallel_idx++))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
@ -735,7 +741,7 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task(
(&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, heap_table, pk_interval, parallel_idx++))) {
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, heap_table, pk_interval, parallel_idx++))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
LOG_WARN("fail to push back merge task", KR(ret));
@ -778,7 +784,7 @@ int ObDirectLoadTabletMergeCtx::build_aggregate_merge_task_for_multiple_heap_tab
(&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(param_, this, &origin_table_, multiple_heap_table_array_,
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, &origin_table_, multiple_heap_table_array_,
pk_interval))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {

View File

@ -24,6 +24,7 @@
#include "storage/direct_load/ob_direct_load_origin_table.h"
#include "storage/direct_load/ob_direct_load_table_data_desc.h"
#include "storage/direct_load/ob_direct_load_fast_heap_table.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
@ -81,7 +82,8 @@ class ObDirectLoadMergeCtx
public:
ObDirectLoadMergeCtx();
~ObDirectLoadMergeCtx();
int init(const ObDirectLoadMergeParam &param,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &param,
const common::ObIArray<table::ObTableLoadLSIdAndPartitionId> &ls_partition_ids,
const common::ObIArray<table::ObTableLoadLSIdAndPartitionId> &target_ls_partition_ids);
const common::ObIArray<ObDirectLoadTabletMergeCtx *> &get_tablet_merge_ctxs() const
@ -93,6 +95,7 @@ private:
const common::ObIArray<table::ObTableLoadLSIdAndPartitionId> &target_ls_partition_ids);
private:
common::ObArenaAllocator allocator_;
observer::ObTableLoadTableCtx *ctx_;
ObDirectLoadMergeParam param_;
common::ObArray<ObDirectLoadTabletMergeCtx *> tablet_merge_ctx_array_;
bool is_inited_;
@ -103,7 +106,9 @@ class ObDirectLoadTabletMergeCtx
public:
ObDirectLoadTabletMergeCtx();
~ObDirectLoadTabletMergeCtx();
int init(const ObDirectLoadMergeParam &param, const table::ObTableLoadLSIdAndPartitionId &ls_partition_id,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &param,
const table::ObTableLoadLSIdAndPartitionId &ls_partition_id,
const table::ObTableLoadLSIdAndPartitionId &target_ls_partition_id);
int build_rescan_task(int64_t thread_count);
int build_merge_task(const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array,
@ -159,6 +164,7 @@ private:
int get_autoincrement_value(uint64_t count, share::ObTabletCacheInterval &interval);
private:
common::ObArenaAllocator allocator_;
observer::ObTableLoadTableCtx *ctx_;
ObDirectLoadMergeParam param_;
uint64_t target_partition_id_;
common::ObTabletID tablet_id_;

View File

@ -27,7 +27,8 @@ using namespace blocksstable;
ObDirectLoadMultipleHeapTableSorter::ObDirectLoadMultipleHeapTableSorter(
ObDirectLoadMemContext *mem_ctx)
: mem_ctx_(mem_ctx),
: ctx_(nullptr),
mem_ctx_(mem_ctx),
allocator_("TLD_Sorter"),
extra_buf_(nullptr),
index_dir_id_(-1),
@ -149,6 +150,8 @@ int ObDirectLoadMultipleHeapTableSorter::get_tables(
LOG_WARN("unexpected table", KR(ret), KPC(table));
} else if (OB_FAIL(heap_table_array_->push_back(heap_table))) {
LOG_WARN("fail to push back heap table", KR(ret));
} else {
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_product_tmp_files_, 1);
}
}
return ret;
@ -218,6 +221,7 @@ int ObDirectLoadMultipleHeapTableSorter::work()
LOG_WARN("fail to add item", KR(ret));
} else {
row = nullptr;
ATOMIC_AAF(&ctx_->job_stat_->store_.compact_stage_load_rows_, 1);
}
}
}

View File

@ -16,6 +16,7 @@
#include "storage/direct_load/ob_direct_load_i_table.h"
#include "storage/direct_load/ob_direct_load_mem_context.h"
#include "storage/direct_load/ob_direct_load_mem_worker.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
@ -32,10 +33,12 @@ public:
int init();
int add_table(ObIDirectLoadPartitionTable *table) override;
void set_work_param(int64_t index_dir_id, int64_t data_dir_id,
void set_work_param(observer::ObTableLoadTableCtx *ctx,
int64_t index_dir_id, int64_t data_dir_id,
common::ObIArray<ObDirectLoadMultipleHeapTable *> &heap_table_array,
common::ObIAllocator &heap_table_allocator)
{
ctx_ = ctx;
index_dir_id_ = index_dir_id;
data_dir_id_ = data_dir_id;
heap_table_array_ = &heap_table_array;
@ -52,6 +55,7 @@ private:
private:
// data members
observer::ObTableLoadTableCtx *ctx_;
ObDirectLoadMemContext *mem_ctx_;
ObDirectLoadExternalFragmentArray fragments_;
ObArenaAllocator allocator_;

View File

@ -28,13 +28,15 @@ namespace storage
using namespace common;
using namespace blocksstable;
using namespace share;
using namespace observer;
/**
* ObDirectLoadPartitionMergeTask
*/
ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask()
: merge_param_(nullptr),
: ctx_(nullptr),
merge_param_(nullptr),
merge_ctx_(nullptr),
parallel_idx_(-1),
affected_rows_(0),
@ -57,6 +59,27 @@ ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask()
}
}
class ObStoreRowIteratorWrapper : public ObIStoreRowIterator
{
public:
ObStoreRowIteratorWrapper(observer::ObTableLoadTableCtx *ctx, ObIStoreRowIterator *inner_iter) :
ctx_(ctx), inner_iter_(inner_iter) {
}
int get_next_row(const blocksstable::ObDatumRow *&row)
{
int ret = inner_iter_->get_next_row(row);
if (ret == OB_SUCCESS) {
ATOMIC_AAF(&ctx_->job_stat_->store_.merge_stage_write_rows_, 1);
}
return ret;
}
private:
observer::ObTableLoadTableCtx *ctx_;
ObIStoreRowIterator *inner_iter_;
};
int ObDirectLoadPartitionMergeTask::process()
{
int ret = OB_SUCCESS;
@ -87,10 +110,11 @@ int ObDirectLoadPartitionMergeTask::process()
K(block_start_seq));
} else {
LOG_INFO("add sstable slice begin", KP(tablet_ctx), K(slice_id));
ObStoreRowIteratorWrapper row_iter_wrapper(ctx_, row_iter);
if (OB_UNLIKELY(is_stop_)) {
ret = OB_CANCELED;
LOG_WARN("merge task canceled", KR(ret));
} else if (OB_FAIL(tablet_ctx->fill_sstable_slice(slice_id, *row_iter, affected_rows_))) {
} else if (OB_FAIL(tablet_ctx->fill_sstable_slice(slice_id, row_iter_wrapper, affected_rows_))) {
LOG_WARN("fail to fill sstable slice", KR(ret));
} else if (OB_FAIL(tablet_ctx->close_sstable_slice(slice_id))) {
LOG_WARN("fail to close writer", KR(ret));
@ -290,7 +314,8 @@ ObDirectLoadPartitionRangeMergeTask::~ObDirectLoadPartitionRangeMergeTask()
{
}
int ObDirectLoadPartitionRangeMergeTask::init(const ObDirectLoadMergeParam &merge_param,
int ObDirectLoadPartitionRangeMergeTask::init(ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadOriginTable *origin_table,
const ObIArray<ObDirectLoadSSTable *> &sstable_array,
@ -301,12 +326,13 @@ int ObDirectLoadPartitionRangeMergeTask::init(const ObDirectLoadMergeParam &merg
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadPartitionRangeMergeTask init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx ||
} else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx ||
nullptr == origin_table || !range.is_valid() || parallel_idx < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), K(sstable_array), K(range),
K(parallel_idx));
} else {
ctx_ = ctx;
merge_param_ = &merge_param;
merge_ctx_ = merge_ctx;
parallel_idx_ = parallel_idx;
@ -464,6 +490,7 @@ ObDirectLoadPartitionRangeMultipleMergeTask::~ObDirectLoadPartitionRangeMultiple
}
int ObDirectLoadPartitionRangeMultipleMergeTask::init(
ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadOriginTable *origin_table,
@ -475,12 +502,13 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::init(
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadPartitionRangeMultipleMergeTask init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx ||
} else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx ||
nullptr == origin_table || !range.is_valid() || parallel_idx < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), K(sstable_array), K(range),
K(parallel_idx));
} else {
ctx_ = ctx;
merge_param_ = &merge_param;
merge_ctx_ = merge_ctx;
parallel_idx_ = parallel_idx;
@ -643,7 +671,8 @@ ObDirectLoadPartitionHeapTableMergeTask::~ObDirectLoadPartitionHeapTableMergeTas
{
}
int ObDirectLoadPartitionHeapTableMergeTask::init(const ObDirectLoadMergeParam &merge_param,
int ObDirectLoadPartitionHeapTableMergeTask::init(ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadExternalTable *external_table,
const ObTabletCacheInterval &pk_interval,
@ -653,13 +682,14 @@ int ObDirectLoadPartitionHeapTableMergeTask::init(const ObDirectLoadMergeParam &
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadPartitionHeapTableMergeTask init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx ||
} else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx ||
nullptr == external_table || parallel_idx < 0 ||
0 == pk_interval.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KP(external_table),
K(parallel_idx), K(pk_interval));
} else {
ctx_ = ctx;
merge_param_ = &merge_param;
merge_ctx_ = merge_ctx;
parallel_idx_ = parallel_idx;
@ -820,6 +850,7 @@ ObDirectLoadPartitionHeapTableMultipleMergeTask::~ObDirectLoadPartitionHeapTable
}
int ObDirectLoadPartitionHeapTableMultipleMergeTask::init(
ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadMultipleHeapTable *heap_table,
@ -830,12 +861,14 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::init(
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleMergeTask init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx || nullptr == heap_table ||
} else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() ||
nullptr == merge_ctx || nullptr == heap_table ||
parallel_idx < 0 || 0 == pk_interval.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KPC(heap_table),
K(parallel_idx), K(pk_interval));
} else {
ctx_ = ctx;
merge_param_ = &merge_param;
merge_ctx_ = merge_ctx;
parallel_idx_ = parallel_idx;
@ -1076,7 +1109,9 @@ ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::
}
int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::init(
const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx,
ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadOriginTable *origin_table,
const ObIArray<ObDirectLoadMultipleHeapTable *> &heap_table_array,
const ObTabletCacheInterval &pk_interval)
@ -1085,12 +1120,13 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::init(
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleMergeTask init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!merge_param.is_valid() || nullptr == merge_ctx ||
} else if (OB_UNLIKELY(nullptr == ctx || !merge_param.is_valid() || nullptr == merge_ctx ||
nullptr == origin_table || heap_table_array.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(merge_param), KP(merge_ctx), KP(origin_table),
K(heap_table_array));
} else {
ctx_ = ctx;
merge_param_ = &merge_param;
merge_ctx_ = merge_ctx;
parallel_idx_ = 0;

View File

@ -19,6 +19,7 @@
#include "storage/direct_load/ob_direct_load_multiple_heap_table_scanner.h"
#include "storage/direct_load/ob_direct_load_insert_table_row_iterator.h"
#include "sql/engine/expr/ob_expr_sys_op_opnsize.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
@ -57,6 +58,7 @@ private:
int init_sql_statistics();
int init_lob_builder();
protected:
observer::ObTableLoadTableCtx *ctx_;
const ObDirectLoadMergeParam *merge_param_;
ObDirectLoadTabletMergeCtx *merge_ctx_;
int64_t parallel_idx_;
@ -73,7 +75,8 @@ class ObDirectLoadPartitionRangeMergeTask : public ObDirectLoadPartitionMergeTas
public:
ObDirectLoadPartitionRangeMergeTask();
virtual ~ObDirectLoadPartitionRangeMergeTask();
int init(const ObDirectLoadMergeParam &merge_param,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadOriginTable *origin_table,
const common::ObIArray<ObDirectLoadSSTable *> &sstable_array,
@ -112,7 +115,8 @@ class ObDirectLoadPartitionRangeMultipleMergeTask : public ObDirectLoadPartition
public:
ObDirectLoadPartitionRangeMultipleMergeTask();
virtual ~ObDirectLoadPartitionRangeMultipleMergeTask();
int init(const ObDirectLoadMergeParam &merge_param,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadOriginTable *origin_table,
const common::ObIArray<ObDirectLoadMultipleSSTable *> &sstable_array,
@ -151,7 +155,8 @@ class ObDirectLoadPartitionHeapTableMergeTask : public ObDirectLoadPartitionMerg
public:
ObDirectLoadPartitionHeapTableMergeTask();
virtual ~ObDirectLoadPartitionHeapTableMergeTask();
int init(const ObDirectLoadMergeParam &merge_param,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadExternalTable *external_table,
const share::ObTabletCacheInterval &pk_interval,
@ -190,7 +195,8 @@ class ObDirectLoadPartitionHeapTableMultipleMergeTask : public ObDirectLoadParti
public:
ObDirectLoadPartitionHeapTableMultipleMergeTask();
virtual ~ObDirectLoadPartitionHeapTableMultipleMergeTask();
int init(const ObDirectLoadMergeParam &merge_param,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param,
ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadMultipleHeapTable *heap_table,
const share::ObTabletCacheInterval &pk_interval,
@ -230,7 +236,8 @@ class ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask
public:
ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask();
virtual ~ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask();
int init(const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx,
int init(observer::ObTableLoadTableCtx *ctx,
const ObDirectLoadMergeParam &merge_param, ObDirectLoadTabletMergeCtx *merge_ctx,
ObDirectLoadOriginTable *origin_table,
const common::ObIArray<ObDirectLoadMultipleHeapTable *> &heap_table_array,
const share::ObTabletCacheInterval &pk_interval);