Fix direct load error row handler

This commit is contained in:
suz-yang
2023-05-19 10:17:26 +00:00
committed by ob-robot
parent 69f997d14c
commit a5f1041b4e
33 changed files with 333 additions and 546 deletions

View File

@ -112,7 +112,7 @@ int ObTableLoadBeginP::process()
param.px_mode_ = false;
param.online_opt_stat_gather_ = false;
param.data_type_ = static_cast<ObTableLoadDataType>(arg_.config_.flag_.data_type_);
param.dup_action_ = ObLoadDupActionType::LOAD_STOP_ON_DUP;
param.dup_action_ = static_cast<ObLoadDupActionType>(arg_.config_.flag_.dup_action_);
if (OB_FAIL(param.normalize())) {
LOG_WARN("fail to normalize param", KR(ret));
}

View File

@ -5,49 +5,23 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_utils.h"
#include "sql/engine/cmd/ob_load_data_utils.h"
#include "share/rc/ob_tenant_base.h"
namespace oceanbase
{
using namespace common;
using namespace common::hash;
using namespace table;
using namespace lib;
using namespace share;
using namespace share::schema;
using namespace blocksstable;
using namespace sql;
namespace observer
{
ObTableLoadErrorRowHandler::PartitionRowkey::~PartitionRowkey()
{
if (OB_NOT_NULL(last_rowkey_.get_datum_ptr())) {
allocator_.free((void *)(last_rowkey_.get_datum_ptr()));
}
}
ObTableLoadErrorRowHandler::PartitionRowkeyMap::~PartitionRowkeyMap()
{
auto release_map_entry = [this](HashMapPair<ObTabletID, PartitionRowkey *> &entry) {
ObTableLoadErrorRowHandler::PartitionRowkey *part_rowkey = entry.second;
part_rowkey->~PartitionRowkey();
allocator_.free((void *)part_rowkey);
return 0;
};
map_.foreach_refactored(release_map_entry);
}
using namespace blocksstable;
using namespace common;
using namespace sql;
ObTableLoadErrorRowHandler::ObTableLoadErrorRowHandler()
: capacity_(0),
session_cnt_(0),
safe_allocator_(row_allocator_),
error_row_cnt_(0),
repeated_row_cnt_(0),
: dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
max_error_row_count_(0),
result_info_(nullptr),
job_stat_(nullptr),
error_row_count_(0),
is_inited_(false)
{
}
@ -56,249 +30,162 @@ ObTableLoadErrorRowHandler::~ObTableLoadErrorRowHandler()
{
}
int ObTableLoadErrorRowHandler::init(ObTableLoadTableCtx *const ctx)
int ObTableLoadErrorRowHandler::init(ObTableLoadStoreCtx *store_ctx)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadErrorRowHandler init twice", KR(ret), KP(this));
} else {
param_ = ctx->param_;
job_stat_ = ctx->job_stat_;
datum_utils_ = &(ctx->schema_.datum_utils_);
col_descs_ = &(ctx->schema_.column_descs_);
capacity_ = ctx->param_.max_error_row_count_;
rowkey_column_num_ = ctx->schema_.rowkey_column_count_;
session_cnt_ = ctx->param_.session_count_;
if (OB_FAIL(session_maps_.prepare_allocate(session_cnt_))) {
LOG_WARN("failed to pre allocate session maps", K(ret), K(session_cnt_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && (i < session_maps_.count()); ++i) {
if (OB_FAIL(session_maps_.at(i).map_.create(1024, "TLD_err_chk_map", "TLD_err_chk_map",
ctx->param_.tenant_id_))) {
LOG_WARN("fail to create map", KR(ret), K(ctx->param_.tenant_id_));
} else {
ObArenaAllocator &allocator = session_maps_.at(i).allocator_;
allocator.set_label("TLD_err_chk");
allocator.set_tenant_id(MTL_ID());
}
}
}
}
if (OB_SUCC(ret)) {
dup_action_ = store_ctx->ctx_->param_.dup_action_;
max_error_row_count_ = store_ctx->ctx_->param_.max_error_row_count_;
result_info_ = &store_ctx->result_info_;
job_stat_ = store_ctx->ctx_->job_stat_;
is_inited_ = true;
}
return ret;
}
int ObTableLoadErrorRowHandler::inner_append_error_row(const ObNewRow &row,
ObIArray<ObNewRow> &error_row_array)
int ObTableLoadErrorRowHandler::handle_insert_row(const blocksstable::ObDatumRow &row)
{
UNUSED(row);
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
ATOMIC_INC(&result_info_->rows_affected_);
}
return ret;
}
int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &row)
{
UNUSED(row);
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) {
if (0 == max_error_row_count_) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
} else {
ObMutexGuard guard(mutex_);
if (error_row_count_ >= max_error_row_count_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count),
K_(error_row_count));
} else {
++error_row_count_;
}
}
ATOMIC_INC(&job_stat_->detected_error_rows_);
} else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) {
ATOMIC_AAF(&result_info_->rows_affected_, 2);
ATOMIC_INC(&result_info_->deleted_);
} else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) {
ATOMIC_INC(&result_info_->skipped_);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dup action", KR(ret), K_(dup_action));
}
}
return ret;
}
int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &old_row,
const ObDatumRow &new_row,
const ObDatumRow *&result_row)
{
int ret = OB_SUCCESS;
ObNewRow new_row;
ObMutexGuard guard(append_row_mutex_);
if (error_row_cnt_ >= capacity_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", K(ret), K(capacity_), K(error_row_cnt_));
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(row, new_row, safe_allocator_))) {
LOG_WARN("failed to deep copy new row", K(ret), K(row));
} else if (OB_FAIL(error_row_array.push_back(new_row))) {
LOG_WARN("failed to push back error row", K(ret), K(new_row));
result_row = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
error_row_cnt_++;
if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) {
if (0 == max_error_row_count_) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
} else {
ObMutexGuard guard(mutex_);
if (error_row_count_ >= max_error_row_count_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count),
K_(error_row_count));
} else {
++error_row_count_;
}
}
if (OB_SUCC(ret)) {
result_row = &old_row;
}
ATOMIC_INC(&job_stat_->detected_error_rows_);
} else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) {
result_row = &old_row;
ATOMIC_INC(&result_info_->skipped_);
} else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) {
result_row = &new_row;
ATOMIC_INC(&result_info_->deleted_);
ATOMIC_AAF(&result_info_->rows_affected_, 2);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dup action", KR(ret), K_(dup_action));
}
}
return ret;
}
int ObTableLoadErrorRowHandler::handle_error_row(int error_code, const ObNewRow &row)
{
UNUSED(row);
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (max_error_row_count_ == 0) {
ret = error_code;
} else {
ObMutexGuard guard(mutex_);
if (error_row_count_ >= max_error_row_count_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count),
K_(error_row_count));
} else {
++error_row_count_;
}
ATOMIC_INC(&job_stat_->detected_error_rows_);
}
return ret;
}
int ObTableLoadErrorRowHandler::inner_append_repeated_row(const ObNewRow &row,
ObIArray<ObNewRow> &repeated_row_array)
int ObTableLoadErrorRowHandler::handle_error_row(int error_code, const ObDatumRow &row)
{
UNUSED(row);
int ret = OB_SUCCESS;
ObNewRow new_row;
ObMutexGuard guard(append_row_mutex_);
if (repeated_row_cnt_ >= DEFAULT_REPEATED_ERROR_ROW_COUNT) {
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (max_error_row_count_ == 0) {
ret = error_code;
} else {
ObMutexGuard guard(mutex_);
if (error_row_count_ >= max_error_row_count_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("repeated row count reaches its maximum value", K(ret), K(capacity_),
K(repeated_row_cnt_));
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(row, new_row, safe_allocator_))) {
LOG_WARN("failed to deep copy new row", K(ret), K(row));
} else if (OB_FAIL(repeated_row_array.push_back(new_row))) {
LOG_WARN("failed to push back error row", K(ret), K(new_row));
LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count),
K_(error_row_count));
} else {
repeated_row_cnt_ ++;
++error_row_count_;
}
ATOMIC_INC(&job_stat_->detected_error_rows_);
}
return ret;
}
int ObTableLoadErrorRowHandler::append_error_row(const ObNewRow &row)
uint64_t ObTableLoadErrorRowHandler::get_error_row_count() const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(inner_append_error_row(row, error_row_array_))) {
LOG_WARN("failed to append row to str error row array", K(ret), K(row));
}
return ret;
}
int ObTableLoadErrorRowHandler::append_error_row(const ObDatumRow &row)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
ObNewRow new_row;
ObObjBufArray obj_buf;
ObArenaAllocator allocator;
if (OB_FAIL(obj_buf.init(&allocator))) {
LOG_WARN("fail to init obj buf", KR(ret));
} else if (OB_FAIL(obj_buf.reserve(row.count_))) {
LOG_WARN("Failed to reserve buf for obj buf", K(ret), K(row.count_));
} else {
new_row.cells_ = obj_buf.get_data();
new_row.count_ = row.count_;
for (int64_t i = 0; OB_SUCC(ret) && i < (*col_descs_).count(); i++) {
if (OB_FAIL(row.storage_datums_[i].to_obj_enhance(new_row.cells_[i], (*col_descs_).at(i).col_type_))) {
LOG_WARN("Failed to transform datum to obj", K(ret), K(i), K(row.storage_datums_[i]));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(inner_append_error_row(new_row, error_new_row_array_))) {
LOG_WARN("failed to append row to error row array", K(ret), K(new_row));
}
}
}
}
return ret;
}
int ObTableLoadErrorRowHandler::append_repeated_row(const common::ObNewRow &row)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
if (param_.dup_action_ == ObLoadDupActionType::LOAD_STOP_ON_DUP) {
if (OB_FAIL(inner_append_error_row(row, error_row_array_))) {
LOG_WARN("failed to append row to error row array", K(ret), K(row));
}
} else {
if (OB_FAIL(inner_append_repeated_row(row, repeated_row_array_))) {
LOG_WARN("failed to append row to error row array", K(ret), K(row));
}
}
}
return ret;
}
int ObTableLoadErrorRowHandler::append_repeated_row(const blocksstable::ObDatumRow &row)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
ObNewRow new_row;
ObObjBufArray obj_buf;
ObArenaAllocator allocator;
if (OB_FAIL(obj_buf.init(&allocator))) {
LOG_WARN("fail to init obj buf", KR(ret));
} else if (OB_FAIL(obj_buf.reserve(row.count_))) {
LOG_WARN("Failed to reserve buf for obj buf", K(ret), K(row.count_));
} else {
new_row.cells_ = obj_buf.get_data();
new_row.count_ = row.count_;
for (int64_t i = 0; OB_SUCC(ret) && i < (*col_descs_).count(); i++) {
if (OB_FAIL(row.storage_datums_[i].to_obj_enhance(new_row.cells_[i], (*col_descs_).at(i).col_type_))) {
LOG_WARN("Failed to transform datum to obj", K(ret), K(i), K(row.storage_datums_[i]));
}
}
if (OB_SUCC(ret)) {
if (param_.dup_action_ == ObLoadDupActionType::LOAD_STOP_ON_DUP) {
if (OB_FAIL(inner_append_error_row(new_row, error_new_row_array_))) {
LOG_WARN("failed to append row to error row array", K(ret), K(new_row));
}
} else {
if (OB_FAIL(inner_append_repeated_row(new_row, repeated_new_row_array_))) {
LOG_WARN("failed to append row to error row array", K(ret), K(new_row));
}
}
}
}
}
return ret;
}
// TODO: convert each obj to string
int ObTableLoadErrorRowHandler::get_all_error_rows(ObTableLoadArray<ObObj> &obj_array)
{
int ret = OB_NOT_IMPLEMENT;
return ret;
}
int ObTableLoadErrorRowHandler::check_rowkey_order(int32_t session_id, const ObTabletID &tablet_id,
const ObDatumRow &datum_row)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(check_rowkey_order_time_us);
int ret = OB_SUCCESS;
int cmp_ret = 1;
ObTableLoadErrorRowHandler::PartitionRowkey *last_part_rowkey = nullptr;
ObDatumRowkey rowkey;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if ((session_id < 1) || (session_id > session_cnt_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid session id", K(session_id), K(session_cnt_));
} else if (OB_FAIL(rowkey.assign(datum_row.storage_datums_, rowkey_column_num_))) {
LOG_WARN("failed to assign to rowkey", K(ret), KPC(datum_row.storage_datums_),
K(rowkey_column_num_));
} else {
ObTableLoadErrorRowHandler::PartitionRowkeyMap &partition_map =
session_maps_.at(session_id - 1);
if (OB_FAIL(partition_map.map_.get_refactored(tablet_id, last_part_rowkey))) {
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
LOG_WARN("fail to get refactored", KR(ret), K(tablet_id));
} else {
// allocate a new last_part_rowkey for the new partition
if (OB_ISNULL(last_part_rowkey = OB_NEWx(ObTableLoadErrorRowHandler::PartitionRowkey,
(&(partition_map.allocator_))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to new ObDatumRowkey", KR(ret));
} else if (OB_FAIL(partition_map.map_.set_refactored(tablet_id, last_part_rowkey))) {
LOG_WARN("fail to add last rowkey to map", KR(ret), K(session_id), K(tablet_id));
}
}
}
if (OB_SUCC(ret)) {
ObDatumRowkey &last_rowkey = last_part_rowkey->last_rowkey_;
if (OB_FAIL(rowkey.compare(last_rowkey, *datum_utils_, cmp_ret))) {
LOG_WARN("fail to compare rowkey to last rowkey", KR(ret), K(rowkey), K(last_rowkey));
} else if (cmp_ret > 0) {
// free last rowkey
last_part_rowkey->allocator_.reuse();
// overwrite last rowkey
// TODO: deep copy one row each batch instead of each row
if (OB_FAIL(
ObTableLoadUtils::deep_copy(rowkey, last_rowkey, last_part_rowkey->allocator_))) {
LOG_WARN("failed to deep copy rowkey to last rowkey", K(ret), K(rowkey), K(last_rowkey));
}
} else if (cmp_ret == 0) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
LOG_WARN("rowkey == last rowkey", K(ret), K(cmp_ret), K(session_id), K(tablet_id),
K(last_rowkey), K(rowkey));
} else {
ret = OB_ROWKEY_ORDER_ERROR;
LOG_WARN("rowkey < last rowkey", K(ret), K(cmp_ret), K(session_id), K(tablet_id),
K(last_rowkey), K(rowkey));
}
}
}
return ret;
ObMutexGuard guard(mutex_);
return error_row_count_;
}
} // namespace observer

View File

@ -5,91 +5,42 @@
#pragma once
#include "common/row/ob_row.h"
#include "observer/table_load/ob_table_load_struct.h"
#include "share/table/ob_table_load_array.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_row.h"
#include "sql/engine/cmd/ob_load_data_utils.h"
#include "storage/blocksstable/ob_datum_row.h"
#include "storage/blocksstable/ob_datum_rowkey.h"
#include "sql/resolver/cmd/ob_load_data_stmt.h"
#include "share/rc/ob_tenant_base.h"
#include "storage/direct_load/ob_direct_load_dml_row_handler.h"
namespace oceanbase
{
namespace table
{
class ObTableLoadResultInfo;
} // namespace table
namespace observer
{
class ObTableLoadStoreCtx;
class ObTableLoadTableCtx;
class ObTableLoadErrorRowHandler
class ObTableLoadErrorRowHandler : public ObDirectLoadDMLRowHandler
{
public:
static const int64_t DEFAULT_REPEATED_ERROR_ROW_COUNT = 100;
ObTableLoadErrorRowHandler();
~ObTableLoadErrorRowHandler();
int init(ObTableLoadTableCtx *const ctx);
int append_error_row(const common::ObNewRow &row);
int append_error_row(const blocksstable::ObDatumRow &row);
int append_repeated_row(const common::ObNewRow &row);
int append_repeated_row(const blocksstable::ObDatumRow &row);
int get_all_error_rows(table::ObTableLoadArray<common::ObObj> &obj_array);
uint64_t get_error_row_cnt() const { return error_row_cnt_; }
int check_rowkey_order(int32_t session_id, const common::ObTabletID &tablet_id,
const blocksstable::ObDatumRow &datum_row);
sql::ObLoadDupActionType get_action() const {return param_.dup_action_;}
uint64_t get_capacity() const { return capacity_; }
TO_STRING_KV(K_(capacity), K_(error_row_cnt), K_(repeated_row_cnt), K_(session_cnt), K_(is_inited));
virtual ~ObTableLoadErrorRowHandler();
int init(ObTableLoadStoreCtx *store_ctx);
int handle_insert_row(const blocksstable::ObDatumRow &row) override;
int handle_update_row(const blocksstable::ObDatumRow &row) override;
int handle_update_row(const blocksstable::ObDatumRow &old_row,
const blocksstable::ObDatumRow &new_row,
const blocksstable::ObDatumRow *&result_row) override;
int handle_error_row(int error_code, const common::ObNewRow &row);
int handle_error_row(int error_code, const blocksstable::ObDatumRow &row);
uint64_t get_error_row_count() const;
TO_STRING_KV(K_(dup_action), K_(max_error_row_count), K_(error_row_count));
private:
int inner_append_error_row(const common::ObNewRow &row,
common::ObIArray<ObNewRow> &error_row_array);
int inner_append_repeated_row(const common::ObNewRow &row,
common::ObIArray<ObNewRow> &repeated_row_array);
class PartitionRowkey
{
public:
PartitionRowkey() : allocator_("TLD_err_chk")
{
allocator_.set_tenant_id(MTL_ID());
last_rowkey_.set_min_rowkey();
}
~PartitionRowkey();
TO_STRING_KV(K(last_rowkey_));
public:
common::ObArenaAllocator allocator_;
blocksstable::ObDatumRowkey last_rowkey_;
};
class PartitionRowkeyMap
{
public:
PartitionRowkeyMap() : map_() {}
~PartitionRowkeyMap();
PartitionRowkeyMap(const PartitionRowkeyMap &other) {}
PartitionRowkeyMap &operator=(const PartitionRowkeyMap &other) { return *this; }
TO_STRING_KV(K(map_.size()));
// all partitions are written sequentially within one session
// thus they can share the same allocator
common::ObArenaAllocator allocator_;
common::hash::ObHashMap<common::ObTabletID, PartitionRowkey *> map_;
};
private:
observer::ObTableLoadParam param_;
sql::ObLoadDupActionType dup_action_;
uint64_t max_error_row_count_;
table::ObTableLoadResultInfo *result_info_;
sql::ObLoadDataStat *job_stat_;
const oceanbase::blocksstable::ObStorageDatumUtils *datum_utils_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
uint64_t capacity_; // maximum allowed error row count
int64_t rowkey_column_num_;
int32_t session_cnt_;
common::ObArray<PartitionRowkeyMap> session_maps_;
common::ObArenaAllocator row_allocator_; //just for safe allocator
common::ObSafeArenaAllocator safe_allocator_; //该分配器是线程安全的
mutable lib::ObMutex append_row_mutex_;
uint64_t error_row_cnt_;
uint64_t repeated_row_cnt_;
common::ObArray<ObNewRow> error_row_array_;
common::ObArray<ObNewRow> error_new_row_array_;
common::ObArray<ObNewRow> repeated_row_array_;
common::ObArray<ObNewRow> repeated_new_row_array_;
mutable lib::ObMutex mutex_;
uint64_t error_row_count_;
bool is_inited_;
};

View File

@ -5,6 +5,7 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_mem_compactor.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
@ -273,9 +274,8 @@ int ObTableLoadMemCompactor::inner_init()
mem_ctx_.need_sort_ = param_->need_sort_;
mem_ctx_.mem_load_task_count_ = param_->session_count_;
mem_ctx_.column_count_ = param_->column_count_;
mem_ctx_.error_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_;
mem_ctx_.result_info_ = &(store_ctx_->result_info_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(mem_ctx_.init())) {

View File

@ -5,6 +5,7 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_merger.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
@ -198,8 +199,7 @@ int ObTableLoadMerger::build_merge_ctx()
merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_;
merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_;
merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_;
merge_param.error_row_handler_ = store_ctx_->error_row_handler_;
merge_param.result_info_ = &(store_ctx_->result_info_);
merge_param.dml_row_handler_ = store_ctx_->error_row_handler_;
if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_,
store_ctx_->target_ls_partition_ids_))) {
LOG_WARN("fail to init merge ctx", KR(ret));

View File

@ -5,6 +5,7 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_multiple_heap_table_compactor.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
@ -316,9 +317,8 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init()
mem_ctx_.need_sort_ = param_->need_sort_;
mem_ctx_.mem_load_task_count_ = param_->session_count_;
mem_ctx_.column_count_ = param_->column_count_;
mem_ctx_.error_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_;
mem_ctx_.result_info_ = &(store_ctx_->result_info_);
if (OB_SUCC(ret)) {
if (OB_FAIL(mem_ctx_.init())) {

View File

@ -5,6 +5,7 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_parallel_merge_ctx.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
@ -355,8 +356,7 @@ public:
ObDirectLoadMultipleSSTableScanMergeParam scan_merge_param;
scan_merge_param.table_data_desc_ = parallel_merge_ctx_->store_ctx_->table_data_desc_;
scan_merge_param.datum_utils_ = &(ctx_->schema_.datum_utils_);
scan_merge_param.error_row_handler_ = parallel_merge_ctx_->store_ctx_->error_row_handler_;
scan_merge_param.result_info_ = &(parallel_merge_ctx_->store_ctx_->result_info_);
scan_merge_param.dml_row_handler_ = parallel_merge_ctx_->store_ctx_->error_row_handler_;
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ctx_->merge_sstable_count_; ++i) {
ObDirectLoadMultipleSSTable *sstable = tablet_ctx_->sstables_.at(i);
if (OB_FAIL(sstable_array_.push_back(sstable))) {

View File

@ -201,10 +201,8 @@ int ObTableLoadStoreCtx::init(
OB_NEWx(ObTableLoadErrorRowHandler, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadErrorRowHandler", KR(ret));
} else if (OB_FAIL(error_row_handler_->init(ctx_))) {
LOG_WARN("fail to init error row handler", KR(ret), K(ctx_->param_.tenant_id_),
K(ctx_->param_.table_id_), K(ctx_->param_.max_error_row_count_),
K(ctx_->param_.session_count_));
} else if (OB_FAIL(error_row_handler_->init(this))) {
LOG_WARN("fail to init error row handler", KR(ret));
}
// init session_ctx_array_
else if (OB_FAIL(init_session_ctx_array())) {

View File

@ -208,7 +208,7 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array()
param.online_opt_stat_gather_ = trans_ctx_->ctx_->param_.online_opt_stat_gather_;
param.insert_table_ctx_ = trans_ctx_->ctx_->store_ctx_->insert_table_ctx_;
param.fast_heap_table_ctx_ = trans_ctx_->ctx_->store_ctx_->fast_heap_table_ctx_;
param.result_info_ = &(trans_ctx_->ctx_->store_ctx_->result_info_);
param.dml_row_handler_ = trans_ctx_->ctx_->store_ctx_->error_row_handler_;
for (int64_t i = 0; OB_SUCC(ret) && i < session_count; ++i) {
SessionContext *session_ctx = session_ctx_array_ + i;
if (param_.px_mode_) {
@ -413,11 +413,10 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator,
}
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
ObTableLoadErrorRowHandler *error_row_handler =
trans_ctx_->ctx_->store_ctx_->error_row_handler_;
if (OB_TMP_FAIL(error_row_handler->append_error_row(row))) {
LOG_WARN("failed to append error row", K(ret), K(row));
if (OB_FAIL(error_row_handler->handle_error_row(ret, row))) {
LOG_WARN("failed to handle error row", K(ret), K(row));
} else {
ret = OB_EAGAIN;
}
@ -470,28 +469,15 @@ int ObTableLoadTransStoreWriter::write_row_to_table_store(ObDirectLoadTableStore
LOG_WARN("fail to append row", KR(ret), K(datum_row));
}
if (OB_FAIL(ret)) {
ObTableLoadErrorRowHandler *error_row_handler = trans_ctx_->ctx_->store_ctx_->error_row_handler_;
ObTableLoadErrorRowHandler *error_row_handler =
trans_ctx_->ctx_->store_ctx_->error_row_handler_;
if (OB_LIKELY(OB_ERR_PRIMARY_KEY_DUPLICATE == ret)) {
int tmp_ret = OB_SUCCESS;
if (trans_ctx_->ctx_->param_.dup_action_ == ObLoadDupActionType::LOAD_REPLACE) {
ATOMIC_AAF(&trans_ctx_->ctx_->store_ctx_->result_info_.rows_affected_, 2);
ATOMIC_INC(&trans_ctx_->ctx_->store_ctx_->result_info_.deleted_);
} else if (trans_ctx_->ctx_->param_.dup_action_ == ObLoadDupActionType::LOAD_IGNORE) {
ATOMIC_INC(&trans_ctx_->ctx_->store_ctx_->result_info_.skipped_);
} else if (trans_ctx_->ctx_->param_.dup_action_ == ObLoadDupActionType::LOAD_STOP_ON_DUP) {
if (OB_TMP_FAIL(error_row_handler->append_error_row(datum_row))) {
LOG_WARN("failed to append repeated row", K(ret), K(tablet_id), K(datum_row));
}
}
if (OB_LIKELY(OB_SUCCESS == tmp_ret)) {
ret = OB_SUCCESS;
if (OB_FAIL(error_row_handler->handle_update_row(datum_row))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row));
}
} else if (OB_LIKELY(OB_ROWKEY_ORDER_ERROR == ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(error_row_handler->append_error_row(datum_row))) {
LOG_WARN("failed to append error row", K(ret), K(tablet_id), K(datum_row));
} else {
ret = OB_SUCCESS;
if (OB_FAIL(error_row_handler->handle_error_row(ret, datum_row))) {
LOG_WARN("fail to handle error row", KR(ret), K(tablet_id), K(datum_row));
}
}
}

View File

@ -27,20 +27,22 @@ struct ObTableLoadFlag
public:
static const uint64_t BIT_IS_NEED_SORT = 1;
static const uint64_t BIT_DATA_TYPE = 2;
static const uint64_t BIT_RESERVED = 61;
static const uint64_t BIT_DUP_ACTION_TYPE = 2;
static const uint64_t BIT_RESERVED = 59;
union {
uint64_t flag_;
struct {
uint64_t is_need_sort_ : BIT_IS_NEED_SORT;
uint64_t data_type_ : BIT_DATA_TYPE;
uint64_t dup_action_ : BIT_DUP_ACTION_TYPE;
uint64_t reserved_ : BIT_RESERVED;
};
};
ObTableLoadFlag() : flag_(0) {}
void reset() { flag_ = 0; }
TO_STRING_KV(K_(is_need_sort), K_(data_type));
TO_STRING_KV(K_(is_need_sort), K_(data_type), K_(dup_action));
};
struct ObTableLoadConfig final

View File

@ -12,7 +12,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
using namespace share;
using namespace sql;
@ -23,8 +22,7 @@ using namespace sql;
ObDirectLoadDataFuseParam::ObDirectLoadDataFuseParam()
: store_column_count_(0),
datum_utils_(nullptr),
error_row_handler_(nullptr),
result_info_(nullptr)
dml_row_handler_(nullptr)
{
}
@ -35,7 +33,7 @@ ObDirectLoadDataFuseParam::~ObDirectLoadDataFuseParam()
bool ObDirectLoadDataFuseParam::is_valid() const
{
return tablet_id_.is_valid() && store_column_count_ > 0 && table_data_desc_.is_valid() &&
nullptr != datum_utils_ && nullptr != error_row_handler_ && nullptr != result_info_;
nullptr != datum_utils_ && nullptr != dml_row_handler_;
}
/**
@ -223,49 +221,34 @@ int ObDirectLoadDataFuse::inner_get_next_row(const ObDatumRow *&datum_row)
if (OB_FAIL(rows_merger_.pop())) {
LOG_WARN("fail to pop item", KR(ret));
} else if (item->iter_idx_ == LOAD_IDX) {
ATOMIC_INC(&param_.result_info_->rows_affected_);
if (OB_FAIL(param_.dml_row_handler_->handle_insert_row(*datum_row))) {
LOG_WARN("fail to handle insert row", KR(ret), KPC(datum_row));
}
}
}
} else {
const Item *item = nullptr;
const ObDatumRow *old_row = nullptr;
const ObDatumRow *new_row = nullptr;
while (OB_SUCC(ret) && !rows_merger_.empty()) {
if (OB_FAIL(rows_merger_.top(item))) {
LOG_WARN("fail to rebuild", KR(ret));
} else {
if (ObLoadDupActionType::LOAD_STOP_ON_DUP == param_.dup_action_) {
if (item->iter_idx_ == ORIGIN_IDX) {
datum_row = item->datum_row_;
old_row = item->datum_row_;
} else {
ObTableLoadErrorRowHandler *error_row_handler = param_.error_row_handler_;
if (OB_FAIL(error_row_handler->append_error_row(*item->datum_row_))) {
if ((OB_ERR_TOO_MANY_ROWS == ret)
&& (0 == param_.error_row_handler_->get_capacity())){
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
new_row = item->datum_row_;
}
LOG_WARN("fail to append row to error row handler", KR(ret), KPC(item->datum_row_));
}
}
} else if (ObLoadDupActionType::LOAD_IGNORE == param_.dup_action_) {
if (item->iter_idx_ == ORIGIN_IDX) {
datum_row = item->datum_row_;
} else {
ATOMIC_INC(&param_.result_info_->skipped_);
}
} else if (ObLoadDupActionType::LOAD_REPLACE == param_.dup_action_) {
if (item->iter_idx_ == ORIGIN_IDX) {
ATOMIC_INC(&param_.result_info_->deleted_);
ATOMIC_AAF(&param_.result_info_->rows_affected_, 2);
} else {
datum_row = item->datum_row_;
}
}
if (OB_SUCC(ret)) {
consumers_[consumer_cnt_++] = item->iter_idx_;
if (OB_FAIL(rows_merger_.pop())) {
LOG_WARN("fail to pop item", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(param_.dml_row_handler_->handle_update_row(*old_row, *new_row, datum_row))) {
LOG_WARN("fail to handle update row", KR(ret), KPC(old_row), KPC(new_row));
}
}
}
return ret;
@ -329,8 +312,7 @@ int ObDirectLoadSSTableDataFuse::init(const ObDirectLoadDataFuseParam &param,
scan_merge_param.tablet_id_ = param.tablet_id_;
scan_merge_param.table_data_desc_ = param.table_data_desc_;
scan_merge_param.datum_utils_ = param.datum_utils_;
scan_merge_param.error_row_handler_ = param.error_row_handler_;
scan_merge_param.result_info_ = param.result_info_;
scan_merge_param.dml_row_handler_ = param.dml_row_handler_;
if (OB_FAIL(scan_merge_.init(scan_merge_param, sstable_array, range))) {
LOG_WARN("fail to init scan merge", KR(ret));
}
@ -401,8 +383,7 @@ int ObDirectLoadMultipleSSTableDataFuse::init(
ObDirectLoadMultipleSSTableScanMergeParam scan_merge_param;
scan_merge_param.table_data_desc_ = param.table_data_desc_;
scan_merge_param.datum_utils_ = param.datum_utils_;
scan_merge_param.error_row_handler_ = param.error_row_handler_;
scan_merge_param.result_info_ = param.result_info_;
scan_merge_param.dml_row_handler_ = param.dml_row_handler_;
if (OB_FAIL(scan_merge_.init(scan_merge_param, sstable_array, range_))) {
LOG_WARN("fail to init scan merge", KR(ret));
}

View File

@ -4,9 +4,9 @@
#pragma once
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "share/table/ob_table_load_define.h"
#include "sql/resolver/cmd/ob_load_data_stmt.h"
#include "storage/direct_load/ob_direct_load_dml_row_handler.h"
#include "storage/direct_load/ob_direct_load_multiple_datum_range.h"
#include "storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h"
#include "storage/direct_load/ob_direct_load_origin_table.h"
@ -26,15 +26,13 @@ public:
~ObDirectLoadDataFuseParam();
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(store_column_count), K_(table_data_desc), KP_(datum_utils),
KP_(error_row_handler), KP_(result_info));
KP_(dml_row_handler));
public:
common::ObTabletID tablet_id_;
int64_t store_column_count_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
sql::ObLoadDupActionType dup_action_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
};
class ObDirectLoadDataFuse

View File

@ -0,0 +1,31 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// suzhi.yt <>
#pragma once
#include "storage/blocksstable/ob_datum_row.h"
namespace oceanbase
{
namespace storage
{
class ObDirectLoadDMLRowHandler
{
public:
ObDirectLoadDMLRowHandler() = default;
virtual ~ObDirectLoadDMLRowHandler() = default;
// handle rows direct insert into sstable
virtual int handle_insert_row(const blocksstable::ObDatumRow &row) = 0;
// handle rows with the same primary key in the imported data
virtual int handle_update_row(const blocksstable::ObDatumRow &row) = 0;
// handle rows with the same primary key between the imported data and the original data
virtual int handle_update_row(const blocksstable::ObDatumRow &old_row,
const blocksstable::ObDatumRow &new_row,
const blocksstable::ObDatumRow *&result_row) = 0;
DECLARE_PURE_VIRTUAL_TO_STRING;
};
} // namespace storage
} // namespace oceanbase

View File

@ -60,5 +60,5 @@ private:
};
} // namespace observer
} // namespace storage
} // namespace oceanbase

View File

@ -14,7 +14,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
/**
* ObDirectLoadExternalMultiPartitionTableBuildParam

View File

@ -9,6 +9,7 @@
#include "share/stat/ob_stat_define.h"
#include "share/table/ob_table_load_define.h"
#include "storage/ddl/ob_direct_insert_sstable_ctx.h"
#include "storage/direct_load/ob_direct_load_dml_row_handler.h"
#include "storage/direct_load/ob_direct_load_fast_heap_table.h"
#include "storage/direct_load/ob_direct_load_insert_table_ctx.h"
@ -30,7 +31,7 @@ ObDirectLoadFastHeapTableBuildParam::ObDirectLoadFastHeapTableBuildParam()
col_descs_(nullptr),
insert_table_ctx_(nullptr),
fast_heap_table_ctx_(nullptr),
result_info_(nullptr),
dml_row_handler_(nullptr),
online_opt_stat_gather_(false)
{
}
@ -43,7 +44,7 @@ bool ObDirectLoadFastHeapTableBuildParam::is_valid() const
{
return tablet_id_.is_valid() && snapshot_version_ > 0 && table_data_desc_.is_valid() &&
nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_ &&
nullptr != result_info_ && nullptr != datum_utils_;
nullptr != dml_row_handler_ && nullptr != datum_utils_;
}
/**
@ -225,7 +226,11 @@ int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id,
LOG_WARN("fail to collect", KR(ret));
} else {
++row_count_;
ATOMIC_INC(&param_.result_info_->rows_affected_);
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(param_.dml_row_handler_->handle_insert_row(datum_row_))) {
LOG_WARN("fail to handle insert row", KR(ret), K_(datum_row));
}
}
}

View File

@ -24,6 +24,7 @@ namespace storage
{
class ObDirectLoadInsertTableContext;
class ObSSTableInsertSliceWriter;
class ObDirectLoadDMLRowHandler;
struct ObDirectLoadFastHeapTableBuildParam
{
@ -32,7 +33,7 @@ public:
~ObDirectLoadFastHeapTableBuildParam();
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(table_data_desc), KP_(insert_table_ctx),
KP_(fast_heap_table_ctx), KP_(result_info), K_(online_opt_stat_gather));
KP_(fast_heap_table_ctx), KP_(dml_row_handler), K_(online_opt_stat_gather));
public:
common::ObTabletID tablet_id_;
int64_t snapshot_version_;
@ -41,7 +42,7 @@ public:
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
bool online_opt_stat_gather_;
};

View File

@ -5,12 +5,12 @@
#ifndef OB_DIRECT_LOAD_MEM_CONTEXT_H_
#define OB_DIRECT_LOAD_MEM_CONTEXT_H_
#include "storage/direct_load/ob_direct_load_easy_queue.h"
#include "storage/direct_load/ob_direct_load_mem_define.h"
#include "storage/direct_load/ob_direct_load_i_table.h"
#include "storage/direct_load/ob_direct_load_table_data_desc.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "share/table/ob_table_load_define.h"
#include "storage/direct_load/ob_direct_load_easy_queue.h"
#include "storage/direct_load/ob_direct_load_dml_row_handler.h"
#include "storage/direct_load/ob_direct_load_i_table.h"
#include "storage/direct_load/ob_direct_load_mem_define.h"
#include "storage/direct_load/ob_direct_load_table_data_desc.h"
namespace oceanbase
{
@ -49,8 +49,8 @@ public:
need_sort_(false),
mem_load_task_count_(0),
column_count_(0),
dml_row_handler_(nullptr),
file_mgr_(nullptr),
result_info_(nullptr),
fly_mem_chunk_count_(0),
finish_compact_count_(0),
mem_dump_task_count_(0),
@ -76,9 +76,8 @@ public:
bool need_sort_; // false: sstable, true: external_table
int32_t mem_load_task_count_;
int32_t column_count_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
ObDirectLoadTmpFileManager *file_mgr_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadEasyQueue<storage::ObDirectLoadExternalMultiPartitionRowChunk *> mem_chunk_queue_;
int64_t fly_mem_chunk_count_;

View File

@ -5,9 +5,6 @@
#define USING_LOG_PREFIX STORAGE
#include "storage/direct_load/ob_direct_load_mem_dump.h"
#include "observer/table_load/ob_table_load_mem_compactor.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "storage/direct_load/ob_direct_load_external_table.h"
#include "storage/direct_load/ob_direct_load_external_table_builder.h"
#include "storage/direct_load/ob_direct_load_external_table_compactor.h"
@ -20,7 +17,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
using namespace table;
using namespace sql;
@ -268,22 +264,11 @@ int ObDirectLoadMemDump::dump_tables()
LOG_WARN("fail to transfer dataum row", KR(ret));
} else if (OB_FAIL(table_builder->append_row(external_row->tablet_id_, datum_row))) {
if (OB_LIKELY(OB_ERR_PRIMARY_KEY_DUPLICATE == ret)) {
int tmp_ret = OB_SUCCESS;
if (mem_ctx_->error_row_handler_->get_action() == ObLoadDupActionType::LOAD_REPLACE) {
ATOMIC_AAF(&mem_ctx_->result_info_->rows_affected_, 2);
ATOMIC_INC(&mem_ctx_->result_info_->deleted_);
} else if (mem_ctx_->error_row_handler_->get_action() == ObLoadDupActionType::LOAD_IGNORE) {
ATOMIC_INC(&mem_ctx_->result_info_->skipped_);
} else if (mem_ctx_->error_row_handler_->get_action() == ObLoadDupActionType::LOAD_STOP_ON_DUP) {
if (OB_TMP_FAIL(mem_ctx_->error_row_handler_->append_error_row(datum_row))) {
LOG_WARN("failed to append row to error row handler", K(tmp_ret), K(datum_row));
}
}
if (OB_LIKELY(OB_SUCCESS == tmp_ret)) {
ret = OB_SUCCESS;
if (OB_FAIL(mem_ctx_->dml_row_handler_->handle_update_row(datum_row))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row));
}
} else {
LOG_WARN("fail to append row", K(ret), K(datum_row));
LOG_WARN("fail to append row", KR(ret), K(datum_row));
}
}
}

View File

@ -4,16 +4,11 @@
#define USING_LOG_PREFIX STORAGE
#include "observer/table_load/ob_table_load_partition_location.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "storage/direct_load/ob_direct_load_mem_sample.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_task.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_task_scheduler.h"
#include "share/table/ob_table_load_handle.h"
#include "observer/table_load/ob_table_load_mem_compactor.h"
namespace oceanbase
{

View File

@ -46,8 +46,7 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam()
is_fast_heap_table_(false),
online_opt_stat_gather_(false),
insert_table_ctx_(nullptr),
error_row_handler_(nullptr),
result_info_(nullptr)
dml_row_handler_(nullptr)
{
}
@ -59,7 +58,7 @@ bool ObDirectLoadMergeParam::is_valid() const
{
return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ &&
snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ &&
nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != error_row_handler_;
nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != dml_row_handler_;
}
/**

View File

@ -18,11 +18,6 @@
namespace oceanbase
{
namespace observer
{
class ObTableLoadErrorRowHandler;
class ObTableLoadSchema;
} // namespace observer
namespace common {
class ObOptOSGColumnStat;
class ObOptTableStat;
@ -37,6 +32,7 @@ class ObDirectLoadSSTable;
class ObDirectLoadMultipleSSTable;
class ObDirectLoadMultipleHeapTable;
class ObDirectLoadMultipleMergeRangeSplitter;
class ObDirectLoadDMLRowHandler;
struct ObDirectLoadMergeParam
{
@ -47,7 +43,7 @@ public:
TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count),
K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), K_(is_heap_table),
K_(is_fast_heap_table), K_(online_opt_stat_gather), KP_(insert_table_ctx),
KP_(error_row_handler), KP_(result_info));
KP_(dml_row_handler));
public:
uint64_t table_id_;
uint64_t target_table_id_;
@ -61,8 +57,7 @@ public:
bool is_fast_heap_table_;
bool online_opt_stat_gather_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
};
class ObDirectLoadMergeCtx

View File

@ -5,7 +5,7 @@
#define USING_LOG_PREFIX STORAGE
#include "storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "storage/direct_load/ob_direct_load_dml_row_handler.h"
#include "storage/blocksstable/ob_datum_range.h"
#include "storage/direct_load/ob_direct_load_multiple_datum_range.h"
#include "storage/direct_load/ob_direct_load_multiple_sstable_scanner.h"
@ -16,7 +16,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
using namespace sql;
/**
@ -24,7 +23,7 @@ using namespace sql;
*/
ObDirectLoadMultipleSSTableScanMergeParam::ObDirectLoadMultipleSSTableScanMergeParam()
: datum_utils_(nullptr), error_row_handler_(nullptr), result_info_(nullptr)
: datum_utils_(nullptr), dml_row_handler_(nullptr)
{
}
@ -32,8 +31,7 @@ ObDirectLoadMultipleSSTableScanMergeParam::~ObDirectLoadMultipleSSTableScanMerge
bool ObDirectLoadMultipleSSTableScanMergeParam::is_valid() const
{
return table_data_desc_.is_valid() && nullptr != datum_utils_ && nullptr != error_row_handler_ &&
nullptr != result_info_;
return table_data_desc_.is_valid() && nullptr != datum_utils_ && nullptr != dml_row_handler_;
}
/**
@ -42,7 +40,7 @@ bool ObDirectLoadMultipleSSTableScanMergeParam::is_valid() const
ObDirectLoadMultipleSSTableScanMerge::ObDirectLoadMultipleSSTableScanMerge()
: datum_utils_(nullptr),
error_row_handler_(nullptr),
dml_row_handler_(nullptr),
range_(nullptr),
consumers_(nullptr),
consumer_cnt_(0),
@ -59,7 +57,7 @@ void ObDirectLoadMultipleSSTableScanMerge::reset()
{
table_data_desc_.reset();
datum_utils_ = nullptr;
error_row_handler_ = nullptr;
dml_row_handler_ = nullptr;
range_ = nullptr;
for (int64_t i = 0; i < scanners_.count(); ++i) {
scanners_[i]->~ObDirectLoadMultipleSSTableScanner();
@ -131,8 +129,7 @@ int ObDirectLoadMultipleSSTableScanMerge::init(
if (OB_SUCC(ret)) {
table_data_desc_ = param.table_data_desc_;
datum_utils_ = param.datum_utils_;
error_row_handler_ = param.error_row_handler_;
result_info_ = param.result_info_;
dml_row_handler_ = param.dml_row_handler_;
range_ = &range;
is_inited_ = true;
}
@ -215,20 +212,11 @@ int ObDirectLoadMultipleSSTableScanMerge::inner_get_next_row(
} else if (OB_LIKELY(rows_merger_->is_unique_champion())) {
datum_row = top_item->row_;
} else {
// record same rowkey row
if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_REPLACE) {
ATOMIC_AAF(&result_info_->rows_affected_, 2);
ATOMIC_INC(&result_info_->deleted_);
} else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_IGNORE) {
ATOMIC_INC(&result_info_->skipped_);
} else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_STOP_ON_DUP) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(top_item->row_->to_datums(datum_row_.storage_datums_,
datum_row_.count_))) {
LOG_WARN("fail to transfer external row to datums", KR(tmp_ret));
} else if (OB_TMP_FAIL(error_row_handler_->append_error_row(datum_row_))) {
LOG_WARN("fail to append row to error row handler", KR(tmp_ret), K(datum_row_));
}
// handle same rowkey row
if (OB_FAIL(top_item->row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) {
LOG_WARN("fail to transfer external row to datums", KR(ret));
} else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row_));
}
}
if (OB_SUCC(ret)) {

View File

@ -19,12 +19,9 @@ namespace blocksstable
{
class ObStorageDatumUtils;
} // namespace blocksstable
namespace observer
{
class ObTableLoadErrorRowHandler;
} // namespace observer
namespace storage
{
class ObDirectLoadDMLRowHandler;
struct ObDirectLoadMultipleSSTableScanMergeParam
{
@ -32,12 +29,11 @@ public:
ObDirectLoadMultipleSSTableScanMergeParam();
~ObDirectLoadMultipleSSTableScanMergeParam();
bool is_valid() const;
TO_STRING_KV(K_(table_data_desc), KP_(datum_utils), KP_(error_row_handler), KP_(result_info));
TO_STRING_KV(K_(table_data_desc), KP_(datum_utils), KP_(dml_row_handler));
public:
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
};
class ObDirectLoadMultipleSSTableScanMerge : public ObIStoreRowIterator
@ -66,8 +62,7 @@ private:
common::ObArenaAllocator allocator_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
const ObDirectLoadMultipleDatumRange *range_;
common::ObSEArray<ObDirectLoadMultipleSSTableScanner *, 64> scanners_;
int64_t *consumers_;

View File

@ -16,7 +16,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
using namespace share;
using namespace share::schema;

View File

@ -21,7 +21,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
using namespace share;
/**
@ -230,9 +229,7 @@ int ObDirectLoadPartitionRangeMergeTask::RowIterator::init(
data_fuse_param.store_column_count_ = merge_param.store_column_count_;
data_fuse_param.table_data_desc_ = merge_param.table_data_desc_;
data_fuse_param.datum_utils_ = merge_param.datum_utils_;
data_fuse_param.error_row_handler_ = merge_param.error_row_handler_;
data_fuse_param.dup_action_ = merge_param.error_row_handler_->get_action();
data_fuse_param.result_info_ = merge_param.result_info_;
data_fuse_param.dml_row_handler_ = merge_param.dml_row_handler_;
if (OB_FAIL(data_fuse_.init(data_fuse_param, origin_table, sstable_array, range))) {
LOG_WARN("fail to init data fuse", KR(ret));
}
@ -386,9 +383,7 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::RowIterator::init(
data_fuse_param.store_column_count_ = merge_param.store_column_count_;
data_fuse_param.table_data_desc_ = merge_param.table_data_desc_;
data_fuse_param.datum_utils_ = merge_param.datum_utils_;
data_fuse_param.error_row_handler_ = merge_param.error_row_handler_;
data_fuse_param.dup_action_ = merge_param.error_row_handler_->get_action();
data_fuse_param.result_info_ = merge_param.result_info_;
data_fuse_param.dml_row_handler_ = merge_param.dml_row_handler_;
if (OB_FAIL(data_fuse_.init(data_fuse_param, origin_table, sstable_array, range))) {
LOG_WARN("fail to init data fuse", KR(ret));
}
@ -516,7 +511,7 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::construct_row_iter(
ObDirectLoadPartitionHeapTableMergeTask::RowIterator::RowIterator()
: deserialize_datums_(nullptr),
deserialize_datum_cnt_(0),
result_info_(nullptr),
dml_row_handler_(nullptr),
is_inited_(false)
{
}
@ -562,7 +557,7 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::init(
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_;
pk_interval_ = pk_interval;
result_info_ = merge_param.result_info_;
dml_row_handler_ = merge_param.dml_row_handler_;
is_inited_ = true;
}
}
@ -592,7 +587,11 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::get_next_row(
// fill hide pk
datum_row_.storage_datums_[0].set_int(pk_seq);
result_row = &datum_row_;
ATOMIC_INC(&result_info_->rows_affected_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(dml_row_handler_->handle_insert_row(*result_row))) {
LOG_WARN("fail to handle insert row", KR(ret), KPC(result_row));
}
}
}
return ret;
@ -671,7 +670,7 @@ int ObDirectLoadPartitionHeapTableMergeTask::construct_row_iter(
ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::RowIterator()
: deserialize_datums_(nullptr),
deserialize_datum_cnt_(0),
result_info_(nullptr),
dml_row_handler_(nullptr),
is_inited_(false)
{
}
@ -715,7 +714,7 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::init(
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_;
pk_interval_ = pk_interval;
result_info_ = merge_param.result_info_;
dml_row_handler_ = merge_param.dml_row_handler_;
is_inited_ = true;
}
}
@ -746,7 +745,11 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::get_next_row(
// fill hide pk
datum_row_.storage_datums_[0].set_int(pk_seq);
result_row = &datum_row_;
ATOMIC_INC(&result_info_->rows_affected_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(dml_row_handler_->handle_insert_row(*result_row))) {
LOG_WARN("fail to handle insert row", KR(ret), KPC(result_row));
}
}
}
return ret;
@ -829,7 +832,7 @@ ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::RowIterat
pos_(0),
deserialize_datums_(nullptr),
deserialize_datum_cnt_(0),
result_info_(nullptr),
dml_row_handler_(nullptr),
is_inited_(false)
{
}
@ -882,7 +885,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::init(
table_data_desc_ = merge_param.table_data_desc_;
heap_table_array_ = heap_table_array;
pk_interval_ = pk_interval;
result_info_ = merge_param.result_info_;
dml_row_handler_ = merge_param.dml_row_handler_;
is_inited_ = true;
}
}
@ -899,6 +902,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_n
LOG_WARN("ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator not init",
KR(ret), KP(this));
} else {
// get row from origin table
if (pos_ == 0) {
const ObDatumRow *datum_row = nullptr;
if (OB_FAIL(origin_iter_->get_next_row(datum_row))) {
@ -931,6 +935,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_n
result_row = &datum_row_;
}
}
// get row from load data
while (OB_SUCC(ret) && result_row == nullptr) {
const ObDirectLoadMultipleExternalRow *external_row = nullptr;
uint64_t pk_seq = OB_INVALID_ID;
@ -954,7 +959,11 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_n
// fill hide pk
datum_row_.storage_datums_[0].set_int(pk_seq);
result_row = &datum_row_;
ATOMIC_INC(&result_info_->rows_affected_);
}
if (OB_SUCC(ret) && nullptr != result_row) {
if (OB_FAIL(dml_row_handler_->handle_insert_row(*result_row))) {
LOG_WARN("fail to handle insert row", KR(ret), KPC(result_row));
}
}
}
}

View File

@ -162,7 +162,7 @@ private:
blocksstable::ObStorageDatum *deserialize_datums_;
int64_t deserialize_datum_cnt_;
share::ObTabletCacheInterval pk_interval_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
bool is_inited_;
};
private:
@ -199,7 +199,7 @@ private:
blocksstable::ObStorageDatum *deserialize_datums_;
int64_t deserialize_datum_cnt_;
share::ObTabletCacheInterval pk_interval_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
bool is_inited_;
};
private:
@ -249,7 +249,7 @@ private:
blocksstable::ObStorageDatum *deserialize_datums_;
int64_t deserialize_datum_cnt_;
share::ObTabletCacheInterval pk_interval_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
bool is_inited_;
};
private:

View File

@ -13,7 +13,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
/**
* ObDirectLoadIndexBlock

View File

@ -5,8 +5,8 @@
#define USING_LOG_PREFIX STORAGE
#include "storage/direct_load/ob_direct_load_sstable_scan_merge.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "storage/blocksstable/ob_datum_range.h"
#include "storage/direct_load/ob_direct_load_dml_row_handler.h"
#include "storage/direct_load/ob_direct_load_sstable_scanner.h"
namespace oceanbase
@ -15,7 +15,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
using namespace sql;
/**
@ -23,7 +22,7 @@ using namespace sql;
*/
ObDirectLoadSSTableScanMergeParam::ObDirectLoadSSTableScanMergeParam()
: datum_utils_(nullptr), error_row_handler_(nullptr), result_info_(nullptr)
: datum_utils_(nullptr), dml_row_handler_(nullptr)
{
}
@ -34,7 +33,7 @@ ObDirectLoadSSTableScanMergeParam::~ObDirectLoadSSTableScanMergeParam()
bool ObDirectLoadSSTableScanMergeParam::is_valid() const
{
return tablet_id_.is_valid() && table_data_desc_.is_valid() && nullptr != datum_utils_ &&
nullptr != error_row_handler_ && nullptr != result_info_;
nullptr != dml_row_handler_;
}
/**
@ -43,7 +42,7 @@ bool ObDirectLoadSSTableScanMergeParam::is_valid() const
ObDirectLoadSSTableScanMerge::ObDirectLoadSSTableScanMerge()
: datum_utils_(nullptr),
error_row_handler_(nullptr),
dml_row_handler_(nullptr),
range_(nullptr),
consumers_(nullptr),
consumer_cnt_(0),
@ -64,7 +63,7 @@ void ObDirectLoadSSTableScanMerge::reset()
tablet_id_.reset();
table_data_desc_.reset();
datum_utils_ = nullptr;
error_row_handler_ = nullptr;
dml_row_handler_ = nullptr;
range_ = nullptr;
for (int64_t i = 0; i < scanners_.count(); ++i) {
scanners_[i]->~ObDirectLoadSSTableScanner();
@ -136,8 +135,7 @@ int ObDirectLoadSSTableScanMerge::init(const ObDirectLoadSSTableScanMergeParam &
tablet_id_ = param.tablet_id_;
table_data_desc_ = param.table_data_desc_;
datum_utils_ = param.datum_utils_;
error_row_handler_ = param.error_row_handler_;
result_info_ = param.result_info_;
dml_row_handler_ = param.dml_row_handler_;
range_ = &range;
is_inited_ = true;
}
@ -219,20 +217,11 @@ int ObDirectLoadSSTableScanMerge::inner_get_next_row(const ObDirectLoadExternalR
} else if (OB_LIKELY(rows_merger_->is_unique_champion())) {
external_row = top_item->external_row_;
} else {
// record same rowkey row
if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_REPLACE) {
ATOMIC_AAF(&result_info_->rows_affected_, 2);
ATOMIC_INC(&result_info_->deleted_);
} else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_IGNORE) {
ATOMIC_INC(&result_info_->skipped_);
} else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_STOP_ON_DUP) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(top_item->external_row_->to_datums(datum_row_.storage_datums_,
datum_row_.count_))) {
LOG_WARN("fail to transfer external row to datums", KR(tmp_ret));
} else if (OB_TMP_FAIL(error_row_handler_->append_error_row(datum_row_))) {
LOG_WARN("fail to append row to error row handler", KR(tmp_ret), K(datum_row_));
}
// handle same rowkey row
if (OB_FAIL(top_item->external_row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) {
LOG_WARN("fail to transfer external row to datums", KR(ret));
} else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row_));
}
}
if (OB_SUCC(ret)) {

View File

@ -20,26 +20,23 @@ namespace blocksstable
class ObDatumRange;
class ObStorageDatumUtils;
} // namespace blocksstable
namespace observer
{
class ObTableLoadErrorRowHandler;
} // namespace observer
namespace storage
{
class ObDirectLoadExternalRow;
class ObDirectLoadDMLRowHandler;
struct ObDirectLoadSSTableScanMergeParam
{
public:
ObDirectLoadSSTableScanMergeParam();
~ObDirectLoadSSTableScanMergeParam();
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils), KP_(error_row_handler), KP_(result_info));
TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils), KP_(dml_row_handler));
public:
common::ObTabletID tablet_id_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
};
class ObDirectLoadSSTableScanMerge : public ObIStoreRowIterator
@ -69,8 +66,7 @@ private:
common::ObTabletID tablet_id_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;
table::ObTableLoadResultInfo *result_info_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
const blocksstable::ObDatumRange *range_;
common::ObSEArray<ObDirectLoadSSTableScanner *, 64> scanners_;
int64_t *consumers_;

View File

@ -13,7 +13,6 @@ namespace storage
{
using namespace common;
using namespace blocksstable;
using namespace observer;
/**
* ObDirectLoadSSTableScanner

View File

@ -32,9 +32,9 @@ ObDirectLoadTableStoreParam::ObDirectLoadTableStoreParam()
is_fast_heap_table_(false),
insert_table_ctx_(nullptr),
fast_heap_table_ctx_(nullptr),
dml_row_handler_(nullptr),
extra_buf_(nullptr),
extra_buf_size_(0),
result_info_(nullptr)
extra_buf_size_(0)
{
}
@ -48,7 +48,7 @@ bool ObDirectLoadTableStoreParam::is_valid() const
nullptr != file_mgr_ &&
(!is_fast_heap_table_ ||
(nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_)) &&
nullptr != result_info_;
nullptr != dml_row_handler_;
}
/**
@ -105,7 +105,7 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam &param,
fast_heap_table_build_param.col_descs_ = param.col_descs_;
fast_heap_table_build_param.insert_table_ctx_ = param.insert_table_ctx_;
fast_heap_table_build_param.fast_heap_table_ctx_ = param.fast_heap_table_ctx_;
fast_heap_table_build_param.result_info_ = param.result_info_;
fast_heap_table_build_param.dml_row_handler_ = param.dml_row_handler_;
fast_heap_table_build_param.online_opt_stat_gather_ = param.online_opt_stat_gather_;
ObDirectLoadFastHeapTableBuilder *fast_heap_table_builder = nullptr;
if (OB_ISNULL(fast_heap_table_builder =

View File

@ -19,6 +19,7 @@ class ObDirectLoadTmpFileManager;
class ObDirectLoadTableBuilderAllocator;
class ObDirectLoadInsertTableContext;
class ObDirectLoadFastHeapTableContext;
class ObDirectLoadDMLRowHandler;
struct ObDirectLoadTableStoreParam
{
@ -28,7 +29,7 @@ public:
bool is_valid() const;
TO_STRING_KV(K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs),
KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table), KP_(insert_table_ctx),
KP_(fast_heap_table_ctx), KP_(extra_buf), K_(extra_buf_size), KP_(result_info));
KP_(fast_heap_table_ctx), KP_(dml_row_handler), KP_(extra_buf), K_(extra_buf_size));
public:
int64_t snapshot_version_;
ObDirectLoadTableDataDesc table_data_desc_;
@ -40,9 +41,9 @@ public:
bool online_opt_stat_gather_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
char *extra_buf_;
int64_t extra_buf_size_;
table::ObTableLoadResultInfo *result_info_;
};
class ObDirectLoadTableStoreBucket