direct load support specifying any number of columns and column order

This commit is contained in:
suz-yang
2024-08-14 09:18:38 +00:00
committed by ob-robot
parent c4dea4a3d8
commit 995d01af9b
32 changed files with 982 additions and 655 deletions

View File

@ -17,10 +17,7 @@
#include "observer/omt/ob_multi_tenant.h"
#include "observer/table_load/ob_table_load_client_service.h"
#include "observer/table_load/ob_table_load_client_task.h"
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_service.h"
#include "sql/resolver/dml/ob_hint.h"
namespace oceanbase
{
@ -37,7 +34,7 @@ using namespace table;
int ObTableDirectLoadBeginExecutor::check_args()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(arg_.table_name_.empty() || arg_.parallel_ <= 0 ||
if (OB_UNLIKELY(arg_.table_name_.empty() || arg_.parallel_ <= 0 || arg_.max_error_row_count_ < 0 ||
arg_.dup_action_ == ObLoadDupActionType::LOAD_INVALID_MODE ||
arg_.timeout_ <= 0)) {
ret = OB_INVALID_ARGUMENT;
@ -64,8 +61,8 @@ int ObTableDirectLoadBeginExecutor::process()
ObTableLoadClientTask *client_task = nullptr;
if (OB_FAIL(ObTableLoadService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else if (OB_FAIL(resolve_param(param))) {
LOG_WARN("fail to resolve param", KR(ret));
} else if (OB_FAIL(init_param(param))) {
LOG_WARN("fail to init param", KR(ret));
} else if (OB_FAIL(ObTableLoadClientService::alloc_task(client_task))) {
LOG_WARN("fail to alloc client task", KR(ret));
} else if (OB_FAIL(client_task->init(param))) {
@ -80,6 +77,7 @@ int ObTableDirectLoadBeginExecutor::process()
ObTableLoadClientStatus client_status = ObTableLoadClientStatus::MAX_STATUS;
int client_error_code = OB_SUCCESS;
while (OB_SUCC(ret) && ObTableLoadClientStatus::RUNNING != client_status) {
client_task->heart_beat(); // 保持心跳
if (OB_UNLIKELY(THIS_WORKER.is_timeout())) {
ret = OB_TIMEOUT;
LOG_WARN("worker timeout", KR(ret));
@ -94,7 +92,8 @@ int ObTableDirectLoadBeginExecutor::process()
break;
case ObTableLoadClientStatus::ERROR:
case ObTableLoadClientStatus::ABORT:
ret = OB_SUCCESS == client_error_code ? OB_CANCELED : client_error_code;
ret = client_error_code;
LOG_WARN("client status error", KR(ret), K(client_status), K(client_error_code));
break;
default:
ret = OB_ERR_UNEXPECTED;
@ -107,8 +106,8 @@ int ObTableDirectLoadBeginExecutor::process()
// fill res
if (OB_SUCC(ret)) {
res_.table_id_ = client_task->param_.get_table_id();
res_.task_id_ = client_task->task_id_;
res_.table_id_ = client_task->get_table_id();
res_.task_id_ = client_task->get_task_id();
client_task->get_status(res_.status_, res_.error_code_);
}
if (nullptr != client_task) {
@ -118,64 +117,23 @@ int ObTableDirectLoadBeginExecutor::process()
return ret;
}
int ObTableDirectLoadBeginExecutor::resolve_param(ObTableLoadClientTaskParam &param)
int ObTableDirectLoadBeginExecutor::init_param(ObTableLoadClientTaskParam &param)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = ctx_.get_tenant_id();
const uint64_t database_id = ctx_.get_database_id();
uint64_t table_id = 0;
ObLoadDupActionType dup_action = arg_.dup_action_;
ObDirectLoadMethod::Type method = ObDirectLoadMethod::INVALID_METHOD;
ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
param.reset();
if (OB_FAIL(ObTableLoadSchema::get_table_id(tenant_id, database_id, arg_.table_name_,
table_id))) {
LOG_WARN("fail to get table id", KR(ret), K(tenant_id), K(database_id), K_(arg));
} else if (arg_.load_method_.empty()) {
method = ObDirectLoadMethod::FULL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
} else {
ObDirectLoadHint::LoadMethod load_method = ObDirectLoadHint::get_load_method_value(arg_.load_method_);
switch (load_method) {
case ObDirectLoadHint::FULL:
method = ObDirectLoadMethod::FULL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
break;
case ObDirectLoadHint::INC:
method = ObDirectLoadMethod::INCREMENTAL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
break;
case ObDirectLoadHint::INC_REPLACE:
if (OB_UNLIKELY(ObLoadDupActionType::LOAD_STOP_ON_DUP != dup_action)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("replace or ignore for inc_replace load method not supported", KR(ret),
K(arg_.load_method_), K(arg_.dup_action_));
} else {
dup_action = ObLoadDupActionType::LOAD_REPLACE; //rewrite dup action
method = ObDirectLoadMethod::INCREMENTAL;
insert_mode = ObDirectLoadInsertMode::INC_REPLACE;
}
break;
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid load method", KR(ret), K(arg_.load_method_));
break;
}
}
if (OB_SUCC(ret)) {
param.set_client_addr(ctx_.get_user_client_addr());
param.set_tenant_id(tenant_id);
param.set_user_id(ctx_.get_user_id());
param.set_database_id(database_id);
param.set_table_id(table_id);
param.set_parallel(arg_.parallel_);
param.set_max_error_row_count(arg_.max_error_row_count_);
param.set_dup_action(dup_action);
param.set_timeout_us(arg_.timeout_);
param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_);
param.set_method(method);
param.set_insert_mode(insert_mode);
}
OX(param.set_task_id(ObTableLoadClientService::generate_task_id()));
OX(param.set_client_addr(ctx_.get_user_client_addr()));
OX(param.set_tenant_id(ctx_.get_tenant_id()));
OX(param.set_user_id(ctx_.get_user_id()));
OX(param.set_database_id(ctx_.get_database_id()));
OZ(param.set_table_name(arg_.table_name_));
OX(param.set_parallel(arg_.parallel_));
OX(param.set_max_error_row_count(arg_.max_error_row_count_));
OX(param.set_dup_action(arg_.dup_action_));
OX(param.set_timeout_us(arg_.timeout_));
OX(param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_));
OZ(param.set_load_method(arg_.load_method_));
OZ(param.set_column_names(arg_.column_names_));
return ret;
}
@ -196,9 +154,23 @@ int ObTableDirectLoadCommitExecutor::process()
int ret = OB_SUCCESS;
LOG_INFO("table direct load commit", K_(arg));
ObTableLoadClientTask *client_task = nullptr;
ObTableLoadClientTaskBrief *client_task_brief = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
LOG_WARN("fail to get client task", KR(ret), K(key));
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
LOG_WARN("fail to get client task", KR(ret), K(key));
} else {
// 处理重试场景
ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadClientService::get_task_brief(key, client_task_brief))) {
LOG_WARN("fail to get client task brief", KR(ret), K(key));
} else if (ObTableLoadClientStatus::COMMIT == client_task_brief->client_status_) {
LOG_INFO("client task is commit", KR(ret));
} else {
ret = client_task_brief->error_code_;
LOG_WARN("client task is failed", KR(ret), KPC(client_task_brief));
}
}
} else if (OB_FAIL(client_task->commit())) {
LOG_WARN("fail to commit client task", KR(ret), K(key));
}
@ -264,9 +236,7 @@ int ObTableDirectLoadGetStatusExecutor::process()
} else {
ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadClientService::get_task_brief(key, client_task_brief))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
LOG_WARN("fail to get client task brief", KR(ret), K(key));
}
LOG_WARN("fail to get client task brief", KR(ret), K(key));
} else {
res_.status_ = client_task_brief->client_status_;
res_.error_code_ = client_task_brief->error_code_;
@ -374,7 +344,7 @@ int ObTableDirectLoadHeartBeatExecutor::process()
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
LOG_WARN("fail to get client task", KR(ret), K(key));
} else {
client_task->get_exec_ctx()->last_heartbeat_time_ = ObTimeUtil::current_time();
client_task->heart_beat();
client_task->get_status(res_.status_, res_.error_code_);
}
if (nullptr != client_task) {

View File

@ -21,8 +21,6 @@ namespace oceanbase
namespace observer
{
class ObTableLoadClientTaskParam;
class ObTableLoadClientTask;
class ObTableLoadTableCtx;
template <table::ObTableDirectLoadOperationType pcode>
class ObTableDirectLoadRpcExecutor
@ -73,7 +71,7 @@ protected:
int process() override;
private:
int resolve_param(ObTableLoadClientTaskParam &param);
int init_param(ObTableLoadClientTaskParam &param);
};
// commit

View File

@ -29,7 +29,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg,
heartbeat_timeout_,
force_create_,
is_async_,
load_method_);
load_method_,
column_names_);
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes,
table_id_,

View File

@ -44,7 +44,8 @@ public:
K_(heartbeat_timeout),
K_(force_create),
K_(is_async),
K_(load_method));
K_(load_method),
K_(column_names));
public:
ObString table_name_;
int64_t parallel_;
@ -55,6 +56,7 @@ public:
bool force_create_; // unused
bool is_async_;
ObString load_method_;
common::ObSArray<ObString> column_names_;
};
struct ObTableDirectLoadBeginRes

View File

@ -78,7 +78,7 @@ int ObTableLoadAutoincNextval::get_input_value(ObStorageDatum &datum,
const uint64_t &sql_mode)
{
int ret = OB_SUCCESS;
if (datum.is_null()) {
if (datum.is_nop() || datum.is_null()) {
is_to_generate = true;
} else {
bool is_zero = false;

View File

@ -40,7 +40,14 @@ bool ObTableLoadClientService::ClientTaskBriefEraseIfExpired::operator()(
* ObTableLoadClientService
*/
ObTableLoadClientService::ObTableLoadClientService() : next_task_id_(1), is_inited_(false) {}
int64_t ObTableLoadClientService::next_task_sequence_ = 0;
int64_t ObTableLoadClientService::generate_task_id()
{
return ObTimeUtil::current_time() * 1000 + ATOMIC_FAA(&next_task_sequence_, 1) % 1000;
}
ObTableLoadClientService::ObTableLoadClientService() : is_inited_(false) {}
ObTableLoadClientService::~ObTableLoadClientService() {}
@ -92,7 +99,6 @@ int ObTableLoadClientService::alloc_task(ObTableLoadClientTask *&client_task)
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadClientTask", KR(ret));
} else {
new_client_task->task_id_ = service->get_client_service().generate_task_id();
client_task = new_client_task;
client_task->inc_ref_count();
}
@ -125,9 +131,7 @@ void ObTableLoadClientService::revert_task(ObTableLoadClientTask *client_task)
const int64_t ref_count = client_task->dec_ref_count();
OB_ASSERT(ref_count >= 0);
if (0 == ref_count) {
const int64_t task_id = client_task->task_id_;
const uint64_t table_id = client_task->param_.get_table_id();
LOG_INFO("free client task", K(task_id), K(table_id), KP(client_task));
LOG_INFO("free client task", KPC(client_task));
free_task(client_task);
client_task = nullptr;
}
@ -141,8 +145,11 @@ int ObTableLoadClientService::add_task(ObTableLoadClientTask *client_task)
if (OB_ISNULL(service = MTL(ObTableLoadService *))) {
ret = OB_ERR_SYS;
LOG_WARN("null table load service", KR(ret));
} else if (OB_ISNULL(client_task)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(client_task));
} else {
ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_);
ObTableLoadUniqueKey key(client_task->get_table_id(), client_task->get_task_id());
ret = service->get_client_service().add_client_task(key, client_task);
}
return ret;
@ -155,8 +162,11 @@ int ObTableLoadClientService::remove_task(ObTableLoadClientTask *client_task)
if (OB_ISNULL(service = MTL(ObTableLoadService *))) {
ret = OB_ERR_SYS;
LOG_WARN("null table load service", KR(ret));
} else if (OB_ISNULL(client_task)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(client_task));
} else {
ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_);
ObTableLoadUniqueKey key(client_task->get_table_id(), client_task->get_task_id());
ret = service->get_client_service().remove_client_task(key, client_task);
}
return ret;
@ -236,10 +246,10 @@ int ObTableLoadClientService::remove_client_task(const ObTableLoadUniqueKey &key
if (OB_FAIL(client_task_brief_map_.create(key, client_task_brief))) {
LOG_WARN("fail to create client task brief", KR(ret), K(key));
} else {
client_task_brief->task_id_ = client_task->task_id_;
client_task_brief->table_id_ = client_task->param_.get_table_id();
client_task_brief->task_id_ = client_task->get_task_id();
client_task_brief->table_id_ = client_task->get_table_id();
client_task->get_status(client_task_brief->client_status_, client_task_brief->error_code_);
client_task_brief->result_info_ = client_task->result_info_;
client_task_brief->result_info_ = client_task->get_result_info();
client_task_brief->active_time_ = ObTimeUtil::current_time();
}
if (nullptr != client_task_brief) {

View File

@ -70,11 +70,11 @@ public:
return ObTableDirectLoadRpcProxy::dispatch(ctx, request, result);
}
private:
OB_INLINE int64_t generate_task_id() { return ATOMIC_FAA(&next_task_id_, 1); }
static int64_t generate_task_id();
private:
static const int64_t CLIENT_TASK_RETENTION_PERIOD = 24LL * 60 * 60 * 1000 * 1000; // 1day
static int64_t next_task_sequence_;
// key => client_task
typedef common::hash::ObHashMap<ObTableLoadUniqueKey, ObTableLoadClientTask *,
common::hash::NoPthreadDefendMode>
@ -116,7 +116,6 @@ private:
mutable obsys::ObRWLock rwlock_;
ClientTaskMap client_task_map_;
ClientTaskBriefMap client_task_brief_map_; // thread safety
int64_t next_task_id_;
bool is_inited_;
};

View File

@ -14,7 +14,6 @@
#include "observer/table_load/ob_table_load_client_task.h"
#include "observer/ob_server.h"
#include "observer/omt/ob_tenant.h"
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_service.h"
@ -39,18 +38,22 @@ using namespace table;
*/
ObTableLoadClientTaskParam::ObTableLoadClientTaskParam()
: tenant_id_(OB_INVALID_TENANT_ID),
: allocator_("TLD_CTask"),
task_id_(0),
tenant_id_(OB_INVALID_TENANT_ID),
user_id_(OB_INVALID_ID),
database_id_(OB_INVALID_ID),
table_id_(OB_INVALID_ID),
table_name_(),
parallel_(0),
max_error_row_count_(0),
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
timeout_us_(0),
heartbeat_timeout_us_(0),
method_(ObDirectLoadMethod::INVALID_METHOD),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE)
load_method_(),
column_names_()
{
allocator_.set_tenant_id(MTL_ID());
column_names_.set_block_allocator(ModulePageAllocator(allocator_));
}
ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {}
@ -58,17 +61,19 @@ ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {}
void ObTableLoadClientTaskParam::reset()
{
client_addr_.reset();
task_id_ = 0;
tenant_id_ = OB_INVALID_TENANT_ID;
user_id_ = OB_INVALID_ID;
database_id_ = OB_INVALID_ID;
table_id_ = OB_INVALID_ID;
table_name_.reset();
parallel_ = 0;
max_error_row_count_ = 0;
dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE;
timeout_us_ = 0;
heartbeat_timeout_us_ = 0;
method_ = ObDirectLoadMethod::INVALID_METHOD;
insert_mode_ = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
load_method_.reset();
column_names_.reset();
allocator_.reset();
}
int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other)
@ -77,39 +82,59 @@ int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other)
if (this != &other) {
reset();
client_addr_ = other.client_addr_;
task_id_ = other.task_id_;
tenant_id_ = other.tenant_id_;
user_id_ = other.user_id_;
database_id_ = other.database_id_;
table_id_ = other.table_id_;
parallel_ = other.parallel_;
max_error_row_count_ = other.max_error_row_count_;
dup_action_ = other.dup_action_;
timeout_us_ = other.timeout_us_;
heartbeat_timeout_us_ = other.heartbeat_timeout_us_;
method_ = other.method_;
insert_mode_ = other.insert_mode_;
if (OB_FAIL(set_table_name(other.table_name_))) {
LOG_WARN("fail to set table name", KR(ret));
} else if (OB_FAIL(set_load_method(other.load_method_))) {
LOG_WARN("fail to set load method", KR(ret));
} else if (OB_FAIL(set_column_names(other.column_names_))) {
LOG_WARN("fail to deep copy column names", KR(ret));
}
}
return ret;
}
bool ObTableLoadClientTaskParam::is_valid() const
{
return client_addr_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ &&
OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && OB_INVALID_ID != table_id_ &&
parallel_ > 0 && ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ &&
timeout_us_ > 0 && heartbeat_timeout_us_ > 0 &&
ObDirectLoadMethod::is_type_valid(method_) &&
ObDirectLoadInsertMode::is_type_valid(insert_mode_) &&
(storage::ObDirectLoadMethod::is_full(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode_)
: true) &&
(storage::ObDirectLoadMethod::is_incremental(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_incremental_method(insert_mode_)
: true) &&
(storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_
? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_
: true);
return client_addr_.is_valid() && task_id_ > 0 && OB_INVALID_TENANT_ID != tenant_id_ &&
OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && !table_name_.empty() &&
parallel_ > 0 && max_error_row_count_ >= 0 &&
ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && timeout_us_ > 0 &&
heartbeat_timeout_us_ > 0;
}
int ObTableLoadClientTaskParam::set_string(const ObString &src, ObString &dest)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ob_write_string(allocator_, src, dest))) {
LOG_WARN("fail to write string", KR(ret));
}
return ret;
}
int ObTableLoadClientTaskParam::set_string_array(const ObIArray<ObString> &src,
ObIArray<ObString> &dest)
{
int ret = OB_SUCCESS;
dest.reset();
for (int64_t i = 0; OB_SUCC(ret) && i < src.count(); ++i) {
const ObString &src_str = src.at(i);
ObString dest_str;
if (OB_FAIL(ob_write_string(allocator_, src_str, dest_str))) {
LOG_WARN("fail to write string", KR(ret));
} else if (OB_FAIL(dest.push_back(dest_str))) {
LOG_WARN("fail to push back", KR(ret));
}
}
return ret;
}
/**
@ -128,22 +153,40 @@ public:
int process() override
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObSQLSessionInfo *origin_session_info = THIS_WORKER.get_session();
ObSQLSessionInfo *session_info = client_task_->get_session_info();
ObSQLSessionInfo *session_info = client_task_->session_info_;
ObSchemaGetterGuard &schema_guard = client_task_->schema_guard_;
ObTableLoadParam load_param;
ObArray<uint64_t> column_ids;
THIS_WORKER.set_session(session_info);
if (OB_ISNULL(session_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null session info", KR(ret));
} else {
session_info->set_thread_id(GETTID());
session_info->set_thread_id(get_tid_cache());
session_info->update_last_active_time();
if (OB_FAIL(session_info->set_session_state(QUERY_ACTIVE))) {
LOG_WARN("fail to set session state", K(ret));
if (OB_TMP_FAIL(session_info->set_session_state(QUERY_ACTIVE))) {
LOG_WARN("fail to set session state", KR(tmp_ret));
}
}
// resolve
if (OB_FAIL(
resolve(client_task_->client_exec_ctx_, client_task_->param_, load_param, column_ids))) {
LOG_WARN("fail to resolve", KR(ret), K(client_task_->param_));
}
// check support
else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard,
load_param.table_id_,
load_param.method_,
load_param.insert_mode_,
load_param.load_mode_,
column_ids))) {
LOG_WARN("fail to check support direct load", KR(ret));
}
// begin
if (OB_SUCC(ret)) {
if (OB_FAIL(client_task_->init_instance())) {
if (OB_FAIL(client_task_->init_instance(load_param, column_ids))) {
LOG_WARN("fail to init instance", KR(ret));
} else if (OB_FAIL(client_task_->set_status_waitting())) {
LOG_WARN("fail to set status waitting", KR(ret));
@ -173,6 +216,168 @@ public:
}
client_task_->destroy_instance();
THIS_WORKER.set_session(origin_session_info);
if (session_info != nullptr && OB_TMP_FAIL(session_info->set_session_state(SESSION_SLEEP))) {
LOG_WARN("fail to set session state", KR(tmp_ret));
}
return ret;
}
static int resolve(ObTableLoadClientExecCtx &client_exec_ctx,
const ObTableLoadClientTaskParam &task_param, ObTableLoadParam &load_param,
ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = task_param.get_tenant_id();
const uint64_t database_id = task_param.get_database_id();
ObSchemaGetterGuard *schema_guard = nullptr;
const ObTableSchema *table_schema = nullptr;
ObDirectLoadMethod::Type method = ObDirectLoadMethod::INVALID_METHOD;
ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
ObCompressorType compressor_type = INVALID_COMPRESSOR;
bool online_opt_stat_gather = false;
double online_sample_percent = 100.;
if (OB_ISNULL(schema_guard = client_exec_ctx.get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected schema guard is null", KR(ret));
}
// resolve table_name_
else if (OB_FAIL(ObTableLoadSchema::get_table_schema(
*schema_guard, tenant_id, database_id, task_param.get_table_name(), table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id),
K(task_param.get_table_name()));
}
// resolve column_names_
else if (OB_FAIL(resolve_columns(table_schema, task_param.get_column_names(), column_ids))) {
LOG_WARN("fail to resolve columns", KR(ret), K(task_param.get_column_names()));
}
// resolve load_method_
else if (OB_FAIL(resolve_load_method(task_param.get_load_method(), method, insert_mode))) {
LOG_WARN("fail to resolve load method", KR(ret), K(task_param.get_load_method()));
}
// compress type
else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(
table_schema->get_compressor_type(), task_param.get_parallel(), compressor_type))) {
LOG_WARN("fail to get tmp store compressor type", KR(ret));
}
// opt stat gather
else if (OB_FAIL(ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(
tenant_id, online_opt_stat_gather))) {
LOG_WARN("fail to get tenant optimizer gather stats on load", KR(ret), K(tenant_id));
} else if (online_opt_stat_gather && OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(
*(client_exec_ctx.exec_ctx_), tenant_id,
table_schema->get_table_id(), online_sample_percent))) {
LOG_WARN("failed to get sys online sample percent", K(ret));
}
if (OB_SUCC(ret)) {
load_param.tenant_id_ = tenant_id;
load_param.table_id_ = table_schema->get_table_id();
load_param.parallel_ = task_param.get_parallel();
load_param.session_count_ = task_param.get_parallel();
load_param.batch_size_ = 100;
load_param.max_error_row_count_ = task_param.get_max_error_row_count();
load_param.column_count_ = column_ids.count();
load_param.need_sort_ = true;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = online_opt_stat_gather;
load_param.dup_action_ =
// rewrite dup action to replace in inc_replace
(method == ObDirectLoadMethod::INCREMENTAL &&
insert_mode == ObDirectLoadInsertMode::INC_REPLACE)
? ObLoadDupActionType::LOAD_REPLACE
: task_param.get_dup_action();
load_param.method_ = method;
load_param.insert_mode_ = insert_mode;
load_param.load_mode_ = ObDirectLoadMode::TABLE_LOAD;
load_param.compressor_type_ = compressor_type;
load_param.online_sample_percent_ = online_sample_percent;
}
return ret;
}
static int resolve_columns(const ObTableSchema *table_schema,
const ObIArray<ObString> &column_names, ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
column_ids.reset();
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected table schema is null", KR(ret), KP(table_schema));
} else if (column_names.empty()) {
if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, column_ids))) {
LOG_WARN("fail to get user column ids", KR(ret));
}
} else {
const static uint64_t INVALID_COLUMN_ID = UINT64_MAX;
ObArray<uint64_t> user_column_ids;
ObArray<ObString> user_column_names;
if (OB_FAIL(ObTableLoadSchema::get_user_column_id_and_names(table_schema,
user_column_ids,
user_column_names))) {
LOG_WARN("fail to get user column id and names", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_names.count(); ++i) {
const ObString &column_name = column_names.at(i);
if (OB_UNLIKELY(column_name.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("empty column name is invalid", KR(ret), K(i), K(column_names));
} else {
int64_t found_column_idx = -1;
for (int64_t j = 0; found_column_idx == -1 && j < user_column_names.count(); ++j) {
const ObString &user_column_name = user_column_names.at(j);
if (column_name.length() != user_column_name.length()) {
} else if (column_name.case_compare(user_column_name) == 0) {
found_column_idx = j;
}
}
if (OB_UNLIKELY(found_column_idx == -1)) {
ret = OB_ERR_BAD_FIELD_ERROR;
LOG_WARN("unknow column", KR(ret), K(column_name), K(user_column_names));
} else {
const uint64_t user_column_id = user_column_ids.at(found_column_idx);
if (OB_UNLIKELY(user_column_id == INVALID_COLUMN_ID)) {
ret = OB_ERR_FIELD_SPECIFIED_TWICE;
LOG_WARN("column specified twice", KR(ret), K(i), K(column_name), K(column_names));
} else if (OB_FAIL(column_ids.push_back(user_column_id))) {
LOG_WARN("fail to push back column id", KR(ret));
} else {
user_column_ids.at(found_column_idx) = INVALID_COLUMN_ID;
}
}
}
}
}
return ret;
}
static int resolve_load_method(const ObString &load_method_str, ObDirectLoadMethod::Type &method,
ObDirectLoadInsertMode::Type &insert_mode)
{
int ret = OB_SUCCESS;
if (load_method_str.empty()) {
method = ObDirectLoadMethod::FULL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
} else {
const ObDirectLoadHint::LoadMethod load_method =
ObDirectLoadHint::get_load_method_value(load_method_str);
switch (load_method) {
case ObDirectLoadHint::FULL:
method = ObDirectLoadMethod::FULL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
break;
case ObDirectLoadHint::INC:
method = ObDirectLoadMethod::INCREMENTAL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
break;
case ObDirectLoadHint::INC_REPLACE:
method = ObDirectLoadMethod::INCREMENTAL;
insert_mode = ObDirectLoadInsertMode::INC_REPLACE;
break;
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid load method", KR(ret), K(load_method_str));
break;
}
}
return ret;
}
@ -195,7 +400,7 @@ public:
void callback(int ret_code, ObTableLoadTask *task) override
{
if (OB_UNLIKELY(OB_SUCCESS != ret_code)) {
client_task_->set_status_abort(ret_code);
client_task_->set_status_error(ret_code);
}
task->~ObTableLoadTask();
}
@ -209,12 +414,11 @@ private:
*/
ObTableLoadClientTask::ObTableLoadClientTask()
: task_id_(OB_INVALID_ID),
allocator_("TLD_ClientTask"),
task_scheduler_(nullptr),
: allocator_("TLD_ClientTask"),
session_info_(nullptr),
plan_ctx_(allocator_),
exec_ctx_(allocator_),
task_scheduler_(nullptr),
session_count_(0),
next_batch_id_(0),
client_status_(ObTableLoadClientStatus::MAX_STATUS),
@ -251,41 +455,33 @@ int ObTableLoadClientTask::init(const ObTableLoadClientTaskParam &param)
} else if (OB_UNLIKELY(!param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param));
} else if (OB_FAIL(param_.assign(param))) {
LOG_WARN("fail to assign param", KR(ret));
} else {
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + param_.get_timeout_us());
if (OB_FAIL(init_exec_ctx())) {
LOG_WARN("fail to init client exec ctx", KR(ret));
} else if (OB_ISNULL(task_scheduler_ =
OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1,
param_.get_table_id(), "Client"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
} else if (OB_FAIL(task_scheduler_->init())) {
if (OB_FAIL(param_.assign(param))) {
LOG_WARN("fail to assign param", KR(ret), K(param));
} else if (OB_FAIL(init_exec_ctx())) {
LOG_WARN("fail to init exec ctx", KR(ret));
} else if (OB_FAIL(init_task_scheduler())) {
LOG_WARN("fail to init task scheduler", KR(ret));
} else if (OB_FAIL(task_scheduler_->start())) {
LOG_WARN("fail to start task scheduler", KR(ret));
} else {
is_inited_ = true;
}
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
}
return ret;
}
int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user_id,
uint64_t database_id, uint64_t table_id,
ObSQLSessionInfo *&session_info,
ObFreeSessionCtx &free_session_ctx)
int ObTableLoadClientTask::create_session_info()
{
int ret = OB_SUCCESS;
const schema::ObTenantSchema *tenant_info = nullptr;
const uint64_t tenant_id = param_.get_tenant_id();
const uint64_t user_id = param_.get_user_id();
const uint64_t database_id = param_.get_database_id();
const ObTenantSchema *tenant_info = nullptr;
const ObUserInfo *user_info = nullptr;
const ObDatabaseSchema *database_schema = nullptr;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(schema_guard_.get_tenant_info(tenant_id, tenant_info))) {
if (OB_NOT_NULL(session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected session info not null", KR(ret));
} else if (OB_FAIL(schema_guard_.get_tenant_info(tenant_id, tenant_info))) {
LOG_WARN("get tenant info failed", K(ret));
} else if (OB_ISNULL(tenant_info)) {
ret = OB_ERR_UNEXPECTED;
@ -300,43 +496,31 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user
} else if (OB_ISNULL(database_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid database schema", K(ret), K(tenant_id), K(database_id));
} else if (OB_FAIL(schema_guard_.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid table schema", K(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(ObTableLoadUtils::create_session_info(session_info, free_session_ctx))) {
} else if (OB_FAIL(ObTableLoadUtils::create_session_info(session_info_, free_session_ctx_))) {
LOG_WARN("create session id failed", KR(ret));
} else {
ObArenaAllocator allocator("TLD_Tmp");
allocator.set_tenant_id(MTL_ID());
ObStringBuffer buffer(&allocator);
buffer.append("DIRECT LOAD: ");
buffer.append(table_schema->get_table_name());
ObSqlString query_str;
ObObj timeout_val;
timeout_val.set_int(param_.get_timeout_us());
OZ(session_info->load_default_sys_variable(false, false)); //加载默认的session参数
OZ(session_info->load_default_configs_in_pc());
OX(session_info->init_tenant(tenant_info->get_tenant_name(), tenant_id));
OX(session_info->set_priv_user_id(user_id));
OX(session_info->store_query_string(ObString(buffer.length(), buffer.ptr())));
OX(session_info->set_user(user_info->get_user_name(), user_info->get_host_name_str(),
user_info->get_user_id()));
OX(session_info->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
OX(session_info->set_default_database(database_schema->get_database_name(),
CS_TYPE_UTF8MB4_GENERAL_CI));
OX(session_info->set_mysql_cmd(COM_QUERY));
OX(session_info->set_current_trace_id(ObCurTraceId::get_trace_id()));
OX(session_info->set_client_addr(param_.get_client_addr()));
OX(session_info->set_peer_addr(ObServer::get_instance().get_self()));
OX(session_info->set_query_start_time(ObTimeUtil::current_time()));
OZ(session_info->update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, timeout_val));
}
if (OB_FAIL(ret)) {
if (session_info != nullptr) {
observer::ObTableLoadUtils::free_session_info(session_info, free_session_ctx);
session_info = nullptr;
}
OX(query_str.assign_fmt("DIRECT LOAD: %.*s, task_id:%ld",
static_cast<int>(param_.get_table_name().length()),
param_.get_table_name().ptr(), param_.get_task_id()));
OZ(session_info_->load_default_sys_variable(false /*print_info_log*/, false /*is_sys_tenant*/)); //加载默认的session参数
OZ(session_info_->load_default_configs_in_pc());
OX(session_info_->init_tenant(tenant_info->get_tenant_name(), tenant_id));
OX(session_info_->set_priv_user_id(user_id));
OX(session_info_->store_query_string(query_str.string()));
OX(session_info_->set_user(user_info->get_user_name(), user_info->get_host_name_str(),
user_info->get_user_id()));
OX(session_info_->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
OX(session_info_->set_default_database(database_schema->get_database_name(),
CS_TYPE_UTF8MB4_GENERAL_CI));
OX(session_info_->set_mysql_cmd(COM_QUERY));
OX(session_info_->set_current_trace_id(ObCurTraceId::get_trace_id()));
OX(session_info_->set_client_addr(param_.get_client_addr()));
OX(session_info_->set_peer_addr(ObServer::get_instance().get_self()));
OX(session_info_->set_query_start_time(ObTimeUtil::current_time()));
OZ(session_info_->update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, timeout_val));
}
return ret;
}
@ -344,13 +528,9 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user
int ObTableLoadClientTask::init_exec_ctx()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(GCTX.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid argument", K(ret), K(GCTX.schema_service_));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(param_.get_tenant_id(), schema_guard_))) {
if (OB_FAIL(ObTableLoadSchema::get_schema_guard(param_.get_tenant_id(), schema_guard_))) {
LOG_WARN("get_schema_guard failed", K(ret));
} else if (OB_FAIL(create_session_info(param_.get_tenant_id(), param_.get_user_id(),
param_.get_database_id(), param_.get_table_id(), session_info_, free_session_ctx_))) {
} else if (OB_FAIL(create_session_info())) {
LOG_WARN("fail to create session info", KR(ret));
} else {
sql_ctx_.schema_guard_ = &schema_guard_;
@ -359,44 +539,55 @@ int ObTableLoadClientTask::init_exec_ctx()
exec_ctx_.set_physical_plan_ctx(&plan_ctx_);
exec_ctx_.set_my_session(session_info_);
client_exec_ctx_.exec_ctx_ = &exec_ctx_;
client_exec_ctx_.last_heartbeat_time_ = ObTimeUtil::current_time();
client_exec_ctx_.heartbeat_timeout_us_ = param_.get_heartbeat_timeout_us();
client_exec_ctx_.init_heart_beat(param_.get_heartbeat_timeout_us());
}
return ret;
}
int ObTableLoadClientTask::init_task_scheduler()
{
int ret = OB_SUCCESS;
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + param_.get_timeout_us());
if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1,
param_.get_task_id(), "Executor"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
} else if (OB_FAIL(task_scheduler_->init())) {
LOG_WARN("fail to init task scheduler", KR(ret));
} else if (OB_FAIL(task_scheduler_->start())) {
LOG_WARN("fail to start task scheduler", KR(ret));
}
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
return ret;
}
int ObTableLoadClientTask::start()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
} else if (OB_FAIL(set_status_initializing())) {
LOG_WARN("fail to set status initializing", KR(ret));
} else {
obsys::ObWLockGuard guard(rw_lock_);
if (OB_UNLIKELY(ObTableLoadClientStatus::MAX_STATUS != client_status_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("unexpected status", KR(ret), K(client_status_));
} else {
client_status_ = ObTableLoadClientStatus::INITIALIZING;
ObTableLoadTask *task = nullptr;
if (OB_ISNULL(task = OB_NEWx(ObTableLoadTask, &allocator_, param_.get_tenant_id()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTask", KR(ret));
} else if (OB_FAIL(task->set_processor<ClientTaskExectueProcessor>(this))) {
LOG_WARN("fail to set client task processor", KR(ret));
} else if (OB_FAIL(task->set_callback<ClientTaskExectueCallback>(this))) {
LOG_WARN("fail to set common task callback", KR(ret));
} else if (OB_FAIL(task_scheduler_->add_task(0, task))) {
LOG_WARN("fail to add task", KR(ret));
}
if (OB_FAIL(ret)) {
client_status_ = ObTableLoadClientStatus::ERROR;
error_code_ = ret;
if (nullptr != task) {
task->~ObTableLoadTask();
allocator_.free(task);
task = nullptr;
}
ObTableLoadTask *task = nullptr;
if (OB_ISNULL(task = OB_NEWx(ObTableLoadTask, &allocator_, param_.get_tenant_id()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTask", KR(ret));
} else if (OB_FAIL(task->set_processor<ClientTaskExectueProcessor>(this))) {
LOG_WARN("fail to set client task processor", KR(ret));
} else if (OB_FAIL(task->set_callback<ClientTaskExectueCallback>(this))) {
LOG_WARN("fail to set common task callback", KR(ret));
} else if (OB_FAIL(task_scheduler_->add_task(0, task))) {
LOG_WARN("fail to add task", KR(ret));
}
if (OB_FAIL(ret)) {
set_status_error(ret);
if (nullptr != task) {
task->~ObTableLoadTask();
allocator_.free(task);
task = nullptr;
}
}
}
@ -417,7 +608,6 @@ int ObTableLoadClientTask::write(ObTableLoadObjRowArray &obj_rows)
LOG_WARN("unexpected session count", KR(ret), K(session_count_));
} else {
const int64_t batch_id = ATOMIC_FAA(&next_batch_id_, 1);
;
const int32_t session_id = batch_id % session_count_ + 1;
ObTableLoadSequenceNo start_seq_no(batch_id << ObTableLoadSequenceNo::BATCH_ID_SHIFT);
for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) {
@ -439,10 +629,15 @@ int ObTableLoadClientTask::commit()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
} else if (OB_FAIL(check_status(ObTableLoadClientStatus::RUNNING))) {
LOG_WARN("fail to check status", KR(ret));
} else if (OB_FAIL(set_status_committing())) {
LOG_WARN("fail to set status committing", KR(ret));
} else {
obsys::ObWLockGuard guard(rw_lock_);
if (ObTableLoadClientStatus::COMMITTING == client_status_ ||
ObTableLoadClientStatus::COMMIT == client_status_) {
LOG_INFO("client task already commit", K(client_status_));
} else {
ret = advance_status_nolock(ObTableLoadClientStatus::RUNNING,
ObTableLoadClientStatus::COMMITTING);
}
}
return ret;
}
@ -460,57 +655,54 @@ void ObTableLoadClientTask::abort()
}
}
}
void ObTableLoadClientTask::heart_beat() { client_exec_ctx_.heart_beat(); }
int ObTableLoadClientTask::check_status() { return client_exec_ctx_.check_status(); }
int ObTableLoadClientTask::advance_status_nolock(const ObTableLoadClientStatus expected,
const ObTableLoadClientStatus updated)
{
int ret = OB_SUCCESS;
if (OB_LIKELY(client_status_ == expected)) {
client_status_ = updated;
LOG_INFO("LOAD DATA client status advance", K(client_status_));
} else if (ObTableLoadClientStatus::ERROR == client_status_ ||
ObTableLoadClientStatus::ABORT == client_status_) {
ret = error_code_;
LOG_WARN("client status error", KR(ret), K(client_status_), K(error_code_));
} else {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("unexpected status", KR(ret), K(client_status_));
}
return ret;
}
int ObTableLoadClientTask::advance_status(const ObTableLoadClientStatus expected,
const ObTableLoadClientStatus updated)
{
obsys::ObWLockGuard guard(rw_lock_);
return advance_status_nolock(expected, updated);
}
int ObTableLoadClientTask::set_status_initializing()
{
return advance_status(ObTableLoadClientStatus::MAX_STATUS, ObTableLoadClientStatus::INITIALIZING);
}
int ObTableLoadClientTask::set_status_waitting()
{
int ret = OB_SUCCESS;
obsys::ObWLockGuard guard(rw_lock_);
if (OB_UNLIKELY(ObTableLoadClientStatus::INITIALIZING != client_status_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("unexpected status", KR(ret), K(client_status_));
} else {
client_status_ = ObTableLoadClientStatus::WAITTING;
}
return ret;
return advance_status(ObTableLoadClientStatus::INITIALIZING, ObTableLoadClientStatus::WAITTING);
}
int ObTableLoadClientTask::set_status_running()
{
int ret = OB_SUCCESS;
obsys::ObWLockGuard guard(rw_lock_);
if (OB_UNLIKELY(ObTableLoadClientStatus::WAITTING != client_status_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("unexpected status", KR(ret), K(client_status_));
} else {
client_status_ = ObTableLoadClientStatus::RUNNING;
}
return ret;
}
int ObTableLoadClientTask::set_status_committing()
{
int ret = OB_SUCCESS;
obsys::ObWLockGuard guard(rw_lock_);
if (OB_UNLIKELY(ObTableLoadClientStatus::RUNNING != client_status_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("unexpected status", KR(ret), K(client_status_));
} else {
client_status_ = ObTableLoadClientStatus::COMMITTING;
}
return ret;
return advance_status(ObTableLoadClientStatus::WAITTING, ObTableLoadClientStatus::RUNNING);
}
int ObTableLoadClientTask::set_status_commit()
{
int ret = OB_SUCCESS;
obsys::ObWLockGuard guard(rw_lock_);
if (OB_UNLIKELY(ObTableLoadClientStatus::COMMITTING != client_status_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("unexpected status", KR(ret), K(client_status_));
} else {
client_status_ = ObTableLoadClientStatus::COMMIT;
}
return ret;
return advance_status(ObTableLoadClientStatus::COMMITTING, ObTableLoadClientStatus::COMMIT);
}
int ObTableLoadClientTask::set_status_error(int error_code)
@ -574,105 +766,22 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status,
error_code = error_code_;
}
int ObTableLoadClientTask::get_compressor_type(const uint64_t tenant_id,
const uint64_t table_id,
const int64_t parallel,
ObCompressorType &compressor_type)
int ObTableLoadClientTask::init_instance(ObTableLoadParam &load_param,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR;
if (OB_FAIL(
ObTableLoadSchema::get_table_compressor_type(tenant_id, table_id, table_compressor_type))) {
LOG_WARN("fail to get table compressor type", KR(ret));
} else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_compressor_type, parallel,
compressor_type))) {
LOG_WARN("fail to get tmp store compressor type", KR(ret));
}
return ret;
}
int ObTableLoadClientTask::init_instance()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
const ObTableLoadTableCtx *tmp_ctx = nullptr;
if (OB_FAIL(instance_.init(load_param, column_ids, &client_exec_ctx_))) {
LOG_WARN("fail to init instance", KR(ret));
} else if (OB_FAIL(instance_.start_trans(trans_ctx_, ObTableLoadInstance::DEFAULT_SEGMENT_ID,
allocator_))) {
LOG_WARN("fail to start trans", KR(ret));
} else if (OB_ISNULL(tmp_ctx = instance_.get_table_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get table ctx", KR(ret));
} else {
const uint64_t tenant_id = param_.get_tenant_id();
const uint64_t table_id = param_.get_table_id();
const ObDirectLoadMethod::Type method = param_.get_method();
const ObDirectLoadInsertMode::Type insert_mode = param_.get_insert_mode();
omt::ObTenant *tenant = nullptr;
ObSchemaGetterGuard schema_guard;
ObArray<uint64_t> column_ids;
ObCompressorType compressor_type = INVALID_COMPRESSOR;
bool online_opt_stat_gather = false;
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard,
table_id,
method,
insert_mode,
ObDirectLoadMode::TABLE_LOAD))) {
LOG_WARN("fail to check support direct load", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(schema_guard,
tenant_id,
table_id,
column_ids))) {
LOG_WARN("fail to get user column ids", KR(ret));
} else if (OB_FAIL(get_compressor_type(tenant_id, table_id, session_count_, compressor_type))) {
LOG_WARN("fail to get compressor type", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(
tenant_id, online_opt_stat_gather))) {
LOG_WARN("fail to get tenant optimizer gather stats on load", KR(ret), K(tenant_id));
}
ObTableLoadParam load_param;
double online_sample_percent = 100.;
if (OB_SUCC(ret)) {
if (online_opt_stat_gather &&
OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(exec_ctx_,
tenant_id,
table_id,
online_sample_percent))) {
LOG_WARN("failed to get sys online sample percent", K(ret));
} else {
load_param.online_sample_percent_ = online_sample_percent;
}
}
if (OB_SUCC(ret)) {
load_param.tenant_id_ = tenant_id;
load_param.table_id_ = table_id;
load_param.parallel_ = param_.get_parallel();
load_param.session_count_ = load_param.parallel_;
load_param.batch_size_ = 100;
load_param.max_error_row_count_ = param_.get_max_error_row_count();
load_param.column_count_ = column_ids.count();
load_param.need_sort_ = true;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = online_opt_stat_gather;
load_param.dup_action_ = param_.get_dup_action();
load_param.method_ = method;
load_param.insert_mode_ = insert_mode;
load_param.load_mode_ = ObDirectLoadMode::TABLE_LOAD;
load_param.compressor_type_ = compressor_type;
const ObTableLoadTableCtx *tmp_ctx = nullptr;
if (OB_FAIL(instance_.init(load_param, column_ids, &client_exec_ctx_))) {
LOG_WARN("fail to init instance", KR(ret));
} else if (OB_FAIL(instance_.start_trans(trans_ctx_, ObTableLoadInstance::DEFAULT_SEGMENT_ID, allocator_))) {
LOG_WARN("fail to start trans", KR(ret));
} else if (OB_ISNULL(tmp_ctx = instance_.get_table_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get table ctx", KR(ret));
} else {
session_count_ = tmp_ctx->param_.write_session_count_;
tmp_ctx = nullptr;
}
}
session_count_ = tmp_ctx->param_.write_session_count_;
tmp_ctx = nullptr;
}
return ret;
}
@ -680,31 +789,17 @@ int ObTableLoadClientTask::init_instance()
int ObTableLoadClientTask::commit_instance()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
if (OB_FAIL(instance_.commit_trans(trans_ctx_))) {
LOG_WARN("fail to commit trans", KR(ret));
} else if (OB_FAIL(instance_.commit())) {
LOG_WARN("fail to commit instance", KR(ret));
} else {
if (OB_FAIL(instance_.commit_trans(trans_ctx_))) {
LOG_WARN("fail to commit trans", KR(ret));
} else if (OB_FAIL(instance_.commit())) {
LOG_WARN("fail to commit instance", KR(ret));
} else {
result_info_ = instance_.get_result_info();
}
result_info_ = instance_.get_result_info();
}
return ret;
}
void ObTableLoadClientTask::destroy_instance()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
} else {
instance_.destroy();
}
}
void ObTableLoadClientTask::destroy_instance() { instance_.destroy(); }
} // namespace observer
} // namespace oceanbase

View File

@ -13,9 +13,9 @@
#pragma once
#include "lib/hash/ob_link_hashmap.h"
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "observer/table_load/ob_table_load_instance.h"
#include "observer/table_load/ob_table_load_struct.h"
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_row_array.h"
#include "sql/session/ob_sql_session_mgr.h"
@ -39,51 +39,68 @@ public:
int assign(const ObTableLoadClientTaskParam &other);
bool is_valid() const;
#define DEFINE_GETTER_AND_SETTER(type, name) \
#define DEFINE_VAR_GETTER_AND_SETTER(type, name) \
OB_INLINE type get_##name() const { return name##_; } \
OB_INLINE void set_##name(type name) { name##_ = name; }
DEFINE_GETTER_AND_SETTER(ObAddr, client_addr);
DEFINE_GETTER_AND_SETTER(uint64_t, tenant_id);
DEFINE_GETTER_AND_SETTER(uint64_t, user_id);
DEFINE_GETTER_AND_SETTER(uint64_t, database_id);
DEFINE_GETTER_AND_SETTER(uint64_t, table_id);
DEFINE_GETTER_AND_SETTER(int64_t, parallel);
DEFINE_GETTER_AND_SETTER(uint64_t, max_error_row_count);
DEFINE_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action);
DEFINE_GETTER_AND_SETTER(uint64_t, timeout_us);
DEFINE_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us);
DEFINE_GETTER_AND_SETTER(storage::ObDirectLoadMethod::Type, method);
DEFINE_GETTER_AND_SETTER(storage::ObDirectLoadInsertMode::Type, insert_mode);
#define DEFINE_STR_GETTER_AND_SETTER(type, name) \
OB_INLINE const type &get_##name() const { return name##_; } \
OB_INLINE int set_##name(const type &name) { return set_string(name, name##_); }
#undef DEFINE_GETTER_AND_SETTER
#define DEFINE_STR_ARRAY_GETTER_AND_SETTER(type, name) \
OB_INLINE const ObIArray<type> &get_##name() const { return name##_; } \
OB_INLINE int set_##name(const ObIArray<type> &name) { return set_string_array(name, name##_); }
DEFINE_VAR_GETTER_AND_SETTER(ObAddr, client_addr);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, task_id);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, tenant_id);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, user_id);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, database_id);
DEFINE_STR_GETTER_AND_SETTER(ObString, table_name);
DEFINE_VAR_GETTER_AND_SETTER(int64_t, parallel);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, max_error_row_count);
DEFINE_VAR_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, timeout_us);
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us);
DEFINE_STR_GETTER_AND_SETTER(ObString, load_method);
DEFINE_STR_ARRAY_GETTER_AND_SETTER(ObString, column_names);
#undef DEFINE_VAR_GETTER_AND_SETTER
#undef DEFINE_STR_GETTER_AND_SETTER
TO_STRING_KV(K_(client_addr),
K_(task_id),
K_(tenant_id),
K_(user_id),
K_(database_id),
K_(table_id),
K_(table_name),
K_(parallel),
K_(max_error_row_count),
K_(dup_action),
K_(timeout_us),
K_(heartbeat_timeout_us),
"method", storage::ObDirectLoadMethod::get_type_string(method_),
"insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_));
K_(load_method),
K_(column_names));
private:
int set_string(const ObString &src, ObString &dest);
int set_string_array(const ObIArray<ObString> &src, ObIArray<ObString> &dest);
private:
ObArenaAllocator allocator_;
int64_t task_id_;
ObAddr client_addr_;
uint64_t tenant_id_;
uint64_t user_id_;
uint64_t database_id_;
uint64_t table_id_;
ObString table_name_;
int64_t parallel_;
uint64_t max_error_row_count_;
sql::ObLoadDupActionType dup_action_;
int64_t timeout_us_;
int64_t heartbeat_timeout_us_;
storage::ObDirectLoadMethod::Type method_;
storage::ObDirectLoadInsertMode::Type insert_mode_;
ObString load_method_;
common::ObArray<ObString> column_names_;
};
class ObTableLoadClientTask
@ -99,55 +116,61 @@ public:
OB_INLINE int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
OB_INLINE int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); }
OB_INLINE int64_t dec_ref_count() { return ATOMIC_SAF(&ref_count_, 1); }
OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; }
OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return &client_exec_ctx_; }
OB_INLINE int64_t get_task_id() const { return param_.get_task_id(); }
OB_INLINE uint64_t get_table_id() const { return 0; }
void heart_beat();
int check_status();
int set_status_initializing();
int set_status_waitting();
int set_status_running();
int set_status_committing();
int set_status_commit();
int set_status_error(int error_code);
void set_status_abort(int error_code = OB_CANCELED);
table::ObTableLoadClientStatus get_status() const;
void get_status(table::ObTableLoadClientStatus &client_status, int &error_code) const;
int check_status(table::ObTableLoadClientStatus client_status);
TO_STRING_KV(K_(task_id), K_(param), K_(result_info), KP_(session_info), K_(free_session_ctx),
K_(client_exec_ctx), KP_(task_scheduler), K_(client_status), K_(error_code),
OB_INLINE const table::ObTableLoadResultInfo &get_result_info() const { return result_info_; }
TO_STRING_KV(K_(param),
KP_(session_info),
K_(free_session_ctx),
K_(client_exec_ctx),
KP_(task_scheduler),
K_(client_status),
K_(error_code),
K_(result_info),
K_(ref_count));
private:
int init_task_scheduler();
int create_session_info(uint64_t tenant_id, uint64_t user_id, uint64_t database_id,
uint64_t table_id, sql::ObSQLSessionInfo *&session_info,
sql::ObFreeSessionCtx &free_session_ctx);
int create_session_info();
int init_exec_ctx();
int init_task_scheduler();
int init_instance();
int advance_status_nolock(const table::ObTableLoadClientStatus expected,
const table::ObTableLoadClientStatus updated);
int advance_status(const table::ObTableLoadClientStatus expected,
const table::ObTableLoadClientStatus updated);
int init_instance(ObTableLoadParam &load_param, const ObIArray<uint64_t> &column_ids);
int commit_instance();
void destroy_instance();
int get_compressor_type(const uint64_t tenant_id,
const uint64_t table_id,
const int64_t parallel,
ObCompressorType &compressor_type);
private:
class ClientTaskExectueProcessor;
class ClientTaskExectueCallback;
public:
uint64_t task_id_;
ObTableLoadClientTaskParam param_;
table::ObTableLoadResultInfo result_info_;
private:
ObArenaAllocator allocator_;
ObITableLoadTaskScheduler *task_scheduler_;
ObTableLoadClientTaskParam param_;
share::schema::ObSchemaGetterGuard schema_guard_;
sql::ObSQLSessionInfo *session_info_;
sql::ObFreeSessionCtx free_session_ctx_;
ObTableLoadClientExecCtx client_exec_ctx_;
ObSchemaGetterGuard schema_guard_;
sql::ObSqlCtx sql_ctx_;
sql::ObPhysicalPlanCtx plan_ctx_;
ObExecContext exec_ctx_;
ObTableLoadClientExecCtx client_exec_ctx_;
ObITableLoadTaskScheduler *task_scheduler_;
int64_t session_count_;
ObTableLoadInstance instance_;
ObTableLoadInstance::TransCtx trans_ctx_;
@ -155,6 +178,7 @@ private:
mutable obsys::ObRWLock rw_lock_;
table::ObTableLoadClientStatus client_status_;
int error_code_;
table::ObTableLoadResultInfo result_info_;
int64_t ref_count_ CACHE_ALIGNED;
bool is_inited_;
};

View File

@ -17,6 +17,7 @@
#include "observer/table_load/control/ob_table_load_control_rpc_proxy.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_coordinator_trans.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_redef_table.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_stat.h"
@ -1117,7 +1118,7 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist
"TLD_TabStatNode",
tenant_id))) {
LOG_WARN("fail to create table stats map", KR(ret));
} else if (OB_FAIL(inc_column_stats.create(ctx_->param_.column_count_,
} else if (OB_FAIL(inc_column_stats.create(ctx_->schema_.store_column_count_,
"TLD_ColStatBkt",
"TLD_ColStatNode",
tenant_id))) {
@ -1659,10 +1660,20 @@ public:
int set_objs(const ObTableLoadObjRowArray &obj_rows, const ObIArray<int64_t> &idx_array)
{
int ret = OB_SUCCESS;
ObTableLoadErrorRowHandler *error_row_handler = ctx_->coordinator_ctx_->error_row_handler_;
for (int64_t i = 0; OB_SUCC(ret) && (i < obj_rows.count()); ++i) {
const ObTableLoadObjRow &src_obj_row = obj_rows.at(i);
ObTableLoadObjRow out_obj_row;
if (OB_FAIL(src_obj_row.project(idx_array, out_obj_row))) {
// 对于客户端导入场景, 需要处理多列或者少列
if (OB_UNLIKELY(src_obj_row.count_ != ctx_->param_.column_count_)) {
ret = OB_ERR_COULUMN_VALUE_NOT_MATCH;
LOG_WARN("column count doesn't match value count", KR(ret), K(src_obj_row),
K(ctx_->param_.column_count_));
ObNewRow new_row(src_obj_row.cells_, src_obj_row.count_);
if (OB_FAIL(error_row_handler->handle_error_row(ret, new_row))) {
LOG_WARN("fail to handle error row", KR(ret));
}
} else if (OB_FAIL(src_obj_row.project(idx_array, out_obj_row))) {
LOG_WARN("failed to projecte out_obj_row", KR(ret), K(src_obj_row.count_));
} else if (OB_FAIL(obj_rows_.push_back(out_obj_row))) {
LOG_WARN("failed to add row to obj_rows_", KR(ret), K(out_obj_row));

View File

@ -353,10 +353,9 @@ int ObTableLoadCoordinatorCtx::init_column_idxs(const ObIArray<uint64_t> &column
int ret = OB_SUCCESS;
idx_array_.reset();
const ObIArray<ObColDesc> &column_descs = ctx_->schema_.column_descs_;
bool found_column = true;
for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) {
const ObColDesc &col_desc = column_descs.at(i);
found_column = (ctx_->schema_.is_heap_table_ && i == 0); // skip hidden pk in heap table
bool found_column = (ctx_->schema_.is_heap_table_ && i == 0); // skip hidden pk in heap table
// 在源数据的列数组中找到对应的列
for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < column_ids.count(); ++j) {
const uint64_t column_id = column_ids.at(j);
@ -368,10 +367,14 @@ int ObTableLoadCoordinatorCtx::init_column_idxs(const ObIArray<uint64_t> &column
}
}
}
}
if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) {
ret = OB_SCHEMA_NOT_UPTODATE;
LOG_WARN("column not found", KR(ret), K(idx_array_), K(column_descs), K(column_ids));
if (OB_SUCC(ret) && !found_column) {
if (OB_UNLIKELY(ctx_->param_.px_mode_)) {
ret = OB_SCHEMA_NOT_UPTODATE;
LOG_WARN("column not found", KR(ret), K(idx_array_), K(column_descs), K(column_ids));
} else if (OB_FAIL(idx_array_.push_back(-1))) {
LOG_WARN("fail to push back column idx", KR(ret), K(idx_array_), K(i), K(col_desc));
}
}
}
return ret;
}

View File

@ -45,6 +45,15 @@ ObSQLSessionInfo *ObTableLoadExecCtx::get_session_info()
return session_info;
}
ObSchemaGetterGuard *ObTableLoadExecCtx::get_schema_guard()
{
ObSchemaGetterGuard *schema_guard = nullptr;
if (nullptr != exec_ctx_ && nullptr != exec_ctx_->get_sql_ctx()) {
schema_guard = exec_ctx_->get_sql_ctx()->schema_guard_;
}
return schema_guard;
}
int ObTableLoadExecCtx::check_status()
{
int ret = OB_SUCCESS;
@ -74,12 +83,24 @@ int ObTableLoadClientExecCtx::check_status()
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadExecCtx::check_status())) {
LOG_WARN("fail to check status", KR(ret));
} else if (OB_UNLIKELY(ObTimeUtil::current_time() - last_heartbeat_time_ > heartbeat_timeout_us_)) {
} else if (OB_UNLIKELY(last_heartbeat_time_ + heartbeat_timeout_us_ <
ObTimeUtil::current_time())) {
ret = OB_TIMEOUT;
LOG_WARN("heartbeat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_));
}
return ret;
}
void ObTableLoadClientExecCtx::init_heart_beat(const int64_t heartbeat_timeout_us)
{
heartbeat_timeout_us_ = heartbeat_timeout_us;
last_heartbeat_time_ = ObTimeUtil::current_time();
}
void ObTableLoadClientExecCtx::heart_beat()
{
last_heartbeat_time_ = ObTimeUtil::current_time();
}
} // namespace observer
} // namespace oceanbase

View File

@ -17,6 +17,13 @@
namespace oceanbase
{
namespace share
{
namespace schema
{
class ObSchemaGetterGuard;
} // namespace schema
} // namespace share
namespace sql
{
class ObExecContext;
@ -36,6 +43,7 @@ public:
virtual ~ObTableLoadExecCtx() = default;
common::ObIAllocator *get_allocator();
sql::ObSQLSessionInfo *get_session_info();
share::schema::ObSchemaGetterGuard *get_schema_guard();
virtual int check_status();
bool is_valid() const { return nullptr != exec_ctx_; }
TO_STRING_KV(KP_(exec_ctx), KP_(tx_desc));
@ -54,8 +62,10 @@ public:
}
virtual ~ObTableLoadClientExecCtx() = default;
virtual int check_status();
void init_heart_beat(const int64_t heartbeat_timeout_us);
void heart_beat();
TO_STRING_KV(KP_(exec_ctx), KP_(tx_desc), K_(heartbeat_timeout_us), K_(last_heartbeat_time));
public:
private:
int64_t heartbeat_timeout_us_;
int64_t last_heartbeat_time_;
};

View File

@ -70,9 +70,10 @@ int ObTableLoadInstance::init(ObTableLoadParam &param,
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadInstance init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!param.is_valid() || !execute_ctx->is_valid())) {
} else if (OB_UNLIKELY(!param.is_valid() || column_ids.empty() ||
column_ids.count() != param.column_count_ || !execute_ctx->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param), KPC(execute_ctx));
LOG_WARN("invalid args", KR(ret), K(param), K(column_ids), KPC(execute_ctx));
} else {
DISABLE_SQL_MEMLEAK_GUARD;
execute_ctx_ = execute_ctx;
@ -92,7 +93,8 @@ int ObTableLoadInstance::init(ObTableLoadParam &param,
else if (OB_FAIL(ObTableLoadService::check_support_direct_load(param.table_id_,
param.method_,
param.insert_mode_,
param.load_mode_))) {
param.load_mode_,
column_ids))) {
LOG_WARN("fail to check support direct load", KR(ret), K(param));
}
// start direct load

View File

@ -290,7 +290,9 @@ int ObTableLoadMemCompactor::inner_init()
mem_ctx_.table_data_desc_ = store_ctx_->table_data_desc_;
mem_ctx_.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_);
mem_ctx_.need_sort_ = param_->need_sort_;
mem_ctx_.column_count_ = param_->column_count_;
mem_ctx_.column_count_ = (store_ctx_->ctx_->schema_.is_heap_table_
? store_ctx_->ctx_->schema_.store_column_count_ - 1
: store_ctx_->ctx_->schema_.store_column_count_);
}
mem_ctx_.mem_load_task_count_ = param_->session_count_;

View File

@ -331,7 +331,9 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init()
mem_ctx_.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_);
mem_ctx_.need_sort_ = param_->need_sort_;
mem_ctx_.mem_load_task_count_ = param_->session_count_;
mem_ctx_.column_count_ = param_->column_count_;
mem_ctx_.column_count_ =
(store_ctx_->ctx_->schema_.is_heap_table_ ? store_ctx_->ctx_->schema_.store_column_count_ - 1
: store_ctx_->ctx_->schema_.store_column_count_);
mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_;
mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_;
mem_ctx_.dup_action_ = param_->dup_action_;

View File

@ -96,18 +96,49 @@ static int pad_obj(ObTableLoadCastObjCtx &cast_obj_ctx, const ObColumnSchemaV2 *
}
int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx,
const ObColumnSchemaV2 *column_schema, const ObObj &src,
const ObColumnSchemaV2 *column_schema,
const ObObj &src,
ObObj &dst)
{
int ret = OB_SUCCESS;
const ObObj *convert_src_obj = nullptr;
const ObObjType expect_type = column_schema->get_meta_type().get_type();
const ObAccuracy &accuracy = column_schema->get_accuracy();
if (OB_FAIL(convert_obj(expect_type, src, convert_src_obj))) {
LOG_WARN("fail to convert obj", KR(ret));
if (src.is_nop_value()) {
// 默认值是表达式
if (lib::is_mysql_mode() && column_schema->get_cur_default_value().is_ext()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("column default value is ext", KR(ret), KPC(column_schema));
} else if (lib::is_oracle_mode() && column_schema->is_default_expr_v2_column()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("column default value is expr", KR(ret), KPC(column_schema));
}
// 没有默认值, 且为NOT NULL
// 例外:枚举类型默认为第一个
else if (column_schema->is_not_null_for_write() &&
column_schema->get_cur_default_value().is_null()) {
if (column_schema->get_meta_type().is_enum()) {
const uint64_t ENUM_FIRST_VAL = 1;
dst.set_enum(ENUM_FIRST_VAL);
} else {
ret = OB_ERR_NO_DEFAULT_FOR_FIELD;
LOG_WARN("column can not be null", KR(ret), KPC(column_schema));
}
}
// mysql模式可以直接用default value
else if (lib::is_mysql_mode()) {
dst = column_schema->get_cur_default_value();
}
// oracle模式需要转换
else {
convert_src_obj = &(column_schema->get_cur_default_value());
}
} else {
if (OB_FAIL(convert_obj(expect_type, src, convert_src_obj))) {
LOG_WARN("fail to convert obj", KR(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_SUCC(ret) && convert_src_obj != nullptr) {
if (column_schema->is_enum_or_set()) {
if (OB_FAIL(handle_string_to_enum_set(cast_obj_ctx, column_schema, src, dst))) {
LOG_WARN("fail to convert string to enum or set", KR(ret), K(src), K(dst));
@ -117,18 +148,18 @@ int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx,
LOG_WARN("fail to do to type", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(pad_obj(cast_obj_ctx, column_schema, dst))) {
LOG_WARN("fail to pad obj", KR(ret));
if (OB_SUCC(ret)) {
if (OB_FAIL(pad_obj(cast_obj_ctx, column_schema, dst))) {
LOG_WARN("fail to pad obj", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (cast_obj_ctx.is_need_check_ &&
OB_FAIL(cast_obj_check(cast_obj_ctx, column_schema, dst))) {
LOG_WARN("fail to check cast obj result", KR(ret), K(dst));
if (OB_SUCC(ret)) {
if (cast_obj_ctx.is_need_check_ &&
OB_FAIL(cast_obj_check(cast_obj_ctx, column_schema, dst))) {
LOG_WARN("fail to check cast obj result", KR(ret), K(dst));
}
}
}
return ret;

View File

@ -181,11 +181,15 @@ int ObTableLoadPartitionCalc::cast_part_key(common::ObNewRow &part_key, common::
ObObj obj;
for (int64_t i = 0; OB_SUCC(ret) && i < part_key_obj_index_.count(); ++i) {
const IndexAndType &index_and_type = part_key_obj_index_.at(i);
if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
part_key.cells_[i], obj))) {
const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_;
ObObj &part_obj = part_key.cells_[i];
if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx,
column_schema,
part_obj,
obj))) {
LOG_WARN("fail to cast obj", KR(ret));
} else {
part_key.cells_[i] = obj;
part_obj = obj;
}
}
}

View File

@ -40,17 +40,16 @@ int ObTableLoadSchema::get_schema_guard(uint64_t tenant_id, ObSchemaGetterGuard
return ret;
}
int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t database_id,
int ObTableLoadSchema::get_table_schema(ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t database_id,
const ObString &table_name,
ObSchemaGetterGuard &schema_guard,
const ObTableSchema *&table_schema)
{
int ret = OB_SUCCESS;
table_schema = nullptr;
if (OB_FAIL(get_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, database_id, table_name, false,
table_schema))) {
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, database_id, table_name, false /*is_index*/,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id), K(table_name));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
@ -59,20 +58,6 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t database_id
return ret;
}
int ObTableLoadSchema::get_table_id(uint64_t tenant_id, uint64_t database_id,
const ObString &table_name, uint64_t &table_id)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(get_table_schema(tenant_id, database_id, table_name, schema_guard, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id), K(table_name));
} else {
table_id = table_schema->get_table_id();
}
return ret;
}
int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id,
ObSchemaGetterGuard &schema_guard,
const ObTableSchema *&table_schema)
@ -112,6 +97,25 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id,
return ret;
}
int ObTableLoadSchema::get_table_schema(ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
const ObTableSchema *&table_schema)
{
int ret = OB_SUCCESS;
table_schema = nullptr;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
}
return ret;
}
int ObTableLoadSchema::get_user_column_schemas(const ObTableSchema *table_schema,
ObIArray<const ObColumnSchemaV2 *> &column_schemas)
{
@ -155,35 +159,82 @@ int ObTableLoadSchema::get_user_column_schemas(ObSchemaGetterGuard &schema_guard
{
int ret = OB_SUCCESS;
const ObTableSchema *table_schema = nullptr;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
if (OB_FAIL(get_table_schema(schema_guard, tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret));
} else {
ret = get_user_column_schemas(table_schema, column_schemas);
}
return ret;
}
int ObTableLoadSchema::get_user_column_ids(const ObTableSchema *table_schema,
ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
column_ids.reset();
ObArray<const ObColumnSchemaV2 *> column_schemas;
if (OB_FAIL(get_user_column_schemas(table_schema, column_schemas))) {
LOG_WARN("fail to get user column schemas", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) {
const ObColumnSchemaV2 *column_schema = column_schemas.at(i);
if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) {
LOG_WARN("fail to push back column id", KR(ret));
}
}
return ret;
}
int ObTableLoadSchema::get_user_column_ids(ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
column_ids.reset();
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(get_table_schema(schema_guard, tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret));
} else {
ret = get_user_column_ids(table_schema, column_ids);
}
return ret;
}
int ObTableLoadSchema::get_user_column_names(const ObTableSchema *table_schema,
ObIArray<ObString> &column_names)
{
int ret = OB_SUCCESS;
column_names.reset();
ObArray<const ObColumnSchemaV2 *> column_schemas;
if (OB_FAIL(get_user_column_schemas(schema_guard, tenant_id, table_id, column_schemas))) {
if (OB_FAIL(get_user_column_schemas(table_schema, column_schemas))) {
LOG_WARN("fail to get user column schemas", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) {
const ObColumnSchemaV2 *column_schema = column_schemas.at(i);
if (OB_FAIL(column_names.push_back(column_schema->get_column_name_str()))) {
LOG_WARN("fail to push back column name", KR(ret));
}
}
return ret;
}
int ObTableLoadSchema::get_user_column_id_and_names(const ObTableSchema *table_schema,
ObIArray<uint64_t> &column_ids,
ObIArray<ObString> &column_names)
{
int ret = OB_SUCCESS;
column_ids.reset();
column_names.reset();
ObArray<const ObColumnSchemaV2 *> column_schemas;
if (OB_FAIL(get_user_column_schemas(table_schema, column_schemas))) {
LOG_WARN("fail to get user column schemas", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) {
const ObColumnSchemaV2 *column_schema = column_schemas.at(i);
if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) {
LOG_WARN("fail to push back column id", KR(ret));
} else if (OB_FAIL(column_names.push_back(column_schema->get_column_name_str()))) {
LOG_WARN("fail to push back column name", KR(ret));
}
}
return ret;
@ -205,6 +256,31 @@ int ObTableLoadSchema::get_user_column_count(ObSchemaGetterGuard &schema_guard,
return ret;
}
int ObTableLoadSchema::get_column_ids(const ObTableSchema *table_schema,
ObIArray<uint64_t> &column_ids,
bool contain_hidden_pk_column)
{
int ret = OB_SUCCESS;
column_ids.reset();
if (OB_ISNULL(table_schema)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(table_schema));
} else {
ObArray<ObColDesc> column_descs;
if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) {
const ObColDesc &col_desc = column_descs.at(i);
if (ObColumnSchemaV2::is_hidden_pk_column_id(col_desc.col_id_) && !contain_hidden_pk_column) {
} else if (OB_FAIL(column_ids.push_back(col_desc.col_id_))) {
LOG_WARN("failed to push back column id", KR(ret), K(i));
}
}
}
return ret;
}
int ObTableLoadSchema::get_column_ids(ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
@ -218,26 +294,13 @@ int ObTableLoadSchema::get_column_ids(ObSchemaGetterGuard &schema_guard,
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id));
} else {
const ObTableSchema *table_schema = nullptr;
ObArray<ObColDesc> column_descs;
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) {
const ObColDesc &col_desc = column_descs.at(i);
const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_);
if (OB_ISNULL(col_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null column schema", KR(ret), K(col_desc));
} else if (ObColumnSchemaV2::is_hidden_pk_column_id(col_schema->get_column_id()) &&
!contain_hidden_pk_column) {
} else if (OB_FAIL(column_ids.push_back(col_schema->get_column_id()))) {
LOG_WARN("failed to push back column id", KR(ret), K(i));
}
} else {
ret = get_column_ids(table_schema, column_ids, contain_hidden_pk_column);
}
}
return ret;
@ -341,6 +404,24 @@ int ObTableLoadSchema::check_has_unused_column(const ObTableSchema *table_schema
return ret;
}
int ObTableLoadSchema::check_has_roaringbitmap_column(const ObTableSchema *table_schema, bool &bret)
{
int ret = OB_SUCCESS;
bret = false;
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
ObColumnSchemaV2 *column_schema = *iter;
if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid column schema", K(column_schema));
} else if (column_schema->is_roaringbitmap()) {
bret = true;
break;
}
}
return ret;
}
int ObTableLoadSchema::check_has_lob_column(const ObTableSchema *table_schema, bool &bret)
{
int ret = OB_SUCCESS;

View File

@ -29,29 +29,43 @@ class ObTableLoadSchema
{
public:
static int get_schema_guard(uint64_t tenant_id, share::schema::ObSchemaGetterGuard &schema_guard);
static int get_table_schema(uint64_t tenant_id, uint64_t database_id,
static int get_table_schema(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t database_id,
const common::ObString &table_name,
share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema *&table_schema);
static int get_table_id(uint64_t tenant_id, uint64_t database_id,
const common::ObString &table_name, uint64_t &table_id);
// 获取最新schema_guard和table_schema
static int get_table_schema(uint64_t tenant_id, uint64_t table_id,
share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema *&table_schema);
// 指定schema_guard获取table_schema
static int get_table_schema(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id, uint64_t table_id,
const share::schema::ObTableSchema *&table_schema);
static int get_user_column_schemas(const share::schema::ObTableSchema *table_schema,
ObIArray<const share::schema::ObColumnSchemaV2 *> &column_schemas);
static int get_user_column_schemas(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
ObIArray<const share::schema::ObColumnSchemaV2 *> &column_schemas);
static int get_user_column_ids(const share::schema::ObTableSchema *table_schema,
common::ObIArray<uint64_t> &column_ids);
static int get_user_column_ids(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
common::ObIArray<uint64_t> &column_ids);
static int get_user_column_names(const share::schema::ObTableSchema *table_schema,
common::ObIArray<common::ObString> &column_names);
static int get_user_column_id_and_names(const share::schema::ObTableSchema *table_schema,
common::ObIArray<uint64_t> &column_ids,
common::ObIArray<common::ObString> &column_names);
static int get_user_column_count(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
int64_t &column_count);
static int get_column_ids(const share::schema::ObTableSchema *table_schema,
common::ObIArray<uint64_t> &column_ids,
bool contain_hidden_pk_column = false);
static int get_column_ids(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,
uint64_t table_id,
@ -61,6 +75,7 @@ public:
static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value);
static int check_has_invisible_column(const share::schema::ObTableSchema *table_schema, bool &bret);
static int check_has_unused_column(const share::schema::ObTableSchema *table_schema, bool &bret);
static int check_has_roaringbitmap_column(const share::schema::ObTableSchema *table_schema, bool &bret);
static int check_has_lob_column(const share::schema::ObTableSchema *table_schema, bool &bret);
static int get_table_compressor_type(uint64_t tenant_id, uint64_t table_id,
ObCompressorType &compressor_type);

View File

@ -350,7 +350,7 @@ void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask()
for (int64_t i = 0; i < client_task_array.count(); ++i) {
ObTableLoadClientTask *client_task = client_task_array.at(i);
if (OB_UNLIKELY(ObTableLoadClientStatus::ERROR == client_task->get_status() ||
client_task->get_exec_ctx()->check_status() != OB_SUCCESS)) {
client_task->check_status() != OB_SUCCESS)) {
client_task->abort();
}
ObTableLoadClientService::revert_task(client_task);
@ -433,7 +433,8 @@ int ObTableLoadService::check_support_direct_load(
const uint64_t table_id,
const ObDirectLoadMethod::Type method,
const ObDirectLoadInsertMode::Type insert_mode,
const storage::ObDirectLoadMode::Type load_mode)
const ObDirectLoadMode::Type load_mode,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == table_id)) {
@ -447,7 +448,7 @@ int ObTableLoadService::check_support_direct_load(
ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else {
ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode);
ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode, column_ids);
}
}
return ret;
@ -457,7 +458,8 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
uint64_t table_id,
const ObDirectLoadMethod::Type method,
const ObDirectLoadInsertMode::Type insert_mode,
const storage::ObDirectLoadMode::Type load_mode)
const ObDirectLoadMode::Type load_mode,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == table_id)) {
@ -472,7 +474,7 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table schema is null", KR(ret));
} else {
ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode);
ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode, column_ids);
}
}
return ret;
@ -484,15 +486,17 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
const ObTableSchema *table_schema,
const ObDirectLoadMethod::Type method,
const ObDirectLoadInsertMode::Type insert_mode,
const storage::ObDirectLoadMode::Type load_mode)
const ObDirectLoadMode::Type load_mode,
const ObIArray<uint64_t> &column_ids)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(nullptr == table_schema ||
!ObDirectLoadMethod::is_type_valid(method) ||
!ObDirectLoadInsertMode::is_type_valid(insert_mode)) ||
!ObDirectLoadMode::is_type_valid(load_mode)) {
!ObDirectLoadInsertMode::is_type_valid(insert_mode) ||
!ObDirectLoadMode::is_type_valid(load_mode) ||
column_ids.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(table_schema), K(method), K(insert_mode));
LOG_WARN("invalid args", KR(ret), KP(table_schema), K(method), K(insert_mode), K(column_ids));
} else {
const uint64_t tenant_id = MTL_ID();
bool trigger_enabled = false;
@ -501,6 +505,7 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
bool has_multivalue_index = false;
bool has_invisible_column = false;
bool has_unused_column = false;
bool has_roaringbitmap_column = false;
// check if it is a user table
const char *tmp_prefix = ObDirectLoadMode::is_insert_overwrite(load_mode) ? InsertOverwritePrefix : EmptyPrefix;
@ -574,6 +579,14 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
LOG_WARN("direct-load does not support table has unused column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "%sdirect-load does not support table has unused column", tmp_prefix);
}
// check has roaringbitmap column
else if (OB_FAIL(ObTableLoadSchema::check_has_roaringbitmap_column(table_schema, has_roaringbitmap_column))) {
LOG_WARN("fail to check has roaringbitmap column", KR(ret));
} else if (has_roaringbitmap_column) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support table has roaringbitmap column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "%sdirect-load does not support table has roaringbitmap column", tmp_prefix);
}
// check if table has mlog
else if (table_schema->has_mlog_table()) {
ret = OB_NOT_SUPPORTED;
@ -623,6 +636,56 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
}
}
}
// check default column
if (OB_SUCC(ret)) {
ObArray<ObColDesc> column_descs;
if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema));
} else if (column_ids.count() == (table_schema->is_heap_table() ? column_descs.count() - 1
: column_descs.count())) {
// non default column
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) {
const ObColDesc &col_desc = column_descs.at(i);
bool found_column = ObColumnSchemaV2::is_hidden_pk_column_id(col_desc.col_id_);
for (int64_t j = 0; !found_column && j < column_ids.count(); ++j) {
if (col_desc.col_id_ == column_ids.at(j)) {
found_column = true;
}
}
if (!found_column) {
const ObColumnSchemaV2 *column_schema = table_schema->get_column_schema(col_desc.col_id_);
if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null column schema", KR(ret), K(col_desc));
}
// 自增列
else if (column_schema->is_autoincrement() || column_schema->is_identity_column()) {
}
// 默认值是表达式
else if (OB_UNLIKELY(lib::is_mysql_mode() && column_schema->get_cur_default_value().is_ext())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support column default value is ext", KR(ret),
KPC(column_schema), K(column_schema->get_cur_default_value()));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support column default value is ext");
} else if (OB_UNLIKELY(lib::is_oracle_mode() && column_schema->is_default_expr_v2_column())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support column default value is expr", KR(ret),
KPC(column_schema), K(column_schema->get_cur_default_value()));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support column default value is expr");
}
// 没有默认值, 且为NOT NULL
// 例外:枚举类型默认为第一个
else if (OB_UNLIKELY(column_schema->is_not_null_for_write() &&
column_schema->get_cur_default_value().is_null() &&
!column_schema->get_meta_type().is_enum())) {
ret = OB_ERR_NO_DEFAULT_FOR_FIELD;
LOG_WARN("column doesn't have a default value", KR(ret), KPC(column_schema));
}
}
}
}
}
}
return ret;
}

View File

@ -39,18 +39,21 @@ public:
static int check_support_direct_load(uint64_t table_id,
const storage::ObDirectLoadMethod::Type method,
const storage::ObDirectLoadInsertMode::Type insert_mode,
const storage::ObDirectLoadMode::Type load_mode);
const storage::ObDirectLoadMode::Type load_mode,
const common::ObIArray<uint64_t> &column_ids);
// 业务层指定schema_guard进行检查
static int check_support_direct_load(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t table_id,
const storage::ObDirectLoadMethod::Type method,
const storage::ObDirectLoadInsertMode::Type insert_mode,
const storage::ObDirectLoadMode::Type load_mode);
const storage::ObDirectLoadMode::Type load_mode,
const common::ObIArray<uint64_t> &column_ids);
static int check_support_direct_load(share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema *table_schema,
const storage::ObDirectLoadMethod::Type method,
const storage::ObDirectLoadInsertMode::Type insert_mode,
const storage::ObDirectLoadMode::Type load_mode);
const storage::ObDirectLoadMode::Type load_mode,
const common::ObIArray<uint64_t> &column_ids);
static ObTableLoadTableCtx *alloc_ctx();
static void free_ctx(ObTableLoadTableCtx *table_ctx);
static int add_ctx(ObTableLoadTableCtx *table_ctx);

View File

@ -103,7 +103,9 @@ int ObTableLoadStoreCtx::init(
if (OB_SUCC(ret)) {
table_data_desc_.rowkey_column_num_ =
(!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0);
table_data_desc_.column_count_ = ctx_->param_.column_count_;
table_data_desc_.column_count_ =
(!ctx_->schema_.is_heap_table_ ? ctx_->schema_.store_column_count_
: ctx_->schema_.store_column_count_ - 1);
table_data_desc_.external_data_block_size_ = ObDirectLoadDataBlock::DEFAULT_DATA_BLOCK_SIZE;
table_data_desc_.sstable_index_block_size_ =
ObDirectLoadSSTableIndexBlock::DEFAULT_INDEX_BLOCK_SIZE;

View File

@ -69,11 +69,6 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDD
if (OB_FAIL(schema_.init(param_.tenant_id_, param_.table_id_))) {
LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_),
K(param_.table_id_));
} else if (OB_UNLIKELY(param.column_count_ != (schema_.is_heap_table_
? (schema_.store_column_count_ - 1)
: schema_.store_column_count_))) {
ret = OB_SCHEMA_NOT_UPTODATE;
LOG_WARN("unexpected column count", KR(ret), K(param.column_count_), K(schema_.store_column_count_), K(schema_.is_heap_table_));
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", param_.tenant_id_))) {
LOG_WARN("fail to init allocator", KR(ret));
} else if (OB_FAIL(trans_ctx_allocator_.init("TLD_TCtxPool", param_.tenant_id_))) {

View File

@ -67,6 +67,7 @@ ObTableLoadTransBucketWriter::ObTableLoadTransBucketWriter(ObTableLoadTransCtx *
param_(trans_ctx_->ctx_->param_),
allocator_("TLD_TBWriter"),
is_partitioned_(false),
column_count_(0),
cast_mode_(CM_NONE),
session_ctx_array_(nullptr),
ref_count_(0),
@ -98,7 +99,10 @@ int ObTableLoadTransBucketWriter::init()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null coordinator ctx", KR(ret));
} else {
is_partitioned_ = coordinator_ctx_->ctx_->schema_.is_partitioned_table_;
const ObTableLoadSchema &schema = coordinator_ctx_->ctx_->schema_;
is_partitioned_ = schema.is_partitioned_table_;
column_count_ =
(!schema.is_heap_table_ ? schema.store_column_count_ : schema.store_column_count_ - 1);
if (OB_FAIL(ObSQLUtils::get_default_cast_mode(coordinator_ctx_->ctx_->session_info_, cast_mode_))) {
LOG_WARN("fail to get_default_cast_mode", KR(ret));
} else if (OB_FAIL(init_session_ctx_array())) {
@ -229,19 +233,27 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity(
for (int64_t j = 0; OB_SUCC(ret) && j < row_count; ++j) {
ObStorageDatum storage_datum;
ObTableLoadObjRow &obj_row = obj_rows.at(j);
out_obj.set_null();
const ObTableLoadPartitionCalc::IndexAndType &index_and_type =
coordinator_ctx_->partition_calc_.part_key_obj_index_.at(
coordinator_ctx_->partition_calc_.partition_with_autoinc_idx_);
const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_;
const int64_t obj_index = index_and_type.index_;
if (OB_UNLIKELY(obj_index >= param_.column_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid length", KR(ret), K(obj_index), K(param_.column_count_));
} else if (!obj_row.cells_[obj_index].is_null() &&
OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
obj_row.cells_[obj_index], out_obj))) {
const ObObj &obj = obj_row.cells_[obj_index];
if (OB_UNLIKELY(obj_row.count_ != column_count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column count not match", KR(ret), K(obj_row), K(column_count_));
} else if (OB_UNLIKELY(obj_index < 0 || obj_index >= column_count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected obj index", KR(ret), K(index_and_type), K(column_count_));
} else if (obj.is_null() || obj.is_nop_value()) {
out_obj = obj;
} else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx,
column_schema,
obj,
out_obj))) {
LOG_WARN("fail to cast obj", KR(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(storage_datum.from_obj_enhance(out_obj))) {
LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj));
} else if (column_schema->is_autoincrement() &&
@ -283,19 +295,22 @@ int ObTableLoadTransBucketWriter::handle_identity_column(const ObColumnSchemaV2
ObArenaAllocator &cast_allocator)
{
int ret = OB_SUCCESS;
if (column_schema->is_always_identity_column()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support always identity column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support always identity column");
} else if (column_schema->is_default_identity_column() && datum.is_null()) {
ret = OB_ERR_INVALID_NOT_NULL_CONSTRAINT_ON_IDENTITY_COLUMN;
LOG_WARN("default identity column has null value", KR(ret));
} else if (column_schema->is_default_on_null_identity_column()) {
// 1. generated always as identity : 不能指定此列导入
// 2. generated by default as identity : 不指定时自动生成, 不能导入null
// 3. generated by default on null as identity : 不指定或者指定null会自动生成
if (OB_UNLIKELY(column_schema->is_always_identity_column() && !datum.is_nop())) {
ret = OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN;
LOG_USER_ERROR(OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN);
} else if (OB_UNLIKELY(column_schema->is_default_identity_column() && datum.is_null())) {
ret = OB_BAD_NULL_ERROR;
LOG_WARN("default identity column cannot insert null", KR(ret));
} else if (datum.is_nop() || datum.is_null()) {
ObSequenceValue seq_value;
if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(coordinator_ctx_->sequence_schema_,
cast_allocator, seq_value))) {
if (OB_FAIL(ObSequenceCache::get_instance().nextval(coordinator_ctx_->sequence_schema_,
cast_allocator,
seq_value))) {
LOG_WARN("fail get nextval for seq", KR(ret));
} else if (datum.is_null()) {
} else {
datum.set_number(seq_value.val());
}
}
@ -311,9 +326,9 @@ int ObTableLoadTransBucketWriter::write_for_non_partitioned(SessionContext &sess
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) {
const ObTableLoadObjRow &row = obj_rows.at(i);
bool need_write = false;
if (OB_UNLIKELY(row.count_ != param_.column_count_)) {
if (OB_UNLIKELY(row.count_ != column_count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column count not match", KR(ret), K(row.count_), K(param_.column_count_));
LOG_WARN("unexpected column count not match", KR(ret), K(row), K(column_count_));
} else if (OB_FAIL(load_bucket->add_row(session_ctx.partition_id_.tablet_id_,
row,
param_.batch_size_,
@ -346,9 +361,9 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_
ObNewRow part_key;
part_key.count_ = part_key_obj_count;
part_key.cells_ = static_cast<ObObj *>(allocator.alloc(sizeof(ObObj) * part_key_obj_count));
if (OB_UNLIKELY(row.count_ != param_.column_count_)) {
if (OB_UNLIKELY(row.count_ != column_count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column count not match", KR(ret), K(row.count_), K(param_.column_count_));
LOG_WARN("unexpected column count not match", KR(ret), K(row), K(column_count_));
} else if (OB_ISNULL(part_key.cells_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));

View File

@ -70,6 +70,7 @@ private:
const ObTableLoadParam &param_;
common::ObArenaAllocator allocator_;
bool is_partitioned_;
int64_t column_count_;
common::ObCastMode cast_mode_;
struct SessionContext
{

View File

@ -439,20 +439,7 @@ int ObTableLoadTransStoreWriter::cast_column(
(obj.is_null() || obj.is_nop_value());
ObObj out_obj;
if (is_null_autoinc) {
out_obj.set_null();
} else if (obj.is_nop_value()) {
if (column_schema->is_not_null_for_write() &&
column_schema->get_cur_default_value().is_null()) {
if (column_schema->get_meta_type().is_enum()) {
const uint64_t ENUM_FIRST_VAL = 1;
out_obj.set_enum(ENUM_FIRST_VAL);
} else {
ret = OB_ERR_NO_DEFAULT_FOR_FIELD;
LOG_WARN("column can not be null", KR(ret), KPC(column_schema));
}
} else {
out_obj = column_schema->get_cur_default_value();
}
out_obj = obj;
} else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, column_schema, obj, out_obj))) {
LOG_WARN("fail to cast obj and check", KR(ret), K(obj));
}
@ -491,19 +478,22 @@ int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 *
ObArenaAllocator &cast_allocator)
{
int ret = OB_SUCCESS;
if (column_schema->is_always_identity_column()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support always identity column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support always identity column");
} else if (column_schema->is_default_identity_column() && datum.is_null()) {
ret = OB_ERR_INVALID_NOT_NULL_CONSTRAINT_ON_IDENTITY_COLUMN;
LOG_WARN("default identity column has null value", KR(ret));
} else if (column_schema->is_default_on_null_identity_column()) {
// 1. generated always as identity : 不能指定此列导入
// 2. generated by default as identity : 不指定时自动生成, 不能导入null
// 3. generated by default on null as identity : 不指定或者指定null会自动生成
if (OB_UNLIKELY(column_schema->is_always_identity_column() && !datum.is_nop())) {
ret = OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN;
LOG_USER_ERROR(OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN);
} else if (OB_UNLIKELY(column_schema->is_default_identity_column() && datum.is_null())) {
ret = OB_BAD_NULL_ERROR;
LOG_WARN("default identity column cannot insert null", KR(ret));
} else if (datum.is_nop() || datum.is_null()) {
ObSequenceValue seq_value;
if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(
trans_ctx_->ctx_->store_ctx_->sequence_schema_, cast_allocator, seq_value))) {
if (OB_FAIL(ObSequenceCache::get_instance().nextval(trans_ctx_->ctx_->store_ctx_->sequence_schema_,
cast_allocator,
seq_value))) {
LOG_WARN("fail get nextval for seq", KR(ret));
} else if (datum.is_null()) {
} else {
datum.set_number(seq_value.val());
}
}

View File

@ -152,19 +152,18 @@ template<class T>
int ObTableLoadRow<T>::project(const ObIArray<int64_t> &idx_projector, ObTableLoadRow<T> &projected_row) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(idx_projector.count() != count_)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected count", KR(ret), K(idx_projector), K(count_));
} else if (OB_FAIL(projected_row.init(count_, allocator_handle_))) {
OB_LOG(WARN, "failed to alloate cells", KR(ret), K(projected_row.count_));
if (OB_FAIL(projected_row.init(idx_projector.count(), allocator_handle_))) {
OB_LOG(WARN, "failed to alloate cells", KR(ret), K(idx_projector.count()));
} else {
for (int64_t j = 0; j < count_; ++j) {
const int64_t idx = idx_projector.at(j);
if (OB_UNLIKELY(idx < 0 || idx >= count_)) {
for (int64_t i = 0; i < idx_projector.count(); ++i) {
const int64_t idx = idx_projector.at(i);
if (idx < 0) {
projected_row.cells_[i].set_nop_value();
} else if (OB_UNLIKELY(idx >= count_)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected idx", KR(ret), K(j), K(idx), K(idx_projector));
OB_LOG(WARN, "unexpected idx", KR(ret), K(i), K(idx), K(idx_projector), K(count_));
} else {
projected_row.cells_[j] = cells_[idx];
projected_row.cells_[i] = cells_[idx];
}
}
projected_row.seq_no_ = seq_no_;

View File

@ -2218,7 +2218,8 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
execute_param_.table_id_,
execute_param_.method_,
execute_param_.insert_mode_,
ObDirectLoadMode::LOAD_DATA))) {
ObDirectLoadMode::LOAD_DATA,
execute_param_.column_ids_))) {
LOG_WARN("fail to check support direct load", KR(ret));
} else if (OB_FAIL(init_execute_context())) {
LOG_WARN("fail to init execute context", KR(ret), K(ctx), K(load_stmt));
@ -2425,31 +2426,55 @@ int ObLoadDataDirectImpl::init_execute_param()
ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_;
int64_t column_count = 0;
execute_param_.column_ids_.reset();
if (is_backup) {
if (is_backup) { // 备份数据导入
if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
execute_param_.column_ids_))) {
LOG_WARN("fail to get column ids for backup", KR(ret));
execute_param_.tenant_id_,
execute_param_.table_id_,
execute_param_.column_ids_))) {
LOG_WARN("fail to get column ids for backup", KR(ret));
}
} else {
if (OB_FAIL(ObTableLoadSchema::get_user_column_count(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
column_count))) {
LOG_WARN("fail to get user column count", KR(ret));
} else if (OB_UNLIKELY(column_count != field_or_var_list.count())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not contain all columns is not supported", KR(ret), K(column_count),
K(field_or_var_list));
} else if (load_stmt_->get_default_table_columns()) { // 默认列导入
if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
execute_param_.column_ids_))) {
LOG_WARN("fail to get user column ids", KR(ret));
}
} else { // 指定列导入
const static uint64_t INVALID_COLUMN_ID = UINT64_MAX;
const ObTableSchema *table_schema = nullptr;
ObArray<uint64_t> user_column_ids;
if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard,
execute_param_.tenant_id_,
execute_param_.table_id_,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(execute_param_));
} else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, user_column_ids))) {
LOG_WARN("fail to get user column ids", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) {
const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(i);
if (OB_UNLIKELY(!field_or_var_struct.is_table_column_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i), K(field_or_var_list));
} else if (OB_FAIL(execute_param_.column_ids_.push_back(field_or_var_struct.column_id_))) {
LOG_WARN("fail to push back column id", KR(ret));
LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i),
K(field_or_var_list));
} else {
const uint64_t column_id = field_or_var_struct.column_id_;
int64_t found_column_idx = -1;
for (int64_t j = 0; found_column_idx == -1 && j < user_column_ids.count(); ++j) {
const uint64_t user_column_id = user_column_ids.at(j);
if (column_id == user_column_id) {
found_column_idx = j;
}
}
if (OB_UNLIKELY(found_column_idx == -1)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unknow column", KR(ret), K(user_column_ids), K(field_or_var_struct));
} else if (OB_FAIL(execute_param_.column_ids_.push_back(column_id))) {
LOG_WARN("fail to push back column id", KR(ret));
} else {
user_column_ids.at(found_column_idx) = INVALID_COLUMN_ID;
}
}
}
}

View File

@ -79,8 +79,8 @@ int ObTableDirectInsertCtx::init(
LOG_WARN("fail to new ObTableLoadInstance", KR(ret));
} else {
load_exec_ctx_->exec_ctx_ = exec_ctx;
const ObTableSchema *table_schema = nullptr;
ObArray<uint64_t> column_ids;
omt::ObTenant *tenant = nullptr;
ObCompressorType compressor_type = ObCompressorType::NONE_COMPRESSOR;
ObDirectLoadMethod::Type method = (is_incremental ? ObDirectLoadMethod::INCREMENTAL : ObDirectLoadMethod::FULL);
ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
@ -93,23 +93,21 @@ int ObTableDirectInsertCtx::init(
}
ObDirectLoadMode::Type load_mode = is_insert_overwrite ? ObDirectLoadMode::INSERT_OVERWRITE : ObDirectLoadMode::INSERT_INTO;
bool is_heap_table = false;
if (OB_FAIL(GCTX.omt_->get_tenant(MTL_ID(), tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(MTL_ID()));
if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard, tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret));
} else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema->get_compressor_type(),
parallel,
compressor_type))) {
LOG_WARN("fail to get tmp store compressor type", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_column_ids(table_schema, column_ids))) {
LOG_WARN("failed to init store column idxs", KR(ret));
} else if (OB_FAIL(ObTableLoadService::check_support_direct_load(*schema_guard,
table_id,
table_schema,
method,
insert_mode,
load_mode))) {
load_mode,
column_ids))) {
LOG_WARN("fail to check support direct load", KR(ret));
} else if (OB_FAIL(get_compressor_type(MTL_ID(), table_id, parallel, compressor_type))) {
LOG_WARN("fail to get compressor type", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard,
tenant_id,
table_id,
column_ids))) {
LOG_WARN("failed to init store column idxs", KR(ret));
} else if(OB_FAIL(get_is_heap_table(*schema_guard, tenant_id, table_id, is_heap_table))) {
LOG_WARN("failed to get is heap table", KR(ret), K(tenant_id), K(table_id));
} else {
ObTableLoadParam param;
param.tenant_id_ = MTL_ID();
@ -120,7 +118,7 @@ int ObTableDirectInsertCtx::init(
param.column_count_ = column_ids.count();
param.px_mode_ = true;
param.online_opt_stat_gather_ = is_online_gather_statistics_;
param.need_sort_ = is_heap_table ? phy_plan.get_direct_load_need_sort() : true;
param.need_sort_ = table_schema->is_heap_table() ? phy_plan.get_direct_load_need_sort() : true;
param.max_error_row_count_ = 0;
param.dup_action_ = (enable_inc_replace ? sql::ObLoadDupActionType::LOAD_REPLACE
: sql::ObLoadDupActionType::LOAD_STOP_ON_DUP);
@ -185,46 +183,5 @@ void ObTableDirectInsertCtx::destroy()
is_online_gather_statistics_ = false;
}
int ObTableDirectInsertCtx::get_compressor_type(const uint64_t tenant_id,
const uint64_t table_id,
const int64_t parallel,
ObCompressorType &compressor_type)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id,
schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema->get_compressor_type(),
parallel, compressor_type))) {
LOG_WARN("fail to get tmp store compressor type", KR(ret));
}
return ret;
}
int ObTableDirectInsertCtx::get_is_heap_table(
ObSchemaGetterGuard &schema_guard,
const uint64_t tenant_id,
const uint64_t table_id,
bool &is_heap_table)
{
int ret = OB_SUCCESS;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table schema is null", KR(ret));
} else {
is_heap_table = table_schema->is_heap_table();
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -70,13 +70,6 @@ public:
return online_sample_percent_;
}
private:
int get_compressor_type(const uint64_t tenant_id, const uint64_t table_id, const int64_t parallel,
ObCompressorType &compressor_type);
int get_is_heap_table(share::schema::ObSchemaGetterGuard &schema_guard,
const uint64_t tenant_id,
const uint64_t table_id,
bool &is_heap_table);
private:
observer::ObTableLoadExecCtx *load_exec_ctx_;
observer::ObTableLoadInstance *table_load_instance_;

View File

@ -0,0 +1,3 @@
3,1,2
33,11,22
333,111,222
1 3 1 2
2 33 11 22
3 333 111 222