[FEAT MERGE] add direct load function

Co-authored-by: Monk-Liu <1152761042@qq.com>
Co-authored-by: saltonz <saltonzh@gmail.com>
Co-authored-by: yongshige <598633031@qq.com>
This commit is contained in:
obdev
2023-01-28 18:08:50 +08:00
committed by ob-robot
parent f27d2efc83
commit 81d28c0295
384 changed files with 55860 additions and 1239 deletions

View File

@ -64,8 +64,8 @@ public:
obrpc::ObCommonRpcProxy *common_rpc_proxy,
int64_t &affected_rows);
static int wait_build_index_finish(const uint64_t tenant_id, const int64_t task_id, bool &is_finish);
private:
static int handle_session_exception(ObSQLSessionInfo &session);
private:
static int cancel_ddl_task(const int64_t tenant_id, obrpc::ObCommonRpcProxy *common_rpc_proxy);
private:
DISALLOW_COPY_AND_ASSIGN(ObDDLExecutorUtil);

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,465 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// suzhi.yt <suzhi.yt@oceanbase.com>
#pragma once
#include "lib/allocator/page_arena.h"
#include "observer/table_load/ob_table_load_object_allocator.h"
#include "observer/table_load/ob_table_load_task.h"
#include "share/table/ob_table_load_array.h"
#include "share/table/ob_table_load_row_array.h"
#include "share/table/ob_table_load_define.h"
#include "sql/engine/cmd/ob_load_data_impl.h"
#include "sql/engine/cmd/ob_load_data_parser.h"
#include "common/storage/ob_io_device.h"
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "observer/table_load/ob_table_load_instance.h"
namespace oceanbase
{
namespace observer
{
class ObTableLoadParam;
class ObTableLoadTableCtx;
class ObTableLoadExecCtx;
class ObTableLoadCoordinator;
class ObITableLoadTaskScheduler;
class ObTableLoadInstance;
} // namespace observer
namespace sql
{
/**
* LOAD DATA接入direct load路径
*
* - 输入行必须包含表的所有列数据, 除了堆表的隐藏主键列
* - 不支持SET子句
* - 不支持表达式
*/
class ObLoadDataDirectImpl : public ObLoadDataBase
{
static const int64_t MAX_DATA_MEM_USAGE_LIMIT = 64;
public:
ObLoadDataDirectImpl();
virtual ~ObLoadDataDirectImpl();
int execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) override;
private:
class Logger;
struct DataAccessParam
{
public:
DataAccessParam();
bool is_valid() const;
TO_STRING_KV(K_(file_location), K_(file_column_num), K_(file_cs_type));
public:
ObLoadFileLocation file_location_;
share::ObBackupStorageInfo access_info_;
int64_t file_column_num_; // number of column in file
ObDataInFileStruct file_format_;
common::ObCollationType file_cs_type_;
};
struct LoadExecuteParam
{
public:
LoadExecuteParam();
bool is_valid() const;
TO_STRING_KV(K_(tenant_id), K_(database_id), K_(table_id), K_(combined_name), K_(parallel),
K_(batch_row_count), K_(data_mem_usage_limit), K_(need_sort), K_(online_opt_stat_gather),
K_(max_error_rows), K_(ignore_row_num), K_(data_access_param), K_(store_column_idxs));
public:
uint64_t tenant_id_;
uint64_t database_id_;
uint64_t table_id_;
uint64_t sql_mode_;
common::ObString database_name_;
common::ObString table_name_;
common::ObString combined_name_; // database name + table name
int64_t parallel_; // number of concurrent threads
int64_t batch_row_count_;
int64_t data_mem_usage_limit_; // limit = data_mem_usage_limit * MAX_BUFFER_SIZE
bool need_sort_;
bool online_opt_stat_gather_;
int64_t max_error_rows_; // max allowed error rows
int64_t ignore_row_num_; // number of rows to ignore per file
sql::ObLoadDupActionType dup_action_;
DataAccessParam data_access_param_;
common::ObSEArray<int64_t, 16>
store_column_idxs_; // Mapping of stored columns to source data columns
};
struct LoadExecuteContext : public observer::ObTableLoadExecCtx
{
public:
LoadExecuteContext();
bool is_valid() const;
TO_STRING_KV(KP_(exec_ctx), KP_(allocator), KP_(direct_loader), KP_(job_stat), KP_(logger));
public:
observer::ObTableLoadInstance *direct_loader_;
sql::ObLoadDataStat *job_stat_;
Logger *logger_;
};
private:
class Logger
{
static const char *log_file_column_names;
static const char *log_file_row_fmt;
public:
Logger();
~Logger();
int init(const common::ObString &load_info);
int log_error_line(const common::ObString &file_name, int64_t line_no, int err_code);
private:
static int generate_log_file_name(char *buf, int64_t size, common::ObString &file_name);
private:
lib::ObMutex mutex_;
ObFileAppender file_appender_;
bool is_oracle_mode_;
char *buf_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(Logger);
};
private:
struct DataDesc
{
public:
DataDesc() : file_idx_(0), start_(0), end_(-1) {}
~DataDesc() {}
TO_STRING_KV(K_(file_idx), K_(filename), K_(start), K_(end));
public:
int64_t file_idx_;
ObString filename_;
int64_t start_;
int64_t end_;
};
class DataDescIterator
{
public:
DataDescIterator();
~DataDescIterator();
int64_t count() const { return data_descs_.count(); }
int copy(const ObLoadFileIterator &file_iter);
int copy(const DataDescIterator &desc_iter);
int add_data_desc(const DataDesc &data_desc);
int get_next_data_desc(DataDesc &data_desc);
TO_STRING_KV(K_(data_descs), K_(pos));
private:
common::ObSEArray<DataDesc, 64> data_descs_;
int64_t pos_;
};
class IRandomIODevice
{
public:
virtual ~IRandomIODevice() = default;
virtual int open(const DataAccessParam &data_access_param, const ObString &filename) = 0;
virtual int pread(char *buf, int64_t count, int64_t offset, int64_t &read_size) = 0;
virtual int get_file_size(int64_t &file_size) = 0;
};
class RandomFileReader : public IRandomIODevice
{
public:
RandomFileReader();
virtual ~RandomFileReader();
int open(const DataAccessParam &data_access_param, const ObString &filename) override;
int pread(char *buf, int64_t count, int64_t offset, int64_t &read_size) override;
int get_file_size(int64_t &file_size) override;
private:
ObString filename_;
ObFileReader file_reader_;
bool is_inited_;
};
class RandomOSSReader : public IRandomIODevice
{
public:
RandomOSSReader();
virtual ~RandomOSSReader();
int open(const DataAccessParam &data_access_param, const ObString &filename) override;
int pread(char *buf, int64_t count, int64_t offset, int64_t &read_size) override;
int get_file_size(int64_t &file_size) override;
private:
ObIODevice *device_handle_;
ObIOFd fd_;
bool is_inited_;
};
class SequentialDataAccessor
{
public:
SequentialDataAccessor();
~SequentialDataAccessor();
int init(const DataAccessParam &data_access_param, const ObString &filename);
int read(char *buf, int64_t count, int64_t &read_size);
int get_file_size(int64_t &file_size);
void seek(int64_t offset) { offset_ = offset; }
int64_t get_offset() const { return offset_; }
private:
RandomFileReader random_file_reader_;
RandomOSSReader random_oss_reader_;
IRandomIODevice *random_io_device_;
int64_t offset_;
bool is_inited_;
};
struct DataBuffer
{
public:
DataBuffer();
~DataBuffer();
void reuse();
void reset();
int init(int64_t capacity = ObLoadFileBuffer::MAX_BUFFER_SIZE);
bool is_valid() const;
int64_t get_data_length() const;
int64_t get_remain_length() const;
bool empty() const;
char *data() const;
void advance(int64_t length);
void update_data_length(int64_t length);
int squash();
void swap(DataBuffer &other);
TO_STRING_KV(KPC_(file_buffer), K_(pos));
private:
common::ObArenaAllocator allocator_;
public:
ObLoadFileBuffer *file_buffer_;
int64_t pos_; // left pos
bool is_end_file_;
private:
DISALLOW_COPY_AND_ASSIGN(DataBuffer);
};
// Read the buffer and align it by row.
class DataReader
{
public:
DataReader();
int init(const DataAccessParam &data_access_param, LoadExecuteContext &execute_ctx,
const DataDesc &data_desc, bool read_raw = false);
int get_next_buffer(ObLoadFileBuffer &file_buffer, int64_t &line_count,
int64_t limit = INT64_MAX);
int get_next_raw_buffer(DataBuffer &data_buffer);
int64_t get_lines_count() const { return data_trimer_.get_lines_count(); }
bool has_incomplate_data() const { return data_trimer_.has_incomplate_data(); }
bool is_end_file() const { return io_accessor_.get_offset() >= end_offset_; }
ObCSVGeneralParser &get_csv_parser() { return csv_parser_; }
private:
LoadExecuteContext *execute_ctx_;
ObCSVGeneralParser csv_parser_; // 用来计算完整行
ObLoadFileDataTrimer data_trimer_; // 缓存不完整行的数据
SequentialDataAccessor io_accessor_;
int64_t end_offset_;
bool read_raw_;
bool is_iter_end_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(DataReader);
};
// Parse the data in the buffer into a string array by column.
class DataParser
{
public:
DataParser();
~DataParser();
int init(const DataAccessParam &data_access_param, Logger &logger);
int parse(const common::ObString &file_name, int64_t start_line_no, DataBuffer &data_buffer);
int get_next_row(common::ObNewRow &row);
private:
void log_error_line(int err_ret, int64_t err_line_no);
private:
ObCSVGeneralParser csv_parser_;
DataBuffer escape_buffer_;
DataBuffer *data_buffer_;
// 以下参数是为了打错误日志
common::ObString file_name_;
int64_t start_line_no_;
int64_t pos_;
Logger *logger_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(DataParser);
};
class SimpleDataSplitUtils
{
public:
static bool is_simple_format(const ObDataInFileStruct &file_format,
common::ObCollationType file_cs_type);
static int split(const DataAccessParam &data_access_param, const DataDesc &data_desc,
int64_t count, DataDescIterator &data_desc_iter);
};
struct TaskResult
{
TaskResult()
: ret_(OB_SUCCESS),
created_ts_(0),
start_process_ts_(0),
finished_ts_(0),
proccessed_row_count_(0),
parsed_bytes_(0)
{
}
void reset()
{
ret_ = OB_SUCCESS;
created_ts_ = 0;
start_process_ts_ = 0;
finished_ts_ = 0;
proccessed_row_count_ = 0;
parsed_bytes_ = 0;
}
int ret_;
int64_t created_ts_;
int64_t start_process_ts_;
int64_t finished_ts_;
int64_t proccessed_row_count_;
int64_t parsed_bytes_;
TO_STRING_KV(K_(ret), K_(created_ts), K_(start_process_ts), K_(finished_ts),
K_(proccessed_row_count), K_(parsed_bytes));
};
struct TaskHandle
{
TaskHandle() : task_id_(common::OB_INVALID_ID), session_id_(0), start_line_no_(0) {}
int64_t task_id_;
DataBuffer data_buffer_;
int32_t session_id_;
DataDesc data_desc_;
int64_t start_line_no_; // 从1开始
TaskResult result_;
TO_STRING_KV(K_(task_id), K_(data_buffer), K_(session_id), K_(data_desc), K_(start_line_no),
K_(result));
private:
DISALLOW_COPY_AND_ASSIGN(TaskHandle);
};
class FileLoadExecutor
{
public:
FileLoadExecutor();
virtual ~FileLoadExecutor();
virtual int init(const LoadExecuteParam &execute_param, LoadExecuteContext &execute_ctx,
const DataDescIterator &data_desc_iter) = 0;
int execute();
int alloc_task(observer::ObTableLoadTask *&task);
void free_task(observer::ObTableLoadTask *task);
void task_finished(TaskHandle *handle);
int process_task_handle(int64_t worker_idx, TaskHandle *handle, int64_t &line_count);
protected:
virtual int prepare_execute() = 0;
virtual int get_next_task_handle(TaskHandle *&handle) = 0;
virtual int fill_task(TaskHandle *handle, observer::ObTableLoadTask *task) = 0;
protected:
int inner_init(const LoadExecuteParam &execute_param, LoadExecuteContext &execute_ctx,
int64_t handle_count);
int init_worker_ctx_array();
int fetch_task_handle(TaskHandle *&handle);
int handle_task_result(int64_t task_id, TaskResult &result);
int handle_all_task_result();
void wait_all_task_finished();
protected:
struct WorkerContext
{
public:
DataParser data_parser_;
table::ObTableLoadArray<ObObj> objs_;
};
protected:
const LoadExecuteParam *execute_param_;
LoadExecuteContext *execute_ctx_;
observer::ObTableLoadObjectAllocator<observer::ObTableLoadTask> task_allocator_;
observer::ObITableLoadTaskScheduler *task_scheduler_;
WorkerContext *worker_ctx_array_;
// task ctrl
ObParallelTaskController task_controller_;
ObConcurrentFixedCircularArray<TaskHandle *> handle_reserve_queue_;
common::ObSEArray<TaskHandle *, 64> handle_resource_; // 用于释放资源
bool is_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(FileLoadExecutor);
};
class FileLoadTaskCallback;
private:
/**
* Large File
*/
class LargeFileLoadExecutor : public FileLoadExecutor
{
public:
LargeFileLoadExecutor();
virtual ~LargeFileLoadExecutor();
int init(const LoadExecuteParam &execute_param, LoadExecuteContext &execute_ctx,
const DataDescIterator &data_desc_iter) override;
protected:
int prepare_execute() override;
int get_next_task_handle(TaskHandle *&handle) override;
int fill_task(TaskHandle *handle, observer::ObTableLoadTask *task) override;
private:
int32_t get_session_id();
int skip_ignore_rows();
private:
DataDesc data_desc_;
DataBuffer expr_buffer_;
DataReader data_reader_;
int32_t next_session_id_;
int64_t total_line_count_;
DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor);
};
class LargeFileLoadTaskProcessor;
private:
/**
* Multi Files
*/
class MultiFilesLoadExecutor : public FileLoadExecutor
{
public:
MultiFilesLoadExecutor() = default;
virtual ~MultiFilesLoadExecutor() = default;
int init(const LoadExecuteParam &execute_param, LoadExecuteContext &execute_ctx,
const DataDescIterator &data_desc_iter) override;
protected:
int prepare_execute() override;
int get_next_task_handle(TaskHandle *&handle) override;
int fill_task(TaskHandle *handle, observer::ObTableLoadTask *task) override;
private:
DataDescIterator data_desc_iter_;
DISALLOW_COPY_AND_ASSIGN(MultiFilesLoadExecutor);
};
class MultiFilesLoadTaskProcessor;
private:
int init_file_iter();
// init execute param
int init_store_column_idxs(common::ObIArray<int64_t> &store_column_idxs);
int init_execute_param();
// init execute context
int init_logger();
int init_execute_context();
private:
ObExecContext *ctx_;
ObLoadDataStmt *load_stmt_;
LoadExecuteParam execute_param_;
LoadExecuteContext execute_ctx_;
observer::ObTableLoadInstance direct_loader_;
Logger logger_;
DISALLOW_COPY_AND_ASSIGN(ObLoadDataDirectImpl);
};
} // namespace sql
} // namespace oceanbase

View File

@ -16,23 +16,52 @@
#include "lib/oblog/ob_log_module.h"
#include "sql/engine/cmd/ob_load_data_impl.h"
#include "sql/engine/cmd/ob_load_data_direct_impl.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
namespace sql
{
int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret)
{
int ret = OB_SUCCESS;
int64_t enable_direct = 0;
if (OB_FAIL(load_hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
} else if ((enable_direct != 0) && GCONF._ob_enable_direct_load) {
check_ret = true;
} else {
check_ret = false;
}
return ret;
}
int ObLoadDataExecutor::execute(ObExecContext &ctx, ObLoadDataStmt &stmt)
{
int ret = OB_SUCCESS;
ObLoadDataBase *load_impl = NULL;
bool is_direct_load = false;
if (!stmt.get_load_arguments().is_csv_format_) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("invalid resolver results", K(ret));
} else if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (OB_FAIL(check_is_direct_load(stmt.get_hints(), is_direct_load))) {
LOG_WARN("fail to check is load mode", KR(ret));
} else {
if (!is_direct_load) {
if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
}
} else {
if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataDirectImpl, (&ctx.get_allocator())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(load_impl->execute(ctx, stmt))) {
LOG_WARN("failed to execute load data stmt", K(ret));
}

View File

@ -25,6 +25,8 @@ public:
virtual ~ObLoadDataExecutor() {}
int execute(ObExecContext &ctx, ObLoadDataStmt &stmt);
private:
int check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret);
private:
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObLoadDataExecutor);

View File

@ -454,7 +454,7 @@ int ObLoadDataBase::pre_parse_lines(ObLoadFileBuffer &buffer,
}
}
}
if (is_last_buf && buffer.current_ptr() > cur_pos) {
if (is_last_buf && cur_lines < line_count && buffer.current_ptr() > cur_pos) {
cur_lines++;
cur_pos = buffer.current_ptr();
}
@ -901,7 +901,7 @@ void ObCSVFormats::init(const ObDataInFileStruct &file_formats)
if (!file_formats.field_term_str_.empty()
&& file_formats.line_term_str_.empty()) {
is_line_term_by_counting_field_ = true;
field_term_char_ = line_term_char_;
line_term_char_ = field_term_char_;
}
is_simple_format_ =
!is_line_term_by_counting_field_
@ -1670,8 +1670,8 @@ int ObLoadDataSPImpl::handle_returned_insert_task(ObExecContext &ctx,
}
*/
box.job_status->processed_rows_ = box.affected_rows;
box.job_status->processed_bytes_ += insert_task.data_size_;
box.job_status->parsed_rows_ = box.affected_rows;
box.job_status->parsed_bytes_ += insert_task.data_size_;
box.job_status->total_insert_task_ = box.insert_task_controller.get_total_task_cnt();
box.job_status->insert_rt_sum_ = box.insert_rt_sum;
box.job_status->total_wait_secs_ = box.wait_secs_for_mem_release;
@ -2991,8 +2991,10 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
ObLoadDataGID::generate_new_id(temp_gid);
job_status->tenant_id_ = tenant_id;
job_status->job_id_ = temp_gid.id;
job_status->table_name_ = load_args.combined_name_;
job_status->file_path_ = load_args.file_name_;
OZ(ob_write_string(job_status->allocator_,
load_args.combined_name_, job_status->table_name_));
OZ(ob_write_string(job_status->allocator_,
load_args.file_name_, job_status->file_path_));
job_status->file_column_ = num_of_file_column;
job_status->table_column_ = num_of_table_column;
job_status->batch_size_ = batch_row_count;

View File

@ -351,6 +351,7 @@ public:
int64_t get_buffer_size() { return buffer_size_; }
int64_t *get_pos() { return &pos_; }
void reset() { pos_ = 0; }
TO_STRING_KV(K_(pos), K_(buffer_size));
private:
int64_t pos_;
int64_t buffer_size_;
@ -392,8 +393,8 @@ public:
int backup_incomplate_data(ObLoadFileBuffer &buffer, int64_t valid_data_len);
int recover_incomplate_data(ObLoadFileBuffer &buffer);
bool has_incomplate_data() { return incomplate_data_len_ > 0; }
int64_t get_lines_count() { return lines_cnt_; }
bool has_incomplate_data() const { return incomplate_data_len_ > 0; }
int64_t get_lines_count() const { return lines_cnt_; }
void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; }
private:
ObCSVFormats formats_;//TODO [load data] change to ObInverseParser(formats)
@ -632,7 +633,7 @@ struct ObFileReadCursor {
read_size_ = 0;
is_end_file_ = false;
}
bool inline is_end_file() { return is_end_file_; }
bool inline is_end_file() const { return is_end_file_; }
int64_t inline get_total_read_MBs() { return file_offset_ >> 20; }
int64_t inline get_total_read_GBs() { return file_offset_ >> 30; }
void commit_read() {

View File

@ -365,6 +365,52 @@ int ObLoadDataUtils::check_session_status(ObSQLSessionInfo &session, int64_t res
return ret;
}
/////////////////
ObGetAllJobStatusOp::ObGetAllJobStatusOp()
: job_status_array_(),
current_job_index_(0)
{
}
ObGetAllJobStatusOp::~ObGetAllJobStatusOp()
{
reset();
}
void ObGetAllJobStatusOp::reset()
{
ObLoadDataStat *job_status;
for (int64_t i = 0; i < job_status_array_.count(); ++i) {
job_status = job_status_array_.at(i);
job_status->release();
}
job_status_array_.reset();
current_job_index_ = 0;
}
int ObGetAllJobStatusOp::operator()(common::hash::HashMapPair<ObLoadDataGID, ObLoadDataStat *> &entry)
{
int ret = OB_SUCCESS;
entry.second->aquire();
if (OB_FAIL(job_status_array_.push_back(entry.second))) {
entry.second->release();
LOG_WARN("push_back ObLoadDataStat failed", K(ret));
}
return ret;
}
int ObGetAllJobStatusOp::get_next_job_status(ObLoadDataStat *&job_status)
{
int ret = OB_SUCCESS;
if (current_job_index_ >= job_status_array_.count()) {
ret = OB_ITER_END;
} else {
job_status = job_status_array_.at(current_job_index_++);
}
return ret;
}
int ObGlobalLoadDataStatMap::init()
{
int ret = OB_SUCCESS;
@ -416,63 +462,21 @@ int ObGlobalLoadDataStatMap::get_job_status(const ObLoadDataGID &id, ObLoadDataS
int ObGlobalLoadDataStatMap::get_all_job_status(ObGetAllJobStatusOp &job_status_op)
{
int ret = OB_SUCCESS;
OZ (map_.foreach_refactored(job_status_op));
return ret;
}
ObGetAllJobStatusOp::ObGetAllJobStatusOp()
: job_status_array_(),
current_job_index_(0)
{
}
ObGetAllJobStatusOp::~ObGetAllJobStatusOp()
{
reset();
}
void ObGetAllJobStatusOp::reset()
{
ObLoadDataStat *job_status;
for (int i = 0; i < job_status_array_.count(); i++) {
job_status = job_status_array_.at(i);
job_status->release();
}
job_status_array_.reset();
current_job_index_ = 0;
}
int ObGetAllJobStatusOp::operator()(common::hash::HashMapPair<ObLoadDataGID, ObLoadDataStat *> &entry)
int ObGlobalLoadDataStatMap::get_job_stat_guard(const ObLoadDataGID &id, ObLoadDataStatGuard &guard)
{
int ret = OB_SUCCESS;
entry.second->aquire();
if (OB_FAIL(job_status_array_.push_back(entry.second))) {
entry.second->release();
LOG_WARN("push_back ObLoadDataStat failed", K(ret));
}
auto get_and_add_ref = [&](hash::HashMapPair<ObLoadDataGID, ObLoadDataStat*> &entry) -> void
{
guard.aquire(entry.second);
};
OZ (map_.read_atomic(id, get_and_add_ref));
return ret;
}
ObLoadDataStat* ObGetAllJobStatusOp::next_job_status()
{
ObLoadDataStat *job_status = nullptr;
if (current_job_index_ < job_status_array_.count()) {
job_status = job_status_array_.at(current_job_index_++);
}
return job_status;
}
bool ObGetAllJobStatusOp::end()
{
return (current_job_index_ >= job_status_array_.count());
}
ObGlobalLoadDataStatMap *ObGlobalLoadDataStatMap::getInstance()
{
return instance_;

View File

@ -284,6 +284,7 @@ struct ObLoadDataGID
gid.id = ATOMIC_AAF(&GlobalLoadDataID, 1);
}
ObLoadDataGID() : id(-1) {}
void reset() { id = -1; }
bool is_valid() const { return id > 0; }
uint64_t hash() const { return common::murmurhash(&id, sizeof(id), 0); }
bool operator==(const ObLoadDataGID &other) const { return id == other.id; }
@ -296,9 +297,11 @@ struct ObLoadDataGID
struct ObLoadDataStat
{
ObLoadDataStat() : ref_cnt_(0),
ObLoadDataStat() : allocator_(ObModIds::OB_SQL_LOAD_DATA),
ref_cnt_(0),
tenant_id_(0),
job_id_(0),
job_type_(),
table_name_(),
file_path_(),
table_column_(0),
@ -310,13 +313,15 @@ struct ObLoadDataStat
estimated_remaining_time_(0),
total_bytes_(0),
read_bytes_(0),
processed_bytes_(0),
processed_rows_(0),
parsed_bytes_(0),
parsed_rows_(0),
total_shuffle_task_(0),
total_insert_task_(0),
shuffle_rt_sum_(0),
insert_rt_sum_(0),
total_wait_secs_(0) {}
total_wait_secs_(0),
max_allowed_error_rows_(0),
detected_error_rows_(0) {}
int64_t aquire() {
return ATOMIC_AAF(&ref_cnt_, 1);
}
@ -325,9 +330,11 @@ struct ObLoadDataStat
}
int64_t get_ref_cnt() { return ATOMIC_LOAD(&ref_cnt_); }
common::ObArenaAllocator allocator_;
volatile int64_t ref_cnt_;
int64_t tenant_id_;
int64_t job_id_;
common::ObString job_type_; // normal / direct
common::ObString table_name_;
common::ObString file_path_;
int64_t table_column_;
@ -338,16 +345,42 @@ struct ObLoadDataStat
int64_t start_time_;
int64_t estimated_remaining_time_;
int64_t total_bytes_;
int64_t read_bytes_; //bytes read to memory
int64_t processed_bytes_;
int64_t processed_rows_;
volatile int64_t read_bytes_; //bytes read to memory
volatile int64_t parsed_bytes_;
volatile int64_t parsed_rows_;
int64_t total_shuffle_task_;
int64_t total_insert_task_;
int64_t shuffle_rt_sum_;
int64_t insert_rt_sum_;
int64_t total_wait_secs_;
int64_t max_allowed_error_rows_;
int64_t detected_error_rows_;
struct {
volatile int64_t received_rows_; // received from client
int64_t last_commit_segment_id_;
common::ObString status_; // none / inited / loading / frozen / merging / commit / error / abort
common::ObString trans_status_; // none / inited / running / frozen / commit / error / abort
} coordinator;
TO_STRING_KV(K(job_id_), K(table_name_), K(total_bytes_), K(processed_bytes_));
struct {
volatile int64_t processed_rows_;
int64_t last_commit_segment_id_;
common::ObString status_;
common::ObString trans_status_;
} store;
TO_STRING_KV(K(tenant_id_), K(job_id_), K(job_type_),
K(table_name_), K(file_path_), K(table_column_), K(file_column_),
K(batch_size_), K(parallel_), K(load_mode_),
K(start_time_), K(estimated_remaining_time_),
K(total_bytes_), K(read_bytes_), K(parsed_bytes_),
K(parsed_rows_), K(total_shuffle_task_), K(total_insert_task_),
K(shuffle_rt_sum_), K(insert_rt_sum_), K(total_wait_secs_),
K(max_allowed_error_rows_), K(detected_error_rows_),
K(coordinator.received_rows_), K(coordinator.last_commit_segment_id_),
K(coordinator.status_), K(coordinator.trans_status_),
K(store.processed_rows_), K(store.last_commit_segment_id_),
K(store.status_), K(store.trans_status_));
};
class ObGetAllJobStatusOp
@ -359,14 +392,60 @@ public:
public:
void reset();
int operator()(common::hash::HashMapPair<ObLoadDataGID, ObLoadDataStat*> &entry);
ObLoadDataStat* next_job_status(void);
bool end(void);
int get_next_job_status(ObLoadDataStat *&job_status);
private:
common::ObSEArray<ObLoadDataStat *, 10> job_status_array_;
int32_t current_job_index_;
};
class ObLoadDataStatGuard
{
public:
ObLoadDataStatGuard() : stat_(nullptr) {}
ObLoadDataStatGuard(const ObLoadDataStatGuard &rhs) : stat_(nullptr)
{
aquire(rhs.stat_);
}
~ObLoadDataStatGuard()
{
release();
}
void aquire(ObLoadDataStat *stat)
{
release();
stat_ = stat;
if (nullptr != stat_) {
stat_->aquire();
}
}
void release()
{
if (nullptr != stat_) {
stat_->release();
stat_ = nullptr;
}
}
ObLoadDataStat *get() const { return stat_; }
// ObLoadDataStat *operator->() { return stat_; }
// const ObLoadDataStat *operator->() const { return stat_; }
ObLoadDataStatGuard &operator=(const ObLoadDataStatGuard &rhs)
{
aquire(rhs.stat_);
return *this;
}
TO_STRING_KV(KPC_(stat));
private:
ObLoadDataStat *stat_;
};
class ObGlobalLoadDataStatMap
{
public:
@ -377,6 +456,7 @@ public:
int unregister_job(const ObLoadDataGID &id, ObLoadDataStat *&job_status);
int get_job_status(const ObLoadDataGID &id, ObLoadDataStat *&job_status);
int get_all_job_status(ObGetAllJobStatusOp &job_status_op);
int get_job_stat_guard(const ObLoadDataGID &id, ObLoadDataStatGuard &guard);
private:
typedef common::hash::ObHashMap<ObLoadDataGID, ObLoadDataStat*,
common::hash::SpinReadWriteDefendMode> HASH_MAP;

View File

@ -0,0 +1,148 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_table_direct_insert_ctx.h"
#include "sql/engine/ob_exec_context.h"
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "observer/table_load/ob_table_load_struct.h"
#include "observer/table_load/ob_table_load_instance.h"
#include "share/schema/ob_schema_getter_guard.h"
namespace oceanbase
{
using namespace common;
using namespace observer;
using namespace storage;
using namespace share;
namespace sql
{
ObTableDirectInsertCtx::~ObTableDirectInsertCtx()
{
destroy();
}
int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
const uint64_t table_id, const int64_t parallel)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableDirectInsertCtx init twice", KR(ret));
} else if (OB_ISNULL(exec_ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("exec_ctx cannot be null", KR(ret));
} else {
load_exec_ctx_ = (ObTableLoadExecCtx *)exec_ctx->get_allocator().alloc(sizeof(ObTableLoadExecCtx));
table_load_instance_ = (ObTableLoadInstance *)exec_ctx->get_allocator().alloc(sizeof(ObTableLoadInstance));
if (OB_ISNULL(load_exec_ctx_) || OB_ISNULL(table_load_instance_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", KR(ret));
} else {
new (load_exec_ctx_) ObTableLoadExecCtx;
new (table_load_instance_) ObTableLoadInstance;
load_exec_ctx_->exec_ctx_ = exec_ctx;
load_exec_ctx_->allocator_ = &(exec_ctx->get_allocator());
uint64_t sql_mode = 0;
ObSEArray<int64_t, 16> store_column_idxs;
ObObj obj;
if (OB_FAIL(init_store_column_idxs(MTL_ID(), table_id, store_column_idxs))) {
LOG_WARN("failed to init store column idxs", KR(ret));
} else if (OB_FAIL(exec_ctx->get_my_session()->get_sys_variable(SYS_VAR_SQL_MODE, sql_mode))) {
LOG_WARN("fail to get sys variable", KR(ret));
} else if (OB_FAIL(exec_ctx->get_my_session()->get_sys_variable(SYS_VAR_ONLINE_OPT_STAT_GATHER, obj))) {
LOG_WARN("fail to get sys variable", K(ret));
} else {
ObTableLoadParam param;
param.column_count_ = store_column_idxs.count();
param.tenant_id_ = MTL_ID();
param.database_id_ = exec_ctx->get_my_session()->get_database_id();
param.table_id_ = table_id;
param.batch_size_ = 100;
param.session_count_ = parallel;
param.px_mode_ = true;
param.online_opt_stat_gather_ = obj.get_bool();
param.need_sort_ = true;
param.max_error_row_count_ = 0;
param.dup_action_ = sql::ObLoadDupActionType::LOAD_STOP_ON_DUP;
param.sql_mode_ = sql_mode;
if (OB_FAIL(table_load_instance_->init(param, store_column_idxs, load_exec_ctx_))) {
LOG_WARN("failed to init direct loader", KR(ret));
} else {
is_inited_ = true;
LOG_DEBUG("succeeded to init direct loader", K(param));
}
}
}
}
return ret;
}
int ObTableDirectInsertCtx::finish()
{
int ret = OB_SUCCESS;
table::ObTableLoadResultInfo result_info;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableDirectInsertCtx is not init", KR(ret));
} else if (OB_FAIL(table_load_instance_->commit(result_info))) {
LOG_WARN("failed to commit direct loader", KR(ret));
} else {
table_load_instance_->destroy();
LOG_DEBUG("succeeded to commit direct loader", K(result_info));
}
return ret;
}
void ObTableDirectInsertCtx::destroy()
{
if (OB_NOT_NULL(table_load_instance_)) {
table_load_instance_->~ObTableLoadInstance();
table_load_instance_ = nullptr;
}
if (OB_NOT_NULL(load_exec_ctx_)) {
load_exec_ctx_->~ObTableLoadExecCtx();
load_exec_ctx_ = nullptr;
}
}
int ObTableDirectInsertCtx::init_store_column_idxs(const uint64_t tenant_id,
const uint64_t table_id, ObIArray<int64_t> &store_column_idxs)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
ObSEArray<ObColDesc, 64> column_descs;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id,
schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema));
} else {
for (int64_t i = 0; OB_SUCC(ret) && (i < column_descs.count()); ++i) {
const ObColDesc &col_desc = column_descs.at(i);
const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_);
if (OB_ISNULL(col_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null column schema", KR(ret), K(col_desc));
} else if (col_schema->is_hidden()) {
} else if (OB_FAIL(store_column_idxs.push_back(i))) {
LOG_WARN("failed to push back store column idxs", KR(ret), K(i));
}
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -0,0 +1,43 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#pragma once
#include "lib/container/ob_iarray.h"
namespace oceanbase
{
namespace observer
{
class ObTableLoadExecCtx;
class ObTableLoadInstance;
}
namespace sql
{
class ObExecContext;
class ObTableDirectInsertCtx
{
public:
ObTableDirectInsertCtx()
: load_exec_ctx_(nullptr),
table_load_instance_(nullptr),
is_inited_(false) {}
~ObTableDirectInsertCtx();
TO_STRING_KV(K_(is_inited));
public:
int init(sql::ObExecContext *exec_ctx, const uint64_t table_id, const int64_t parallel);
int finish();
void destroy();
private:
int init_store_column_idxs(const uint64_t tenant_id, const uint64_t table_id,
common::ObIArray<int64_t> &store_column_idxs);
private:
observer::ObTableLoadExecCtx *load_exec_ctx_;
observer::ObTableLoadInstance *table_load_instance_;
bool is_inited_;
};
} // namespace observer
} // namespace oceanbase

View File

@ -0,0 +1,116 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_table_direct_insert_trans.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_store.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/engine/ob_physical_plan.h"
namespace oceanbase
{
using namespace observer;
namespace sql
{
int ObTableDirectInsertTrans::try_start_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
if (phy_plan.get_enable_append()
&& (0 != phy_plan.get_append_table_id())) {
if (!GCONF._ob_enable_direct_load) { // recheck
phy_plan.set_enable_append(false);
phy_plan.set_append_table_id(0);
} else {
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
CK (OB_NOT_NULL(session));
bool auto_commit = false;
if (OB_FAIL(session->get_autocommit(auto_commit))) {
LOG_WARN("failed to get auto commit", KR(ret));
} else if (!auto_commit || session->is_in_transaction()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("using direct-insert within a transaction is not supported",
KR(ret), K(auto_commit), K(session->is_in_transaction()));
} else {
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
uint64_t table_id = phy_plan.get_append_table_id();
int64_t parallel = phy_plan.get_px_dop();
if (OB_FAIL(table_direct_insert_ctx.init(&ctx, table_id, parallel))) {
LOG_WARN("failed to init table direct insert ctx", KR(ret), K(table_id), K(parallel));
}
}
}
}
return ret;
}
int ObTableDirectInsertTrans::try_finish_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
if (phy_plan.get_enable_append()
&& (0 != phy_plan.get_append_table_id())) {
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
if (OB_FAIL(table_direct_insert_ctx.finish())) {
LOG_WARN("failed to finish table direct insert ctx", KR(ret));
}
}
return ret;
}
int ObTableDirectInsertTrans::start_trans(const uint64_t table_id, const int64_t task_id)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadKey key(MTL_ID(), table_id);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id));
} else {
table::ObTableLoadTransId trans_id;
trans_id.segment_id_ = task_id;
trans_id.trans_gid_ = 1;
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.px_start_trans(trans_id))) {
LOG_WARN("fail to start direct load trans", KR(ret), K(trans_id));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
}
int ObTableDirectInsertTrans::finish_trans(const uint64_t table_id, const int64_t task_id)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadKey key(MTL_ID(), table_id);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id));
} else {
table::ObTableLoadTransId trans_id;
trans_id.segment_id_ = task_id;
trans_id.trans_gid_ = 1;
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.px_finish_trans(trans_id))) {
LOG_WARN("fail to finish direct load trans", KR(ret), K(trans_id));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -0,0 +1,27 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#pragma once
#include "lib/ob_define.h"
namespace oceanbase
{
namespace sql
{
class ObExecContext;
class ObPhysicalPlan;
class ObTableDirectInsertTrans
{
public:
// all insert-tasks within an insert into select clause are wrapped by a single direct insert instance
static int try_start_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
static int try_finish_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
// each insert-task is processed in a single thread and is wrapped by a table load trans
static int start_trans(const uint64_t table_id, const int64_t task_id);
static int finish_trans(const uint64_t table_id, const int64_t task_id);
};
} // namespace sql
} // namespace oceanbase