[FEAT MERGE] patch feature 'direct load from backup data' to 432
This commit is contained in:
@ -23,6 +23,7 @@
|
||||
#include "share/schema/ob_schema_getter_guard.h"
|
||||
#include "share/ob_device_manager.h"
|
||||
#include "share/backup/ob_backup_io_adapter.h"
|
||||
#include "observer/table_load/backup/ob_table_load_backup_table.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -909,7 +910,8 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
|
||||
*/
|
||||
|
||||
ObLoadDataDirectImpl::FileLoadExecutor::FileLoadExecutor()
|
||||
: execute_param_(nullptr),
|
||||
: allocator_("TLD_FileLoad"),
|
||||
execute_param_(nullptr),
|
||||
execute_ctx_(nullptr),
|
||||
task_scheduler_(nullptr),
|
||||
worker_count_(0),
|
||||
@ -917,6 +919,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::FileLoadExecutor()
|
||||
total_line_count_(0),
|
||||
is_inited_(false)
|
||||
{
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
handle_resource_.set_tenant_id(MTL_ID());
|
||||
}
|
||||
|
||||
@ -943,6 +946,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::~FileLoadExecutor()
|
||||
execute_ctx_->allocator_->free(handle);
|
||||
}
|
||||
handle_resource_.reset();
|
||||
trans_ctx_.reset();
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &execute_param,
|
||||
@ -1034,6 +1038,10 @@ int ObLoadDataDirectImpl::FileLoadExecutor::execute()
|
||||
} else {
|
||||
if (OB_FAIL(prepare_execute())) {
|
||||
LOG_WARN("fail to prepare execute", KR(ret));
|
||||
} else if (OB_FAIL(execute_ctx_->direct_loader_->start_trans(trans_ctx_,
|
||||
ObTableLoadInstance::DEFAULT_SEGMENT_ID,
|
||||
allocator_))) {
|
||||
LOG_WARN("fail to start trans", KR(ret));
|
||||
}
|
||||
|
||||
LOG_TRACE("file load executor prepare execute done", K(ret));
|
||||
@ -1075,6 +1083,12 @@ int ObLoadDataDirectImpl::FileLoadExecutor::execute()
|
||||
LOG_WARN("fail to handle all task result", KR(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(execute_ctx_->direct_loader_->commit_trans(trans_ctx_))) {
|
||||
LOG_WARN("fail to commit trans", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE("large file load executor init done", K(ret));
|
||||
@ -1266,7 +1280,8 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(TaskHandle *hand
|
||||
} // end while()
|
||||
|
||||
if (OB_SUCC(ret) && (processed_line_count > 0)) {
|
||||
if (OB_FAIL(execute_ctx_->direct_loader_->write(handle->session_id_, obj_rows))) {
|
||||
if (OB_FAIL(execute_ctx_->direct_loader_->write_trans(trans_ctx_, handle->session_id_,
|
||||
obj_rows))) {
|
||||
LOG_WARN("fail to write objs", KR(ret));
|
||||
} else {
|
||||
total_processed_line_count += processed_line_count;
|
||||
@ -1718,6 +1733,413 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::fill_task(TaskHandle *handle,
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* BackupLoadTaskProcessor
|
||||
*/
|
||||
|
||||
class ObLoadDataDirectImpl::BackupLoadTaskProcessor : public ObITableLoadTaskProcessor
|
||||
{
|
||||
public:
|
||||
BackupLoadTaskProcessor(ObTableLoadTask &task, BackupLoadExecutor *load_executor,
|
||||
int32_t session_id)
|
||||
: ObITableLoadTaskProcessor(task), load_executor_(load_executor), session_id_(session_id)
|
||||
{
|
||||
}
|
||||
virtual ~BackupLoadTaskProcessor() = default;
|
||||
int process() override;
|
||||
INHERIT_TO_STRING_KV("task_processor", ObITableLoadTaskProcessor, KP_(load_executor));
|
||||
private:
|
||||
BackupLoadExecutor *load_executor_;
|
||||
int32_t session_id_;
|
||||
};
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadTaskProcessor::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t partition_idx = -1;
|
||||
int64_t subpart_count = 0;
|
||||
int64_t subpart_idx = -1;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(load_executor_->check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret), K(session_id_));
|
||||
} else if (OB_FAIL(load_executor_->get_next_partition_task(partition_idx, subpart_count,
|
||||
subpart_idx))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next partition idx", KR(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else if (OB_FAIL(load_executor_->process_partition(session_id_, partition_idx, subpart_count,
|
||||
subpart_idx))) {
|
||||
LOG_WARN("fail to process partition", KR(ret), K(session_id_), K(partition_idx),
|
||||
K(subpart_count), K(subpart_idx));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* BackupLoadTaskCallback
|
||||
*/
|
||||
|
||||
class ObLoadDataDirectImpl::BackupLoadTaskCallback : public ObITableLoadTaskCallback
|
||||
{
|
||||
public:
|
||||
BackupLoadTaskCallback(BackupLoadExecutor *load_executor) : load_executor_(load_executor) {}
|
||||
virtual ~BackupLoadTaskCallback() = default;
|
||||
void callback(int ret_code, ObTableLoadTask *task) override
|
||||
{
|
||||
load_executor_->task_finished(task, ret_code);
|
||||
}
|
||||
private:
|
||||
BackupLoadExecutor *load_executor_;
|
||||
};
|
||||
|
||||
/**
|
||||
* BackupLoadExecutor
|
||||
*/
|
||||
|
||||
ObLoadDataDirectImpl::BackupLoadExecutor::BackupLoadExecutor()
|
||||
: allocator_("TLD_BackupLoad"),
|
||||
execute_param_(nullptr),
|
||||
execute_ctx_(nullptr),
|
||||
backup_table_(nullptr),
|
||||
task_scheduler_(nullptr),
|
||||
worker_count_(0),
|
||||
partition_count_(0),
|
||||
subpart_count_(0),
|
||||
next_partition_idx_(0),
|
||||
next_subpart_idx_(0),
|
||||
total_line_count_(0),
|
||||
task_error_code_(OB_SUCCESS),
|
||||
is_inited_(false)
|
||||
{
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
}
|
||||
|
||||
ObLoadDataDirectImpl::BackupLoadExecutor::~BackupLoadExecutor()
|
||||
{
|
||||
if (nullptr != task_scheduler_) {
|
||||
task_scheduler_->stop();
|
||||
task_scheduler_->wait();
|
||||
task_scheduler_->~ObITableLoadTaskScheduler();
|
||||
allocator_.free(task_scheduler_);
|
||||
task_scheduler_ = nullptr;
|
||||
}
|
||||
if (nullptr != backup_table_) {
|
||||
backup_table_->~ObTableLoadBackupTable();
|
||||
allocator_.free(backup_table_);
|
||||
backup_table_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadExecutor::init(const LoadExecuteParam &execute_param,
|
||||
LoadExecuteContext &execute_ctx,
|
||||
const ObString &path)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObLoadDataDirectImpl::BackupLoadExecutor init twice", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(!execute_param.is_valid() || !execute_ctx.is_valid() || path.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(execute_param), K(execute_ctx), K(path));
|
||||
} else {
|
||||
execute_param_ = &execute_param;
|
||||
execute_ctx_ = &execute_ctx;
|
||||
const DataAccessParam &data_access_param = execute_param.data_access_param_;
|
||||
if (OB_FAIL(check_support_direct_load())) {
|
||||
LOG_WARN("fail to check support direct load", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadBackupTable::get_table(ObTableLoadBackupVersion::V_1_4,
|
||||
backup_table_, allocator_))) {
|
||||
LOG_WARN("fail to get backup table", KR(ret));
|
||||
} else if (OB_FAIL(backup_table_->init(&data_access_param.access_info_, path))) {
|
||||
LOG_WARN("fail to init backup table", KR(ret));
|
||||
} else {
|
||||
const int64_t max_partition_count = ObTableLoadSequenceNo::MAX_BACKUP_PARTITION_IDX + 1;
|
||||
const int64_t max_subpart_count = ObTableLoadSequenceNo::MAX_BACKUP_SUBPART_IDX + 1;
|
||||
worker_count_ = execute_param.thread_count_;
|
||||
partition_count_ = backup_table_->get_partition_count();
|
||||
if (OB_UNLIKELY(worker_count_ <= 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected worker count", KR(ret), K(worker_count_));
|
||||
} else if (OB_UNLIKELY(partition_count_ <= 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected partition count", KR(ret), K(partition_count_));
|
||||
} else if (OB_UNLIKELY(partition_count_ > max_partition_count)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("table partition too much", KR(ret), K(partition_count_), K(max_partition_count));
|
||||
} else {
|
||||
// 计算subpart_count_
|
||||
if (partition_count_ >= worker_count_ * MIN_TASK_PER_WORKER) {
|
||||
// 分区数目足够, 不对分区进行拆分
|
||||
subpart_count_ = 1;
|
||||
} else {
|
||||
subpart_count_ =
|
||||
(worker_count_ * MIN_TASK_PER_WORKER + partition_count_ - 1) / partition_count_;
|
||||
subpart_count_ = MIN(subpart_count_, max_subpart_count);
|
||||
}
|
||||
if (OB_FAIL(task_controller_.init(worker_count_))) {
|
||||
LOG_WARN("fail to init task controller", KR(ret), K(worker_count_));
|
||||
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", execute_param_->tenant_id_))) {
|
||||
LOG_WARN("fail to init allocator", KR(ret));
|
||||
} else if (OB_ISNULL(task_scheduler_ =
|
||||
OB_NEWx(ObTableLoadTaskThreadPoolScheduler, &allocator_,
|
||||
worker_count_, execute_param_->table_id_, "Parse"))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||
LOG_WARN("fail to init task scheduler", KR(ret));
|
||||
} else if (OB_FAIL(task_scheduler_->start())) {
|
||||
LOG_WARN("fail to start task scheduler", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadExecutor::check_support_direct_load()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
const uint64_t table_id = execute_param_->table_id_;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
bool has_lob_column = false;
|
||||
if (OB_FAIL(
|
||||
ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
}
|
||||
// check has lob column
|
||||
else if (OB_FAIL(ObTableLoadSchema::check_has_lob_column(table_schema, has_lob_column))) {
|
||||
LOG_WARN("fail to check has lob column", KR(ret));
|
||||
} else if (has_lob_column) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("direct-load backup does not support table has lob column", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadExecutor::execute()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLoadDataDirectImpl::BackupLoadExecutor not init", KR(ret), KP(this));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < worker_count_; ++i) {
|
||||
if (OB_FAIL(task_controller_.on_next_task())) {
|
||||
LOG_WARN("fail to on next task", KR(ret));
|
||||
} else {
|
||||
ObTableLoadTask *task = nullptr;
|
||||
// 1. 分配task
|
||||
if (OB_ISNULL(task = task_allocator_.alloc(execute_param_->tenant_id_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc task", KR(ret));
|
||||
}
|
||||
// 2. 设置processor
|
||||
else if (OB_FAIL(task->set_processor<BackupLoadTaskProcessor>(this, i + 1))) {
|
||||
LOG_WARN("fail to set backup load task processor", KR(ret));
|
||||
}
|
||||
// 3. 设置callback
|
||||
else if (OB_FAIL(task->set_callback<BackupLoadTaskCallback>(this))) {
|
||||
LOG_WARN("fail to set backup load task callback", KR(ret));
|
||||
}
|
||||
// 4. 把task放入调度器
|
||||
else if (OB_FAIL(task_scheduler_->add_task(i, task))) {
|
||||
LOG_WARN("fail to add task", KR(ret), K(i), KPC(task));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
task_controller_.on_task_finished();
|
||||
if (nullptr != task) {
|
||||
task_allocator_.free(task);
|
||||
task = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
task_controller_.wait_all_task_finish(execute_param_->combined_name_.ptr(), THIS_WORKER.get_timeout_ts());
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ATOMIC_LOAD(&task_error_code_))) {
|
||||
LOG_WARN("task error", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadExecutor::get_next_partition_task(int64_t &partition_idx,
|
||||
int64_t &subpart_count,
|
||||
int64_t &subpart_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMutexGuard guard(mutex_);
|
||||
if (next_partition_idx_ >= partition_count_) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
partition_idx = next_partition_idx_;
|
||||
subpart_count = subpart_count_;
|
||||
subpart_idx = next_subpart_idx_++;
|
||||
if (next_subpart_idx_ >= subpart_count_) {
|
||||
// switch next partition
|
||||
next_partition_idx_++;
|
||||
next_subpart_idx_ = 0;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadExecutor::process_partition(int32_t session_id,
|
||||
int64_t partition_idx,
|
||||
int64_t subpart_count,
|
||||
int64_t subpart_idx)
|
||||
{
|
||||
LOG_INFO("start process partition", K(partition_idx), K(subpart_count), K(subpart_idx));
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t total_processed_line_count = 0;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLoadDataDirectImpl::BackupLoadExecutor not init", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(partition_idx < 0 || partition_idx >= partition_count_ ||
|
||||
subpart_count <= 0 || subpart_idx < 0 || subpart_idx >= subpart_count)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(partition_idx), K(partition_count_), K(subpart_idx),
|
||||
K(subpart_count));
|
||||
} else {
|
||||
ObArenaAllocator allocator("TLD_BackupLoad");
|
||||
allocator.set_tenant_id(MTL_ID());
|
||||
const int64_t column_count = execute_param_->data_access_param_.file_column_num_;
|
||||
ObTableLoadInstance *direct_loader = execute_ctx_->direct_loader_;
|
||||
ObTableLoadInstance::TransCtx trans_ctx;
|
||||
ObNewRowIterator *row_iter = nullptr;
|
||||
ObNewRow *new_row = nullptr;
|
||||
bool is_iter_end = false;
|
||||
int64_t processed_line_count = 0;
|
||||
const bool is_heap_table = direct_loader->get_table_ctx()->schema_.is_heap_table_;
|
||||
ObTableLoadSequenceNo sequence_no(
|
||||
(partition_idx << ObTableLoadSequenceNo::BACKUP_PARTITION_IDX_SHIFT) +
|
||||
(subpart_idx << ObTableLoadSequenceNo::BACKUP_SUBPART_IDX_SHIFT));
|
||||
const int64_t segment_id = partition_idx * subpart_count + subpart_idx + 1;
|
||||
if (OB_FAIL(direct_loader->start_trans(trans_ctx, segment_id, allocator))) {
|
||||
LOG_WARN("fail to start trans", KR(ret), K(segment_id), K(partition_idx), K(subpart_count),
|
||||
K(subpart_idx));
|
||||
} else if (OB_FAIL(backup_table_->scan(partition_idx, row_iter, allocator, subpart_count,
|
||||
subpart_idx))) {
|
||||
LOG_WARN("fail to scan backup table partition", KR(ret), KP_(backup_table), K(partition_idx),
|
||||
K(subpart_count), K(subpart_idx));
|
||||
}
|
||||
while (OB_SUCC(ret) && !is_iter_end) {
|
||||
// 每个新的batch需要分配一个新的shared_allocator
|
||||
ObTableLoadSharedAllocatorHandle allocator_handle;
|
||||
ObTableLoadObjRowArray obj_rows;
|
||||
processed_line_count = 0;
|
||||
if (OB_FAIL(check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret), K(partition_idx), K(session_id));
|
||||
} else {
|
||||
allocator_handle = ObTableLoadSharedAllocatorHandle::make_handle(
|
||||
"TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
if (OB_UNLIKELY(!allocator_handle)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to make allocator handle", KR(ret));
|
||||
} else {
|
||||
obj_rows.set_allocator(allocator_handle);
|
||||
}
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret) && processed_line_count < execute_param_->batch_row_count_) {
|
||||
if (OB_FAIL(row_iter->get_next_row(new_row))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next row", KR(ret));
|
||||
} else {
|
||||
is_iter_end = true;
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
//此时row中的每个obj的内容指向的是data parser中的内存
|
||||
//因此得把它们深拷贝一遍
|
||||
ObTableLoadObjRow tmp_obj_row;
|
||||
tmp_obj_row.seq_no_= sequence_no++;
|
||||
tmp_obj_row.cells_ = (!is_heap_table ? new_row->cells_ : new_row->cells_ + 1);
|
||||
tmp_obj_row.count_ = (!is_heap_table ? new_row->count_ : new_row->count_ - 1);
|
||||
ObTableLoadObjRow obj_row;
|
||||
if (OB_FAIL(obj_row.deep_copy(tmp_obj_row, allocator_handle))) {
|
||||
LOG_WARN("failed to deep copy add assign to tmp_obj_row", KR(ret));
|
||||
} else if (OB_FAIL(obj_rows.push_back(obj_row))) {
|
||||
LOG_WARN("failed to push back", KR(ret));
|
||||
} else {
|
||||
++processed_line_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && processed_line_count > 0) {
|
||||
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_rows_, processed_line_count);
|
||||
if (OB_FAIL(direct_loader->write_trans(trans_ctx, session_id, obj_rows))) {
|
||||
LOG_WARN("fail to write objs", KR(ret), K(session_id));
|
||||
} else {
|
||||
total_processed_line_count += processed_line_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(direct_loader->commit_trans(trans_ctx))) {
|
||||
LOG_WARN("fail to commit trans", KR(ret), K(partition_idx));
|
||||
}
|
||||
}
|
||||
ATOMIC_AAF(&total_line_count_, total_processed_line_count);
|
||||
if (nullptr != row_iter) {
|
||||
row_iter->~ObNewRowIterator();
|
||||
allocator.free(row_iter);
|
||||
row_iter = nullptr;
|
||||
}
|
||||
}
|
||||
LOG_INFO("finish process partition", K(partition_idx), K(subpart_count), K(subpart_idx), KR(ret),
|
||||
K(ATOMIC_LOAD(&total_line_count_)), K(total_processed_line_count));
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObLoadDataDirectImpl::BackupLoadExecutor::task_finished(ObTableLoadTask *task, int ret_code)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLoadDataDirectImpl::BackupLoadExecutor not init", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(nullptr == task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(task), K(worker_count_));
|
||||
} else {
|
||||
if (OB_UNLIKELY(OB_SUCCESS != ret_code)) {
|
||||
ATOMIC_VCAS(&task_error_code_, common::OB_SUCCESS, ret_code);
|
||||
}
|
||||
task_allocator_.free(task);
|
||||
task_controller_.on_task_finished();
|
||||
}
|
||||
}
|
||||
|
||||
int ObLoadDataDirectImpl::BackupLoadExecutor::check_status()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLoadDataDirectImpl::BackupLoadExecutor not init", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(OB_SUCCESS != task_error_code_)) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("load is error, canceled", KR(ret), K(task_error_code_));
|
||||
} else if (OB_FAIL(execute_ctx_->exec_ctx_.check_status())) {
|
||||
LOG_WARN("fail to check exec ctx status", KR(ret));
|
||||
} else if (OB_FAIL(execute_ctx_->direct_loader_->check_status())) {
|
||||
LOG_WARN("fail to check direct loader", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* ObLoadDataDirectImpl
|
||||
*/
|
||||
@ -1740,6 +2162,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
|
||||
ObSQLSessionInfo *session = nullptr;
|
||||
ObSchemaGetterGuard *schema_guard = nullptr;
|
||||
int64_t total_line_count = 0;
|
||||
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
|
||||
|
||||
if (OB_UNLIKELY(load_args.file_iter_.count() > ObTableLoadSequenceNo::MAX_DATA_ID)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
@ -1801,7 +2224,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_SUCC(ret) && !is_backup) {
|
||||
FileLoadExecutor *file_load_executor = nullptr;
|
||||
DataDescIterator data_desc_iter;
|
||||
if (1 == load_args.file_iter_.count() && 0 == execute_param_.ignore_row_num_ &&
|
||||
@ -1849,6 +2272,23 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
|
||||
file_load_executor = nullptr;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && is_backup) {
|
||||
BackupLoadExecutor *backup_load_executor = nullptr;
|
||||
if (OB_ISNULL(backup_load_executor = OB_NEWx(BackupLoadExecutor, execute_ctx_.allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new BackupLoadExecutor", KR(ret));
|
||||
} else if (OB_FAIL(backup_load_executor->init(execute_param_, execute_ctx_, load_args.file_name_))) {
|
||||
LOG_WARN("fail to init backup load executor", KR(ret));
|
||||
} else if (OB_FAIL(backup_load_executor->execute())) {
|
||||
LOG_WARN("fail to execute backup load executor", KR(ret));
|
||||
} else {
|
||||
total_line_count = backup_load_executor->get_total_line_count();
|
||||
}
|
||||
if (nullptr != backup_load_executor) {
|
||||
backup_load_executor->~BackupLoadExecutor();
|
||||
backup_load_executor = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(direct_loader_.commit())) {
|
||||
@ -1878,6 +2318,7 @@ int ObLoadDataDirectImpl::init_execute_param()
|
||||
const ObLoadDataHint &hint = load_stmt_->get_hints();
|
||||
const ObIArray<ObLoadDataStmt::FieldOrVarStruct> &field_or_var_list =
|
||||
load_stmt_->get_field_or_var_list();
|
||||
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
|
||||
execute_param_.tenant_id_ = load_args.tenant_id_;
|
||||
execute_param_.database_id_ = load_args.database_id_;
|
||||
execute_param_.table_id_ = load_args.table_id_;
|
||||
@ -1976,23 +2417,32 @@ int ObLoadDataDirectImpl::init_execute_param()
|
||||
ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_;
|
||||
int64_t column_count = 0;
|
||||
execute_param_.column_ids_.reset();
|
||||
if (OB_FAIL(ObTableLoadSchema::get_user_column_count(*schema_guard,
|
||||
execute_param_.tenant_id_,
|
||||
execute_param_.table_id_,
|
||||
column_count))) {
|
||||
LOG_WARN("fail to get user column count", KR(ret));
|
||||
} else if (OB_UNLIKELY(column_count != field_or_var_list.count())) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("not contain all columns is not supported", KR(ret), K(column_count),
|
||||
K(field_or_var_list));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) {
|
||||
const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(i);
|
||||
if (OB_UNLIKELY(!field_or_var_struct.is_table_column_)) {
|
||||
if (is_backup) {
|
||||
if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard,
|
||||
execute_param_.tenant_id_,
|
||||
execute_param_.table_id_,
|
||||
execute_param_.column_ids_))) {
|
||||
LOG_WARN("fail to get column ids for backup", KR(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(ObTableLoadSchema::get_user_column_count(*schema_guard,
|
||||
execute_param_.tenant_id_,
|
||||
execute_param_.table_id_,
|
||||
column_count))) {
|
||||
LOG_WARN("fail to get user column count", KR(ret));
|
||||
} else if (OB_UNLIKELY(column_count != field_or_var_list.count())) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i), K(field_or_var_list));
|
||||
} else if (OB_FAIL(execute_param_.column_ids_.push_back(field_or_var_struct.column_id_))) {
|
||||
LOG_WARN("fail to push back column id", KR(ret));
|
||||
LOG_WARN("not contain all columns is not supported", KR(ret), K(column_count),
|
||||
K(field_or_var_list));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) {
|
||||
const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(i);
|
||||
if (OB_UNLIKELY(!field_or_var_struct.is_table_column_)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i), K(field_or_var_list));
|
||||
} else if (OB_FAIL(execute_param_.column_ids_.push_back(field_or_var_struct.column_id_))) {
|
||||
LOG_WARN("fail to push back column id", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2002,6 +2452,8 @@ int ObLoadDataDirectImpl::init_execute_param()
|
||||
int ObLoadDataDirectImpl::init_execute_context()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
|
||||
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
|
||||
execute_ctx_.exec_ctx_.exec_ctx_ = ctx_;
|
||||
execute_ctx_.allocator_ = &ctx_->get_allocator();
|
||||
ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR;
|
||||
@ -2013,7 +2465,7 @@ int ObLoadDataDirectImpl::init_execute_context()
|
||||
load_param.batch_size_ = execute_param_.batch_row_count_;
|
||||
load_param.max_error_row_count_ = execute_param_.max_error_rows_;
|
||||
load_param.column_count_ = execute_param_.column_ids_.count();
|
||||
load_param.need_sort_ = execute_param_.need_sort_;
|
||||
load_param.need_sort_ = is_backup ? false : execute_param_.need_sort_;
|
||||
load_param.dup_action_ = execute_param_.dup_action_;
|
||||
load_param.px_mode_ = false;
|
||||
load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
|
||||
|
||||
Reference in New Issue
Block a user