[FEAT MERGE]direct load data support partition level load

This commit is contained in:
medcll 2024-11-22 07:15:03 +00:00 committed by ob-robot
parent e175d96d4c
commit 0ef02930d0
44 changed files with 1095 additions and 207 deletions

View File

@ -282,6 +282,7 @@ ob_set_subtarget(ob_server table_load
table_load/ob_table_load_pre_sorter.cpp
table_load/ob_table_load_mem_chunk_manager.cpp
table_load/ob_table_load_pre_sort_writer.cpp
table_load/ob_table_load_empty_insert_tablet_ctx_manager.cpp
)
ob_set_subtarget(ob_server virtual_table

View File

@ -134,6 +134,7 @@ int ObTableDirectLoadBeginExecutor::init_param(ObTableLoadClientTaskParam &param
OX(param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_));
OZ(param.set_load_method(arg_.load_method_));
OZ(param.set_column_names(arg_.column_names_));
OZ(param.set_part_names(arg_.part_names_));
return ret;
}

View File

@ -17,6 +17,7 @@
#include "observer/table_load/ob_table_load_store.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "sql/engine/ob_des_exec_context.h"
#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h"
namespace oceanbase
{
@ -687,5 +688,34 @@ int ObDirectLoadControlInsertTransExecutor::process()
return ret;
}
int ObDirectLoadControlInitEmptyTabletsExecutor::check_args()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == arg_.table_id_
|| 0 == arg_.ddl_param_.task_id_
|| arg_.partition_id_array_.empty()
|| arg_.target_partition_id_array_.empty()
|| arg_.partition_id_array_.count() != arg_.target_partition_id_array_.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(arg_));
}
return OB_SUCCESS;
}
int ObDirectLoadControlInitEmptyTabletsExecutor::process()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else if (OB_FAIL(ObTableLoadEmptyInsertTabletCtxManager::execute(
arg_.table_id_,
arg_.ddl_param_,
arg_.partition_id_array_,
arg_.target_partition_id_array_))) {
LOG_WARN("fail to execute init empty tablet", KR(ret));
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -362,5 +362,22 @@ protected:
int process() override;
};
class ObDirectLoadControlInitEmptyTabletsExecutor
: public ObTableLoadControlRpcExecutor<ObDirectLoadControlCommandType::INIT_EMPTY_TABLETS>
{
typedef ObTableLoadControlRpcExecutor<ObDirectLoadControlCommandType::INIT_EMPTY_TABLETS> ParentType;
public:
ObDirectLoadControlInitEmptyTabletsExecutor(common::ObIAllocator &allocator,
const ObDirectLoadControlRequest &request,
ObDirectLoadControlResult &result)
: ParentType(allocator, request, result)
{
}
virtual ~ObDirectLoadControlInitEmptyTabletsExecutor() = default;
protected:
int check_args() override;
int process() override;
};
} // namespace observer
} // namespace oceanbase

View File

@ -47,6 +47,8 @@ int ObTableLoadControlRpcProxy::dispatch(const ObDirectLoadControlRequest &reque
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::ABANDON_TRANS);
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::GET_TRANS_STATUS);
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::INSERT_TRANS);
// init empty tablets
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::INIT_EMPTY_TABLETS);
default:
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "unexpected command type", K(ret), K(request));

View File

@ -36,6 +36,7 @@ class ObDirectLoadControlConfirmFinishTransExecutor;
class ObDirectLoadControlAbandonTransExecutor;
class ObDirectLoadControlGetTransStatusExecutor;
class ObDirectLoadControlInsertTransExecutor;
class ObDirectLoadControlInitEmptyTabletsExecutor;
class ObTableLoadControlRpcProxy
{
@ -216,6 +217,12 @@ public:
ObDirectLoadControlInsertTransExecutor,
ObDirectLoadControlInsertTransArg);
// init empty tablet
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, init_empty_tablets,
ObDirectLoadControlCommandType::INIT_EMPTY_TABLETS,
ObDirectLoadControlInitEmptyTabletsExecutor,
ObDirectLoadControlInitEmptyTabletsArg);
private:
obrpc::ObSrvRpcProxy &rpc_proxy_;
ObArenaAllocator allocator_;

View File

@ -312,5 +312,12 @@ OB_SERIALIZE_MEMBER(ObDirectLoadControlInsertTransArg,
sequence_no_,
payload_);
// init empty tablets
OB_SERIALIZE_MEMBER(ObDirectLoadControlInitEmptyTabletsArg,
table_id_,
ddl_param_,
partition_id_array_,
target_partition_id_array_);
} // namespace observer
} // namespace oceanbase

View File

@ -467,5 +467,25 @@ public:
ObString payload_; //里面包的是ObTableLoadObjArray
};
class ObDirectLoadControlInitEmptyTabletsArg final
{
OB_UNIS_VERSION(1);
public:
ObDirectLoadControlInitEmptyTabletsArg()
: table_id_(common::OB_INVALID_ID)
{
}
~ObDirectLoadControlInitEmptyTabletsArg() {}
TO_STRING_KV(K_(table_id),
K_(ddl_param),
K_(partition_id_array),
K_(target_partition_id_array));
public:
uint64_t table_id_;
ObTableLoadDDLParam ddl_param_;
ObSArray<table::ObTableLoadLSIdAndPartitionId> partition_id_array_; // origin table
ObSArray<table::ObTableLoadLSIdAndPartitionId> target_partition_id_array_; // target table
};
} // namespace observer
} // namespace oceanbase

View File

@ -22,6 +22,7 @@
#include "observer/table_load/ob_table_load_task_scheduler.h"
#include "observer/table_load/ob_table_load_utils.h"
#include "share/stat/ob_dbms_stats_utils.h"
#include "share/schema/ob_part_mgr_util.h"
namespace oceanbase
{
@ -50,10 +51,12 @@ ObTableLoadClientTaskParam::ObTableLoadClientTaskParam()
timeout_us_(0),
heartbeat_timeout_us_(0),
load_method_(),
column_names_()
column_names_(),
part_names_()
{
allocator_.set_tenant_id(MTL_ID());
column_names_.set_block_allocator(ModulePageAllocator(allocator_));
part_names_.set_block_allocator(ModulePageAllocator(allocator_));
}
ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {}
@ -73,6 +76,7 @@ void ObTableLoadClientTaskParam::reset()
heartbeat_timeout_us_ = 0;
load_method_.reset();
column_names_.reset();
part_names_.reset();
allocator_.reset();
}
@ -97,6 +101,8 @@ int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other)
LOG_WARN("fail to set load method", KR(ret));
} else if (OB_FAIL(set_column_names(other.column_names_))) {
LOG_WARN("fail to deep copy column names", KR(ret));
} else if (OB_FAIL(set_part_names(other.part_names_))) {
LOG_WARN("fail to deep copy part names", KR(ret));
}
}
return ret;
@ -159,6 +165,7 @@ public:
ObSchemaGetterGuard &schema_guard = client_task_->schema_guard_;
ObTableLoadParam load_param;
ObArray<uint64_t> column_ids;
ObArray<ObTabletID> tablet_ids;
THIS_WORKER.set_session(session_info);
if (OB_ISNULL(session_info)) {
ret = OB_ERR_UNEXPECTED;
@ -172,7 +179,7 @@ public:
}
// resolve
if (OB_FAIL(
resolve(client_task_->client_exec_ctx_, client_task_->param_, load_param, column_ids))) {
resolve(client_task_->client_exec_ctx_, client_task_->param_, load_param, column_ids, tablet_ids))) {
LOG_WARN("fail to resolve", KR(ret), K(client_task_->param_));
}
// check support
@ -187,7 +194,7 @@ public:
}
// begin
if (OB_SUCC(ret)) {
if (OB_FAIL(client_task_->init_instance(load_param, column_ids))) {
if (OB_FAIL(client_task_->init_instance(load_param, column_ids, tablet_ids))) {
LOG_WARN("fail to init instance", KR(ret));
} else if (OB_FAIL(client_task_->set_status_waitting())) {
LOG_WARN("fail to set status waitting", KR(ret));
@ -228,7 +235,7 @@ public:
static int resolve(ObTableLoadClientExecCtx &client_exec_ctx,
const ObTableLoadClientTaskParam &task_param, ObTableLoadParam &load_param,
ObIArray<uint64_t> &column_ids)
ObIArray<uint64_t> &column_ids, ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = task_param.get_tenant_id();
@ -276,6 +283,8 @@ public:
*(client_exec_ctx.exec_ctx_), tenant_id,
table_schema->get_table_id(), online_sample_percent))) {
LOG_WARN("failed to get sys online sample percent", K(ret));
} else if (OB_FAIL(resolve_part_names(table_schema, task_param.get_part_names(), tablet_ids))) {
LOG_WARN("fail to resolve part name", KR(ret));
}
if (OB_SUCC(ret)) {
load_param.tenant_id_ = tenant_id;
@ -299,7 +308,8 @@ public:
load_param.load_mode_ = ObDirectLoadMode::TABLE_LOAD;
load_param.compressor_type_ = compressor_type;
load_param.online_sample_percent_ = online_sample_percent;
load_param.load_level_ = ObDirectLoadLevel::TABLE;
load_param.load_level_ = tablet_ids.empty() ? ObDirectLoadLevel::TABLE
: ObDirectLoadLevel::PARTITION;
}
return ret;
}
@ -359,6 +369,42 @@ public:
return ret;
}
static int resolve_part_names(const ObTableSchema *table_schema,
const ObIArray<ObString> &part_names,
ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
ObArray<ObPartID> part_ids;
uint64_t table_id = OB_INVALID_ID;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schems is nullptr", KR(ret));
} else {
table_id = table_schema->get_table_id();
for (int i = 0; OB_SUCC(ret) && i < part_names.count(); i++) {
ObArray<ObObjectID> partition_ids;
ObString &partition_name = const_cast<ObString &>(part_names.at(i));
//here just conver partition_name to its lowercase
ObCharset::casedn(CS_TYPE_UTF8MB4_GENERAL_CI, partition_name);
ObPartGetter part_getter(*table_schema);
if (OB_FAIL(part_getter.get_part_ids(partition_name, partition_ids))) {
LOG_WARN("fail to get part ids", K(ret), K(partition_name));
if (OB_UNKNOWN_PARTITION == ret && lib::is_mysql_mode()) {
LOG_USER_ERROR(OB_UNKNOWN_PARTITION, partition_name.length(), partition_name.ptr(),
table_schema->get_table_name_str().length(),
table_schema->get_table_name_str().ptr());
}
} else if (OB_FAIL(append_array_no_dup(part_ids, partition_ids))) {
LOG_WARN("Push partition id error", K(ret));
}
} // end of for
if (OB_SUCC(ret) && OB_FAIL(ObTableLoadSchema::get_tablet_ids_by_part_ids(table_schema, part_ids, tablet_ids))) {
LOG_WARN("fail to get tablet ids", KR(ret));
}
}
return ret;
}
static int resolve_load_method(const ObString &load_method_str, ObDirectLoadMethod::Type &method,
ObDirectLoadInsertMode::Type &insert_mode)
{
@ -784,11 +830,12 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status,
}
int ObTableLoadClientTask::init_instance(ObTableLoadParam &load_param,
const ObIArray<uint64_t> &column_ids)
const ObIArray<uint64_t> &column_ids,
const ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
const ObTableLoadTableCtx *tmp_ctx = nullptr;
if (OB_FAIL(instance_.init(load_param, column_ids, &client_exec_ctx_))) {
if (OB_FAIL(instance_.init(load_param, column_ids, tablet_ids, &client_exec_ctx_))) {
LOG_WARN("fail to init instance", KR(ret));
} else if (OB_FAIL(instance_.start_trans(trans_ctx_, ObTableLoadInstance::DEFAULT_SEGMENT_ID,
allocator_))) {

View File

@ -64,6 +64,7 @@ public:
DEFINE_VAR_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us);
DEFINE_STR_GETTER_AND_SETTER(ObString, load_method);
DEFINE_STR_ARRAY_GETTER_AND_SETTER(ObString, column_names);
DEFINE_STR_ARRAY_GETTER_AND_SETTER(ObString, part_names);
#undef DEFINE_VAR_GETTER_AND_SETTER
#undef DEFINE_STR_GETTER_AND_SETTER
@ -80,7 +81,8 @@ public:
K_(timeout_us),
K_(heartbeat_timeout_us),
K_(load_method),
K_(column_names));
K_(column_names),
K_(part_names));
private:
int set_string(const ObString &src, ObString &dest);
@ -101,6 +103,7 @@ private:
int64_t heartbeat_timeout_us_;
ObString load_method_;
common::ObArray<ObString> column_names_;
common::ObArray<ObString> part_names_;
};
class ObTableLoadClientTask : public common::ObDLinkBase<ObTableLoadClientTask>
@ -156,7 +159,9 @@ private:
int advance_status(const table::ObTableLoadClientStatus expected,
const table::ObTableLoadClientStatus updated);
int init_instance(ObTableLoadParam &load_param, const ObIArray<uint64_t> &column_ids);
int init_instance(ObTableLoadParam &load_param,
const ObIArray<uint64_t> &column_ids,
const ObIArray<ObTabletID> &tablet_ids);
int commit_instance();
void destroy_instance();

View File

@ -34,6 +34,7 @@
#include "observer/table_load/ob_table_load_index_long_wait.h"
#include "observer/omt/ob_tenant.h"
#include "storage/direct_load/ob_direct_load_mem_define.h"
#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h"
namespace oceanbase
{
@ -82,13 +83,14 @@ bool ObTableLoadCoordinator::is_ctx_inited(ObTableLoadTableCtx *ctx)
int ObTableLoadCoordinator::init_ctx(ObTableLoadTableCtx *ctx,
const ObIArray<uint64_t> &column_ids,
const ObIArray<ObTabletID> &tablet_ids,
ObTableLoadExecCtx *exec_ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid agrs", KR(ret));
} else if (OB_FAIL(ctx->init_coordinator_ctx(column_ids, exec_ctx))) {
} else if (OB_FAIL(ctx->init_coordinator_ctx(column_ids, tablet_ids, exec_ctx))) {
LOG_WARN("fail to init coordinator ctx", KR(ret));
}
return ret;
@ -288,8 +290,11 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
} else if (cluster_version < CLUSTER_VERSION_4_2_2_0 ||
(cluster_version >= CLUSTER_VERSION_4_3_0_0 && cluster_version < CLUSTER_VERSION_4_3_1_0)) {
// not support resource manage
if (OB_FAIL(coordinator_ctx_->init_partition_location())) {
LOG_WARN("fail to init partition location", KR(ret));
if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(coordinator_ctx_->partition_ids_,
coordinator_ctx_->target_partition_ids_,
coordinator_ctx_->partition_location_,
coordinator_ctx_->target_partition_location_))) {
LOG_WARN("fail to inner init partition location", KR(ret));
} else {
ctx_->param_.session_count_ = MAX(MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2), MIN_THREAD_COUNT);
ctx_->param_.write_session_count_ = ctx_->param_.session_count_;
@ -310,8 +315,11 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
LOG_WARN("fail to check status", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) {
LOG_WARN("fail to check status", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->init_partition_location())) {
LOG_WARN("fail to init partition location", KR(ret));
} else if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(coordinator_ctx_->partition_ids_,
coordinator_ctx_->target_partition_ids_,
coordinator_ctx_->partition_location_,
coordinator_ctx_->target_partition_location_))) {
LOG_WARN("fail to inner init partition location", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) {
LOG_WARN("fail to get all leader info", KR(ret));
} else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) {
@ -582,6 +590,118 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_
return ret;
}
int ObTableLoadCoordinator::init_empty_tablets()
{
int ret = OB_SUCCESS;
if (OB_FAIL(coordinator_ctx_->empty_insert_tablet_ctx_manager_
->set_thread_count(param_.write_session_count_))) {
LOG_WARN("fail to set thread count", KR(ret), K(param_.write_session_count_));
}
for(int64_t i = 0; OB_SUCC(ret) && i < param_.write_session_count_; ++i) {
ObTableLoadTask *task = nullptr;
if (OB_FAIL(ctx_->alloc_task(task))) {
LOG_WARN("fail to alloc task", KR(ret));
} else if (OB_FAIL(task->set_processor<InitEmptyTabletTaskProcessor>(ctx_))) {
LOG_WARN("fail to set check begin result task processor", KR(ret));
} else if (OB_FAIL(task->set_callback<InitEmptyTabletTaskCallback>(ctx_))) {
LOG_WARN("fail to set check begin result task callback", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->task_scheduler_->add_task(i, task))) {
LOG_WARN("fail to add task", KR(ret), KPC(task));
}
if (OB_FAIL(ret)) {
if (nullptr != task) {
ctx_->free_task(task);
}
}
}
return ret;
}
class ObTableLoadCoordinator::InitEmptyTabletTaskProcessor : public ObITableLoadTaskProcessor
{
public:
InitEmptyTabletTaskProcessor(ObTableLoadTask &task,
ObTableLoadTableCtx *ctx)
: ObITableLoadTaskProcessor(task),
ctx_(ctx),
empty_tablet_manager_(ctx->coordinator_ctx_->empty_insert_tablet_ctx_manager_)
{
ctx_->inc_ref_count();
}
virtual ~InitEmptyTabletTaskProcessor()
{
ObTableLoadService::put_ctx(ctx_);
}
int process() override {
int ret = OB_SUCCESS;
bool is_finish = false;
if (OB_ISNULL(ctx_) || OB_ISNULL(empty_tablet_manager_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ctx or empty_tablet_manager is nullptr", KR(ret));
}
while (OB_SUCC(ret)) {
ObDirectLoadControlInitEmptyTabletsArg arg;
arg.table_id_ = ctx_->param_.table_id_;
arg.ddl_param_ = ctx_->ddl_param_;
ObAddr addr;
if (OB_FAIL(ctx_->coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) {
LOG_WARN("fail to check status", KR(ret));
} else if (OB_FAIL(empty_tablet_manager_->get_next_task(addr,
arg.partition_id_array_,
arg.target_partition_id_array_))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("fail to get next init empty partition task", KR(ret));
}
} else {
TABLE_LOAD_CONTROL_RPC_CALL(init_empty_tablets, addr, arg);
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(empty_tablet_manager_->handle_thread_finish(is_finish))) {
LOG_WARN("fail to handle thread finish", KR(ret));
} else if (is_finish) {
ObTableLoadCoordinator coordinator(ctx_);
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.add_check_begin_result_task())) {
LOG_WARN("fail to add check begin result task", KR(ret));
}
}
}
return ret;
}
private:
ObTableLoadTableCtx * const ctx_;
ObTableLoadEmptyInsertTabletCtxManager * const empty_tablet_manager_;
};
class ObTableLoadCoordinator::InitEmptyTabletTaskCallback : public ObITableLoadTaskCallback
{
public:
InitEmptyTabletTaskCallback(ObTableLoadTableCtx *ctx)
: ctx_(ctx)
{
ctx_->inc_ref_count();
}
virtual ~InitEmptyTabletTaskCallback()
{
ObTableLoadService::put_ctx(ctx_);
}
void callback(int ret_code, ObTableLoadTask *task) override
{
int ret = OB_SUCCESS;
if (OB_FAIL(ret_code)) {
ctx_->coordinator_ctx_->set_status_error(ret);
}
ctx_->free_task(task);
}
private:
ObTableLoadTableCtx * const ctx_;
};
int ObTableLoadCoordinator::confirm_begin_peers()
{
int ret = OB_SUCCESS;
@ -632,8 +752,14 @@ int ObTableLoadCoordinator::begin()
LOG_WARN("fail to confirm begin peers", KR(ret));
} else {
coordinator_ctx_->set_enable_heart_beat(true);
if (OB_FAIL(add_check_begin_result_task())) {
LOG_WARN("fail to add check begin result task", KR(ret));
if (OB_NOT_NULL(coordinator_ctx_->empty_insert_tablet_ctx_manager_)) {
if (OB_FAIL(init_empty_tablets())) {
LOG_WARN("fail to init empty partition", KR(ret));
}
} else {
if (OB_FAIL(add_check_begin_result_task())) {
LOG_WARN("fail to add check begin result task", KR(ret));
}
}
}
}

View File

@ -49,6 +49,7 @@ public:
static bool is_ctx_inited(ObTableLoadTableCtx *ctx);
static int init_ctx(ObTableLoadTableCtx *ctx,
const common::ObIArray<uint64_t> &column_ids,
const common::ObIArray<ObTabletID> &tablet_ids,
ObTableLoadExecCtx *exec_ctx);
static void abort_ctx(ObTableLoadTableCtx *ctx);
int init();
@ -76,6 +77,10 @@ private:
int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics,
table::ObTableLoadDmlStat &dml_stats);
int heart_beat_peer();
private:
int init_empty_tablets();
class InitEmptyTabletTaskProcessor;
class InitEmptyTabletTaskCallback;
private:
int add_check_begin_result_task();
int check_peers_begin_result(bool &is_finish);

View File

@ -20,6 +20,7 @@
#include "observer/ob_server_event_history_table_operator.h"
#include "share/ob_autoincrement_service.h"
#include "share/sequence/ob_sequence_cache.h"
#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h"
namespace oceanbase
{
@ -39,6 +40,7 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx)
task_scheduler_(nullptr),
exec_ctx_(nullptr),
error_row_handler_(nullptr),
empty_insert_tablet_ctx_manager_(nullptr),
sequence_schema_(&allocator_),
last_trans_gid_(1024),
next_session_id_(0),
@ -57,43 +59,8 @@ ObTableLoadCoordinatorCtx::~ObTableLoadCoordinatorCtx()
destroy();
}
int ObTableLoadCoordinatorCtx::init_partition_location()
{
int ret = OB_SUCCESS;
int retry = 0;
bool flag = false;
while (retry < 3 && OB_SUCC(ret)) {
partition_location_.reset();
target_partition_location_.reset();
// init partition_location_
if (OB_FAIL(partition_location_.init(ctx_->param_.tenant_id_, ctx_->schema_.partition_ids_))) {
LOG_WARN("fail to init partition location", KR(ret));
} else if (OB_FAIL(target_partition_location_.init(ctx_->param_.tenant_id_, target_schema_.partition_ids_))) {
LOG_WARN("fail to init origin partition location", KR(ret));
} else if (OB_FAIL(partition_location_.check_tablet_has_same_leader(target_partition_location_, flag))) {
LOG_WARN("fail to check_tablet_has_same_leader", KR(ret));
}
if (OB_SUCC(ret)) {
if (flag) {
break;
} else {
LOG_WARN("invalid leader info, maybe change master");
}
}
retry ++;
}
if (OB_SUCC(ret)) {
if (!flag) {
ret = OB_EAGAIN;
LOG_WARN("invalid leader info", KR(ret));
}
}
return ret;
}
int ObTableLoadCoordinatorCtx::init(const ObIArray<uint64_t> &column_ids,
const ObIArray<ObTabletID> &tablet_ids,
ObTableLoadExecCtx *exec_ctx)
{
int ret = OB_SUCCESS;
@ -116,7 +83,7 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<uint64_t> &column_ids,
}
// init partition_calc_
else if (OB_FAIL(
partition_calc_.init(ctx_->param_, ctx_->session_info_))) {
partition_calc_.init(ctx_->param_, ctx_->session_info_, tablet_ids))) {
LOG_WARN("fail to init partition calc", KR(ret));
}
// init trans_allocator_
@ -149,6 +116,16 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<uint64_t> &column_ids,
else if (ctx_->schema_.has_identity_column_ && OB_FAIL(init_sequence())) {
LOG_WARN("fail to init sequence", KR(ret));
}
// init partition ids
else if (OB_FAIL(init_partition_ids(tablet_ids))) {
LOG_WARN("fail to init partition ids", KR(ret));
}
// init empty_insert_tablet_ctx_manager_
else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)
&& !empty_partition_ids_.empty()
&& OB_FAIL(init_empty_insert_tablet_ctx_manager())) {
LOG_WARN("fail to init empty insert tablet ctx manager", KR(ret));
}
if (OB_SUCC(ret)) {
exec_ctx_ = exec_ctx;
is_inited_ = true;
@ -204,6 +181,10 @@ void ObTableLoadCoordinatorCtx::destroy()
trans_ctx_map_.reuse();
segment_ctx_map_.reset();
commited_trans_ctx_array_.reset();
if (nullptr != empty_insert_tablet_ctx_manager_) {
allocator_.free(empty_insert_tablet_ctx_manager_);
empty_insert_tablet_ctx_manager_ = nullptr;
}
}
int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status)
@ -526,6 +507,110 @@ int ObTableLoadCoordinatorCtx::init_session_ctx_array()
return ret;
}
int ObTableLoadCoordinatorCtx::init_partition_ids(const ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *origin_table_schema = nullptr;
const ObTableSchema *target_table_schema = nullptr;
ObArray<ObTabletID> all_origin_tablet_ids, all_target_tablet_ids;
ObArray<ObObjectID> all_origin_part_ids, all_target_part_ids;
ObTableLoadPartitionId origin_id, target_id;
ObHashSet<ObTabletID> tablet_ids_set;
if (OB_FAIL(ObTableLoadSchema::get_schema_guard(ctx_->param_.tenant_id_, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(schema_guard,
ctx_->param_.tenant_id_,
ctx_->param_.table_id_,
origin_table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(ctx_->param_.tenant_id_), K(ctx_->param_.table_id_));
} else if (OB_ISNULL(origin_table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is nullptr", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(schema_guard,
ctx_->param_.tenant_id_,
ctx_->ddl_param_.dest_table_id_,
target_table_schema))) {
LOG_WARN("fail to get target schema", KR(ret),
K(ctx_->param_.tenant_id_),
K(ctx_->ddl_param_.dest_table_id_));
} else if (OB_ISNULL(target_table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("target schema is nullptr", KR(ret));
} else if (OB_FAIL(origin_table_schema->get_all_tablet_and_object_ids(all_origin_tablet_ids,
all_origin_part_ids))) {
LOG_WARN("fail to get all origin tablet ids and part ids", KR(ret));
} else if (OB_FAIL(target_table_schema->get_all_tablet_and_object_ids(all_target_tablet_ids,
all_target_part_ids))) {
LOG_WARN("fail to get all target tablet ids and part ids", KR(ret));
} else if (tablet_ids.empty()) {
for (int64_t i = 0; OB_SUCC(ret) && i < all_origin_tablet_ids.count(); ++i) {
origin_id.partition_id_ = all_origin_part_ids.at(i);
origin_id.tablet_id_ = all_origin_tablet_ids.at(i);
target_id.partition_id_ = all_target_part_ids.at(i);
target_id.tablet_id_ = all_target_tablet_ids.at(i);
if (OB_FAIL(partition_ids_.push_back(origin_id))) {
LOG_WARN("fail to push back origin id", KR(ret));
} else if (OB_FAIL(target_partition_ids_.push_back(target_id))) {
LOG_WARN("fail to push back target id", KR(ret));
}
}
} else if (OB_FAIL(tablet_ids_set.create(tablet_ids.count(),
ObMemAttr(MTL_ID(), "TLD_TABLETID")))) {
LOG_WARN("fail to create tablet ids set", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
if (OB_FAIL(tablet_ids_set.set_refactored(tablet_ids.at(i)))) {
LOG_WARN("fail to set refactored", KR(ret), K(tablet_ids.at(i)));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < all_origin_tablet_ids.count(); ++i) {
origin_id.partition_id_ = all_origin_part_ids.at(i);
origin_id.tablet_id_ = all_origin_tablet_ids.at(i);
target_id.partition_id_ = all_target_part_ids.at(i);
target_id.tablet_id_ = all_target_tablet_ids.at(i);
ret = tablet_ids_set.exist_refactored(all_origin_tablet_ids.at(i));
// non_empty partition
if (OB_HASH_EXIST == ret) {
if (OB_FAIL(partition_ids_.push_back(origin_id))) {
LOG_WARN("fail to push origin id", KR(ret));
} else if (OB_FAIL(target_partition_ids_.push_back(target_id))) {
LOG_WARN("fail to push target id", KR(ret));
}
}
// empty partition
else if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(empty_partition_ids_.push_back(origin_id))) {
LOG_WARN("fail to push empty origin id", KR(ret));
} else if (OB_FAIL(empty_target_partition_ids_.push_back(target_id))) {
LOG_WARN("fail to push empty target id", KR(ret));
}
} else {
LOG_WARN("fail to search tablet ids set", KR(ret));
}
}
}
return ret;
}
int ObTableLoadCoordinatorCtx::init_empty_insert_tablet_ctx_manager()
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(empty_insert_tablet_ctx_manager_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty_insert_tablet_ctx_manager is not nullptr", KR(ret));
} else if (OB_ISNULL(empty_insert_tablet_ctx_manager_
= OB_NEWx(ObTableLoadEmptyInsertTabletCtxManager,
(&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new empty_insert_tablet_ctx_manager", KR(ret));
} else if (OB_FAIL(empty_insert_tablet_ctx_manager_->init(empty_partition_ids_,
empty_target_partition_ids_))) {
LOG_WARN("fail to init empty_insert_tablet_ctx_manager", KR(ret));
}
return ret;
}
int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_id,
ObTableLoadCoordinatorTrans *&trans)
{

View File

@ -37,13 +37,16 @@ class ObTableLoadTransCtx;
class ObTableLoadCoordinatorTrans;
class ObITableLoadTaskScheduler;
class ObTableLoadErrorRowHandler;
class ObTableLoadEmptyInsertTabletCtxManager;
class ObTableLoadCoordinatorCtx
{
public:
ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx);
~ObTableLoadCoordinatorCtx();
int init(const common::ObIArray<uint64_t> &column_ids, ObTableLoadExecCtx *exec_ctx);
int init(const common::ObIArray<uint64_t> &column_ids,
const common::ObIArray<ObTabletID> &tablet_ids,
ObTableLoadExecCtx *exec_ctx);
void stop();
void destroy();
bool is_valid() const { return is_inited_; }
@ -114,7 +117,6 @@ public:
common::ObIAllocator &allocator) const;
int check_exist_trans(bool &is_exist) const;
int check_exist_committed_trans(bool &is_exist) const;
int init_partition_location();
int init_complete();
private:
int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx);
@ -125,10 +127,16 @@ private:
int generate_autoinc_params(share::AutoincParam &autoinc_param);
int init_sequence();
void add_to_all_server_event();
int init_partition_ids(const ObIArray<ObTabletID> &tablet_ids);
int init_empty_insert_tablet_ctx_manager();
public:
ObTableLoadTableCtx * const ctx_;
common::ObArenaAllocator allocator_;
ObTableLoadSchema target_schema_;
common::ObArray<table::ObTableLoadPartitionId> partition_ids_;
common::ObArray<table::ObTableLoadPartitionId> target_partition_ids_;
common::ObArray<table::ObTableLoadPartitionId> empty_partition_ids_;
common::ObArray<table::ObTableLoadPartitionId> empty_target_partition_ids_;
ObTableLoadPartitionLocation partition_location_;
ObTableLoadPartitionLocation target_partition_location_;
ObTableLoadPartitionCalc partition_calc_;
@ -137,6 +145,7 @@ public:
ObTableLoadExecCtx *exec_ctx_;
table::ObTableLoadResultInfo result_info_;
ObTableLoadErrorRowHandler *error_row_handler_;
ObTableLoadEmptyInsertTabletCtxManager *empty_insert_tablet_ctx_manager_;
share::schema::ObSequenceSchema sequence_schema_;
struct SessionContext
{

View File

@ -0,0 +1,179 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "share/table/ob_table_load_define.h"
#include "storage/direct_load/ob_direct_load_insert_table_ctx.h"
namespace oceanbase
{
namespace observer
{
using namespace table;
ObTableLoadEmptyInsertTabletCtxManager::ObTableLoadEmptyInsertTabletCtxManager()
: thread_count_(0),
idx_(0),
start_(0),
is_inited_(false)
{
}
ObTableLoadEmptyInsertTabletCtxManager::~ObTableLoadEmptyInsertTabletCtxManager()
{
}
int ObTableLoadEmptyInsertTabletCtxManager::init(
const ObIArray<ObTableLoadPartitionId> &partition_ids,
const ObIArray<ObTableLoadPartitionId> &target_partition_ids)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("empty insert tablet ctx manager init twice", KR(ret));
} else if (!target_partition_ids.empty()) {
if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(partition_ids,
target_partition_ids,
partition_location_,
target_partition_location_))) {
LOG_WARN("fail to init partition location", KR(ret));
} else if (OB_FAIL(partition_location_.get_all_leader_info(all_leader_info_array_))) {
LOG_WARN("fail to get all origin leader info", KR(ret));
} else if (OB_FAIL(target_partition_location_.get_all_leader_info(target_all_leader_info_array_))) {
LOG_WARN("fail to get all target leader info", KR(ret));
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
return ret;
}
int ObTableLoadEmptyInsertTabletCtxManager::get_next_task(
ObAddr &addr,
ObIArray<table::ObTableLoadLSIdAndPartitionId> &partition_ids,
ObIArray<table::ObTableLoadLSIdAndPartitionId> &target_partition_ids)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("empty insert tablet ctx manager is not init", KR(ret));
} else {
ObMutexGuard guard(op_lock_);
if (all_leader_info_array_.count() == idx_) {
ret = OB_ITER_END;
} else {
const LeaderInfo &leader_info = all_leader_info_array_.at(idx_);
const LeaderInfo &target_leader_info = target_all_leader_info_array_.at(idx_);
addr = target_leader_info.addr_;
for (; OB_SUCC(ret) && start_ < target_leader_info.partition_id_array_.count()
&& partition_ids.count() < TABLET_COUNT_PER_TASK; ++start_) {
if (OB_FAIL(partition_ids.push_back(leader_info.partition_id_array_.at(start_)))) {
LOG_WARN("fail to push back partition ids", KR(ret));
} else if (OB_FAIL(target_partition_ids.push_back(target_leader_info.partition_id_array_.at(start_)))) {
LOG_WARN("fail to push back target partition ids", KR(ret));
}
}
if (target_leader_info.partition_id_array_.count() == start_) {
start_ = 0;
++idx_;
}
}
}
return ret;
}
int ObTableLoadEmptyInsertTabletCtxManager::set_thread_count(const int64_t thread_count)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("empty insert tablet ctx manager is not init", KR(ret));
} else if (thread_count <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("thread_count is invalid argument", KR(ret), K(thread_count));
} else {
thread_count_ = thread_count;
}
return ret;
}
int ObTableLoadEmptyInsertTabletCtxManager::handle_thread_finish(bool &is_finish)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("empty insert tablet ctx is not init", KR(ret));
} else if (thread_count_ <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("thread count is invalid", KR(ret), K(thread_count_));
} else {
is_finish = 0 == ATOMIC_AAF(&thread_count_, -1);
}
return ret;
}
int ObTableLoadEmptyInsertTabletCtxManager::execute(
const uint64_t &table_id,
const ObTableLoadDDLParam &ddl_param,
const ObIArray<table::ObTableLoadLSIdAndPartitionId> &ls_part_ids,
const ObIArray<table::ObTableLoadLSIdAndPartitionId> &target_ls_part_ids)
{
int ret = OB_SUCCESS;
ObTableLoadSchema table_load_schema;
ObDirectLoadInsertTableParam insert_table_param;
ObDirectLoadInsertTableContext tmp_insert_table_ctx;
if (OB_FAIL(table_load_schema.init(MTL_ID(), table_id))) {
LOG_WARN("fail to init table load schema", KR(ret));
}
insert_table_param.table_id_ = table_id;
insert_table_param.schema_version_ = ddl_param.schema_version_;
insert_table_param.snapshot_version_ = ddl_param.snapshot_version_;
insert_table_param.ddl_task_id_ = ddl_param.task_id_;
insert_table_param.data_version_ = ddl_param.data_version_;
insert_table_param.parallel_ = 1;
insert_table_param.reserved_parallel_ = 0;
insert_table_param.rowkey_column_count_ = table_load_schema.rowkey_column_count_;
insert_table_param.column_count_ = table_load_schema.store_column_count_;
insert_table_param.lob_column_count_ = table_load_schema.lob_column_idxs_.count();
insert_table_param.is_partitioned_table_ = table_load_schema.is_partitioned_table_;
insert_table_param.is_heap_table_ = table_load_schema.is_heap_table_;
insert_table_param.is_column_store_ = table_load_schema.is_column_store_;
insert_table_param.online_opt_stat_gather_ = false;
insert_table_param.is_incremental_ = false;
insert_table_param.datum_utils_ = &(table_load_schema.datum_utils_);
insert_table_param.col_descs_ = &(table_load_schema.column_descs_);
insert_table_param.cmp_funcs_ = &(table_load_schema.cmp_funcs_);
insert_table_param.online_sample_percent_ = 1.0;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(tmp_insert_table_ctx.init(insert_table_param,
ls_part_ids,
target_ls_part_ids))) {
LOG_WARN("fail to init tmp insert table ctx", KR(ret));
}
FOREACH_X(it, tmp_insert_table_ctx.get_tablet_ctx_map(), OB_SUCC(ret)) {
if (OB_FAIL(it->second->open())) {
LOG_WARN("fail to open tablet ctx", KR(ret));
} else if (OB_FAIL(it->second->close())) {
LOG_WARN("fail to close tablet ctx", KR(ret));
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -0,0 +1,66 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_TABLE_LOAD_EMPTY_INSERT_TABLET_CTX_MANAGER_H_
#define _OB_TABLE_LOAD_EMPTY_INSERT_TABLET_CTX_MANAGER_H_
#include "lib/container/ob_iarray.h"
#include "observer/table_load/ob_table_load_partition_location.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
namespace oceanbase
{
namespace table
{
struct ObTableLoadPartitionId;
} // namespace table
namespace storage
{
class ObDirectLoadInsertTableParam;
} // namespace storage
namespace observer
{
class ObTableLoadEmptyInsertTabletCtxManager
{
using LeaderInfo = ObTableLoadPartitionLocation::LeaderInfo;
static const int64_t TABLET_COUNT_PER_TASK = 20;
public:
ObTableLoadEmptyInsertTabletCtxManager();
~ObTableLoadEmptyInsertTabletCtxManager();
int init(
const common::ObIArray<table::ObTableLoadPartitionId> &partition_ids,
const common::ObIArray<table::ObTableLoadPartitionId> &target_partition_ids);
int get_next_task(ObAddr &addr,
ObIArray<table::ObTableLoadLSIdAndPartitionId> &partition_ids,
ObIArray<table::ObTableLoadLSIdAndPartitionId> &target_partition_ids);
int set_thread_count(const int64_t thread_count);
int handle_thread_finish(bool &is_finish);
static int execute(const uint64_t &table_id,
const ObTableLoadDDLParam &ddl_param,
const ObIArray<table::ObTableLoadLSIdAndPartitionId> &ls_part_ids,
const ObIArray<table::ObTableLoadLSIdAndPartitionId> &target_ls_part_ids);
private:
ObTableLoadPartitionLocation partition_location_;
ObTableLoadPartitionLocation target_partition_location_;
table::ObTableLoadArray<LeaderInfo> all_leader_info_array_;
table::ObTableLoadArray<LeaderInfo> target_all_leader_info_array_;
int64_t thread_count_ CACHE_ALIGNED;
lib::ObMutex op_lock_;
int64_t idx_;
int64_t start_;
bool is_inited_;
};
} // namespace observer
} // namespace oceanbase
#endif // _OB_TABLE_LOAD_EMPTY_INSERT_TABLET_CTX_MANAGER_H_

View File

@ -115,7 +115,7 @@ int ObTableLoadInstance::init(ObTableLoadParam &param,
LOG_WARN("fail to check support direct load", KR(ret), K(param));
}
// start direct load
else if (OB_FAIL(start_direct_load(param, column_ids))) {
else if (OB_FAIL(start_direct_load(param, column_ids, tablet_ids))) {
LOG_WARN("fail to start direct load", KR(ret));
}
// init succ
@ -490,7 +490,8 @@ int ObTableLoadInstance::abort_redef_table()
}
int ObTableLoadInstance::start_direct_load(const ObTableLoadParam &param,
const ObIArray<uint64_t> &column_ids)
const ObIArray<uint64_t> &column_ids,
const ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
@ -502,7 +503,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam &param,
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, ObString::make_string(""), execute_ctx_->exec_ctx_))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) {
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, tablet_ids, execute_ctx_))) {
LOG_WARN("fail to coordinator init ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadService::add_ctx(table_ctx))) {
LOG_WARN("fail to add ctx", KR(ret));

View File

@ -70,7 +70,9 @@ private:
int abort_redef_table();
private:
// direct load
int start_direct_load(const ObTableLoadParam &param, const common::ObIArray<uint64_t> &column_ids);
int start_direct_load(const ObTableLoadParam &param,
const common::ObIArray<uint64_t> &column_ids,
const common::ObIArray<ObTabletID> &tablet_ids);
int wait_begin_finish();
int end_direct_load(const bool commit);
int add_tx_result_to_user_session();

View File

@ -45,7 +45,9 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
allocator_.set_tenant_id(MTL_ID());
}
int ObTableLoadPartitionCalc::init(const ObTableLoadParam &param, sql::ObSQLSessionInfo *session_info)
int ObTableLoadPartitionCalc::init(const ObTableLoadParam &param,
sql::ObSQLSessionInfo *session_info,
const ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -81,6 +83,17 @@ int ObTableLoadPartitionCalc::init(const ObTableLoadParam &param, sql::ObSQLSess
// 获取part_key_obj_index_
else if (OB_FAIL(init_part_key_index(table_schema, allocator_))) {
LOG_WARN("fail to get rowkey index", KR(ret));
} else if (ObDirectLoadLevel::PARTITION == param.load_level_) {
ObMemAttr attr(MTL_ID(), "TLD_TABLETID");
if (OB_FAIL(tablet_ids_set_.create(1024, attr))) {
LOG_WARN("fail to init tablet ids set", KR(ret));
} else {
for (uint64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
if (OB_FAIL(tablet_ids_set_.set_refactored(tablet_ids.at(i)))) {
LOG_WARN("fail to set tablet id", KR(ret));
}
}
}
}
}
if (OB_SUCC(ret)) {
@ -213,9 +226,24 @@ int ObTableLoadPartitionCalc::get_partition_by_row(
LOG_WARN("invalid args", K(part_ids.count()), K(tablet_ids.count()));
}
for (int i = 0; OB_SUCC(ret) && i < part_rows.count(); i++) {
if (OB_FAIL(
partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) {
LOG_WARN("fail to push partition id", KR(ret));
if (ObDirectLoadLevel::PARTITION == param_->load_level_) {
ret = tablet_ids_set_.exist_refactored(tablet_ids.at(i));
if (OB_LIKELY(OB_HASH_EXIST == ret)) {
if (OB_FAIL(partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) {
LOG_WARN("fail to push partition id", KR(ret), K(part_ids.at(i)), K(tablet_ids.at(i)));
}
} else if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(partition_ids.push_back(ObTableLoadPartitionId()))) {
LOG_WARN("fail to push empty partition id", KR(ret));
}
} else {
LOG_WARN("fail to search tablet ids set", KR(ret));
}
} else {
if (OB_FAIL(
partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) {
LOG_WARN("fail to push partition id", KR(ret));
}
}
}
return ret;

View File

@ -34,7 +34,9 @@ class ObTableLoadPartitionCalc
{
public:
ObTableLoadPartitionCalc();
int init(const ObTableLoadParam &param, sql::ObSQLSessionInfo *session_info);
int init(const ObTableLoadParam &param,
sql::ObSQLSessionInfo *session_info,
const ObIArray<ObTabletID> &tablet_ids);
int get_part_key(const table::ObTableLoadObjRow &row, common::ObNewRow &part_key) const;
int cast_part_key(common::ObNewRow &part_key, common::ObIAllocator &allocator) const;
int get_partition_by_row(common::ObIArray<common::ObNewRow> &part_rows,
@ -70,6 +72,7 @@ private:
sql::ObExecContext exec_ctx_;
sql::ObTableLocation table_location_;
ObSchemaGetterGuard schema_guard_;
common::hash::ObHashSet<ObTabletID> tablet_ids_set_; // only for load_level == PARTITION
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(ObTableLoadPartitionCalc);
};

View File

@ -51,6 +51,46 @@ int ObTableLoadPartitionLocation::check_tablet_has_same_leader(const ObTableLoad
return ret;
}
int ObTableLoadPartitionLocation::init_partition_location(
const ObIArray<ObTableLoadPartitionId> &partition_ids,
const ObIArray<ObTableLoadPartitionId> &target_partition_ids,
ObTableLoadPartitionLocation &partition_location,
ObTableLoadPartitionLocation &target_partition_location)
{
int ret = OB_SUCCESS;
int retry = 0;
bool flag = false;
while (retry < 3 && OB_SUCC(ret)) {
partition_location.reset();
target_partition_location.reset();
// init partition_location_
if (OB_FAIL(partition_location.init(MTL_ID(), partition_ids))) {
LOG_WARN("fail to init partition location", KR(ret));
} else if (OB_FAIL(target_partition_location.init(MTL_ID(), target_partition_ids))) {
LOG_WARN("fail to init origin partition location", KR(ret));
} else if (OB_FAIL(partition_location.check_tablet_has_same_leader(target_partition_location, flag))) {
LOG_WARN("fail to check_tablet_has_same_leader", KR(ret));
}
if (OB_SUCC(ret)) {
if (flag) {
break;
} else {
LOG_WARN("invalid leader info, maybe change master");
}
}
retry ++;
}
if (OB_SUCC(ret)) {
if (!flag) {
ret = OB_EAGAIN;
LOG_WARN("invalid leader info", KR(ret));
}
}
return ret;
}
int ObTableLoadPartitionLocation::fetch_ls_id(uint64_t tenant_id, const ObTabletID &tablet_id,
ObLSID &ls_id)
{
@ -87,14 +127,14 @@ int ObTableLoadPartitionLocation::fetch_ls_location(uint64_t tenant_id, const Ob
}
int ObTableLoadPartitionLocation::fetch_ls_locations(uint64_t tenant_id,
const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids)
const ObIArray<ObTableLoadPartitionId> &partition_ids)
{
int ret = OB_SUCCESS;
ObArray<ObLSID> ls_ids;
ls_ids.set_tenant_id(MTL_ID());
for (int64_t i = 0; OB_SUCC(ret) && (i < partition_ids.count()); ++i) {
const ObTabletID &tablet_id = partition_ids[i].tablet_id_;
const ObTabletID &tablet_id = partition_ids.at(i).tablet_id_;
if (OB_FAIL(tablet_ids_.push_back(tablet_id))) {
LOG_WARN("failed to push back tablet_id", K(tablet_id), K(i));
}
@ -207,7 +247,7 @@ int ObTableLoadPartitionLocation::fetch_tablet_handle(uint64_t tenant_id,
}
int ObTableLoadPartitionLocation::init(
uint64_t tenant_id, const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids)
uint64_t tenant_id, const ObIArray<ObTableLoadPartitionId> &partition_ids)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -231,7 +271,7 @@ int ObTableLoadPartitionLocation::init(
}
int ObTableLoadPartitionLocation::init_all_partition_location(
uint64_t tenant_id, const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids)
uint64_t tenant_id, const ObIArray<ObTableLoadPartitionId> &partition_ids)
{
int ret = OB_SUCCESS;
if (OB_FAIL(fetch_ls_locations(tenant_id, partition_ids))) {

View File

@ -62,7 +62,7 @@ public:
tablet_ids_.set_tenant_id(MTL_ID());
}
int init(uint64_t tenant_id,
const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids);
const common::ObIArray<table::ObTableLoadPartitionId> &partition_ids);
int get_leader(common::ObTabletID tablet_id, PartitionLocationInfo &info) const;
int get_all_leader(table::ObTableLoadArray<common::ObAddr> &addr_array) const;
int get_all_leader_info(table::ObTableLoadArray<LeaderInfo> &info_array) const;
@ -77,6 +77,10 @@ public:
int check_tablet_has_same_leader(const ObTableLoadPartitionLocation &other, bool &result);
public:
static int init_partition_location(const ObIArray<table::ObTableLoadPartitionId> &partition_ids,
const ObIArray<table::ObTableLoadPartitionId> &target_partition_ids,
ObTableLoadPartitionLocation &partition_location,
ObTableLoadPartitionLocation &target_partition_location);
// 通过tablet_id获取
static int fetch_ls_id(uint64_t tenant_id, const common::ObTabletID &tablet_id,
share::ObLSID &ls_id);
@ -91,11 +95,11 @@ public:
storage::ObTabletHandle &tablet_handle);
private:
int init_all_partition_location(
uint64_t tenant_id, const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids);
uint64_t tenant_id, const common::ObIArray<table::ObTableLoadPartitionId> &partition_ids);
int init_all_leader_info();
int fetch_ls_locations(
uint64_t tenant_id,
const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids);
const common::ObIArray<table::ObTableLoadPartitionId> &partition_ids);
private:
common::ObArenaAllocator allocator_;
common::ObArray<common::ObTabletID> tablet_ids_; //保证遍历partition_map_的时候顺序不变

View File

@ -453,6 +453,124 @@ int ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(const uint64_t
return ret;
}
int ObTableLoadSchema::get_tablet_ids_by_part_ids(const ObTableSchema *table_schema,
const ObIArray<ObObjectID> &part_ids,
ObIArray<ObTabletID> &tablet_ids)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table schema is nullptr", KR(ret));
} else {
for (uint64_t i = 0; OB_SUCC(ret) && i < part_ids.count(); ++i) {
ObObjectID part_id = part_ids.at(i);
ObTabletID tmp_tablet_id;
if (OB_FAIL(table_schema->get_tablet_id_by_object_id(part_id, tmp_tablet_id))) {
LOG_WARN("fail to get tablet ids by part object id", KR(ret), K(part_id));
} else if (OB_FAIL(tablet_ids.push_back(tmp_tablet_id))) {
LOG_WARN("fail to push tablet id", KR(ret), K(tmp_tablet_id));
}
}
}
return ret;
}
int ObTableLoadSchema::check_has_identity_column(const ObTableSchema *table_schema,
bool &has_identity_column)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is nullptr", KR(ret));
} else if (has_identity_column) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("has_identity_column is invalid", KR(ret));
} else {
ObArray<ObObjectID> column_ids;
if (OB_FAIL(table_schema->get_column_ids(column_ids))) {
LOG_WARN("failed to get column ids", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && !has_identity_column && (i < column_ids.count()); ++i) {
const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(column_ids.at(i));
if (OB_ISNULL(col_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null column schema", KR(ret));
} else if (col_schema->is_identity_column()) {
has_identity_column = true;
}
}
}
}
return ret;
}
int ObTableLoadSchema::check_support_partition_exchange(const ObTableSchema *table_schema,
bool &is_support)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is nullptr", KR(ret));
} else if (!is_support) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("is_support is not invalid", KR(ret), K(is_support));
} else {
ObPartitionFuncType part_type = ObPartitionFuncType::PARTITION_FUNC_TYPE_MAX;
switch (table_schema->get_part_level()) {
case ObPartitionLevel::PARTITION_LEVEL_ONE:
part_type = table_schema->get_part_option().get_part_func_type();
break;
case ObPartitionLevel::PARTITION_LEVEL_TWO:
part_type = table_schema->get_sub_part_option().get_part_func_type();
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected partition level", KR(ret), K(table_schema->get_part_level()));
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(!((PARTITION_FUNC_TYPE_RANGE == part_type)
|| (PARTITION_FUNC_TYPE_RANGE_COLUMNS == part_type)
|| (PARTITION_FUNC_TYPE_LIST == part_type)
|| (PARTITION_FUNC_TYPE_LIST_COLUMNS == part_type)))) {
is_support = false;
}
}
return ret;
}
int ObTableLoadSchema::check_has_global_index(ObSchemaGetterGuard &schema_guard,
const ObTableSchema *table_schema,
bool &is_have)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is nullptr", KR(ret));
} else if (is_have) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("is_have is invalid", KR(ret), K(is_have));
} else {
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos))) {
LOG_WARN("failed to get simple_index_infos", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && !is_have && (i < simple_index_infos.count()); ++i) {
const ObTableSchema *index_schema = NULL;
if (OB_FAIL(schema_guard.get_table_schema(
table_schema->get_tenant_id(), simple_index_infos.at(i).table_id_, index_schema))) {
LOG_WARN("failed to get table schema", KR(ret));
} else if (OB_ISNULL(index_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("index schema from schema guard is NULL", KR(ret), KP(index_schema));
} else if (index_schema->is_global_index_table()) {
is_have = true;
}
}
}
}
return ret;
}
ObTableLoadSchema::ObTableLoadSchema()
: allocator_("TLD_Schema"),
is_partitioned_table_(false),
@ -503,7 +621,6 @@ void ObTableLoadSchema::reset()
lob_meta_column_descs_.reset();
lob_meta_datum_utils_.reset();
cmp_funcs_.reset();
partition_ids_.reset();
allocator_.reset();
is_inited_ = false;
}
@ -577,13 +694,6 @@ int ObTableLoadSchema::init_table_schema(const ObTableSchema *table_schema)
part_ids.set_tenant_id(MTL_ID());
if (OB_FAIL(table_schema->get_all_tablet_and_object_ids(tablet_ids, part_ids))) {
LOG_WARN("fail to get all tablet ids", KR(ret));
} else if (OB_FAIL(partition_ids_.create(part_ids.count(), allocator_))) {
LOG_WARN("fail to create array", KR(ret));
} else {
for (int64_t i = 0; i < part_ids.count(); ++i) {
partition_ids_[i].partition_id_ = part_ids.at(i);
partition_ids_[i].tablet_id_ = tablet_ids.at(i);
}
}
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {

View File

@ -79,7 +79,14 @@ public:
const share::schema::ObTableSchema *table_schema,
bool &bret);
static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value);
static int get_tablet_ids_by_part_ids(const ObTableSchema *table_schema,
const ObIArray<ObObjectID> &part_ids,
ObIArray<ObTabletID> &tablet_ids);
static int check_has_identity_column(const ObTableSchema *table_schema, bool &has_identity_column);
static int check_support_partition_exchange(const ObTableSchema *table_schema, bool &is_support);
static int check_has_global_index(ObSchemaGetterGuard &schema_guard,
const ObTableSchema *table_schema,
bool &is_have);
public:
ObTableLoadSchema();
~ObTableLoadSchema();
@ -124,7 +131,6 @@ public:
common::ObArray<share::schema::ObColDesc> lob_meta_column_descs_;
blocksstable::ObStorageDatumUtils lob_meta_datum_utils_;
blocksstable::ObStoreCmpFuncs cmp_funcs_; // for sql statistics
table::ObTableLoadArray<table::ObTableLoadPartitionId> partition_ids_;
bool is_inited_;
};

View File

@ -556,7 +556,24 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support table required by materialized view refresh", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "%sdirect-load does not support table required by materialized view refresh", tmp_prefix);
} else if (ObDirectLoadMethod::is_incremental(method)) { // incremental direct-load
}
// check for partition level
else if (ObDirectLoadLevel::PARTITION == load_level
&& OB_FAIL(check_support_direct_load_for_partition_level(schema_guard,
table_schema,
method,
compat_version))) {
LOG_WARN("fail to check support direct load for partition level", KR(ret));
}
// check insert overwrite
else if (ObDirectLoadMode::is_insert_overwrite(load_mode)
&& compat_version < DATA_VERSION_4_3_2_0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("version lower than 4.3.2.0 does not support insert overwrite", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.2.0 does not support insert overwrite");
}
// incremental direct-load
else if (ObDirectLoadMethod::is_incremental(method)) {
if (GCTX.is_shared_storage_mode()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("in share storage mode, using incremental direct-load is not supported", KR(ret));
@ -596,15 +613,17 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
LOG_WARN("incremental direct-load does not support table with check constraints", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "incremental direct-load does not support table with check constraints");
}
} else if (ObDirectLoadMethod::is_full(method)) { // full direct-load
}
// full direct-load
else if (ObDirectLoadMethod::is_full(method)) {
if (OB_UNLIKELY(!ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected insert mode for full direct-load", KR(ret), K(method), K(insert_mode));
} else if (ObDirectLoadMode::is_insert_overwrite(load_mode)) {
if (OB_FAIL(check_support_direct_load_for_insert_overwrite(
schema_guard, *table_schema, load_level, compat_version))) {
LOG_WARN("failed to check support direct load for insert overwrite", KR(ret));
}
} else if (ObDirectLoadInsertMode::OVERWRITE == insert_mode
&& compat_version < DATA_VERSION_4_3_1_0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("version lower than 4.3.1.0 does not support insert overwrite mode", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.1.0 does not support insert overwrite mode");
}
}
// check default column
@ -661,102 +680,61 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu
return ret;
}
int ObTableLoadService::check_support_direct_load_for_insert_overwrite(
int ObTableLoadService::check_support_direct_load_for_partition_level(
ObSchemaGetterGuard &schema_guard,
const ObTableSchema &table_schema,
const ObDirectLoadLevel::Type load_level,
const ObTableSchema *table_schema,
const ObDirectLoadMethod::Type method,
const uint64_t compat_version)
{
int ret = OB_SUCCESS;
if (compat_version < DATA_VERSION_4_3_2_0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("version lower than 4.3.2.0 does not support insert overwrite", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.2.0 does not support insert overwrite");
} else if (table_schema.get_foreign_key_infos().count() > 0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("insert overwrite with incremental direct-load does not support table with foreign keys", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "insert overwrite with direct-load does not support table with foreign keys");
} else if (ObDirectLoadLevel::PARTITION == load_level) {
if (compat_version < DATA_VERSION_4_3_3_0) {
if (ObDirectLoadMethod::is_incremental(method)) {
// do nothing
} else if (ObDirectLoadMethod::is_full(method)) {
bool has_global_index = false;
bool has_identity_column = false;
bool is_support_partition_exchange = true;
if (compat_version < DATA_VERSION_4_3_5_0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("version lower than 4.3.3.0 does not support insert overwrite partition", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.3.0 does not support insert overwrite partition");
} else if (table_schema.is_duplicate_table()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("insert overwrite partition does not support duplicate table", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support duplicate table");
} else if (table_schema.get_index_tid_count() > 0) {
bool has_global_indexes = false;
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
if (OB_FAIL(table_schema.get_simple_index_infos(simple_index_infos))) {
LOG_WARN("failed to get simple_index_infos", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && !has_global_indexes && (i < simple_index_infos.count()); ++i) {
const ObTableSchema *index_schema = NULL;
if (OB_FAIL(schema_guard.get_table_schema(
table_schema.get_tenant_id(), simple_index_infos.at(i).table_id_, index_schema))) {
LOG_WARN("failed to get table schema", KR(ret));
} else if (OB_ISNULL(index_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("index schema from schema guard is NULL", KR(ret), KP(index_schema));
} else if (index_schema->is_global_index_table()) {
has_global_indexes = true;
}
}
}
if (OB_SUCC(ret) && (has_global_indexes)) {
LOG_WARN("version lower than 4.3.5.0 does not support direct load partition", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.5.0 does not support direct load partition");
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is nullptr", KR(ret));
} else {
if (table_schema->is_duplicate_table()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("insert overwrite partition does not support table with global indexes", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support table with global indexes");
}
} else if (0 != table_schema.get_autoinc_column_id()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("insert overwrite partition does not support table with auto_increment columns", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support table with auto_increment columns");
} else if (lib::is_oracle_mode()) {
ObArray<ObObjectID> column_ids;
if (OB_FAIL(table_schema.get_column_ids(column_ids))) {
LOG_WARN("failed to get column ids", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && (i < column_ids.count()); ++i) {
const ObColumnSchemaV2 *col_schema = table_schema.get_column_schema(column_ids.at(i));
if (OB_ISNULL(col_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null column schema", KR(ret));
} else if (col_schema->is_identity_column()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("insert overwrite partition does not support table with identity columns", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support table with identity columns");
}
}
LOG_WARN("partition level direct load not support duplicate table", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support duplicate table");
} else if (0 != table_schema->get_autoinc_column_id()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partition level direct load not support table with auto_increment columns", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support table with auto_increment columns");
} else if (table_schema->get_index_tid_count() > 0
&& OB_FAIL(ObTableLoadSchema::check_has_global_index(schema_guard,
table_schema,
has_global_index))) {
LOG_WARN("fail to check has global index", KR(ret));
} else if (has_global_index) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partition level direct load not support table with global indexes", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support table with global indexes");
} else if (lib::is_oracle_mode() && OB_FAIL(ObTableLoadSchema::check_has_identity_column(
table_schema, has_identity_column))) {
LOG_WARN("fail to check has identity column", KR(ret));
} else if (has_identity_column) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partition level direct load not support table with identity columns", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support table with identity columns");
} else if (OB_FAIL(ObTableLoadSchema::check_support_partition_exchange(table_schema,
is_support_partition_exchange))) {
LOG_WARN("fail to check support partition exchange", KR(ret));
} else if (!is_support_partition_exchange) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partition level direct load not support hash/key partitions", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support hash/key partitions");
}
}
if (OB_SUCC(ret)) { // check partition type
ObPartitionFuncType part_type = PARTITION_FUNC_TYPE_MAX;
switch (table_schema.get_part_level()) {
case ObPartitionLevel::PARTITION_LEVEL_ONE:
part_type = table_schema.get_part_option().get_part_func_type();
break;
case ObPartitionLevel::PARTITION_LEVEL_TWO:
part_type = table_schema.get_sub_part_option().get_part_func_type();
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected partition level", KR(ret), K(table_schema.get_part_level()));
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(!((PARTITION_FUNC_TYPE_RANGE == part_type)
|| (PARTITION_FUNC_TYPE_RANGE_COLUMNS == part_type)
|| (PARTITION_FUNC_TYPE_LIST == part_type)
|| (PARTITION_FUNC_TYPE_LIST_COLUMNS == part_type)))) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("insert overwrite partition does not support hash/key partitions", KR(ret), K(part_type));
FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support hash/key partition type");
}
}
} // end if
}
return ret;
}
@ -775,7 +753,6 @@ int ObTableLoadService::alloc_ctx(ObTableLoadTableCtx *&table_ctx)
}
return ret;
}
void ObTableLoadService::free_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;

View File

@ -59,11 +59,10 @@ public:
const storage::ObDirectLoadMode::Type load_mode,
const storage::ObDirectLoadLevel::Type load_level,
const common::ObIArray<uint64_t> &column_ids);
static int check_support_direct_load_for_insert_overwrite(
share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema &table_schema,
const storage::ObDirectLoadLevel::Type load_level,
const uint64_t compat_version);
static int check_support_direct_load_for_partition_level(ObSchemaGetterGuard &schema_guard,
const ObTableSchema *table_schema,
const ObDirectLoadMethod::Type method,
const uint64_t compat_version);
static int alloc_ctx(ObTableLoadTableCtx *&table_ctx);
static void free_ctx(ObTableLoadTableCtx *table_ctx);

View File

@ -200,6 +200,7 @@ void ObTableLoadTableCtx::unregister_job_stat()
}
int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray<uint64_t> &column_ids,
const ObIArray<ObTabletID> &tablet_ids,
ObTableLoadExecCtx *exec_ctx)
{
int ret = OB_SUCCESS;
@ -214,7 +215,7 @@ int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray<uint64_t> &column_i
if (OB_ISNULL(coordinator_ctx = OB_NEWx(ObTableLoadCoordinatorCtx, (&allocator_), this))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadCoordinatorCtx", KR(ret));
} else if (OB_FAIL(coordinator_ctx->init(column_ids, exec_ctx))) {
} else if (OB_FAIL(coordinator_ctx->init(column_ids, tablet_ids, exec_ctx))) {
LOG_WARN("fail to init coordinator ctx", KR(ret));
} else if (OB_FAIL(coordinator_ctx->set_status_inited())) {
LOG_WARN("fail to set coordinator status inited", KR(ret));

View File

@ -71,6 +71,7 @@ public:
K_(is_inited));
public:
int init_coordinator_ctx(const common::ObIArray<uint64_t> &column_ids,
const common::ObIArray<ObTabletID> &tablet_ids,
ObTableLoadExecCtx *exec_ctx);
int init_store_ctx(
const table::ObTableLoadArray<table::ObTableLoadLSIdAndPartitionId> &partition_id_array,

View File

@ -130,12 +130,12 @@ int ObTableLoadTransBucketWriter::init_session_ctx_array()
session_ctx->session_id_ = i + 1;
if (!is_partitioned_) {
ObTableLoadPartitionLocation::PartitionLocationInfo info;
if (OB_UNLIKELY(1 != coordinator_ctx_->ctx_->schema_.partition_ids_.count())) {
if (OB_UNLIKELY(1 != coordinator_ctx_->partition_ids_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected partition id num in non partitioned table", KR(ret), "count",
coordinator_ctx_->ctx_->schema_.partition_ids_.count());
coordinator_ctx_->partition_ids_.count());
} else if (FALSE_IT(session_ctx->partition_id_ =
coordinator_ctx_->ctx_->schema_.partition_ids_[0])) {
coordinator_ctx_->partition_ids_[0])) {
} else if (OB_FAIL(coordinator_ctx_->partition_location_.get_leader(
session_ctx->partition_id_.tablet_id_, info))) {
LOG_WARN("failed to get leader addr", K(ret));

View File

@ -71,6 +71,7 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam()
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
method_(ObDirectLoadMethod::INVALID_METHOD),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE),
load_level_(ObDirectLoadLevel::INVALID_LEVEL),
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
online_sample_percent_(100.)
{
@ -86,6 +87,7 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const
ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ &&
ObDirectLoadMethod::is_type_valid(method_) &&
ObDirectLoadInsertMode::is_type_valid(insert_mode_) &&
ObDirectLoadLevel::is_type_valid(load_level_) &&
(storage::ObDirectLoadMethod::is_full(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode_)
: true) &&
@ -2347,9 +2349,10 @@ int ObLoadDataDirectImpl::init_execute_param()
} else {
execute_param_.need_sort_ = optimizer_ctx->need_sort_;
execute_param_.max_error_rows_ = optimizer_ctx->max_error_row_count_;
execute_param_.dup_action_ = optimizer_ctx->dup_action_;
execute_param_.method_ = optimizer_ctx->load_method_;
execute_param_.insert_mode_ = optimizer_ctx->insert_mode_;
execute_param_.dup_action_ = optimizer_ctx->dup_action_;
execute_param_.load_level_ = optimizer_ctx->load_level_;
}
}
// parallel_
@ -2466,6 +2469,12 @@ int ObLoadDataDirectImpl::init_execute_param()
LOG_WARN("failed to get sys online sample percent", K(ret));
}
}
// tablet_ids_
if (OB_SUCC(ret) && OB_FAIL(ObTableLoadSchema::get_tablet_ids_by_part_ids(table_schema,
load_stmt_->get_part_ids(),
execute_param_.tablet_ids_))) {
LOG_WARN("fail to get tablet ids", KR(ret));
}
return ret;
}
@ -2493,9 +2502,11 @@ int ObLoadDataDirectImpl::init_execute_context()
load_param.load_mode_ = ObDirectLoadMode::LOAD_DATA;
load_param.compressor_type_ = execute_param_.compressor_type_;
load_param.online_sample_percent_ = execute_param_.online_sample_percent_;
load_param.load_level_ = ObDirectLoadLevel::TABLE;
if (OB_FAIL(
direct_loader_.init(load_param, execute_param_.column_ids_, &execute_ctx_.exec_ctx_))) {
load_param.load_level_ = execute_param_.load_level_;
if (OB_FAIL(direct_loader_.init(load_param,
execute_param_.column_ids_,
execute_param_.tablet_ids_,
&execute_ctx_.exec_ctx_))) {
LOG_WARN("fail to init direct loader", KR(ret));
} else if (OB_FAIL(init_logger())) {
LOG_WARN("fail to init logger", KR(ret));

View File

@ -94,10 +94,12 @@ private:
K_(dup_action),
"method", storage::ObDirectLoadMethod::get_type_string(method_),
"insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_),
K_(load_level),
K_(data_access_param),
K_(column_ids),
K_(compressor_type),
K_(online_sample_percent));
K_(online_sample_percent),
K_(tablet_ids));
public:
uint64_t tenant_id_;
uint64_t database_id_;
@ -116,10 +118,12 @@ private:
sql::ObLoadDupActionType dup_action_;
storage::ObDirectLoadMethod::Type method_;
storage::ObDirectLoadInsertMode::Type insert_mode_;
storage::ObDirectLoadLevel::Type load_level_;
DataAccessParam data_access_param_;
ObArray<uint64_t> column_ids_;
ObCompressorType compressor_type_;
double online_sample_percent_;
ObArray<ObTabletID> tablet_ids_;
};
struct LoadExecuteContext

View File

@ -36,6 +36,7 @@ ObDirectLoadOptimizerCtx::ObDirectLoadOptimizerCtx()
load_method_(ObDirectLoadMethod::INVALID_METHOD),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE),
load_mode_(ObDirectLoadMode::INVALID_MODE),
load_level_(ObDirectLoadLevel::INVALID_LEVEL),
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
max_error_row_count_(0),
need_sort_(false),
@ -51,6 +52,7 @@ void ObDirectLoadOptimizerCtx::reset()
load_method_ = ObDirectLoadMethod::INVALID_METHOD;
insert_mode_ = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
load_mode_ = ObDirectLoadMode::INVALID_MODE;
load_level_ = ObDirectLoadLevel::INVALID_LEVEL;
dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE;
max_error_row_count_ = 0;
need_sort_ = false;
@ -91,9 +93,11 @@ int ObDirectLoadOptimizer::optimize(ObExecContext *exec_ctx, ObLoadDataStmt &stm
direct_load_optimizer_ctx_.table_id_ = stmt.get_load_arguments().table_id_;
direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::LOAD_DATA;
direct_load_optimizer_ctx_.dup_action_ = stmt.get_load_arguments().dupl_action_;
direct_load_optimizer_ctx_.load_level_ = stmt.get_part_ids().empty() ? ObDirectLoadLevel::TABLE
: ObDirectLoadLevel::PARTITION;
if (OB_FAIL(check_semantics())) {
LOG_WARN("fail to check semantics", K(ret));
} else if (OB_FAIL(check_support_direct_load(exec_ctx, ObDirectLoadLevel::TABLE))) {
} else if (OB_FAIL(check_support_direct_load(exec_ctx))) {
LOG_WARN("fail to check support direct load", K(ret));
} else {
direct_load_optimizer_ctx_.dup_action_ = direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE ?
@ -168,7 +172,6 @@ int ObDirectLoadOptimizer::optimize(
// 通过hint而不是默认配置项的方式,不会修改并行度,当并行度小于2时不满足pdml条件,无需走旁路导入检查
// do nothing
} else if (direct_load_optimizer_ctx_.load_method_ != ObDirectLoadMethod::INVALID_METHOD) {
ObDirectLoadLevel::Type load_level = ObDirectLoadLevel::TABLE;
if (session_info->get_ddl_info().is_mview_complete_refresh()) {
if (direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE) {
ret = OB_ERR_UNEXPECTED;
@ -188,15 +191,15 @@ int ObDirectLoadOptimizer::optimize(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null table partition info", K(ret));
} else if (info->get_ref_table_id() == table_id) {
if (info->get_table_location().get_part_hint_ids().count() > 0) {
load_level = ObDirectLoadLevel::PARTITION;
}
direct_load_optimizer_ctx_.load_level_ = info->get_table_location().get_part_hint_ids().empty()
? ObDirectLoadLevel::TABLE
: ObDirectLoadLevel::PARTITION;
break;
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(check_support_direct_load(exec_ctx, load_level))) {
if (OB_FAIL(check_support_direct_load(exec_ctx))) {
LOG_WARN("fail to check support direct load", K(ret));
bool allow_fallback = false;
if (ret == OB_NOT_SUPPORTED && stmt.get_query_ctx()->optimizer_features_enable_version_ >= COMPAT_VERSION_4_3_4) {
@ -326,9 +329,7 @@ int ObDirectLoadOptimizer::check_support_insert_overwrite(const ObGlobalHint &gl
return ret;
}
int ObDirectLoadOptimizer::check_support_direct_load(
ObExecContext *exec_ctx,
storage::ObDirectLoadLevel::Type load_level)
int ObDirectLoadOptimizer::check_support_direct_load(ObExecContext *exec_ctx)
{
int ret = OB_SUCCESS;
ObSqlCtx *sql_ctx = nullptr;
@ -375,7 +376,7 @@ int ObDirectLoadOptimizer::check_support_direct_load(
direct_load_optimizer_ctx_.load_method_,
direct_load_optimizer_ctx_.insert_mode_,
direct_load_optimizer_ctx_.load_mode_,
load_level,
direct_load_optimizer_ctx_.load_level_,
column_ids))) {
LOG_WARN("fail to check support direct load", K(ret), K(direct_load_optimizer_ctx_));
}

View File

@ -47,13 +47,14 @@ public:
bool is_insert_overwrite() const { return ObDirectLoadMode::is_insert_overwrite(load_mode_); }
bool is_insert_into() const { return load_mode_ == ObDirectLoadMode::INSERT_INTO; }
void reset();
TO_STRING_KV(K_(table_id), K_(load_method), K_(insert_mode), K_(load_mode), K_(dup_action),
TO_STRING_KV(K_(table_id), K_(load_method), K_(insert_mode), K_(load_mode), K_(load_level), K_(dup_action),
K_(max_error_row_count), K_(need_sort), K_(can_use_direct_load), K_(use_direct_load), K_(is_optimized));
public:
uint64_t table_id_;
storage::ObDirectLoadMethod::Type load_method_;
storage::ObDirectLoadInsertMode::Type insert_mode_;
storage::ObDirectLoadMode::Type load_mode_;
storage::ObDirectLoadLevel::Type load_level_;
sql::ObLoadDupActionType dup_action_;
int64_t max_error_row_count_;
bool need_sort_;
@ -78,7 +79,7 @@ private:
void enable_by_overwrite();
int check_semantics();
int check_support_insert_overwrite(const ObGlobalHint &global_hint);
int check_support_direct_load(ObExecContext *exec_ctx, storage::ObDirectLoadLevel::Type load_level);
int check_support_direct_load(ObExecContext *exec_ctx);
int check_direct_load_allow_fallback(ObExecContext *exec_ctx, bool &allow_fallback);
private:
ObDirectLoadOptimizerCtx &direct_load_optimizer_ctx_;

View File

@ -4907,8 +4907,7 @@ load_data_with_opt_hint opt_load_local INFILE infile_string opt_duplicate INTO T
relation_factor opt_use_partition opt_compression opt_load_charset field_opt line_opt opt_load_ignore_rows
opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list
{
(void) $9;
malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 13,
malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 14,
$2, /* 0. local */
$4, /* 1. filename */
$5, /* 2. duplicate */
@ -4921,7 +4920,8 @@ opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list
$16, /* 9. set field */
$1, /* 10. hint */
$17, /* 11. extended option list */
$10 /* 12. compression format */
$10, /* 12. compression format */
$9 /* 13. partition */
);
}
;

View File

@ -30,6 +30,7 @@
#include "lib/restore/ob_storage_info.h"
#include "sql/engine/cmd/ob_load_data_file_reader.h"
#include <glob.h>
#include "share/schema/ob_part_mgr_util.h"
namespace oceanbase
{
@ -376,6 +377,16 @@ int ObLoadDataResolver::resolve(const ParseNode &parse_tree)
}
}
if (OB_SUCC(ret)) {
/*13. partition */
const ParseNode *child_node = node->children_[ENUM_OPT_USE_PARTITION];
if (OB_NOT_NULL(child_node)) {
if (OB_FAIL(resolve_partitions(*child_node, *load_stmt))) {
LOG_WARN("fail to resolve partition");
}
}
}
if (OB_SUCC(ret)) {
ObLoadArgument &load_args = load_stmt->get_load_arguments();
const ObDirectLoadHint &direct_load_hint = load_stmt->get_hints().get_direct_load_hint();
@ -1649,6 +1660,56 @@ int ObLoadDataResolver::check_trigger_constraint(const ObTableSchema *table_sche
return ret;
}
int ObLoadDataResolver::resolve_partitions(const ParseNode &node, ObLoadDataStmt &load_stmt)
{
int ret = OB_SUCCESS;
uint64_t table_id = load_stmt.get_load_arguments().table_id_;
const ObTableSchema *table_schema = nullptr;
if (OB_ISNULL(session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is nullptr", KR(ret));
} else if (OB_FAIL(schema_checker_->get_table_schema(session_info_->get_effective_tenant_id(), table_id, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret));
}
OB_ASSERT(1 == node.num_child_ && node.children_[0]->num_child_ > 0);
if (OB_SUCC(ret) && OB_NOT_NULL(node.children_[0]) && T_NAME_LIST == node.children_[0]->type_) {
const ParseNode *name_list = node.children_[0];
ObString partition_name;
ObArray<ObObjectID> part_ids;
for (int i = 0; OB_SUCC(ret) && i < name_list->num_child_; i++) {
ObArray<ObObjectID> partition_ids;
partition_name.assign_ptr(name_list->children_[i]->str_value_,
static_cast<int32_t>(name_list->children_[i]->str_len_));
//here just conver partition_name to its lowercase
ObCharset::casedn(CS_TYPE_UTF8MB4_GENERAL_CI, partition_name);
ObPartGetter part_getter(*table_schema);
if (T_USE_PARTITION == node.type_) {
if (OB_FAIL(part_getter.get_part_ids(partition_name, partition_ids))) {
LOG_WARN("fail to get part ids", K(ret), K(partition_name));
if (OB_UNKNOWN_PARTITION == ret && lib::is_mysql_mode()) {
LOG_USER_ERROR(OB_UNKNOWN_PARTITION, partition_name.length(), partition_name.ptr(),
table_schema->get_table_name_str().length(),
table_schema->get_table_name_str().ptr());
}
}
} else if (OB_FAIL(part_getter.get_subpart_ids(partition_name, partition_ids))) {
LOG_WARN("fail to get subpart ids", K(ret), K(partition_name));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(append_array_no_dup(part_ids, partition_ids))) {
LOG_WARN("Push partition id error", K(ret));
}
}
} // end of for
if (OB_SUCC(ret)) {
if (OB_FAIL(load_stmt.set_part_ids(part_ids))) {
LOG_WARN("fail to set partition ids", KR(ret));
}
}
}
return ret;
}
int ObLoadDataResolver::check_collection_sql_type(const ObTableSchema *table_schema)
{
int ret = OB_SUCCESS;

View File

@ -60,6 +60,7 @@ public:
int resolve_filename(ObLoadDataStmt *load_stmt, ParseNode *node);
int local_infile_enabled(bool &enabled) const;
int resolve_partitions(const ParseNode &node, ObLoadDataStmt &load_stmt);
int check_trigger_constraint(const ObTableSchema *table_schema);
int check_collection_sql_type(const ObTableSchema *table_schema);
@ -81,6 +82,7 @@ private:
ENUM_OPT_HINT,
ENUM_OPT_EXTENDED_OPTIONS,
ENUM_OPT_COMPRESSION,
ENUM_OPT_USE_PARTITION,
ENUM_TOTAL_COUNT
};
ObStmtScope current_scope_;

View File

@ -92,6 +92,11 @@ ColumnItem* ObLoadDataStmt::get_column_item_by_idx(uint64_t column_id) {
return tar_item;
}
int ObLoadDataStmt::set_part_ids(common::ObIArray<ObObjectID> &part_ids)
{
return part_ids_.assign(part_ids);
}
int ObLoadDataHint::get_value(IntHintItem item, int64_t &value) const
{
int ret = OB_SUCCESS;

View File

@ -268,6 +268,8 @@ public:
ObLoadDataHint &get_hints() { return hints_; }
void set_default_table_columns() { is_default_table_columns_ = true; }
bool get_default_table_columns() { return is_default_table_columns_; }
int set_part_ids(common::ObIArray<ObObjectID> &part_ids);
const common::ObIArray<ObObjectID> &get_part_ids() const { return part_ids_; }
void set_optimizer_ctx(ObDirectLoadOptimizerCtx *optimizer_ctx) { optimizer_ctx_ = optimizer_ctx; }
ObDirectLoadOptimizerCtx *get_optimizer_ctx() { return optimizer_ctx_; }
TO_STRING_KV(N_STMT_TYPE, ((int)stmt_type_),
@ -277,7 +279,8 @@ public:
K_(field_or_var_list),
K_(assignments),
K_(hints),
K_(is_default_table_columns));
K_(is_default_table_columns),
K_(part_ids));
private:
ObDirectLoadOptimizerCtx *optimizer_ctx_;
@ -288,6 +291,7 @@ private:
ObAssignments assignments_;
ObLoadDataHint hints_;
bool is_default_table_columns_;
common::ObArray<ObObjectID> part_ids_;
DISALLOW_COPY_AND_ASSIGN(ObLoadDataStmt);
};

View File

@ -35,6 +35,9 @@ struct ObDirectLoadMode
static bool is_type_valid(const Type type);
static bool is_insert_overwrite(const Type type) { return INSERT_OVERWRITE == type; }
static bool is_load_data(const Type type) { return LOAD_DATA == type; }
static bool is_insert_into(const Type type) { return INSERT_INTO == type; }
static bool is_table_load(const Type type) { return TABLE_LOAD == type; }
};
struct ObDirectLoadMethod

View File

@ -0,0 +1,4 @@
1|1|1
11|11|11
21|21|21
31|31|31
1 1 1 1
2 11 11 11
3 21 21 21
4 31 31 31

View File

@ -0,0 +1,4 @@
2|2|2
12|12|12
22|22|22
32|32|32
1 2 2 2
2 12 12 12
3 22 22 22
4 32 32 32

View File

@ -0,0 +1,3 @@
3|3|3
13|13|13
23|23|23
1 3 3 3
2 13 13 13
3 23 23 23

View File

@ -0,0 +1,2 @@
4|4|4
34|34|34
1 4 4 4
2 34 34 34

View File

@ -0,0 +1,4 @@
1|1|1
12|12|12
23|23|23
34|34|34
1 1 1 1
2 12 12 12
3 23 23 23
4 34 34 34