From 0ef02930d02f8e914d8fc25df17e31e88d2af3b2 Mon Sep 17 00:00:00 2001 From: medcll <527998250@qq.com> Date: Fri, 22 Nov 2024 07:15:03 +0000 Subject: [PATCH] [FEAT MERGE]direct load data support partition level load --- src/observer/CMakeLists.txt | 1 + .../ob_table_direct_load_rpc_executor.cpp | 1 + .../ob_table_load_control_rpc_executor.cpp | 30 +++ .../ob_table_load_control_rpc_executor.h | 17 ++ .../ob_table_load_control_rpc_proxy.cpp | 2 + .../control/ob_table_load_control_rpc_proxy.h | 7 + .../ob_table_load_control_rpc_struct.cpp | 7 + .../ob_table_load_control_rpc_struct.h | 20 ++ .../table_load/ob_table_load_client_task.cpp | 61 +++++- .../table_load/ob_table_load_client_task.h | 9 +- .../table_load/ob_table_load_coordinator.cpp | 140 +++++++++++++- .../table_load/ob_table_load_coordinator.h | 5 + .../ob_table_load_coordinator_ctx.cpp | 159 ++++++++++++---- .../ob_table_load_coordinator_ctx.h | 13 +- ...e_load_empty_insert_tablet_ctx_manager.cpp | 179 ++++++++++++++++++ ...ble_load_empty_insert_tablet_ctx_manager.h | 66 +++++++ .../table_load/ob_table_load_instance.cpp | 7 +- .../table_load/ob_table_load_instance.h | 4 +- .../ob_table_load_partition_calc.cpp | 36 +++- .../table_load/ob_table_load_partition_calc.h | 5 +- .../ob_table_load_partition_location.cpp | 48 ++++- .../ob_table_load_partition_location.h | 10 +- .../table_load/ob_table_load_schema.cpp | 126 +++++++++++- .../table_load/ob_table_load_schema.h | 10 +- .../table_load/ob_table_load_service.cpp | 169 +++++++---------- .../table_load/ob_table_load_service.h | 9 +- .../table_load/ob_table_load_table_ctx.cpp | 3 +- .../table_load/ob_table_load_table_ctx.h | 1 + .../ob_table_load_trans_bucket_writer.cpp | 6 +- .../engine/cmd/ob_load_data_direct_impl.cpp | 19 +- src/sql/engine/cmd/ob_load_data_direct_impl.h | 6 +- .../optimizer/ob_direct_load_optimizer.cpp | 21 +- src/sql/optimizer/ob_direct_load_optimizer.h | 5 +- src/sql/parser/sql_parser_mysql_mode.y | 6 +- .../resolver/cmd/ob_load_data_resolver.cpp | 61 ++++++ src/sql/resolver/cmd/ob_load_data_resolver.h | 2 + src/sql/resolver/cmd/ob_load_data_stmt.cpp | 5 + src/sql/resolver/cmd/ob_load_data_stmt.h | 6 +- .../direct_load/ob_direct_load_struct.h | 3 + .../data/partition/partition_data1.csv | 4 + .../data/partition/partition_data2.csv | 4 + .../data/partition/partition_data3.csv | 3 + .../data/partition/partition_data4.csv | 2 + .../data/partition/partition_data5.csv | 4 + 44 files changed, 1095 insertions(+), 207 deletions(-) create mode 100644 src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.cpp create mode 100644 src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h create mode 100644 tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data1.csv create mode 100644 tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data2.csv create mode 100644 tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data3.csv create mode 100644 tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data4.csv create mode 100644 tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data5.csv diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 615fd66a7..cf44e1a78 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -282,6 +282,7 @@ ob_set_subtarget(ob_server table_load table_load/ob_table_load_pre_sorter.cpp table_load/ob_table_load_mem_chunk_manager.cpp table_load/ob_table_load_pre_sort_writer.cpp + table_load/ob_table_load_empty_insert_tablet_ctx_manager.cpp ) ob_set_subtarget(ob_server virtual_table diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index cb8689647..076e14307 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -134,6 +134,7 @@ int ObTableDirectLoadBeginExecutor::init_param(ObTableLoadClientTaskParam ¶m OX(param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_)); OZ(param.set_load_method(arg_.load_method_)); OZ(param.set_column_names(arg_.column_names_)); + OZ(param.set_part_names(arg_.part_names_)); return ret; } diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index 7ec928a45..31b1e3421 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -17,6 +17,7 @@ #include "observer/table_load/ob_table_load_store.h" #include "observer/table_load/ob_table_load_table_ctx.h" #include "sql/engine/ob_des_exec_context.h" +#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h" namespace oceanbase { @@ -687,5 +688,34 @@ int ObDirectLoadControlInsertTransExecutor::process() return ret; } +int ObDirectLoadControlInitEmptyTabletsExecutor::check_args() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_ID == arg_.table_id_ + || 0 == arg_.ddl_param_.task_id_ + || arg_.partition_id_array_.empty() + || arg_.target_partition_id_array_.empty() + || arg_.partition_id_array_.count() != arg_.target_partition_id_array_.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(arg_)); + } + return OB_SUCCESS; +} + +int ObDirectLoadControlInitEmptyTabletsExecutor::process() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ObTableLoadService::check_tenant())) { + LOG_WARN("fail to check tenant", KR(ret)); + } else if (OB_FAIL(ObTableLoadEmptyInsertTabletCtxManager::execute( + arg_.table_id_, + arg_.ddl_param_, + arg_.partition_id_array_, + arg_.target_partition_id_array_))) { + LOG_WARN("fail to execute init empty tablet", KR(ret)); + } + return ret; +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.h b/src/observer/table_load/control/ob_table_load_control_rpc_executor.h index f443410e1..b27ced44b 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.h @@ -362,5 +362,22 @@ protected: int process() override; }; +class ObDirectLoadControlInitEmptyTabletsExecutor + : public ObTableLoadControlRpcExecutor +{ + typedef ObTableLoadControlRpcExecutor ParentType; +public: + ObDirectLoadControlInitEmptyTabletsExecutor(common::ObIAllocator &allocator, + const ObDirectLoadControlRequest &request, + ObDirectLoadControlResult &result) + : ParentType(allocator, request, result) + { + } + virtual ~ObDirectLoadControlInitEmptyTabletsExecutor() = default; +protected: + int check_args() override; + int process() override; +}; + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.cpp index d03608fb7..caa6f8e1c 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.cpp @@ -47,6 +47,8 @@ int ObTableLoadControlRpcProxy::dispatch(const ObDirectLoadControlRequest &reque OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::ABANDON_TRANS); OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::GET_TRANS_STATUS); OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::INSERT_TRANS); + // init empty tablets + OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::INIT_EMPTY_TABLETS); default: ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "unexpected command type", K(ret), K(request)); diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h index 32aefb830..987c7670d 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h @@ -36,6 +36,7 @@ class ObDirectLoadControlConfirmFinishTransExecutor; class ObDirectLoadControlAbandonTransExecutor; class ObDirectLoadControlGetTransStatusExecutor; class ObDirectLoadControlInsertTransExecutor; +class ObDirectLoadControlInitEmptyTabletsExecutor; class ObTableLoadControlRpcProxy { @@ -216,6 +217,12 @@ public: ObDirectLoadControlInsertTransExecutor, ObDirectLoadControlInsertTransArg); + // init empty tablet + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, init_empty_tablets, + ObDirectLoadControlCommandType::INIT_EMPTY_TABLETS, + ObDirectLoadControlInitEmptyTabletsExecutor, + ObDirectLoadControlInitEmptyTabletsArg); + private: obrpc::ObSrvRpcProxy &rpc_proxy_; ObArenaAllocator allocator_; diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp index 6f12a8584..cc85ebf42 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp @@ -312,5 +312,12 @@ OB_SERIALIZE_MEMBER(ObDirectLoadControlInsertTransArg, sequence_no_, payload_); +// init empty tablets +OB_SERIALIZE_MEMBER(ObDirectLoadControlInitEmptyTabletsArg, + table_id_, + ddl_param_, + partition_id_array_, + target_partition_id_array_); + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h index 4fd828913..fbdedab20 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h @@ -467,5 +467,25 @@ public: ObString payload_; //里面包的是ObTableLoadObjArray }; +class ObDirectLoadControlInitEmptyTabletsArg final +{ + OB_UNIS_VERSION(1); +public: + ObDirectLoadControlInitEmptyTabletsArg() + : table_id_(common::OB_INVALID_ID) + { + } + ~ObDirectLoadControlInitEmptyTabletsArg() {} + TO_STRING_KV(K_(table_id), + K_(ddl_param), + K_(partition_id_array), + K_(target_partition_id_array)); +public: + uint64_t table_id_; + ObTableLoadDDLParam ddl_param_; + ObSArray partition_id_array_; // origin table + ObSArray target_partition_id_array_; // target table +}; + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 55da9f252..3bb4284f1 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -22,6 +22,7 @@ #include "observer/table_load/ob_table_load_task_scheduler.h" #include "observer/table_load/ob_table_load_utils.h" #include "share/stat/ob_dbms_stats_utils.h" +#include "share/schema/ob_part_mgr_util.h" namespace oceanbase { @@ -50,10 +51,12 @@ ObTableLoadClientTaskParam::ObTableLoadClientTaskParam() timeout_us_(0), heartbeat_timeout_us_(0), load_method_(), - column_names_() + column_names_(), + part_names_() { allocator_.set_tenant_id(MTL_ID()); column_names_.set_block_allocator(ModulePageAllocator(allocator_)); + part_names_.set_block_allocator(ModulePageAllocator(allocator_)); } ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {} @@ -73,6 +76,7 @@ void ObTableLoadClientTaskParam::reset() heartbeat_timeout_us_ = 0; load_method_.reset(); column_names_.reset(); + part_names_.reset(); allocator_.reset(); } @@ -97,6 +101,8 @@ int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other) LOG_WARN("fail to set load method", KR(ret)); } else if (OB_FAIL(set_column_names(other.column_names_))) { LOG_WARN("fail to deep copy column names", KR(ret)); + } else if (OB_FAIL(set_part_names(other.part_names_))) { + LOG_WARN("fail to deep copy part names", KR(ret)); } } return ret; @@ -159,6 +165,7 @@ public: ObSchemaGetterGuard &schema_guard = client_task_->schema_guard_; ObTableLoadParam load_param; ObArray column_ids; + ObArray tablet_ids; THIS_WORKER.set_session(session_info); if (OB_ISNULL(session_info)) { ret = OB_ERR_UNEXPECTED; @@ -172,7 +179,7 @@ public: } // resolve if (OB_FAIL( - resolve(client_task_->client_exec_ctx_, client_task_->param_, load_param, column_ids))) { + resolve(client_task_->client_exec_ctx_, client_task_->param_, load_param, column_ids, tablet_ids))) { LOG_WARN("fail to resolve", KR(ret), K(client_task_->param_)); } // check support @@ -187,7 +194,7 @@ public: } // begin if (OB_SUCC(ret)) { - if (OB_FAIL(client_task_->init_instance(load_param, column_ids))) { + if (OB_FAIL(client_task_->init_instance(load_param, column_ids, tablet_ids))) { LOG_WARN("fail to init instance", KR(ret)); } else if (OB_FAIL(client_task_->set_status_waitting())) { LOG_WARN("fail to set status waitting", KR(ret)); @@ -228,7 +235,7 @@ public: static int resolve(ObTableLoadClientExecCtx &client_exec_ctx, const ObTableLoadClientTaskParam &task_param, ObTableLoadParam &load_param, - ObIArray &column_ids) + ObIArray &column_ids, ObIArray &tablet_ids) { int ret = OB_SUCCESS; const uint64_t tenant_id = task_param.get_tenant_id(); @@ -276,6 +283,8 @@ public: *(client_exec_ctx.exec_ctx_), tenant_id, table_schema->get_table_id(), online_sample_percent))) { LOG_WARN("failed to get sys online sample percent", K(ret)); + } else if (OB_FAIL(resolve_part_names(table_schema, task_param.get_part_names(), tablet_ids))) { + LOG_WARN("fail to resolve part name", KR(ret)); } if (OB_SUCC(ret)) { load_param.tenant_id_ = tenant_id; @@ -299,7 +308,8 @@ public: load_param.load_mode_ = ObDirectLoadMode::TABLE_LOAD; load_param.compressor_type_ = compressor_type; load_param.online_sample_percent_ = online_sample_percent; - load_param.load_level_ = ObDirectLoadLevel::TABLE; + load_param.load_level_ = tablet_ids.empty() ? ObDirectLoadLevel::TABLE + : ObDirectLoadLevel::PARTITION; } return ret; } @@ -359,6 +369,42 @@ public: return ret; } + static int resolve_part_names(const ObTableSchema *table_schema, + const ObIArray &part_names, + ObIArray &tablet_ids) + { + int ret = OB_SUCCESS; + ObArray part_ids; + uint64_t table_id = OB_INVALID_ID; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schems is nullptr", KR(ret)); + } else { + table_id = table_schema->get_table_id(); + for (int i = 0; OB_SUCC(ret) && i < part_names.count(); i++) { + ObArray partition_ids; + ObString &partition_name = const_cast(part_names.at(i)); + //here just conver partition_name to its lowercase + ObCharset::casedn(CS_TYPE_UTF8MB4_GENERAL_CI, partition_name); + ObPartGetter part_getter(*table_schema); + if (OB_FAIL(part_getter.get_part_ids(partition_name, partition_ids))) { + LOG_WARN("fail to get part ids", K(ret), K(partition_name)); + if (OB_UNKNOWN_PARTITION == ret && lib::is_mysql_mode()) { + LOG_USER_ERROR(OB_UNKNOWN_PARTITION, partition_name.length(), partition_name.ptr(), + table_schema->get_table_name_str().length(), + table_schema->get_table_name_str().ptr()); + } + } else if (OB_FAIL(append_array_no_dup(part_ids, partition_ids))) { + LOG_WARN("Push partition id error", K(ret)); + } + } // end of for + if (OB_SUCC(ret) && OB_FAIL(ObTableLoadSchema::get_tablet_ids_by_part_ids(table_schema, part_ids, tablet_ids))) { + LOG_WARN("fail to get tablet ids", KR(ret)); + } + } + return ret; + } + static int resolve_load_method(const ObString &load_method_str, ObDirectLoadMethod::Type &method, ObDirectLoadInsertMode::Type &insert_mode) { @@ -784,11 +830,12 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status, } int ObTableLoadClientTask::init_instance(ObTableLoadParam &load_param, - const ObIArray &column_ids) + const ObIArray &column_ids, + const ObIArray &tablet_ids) { int ret = OB_SUCCESS; const ObTableLoadTableCtx *tmp_ctx = nullptr; - if (OB_FAIL(instance_.init(load_param, column_ids, &client_exec_ctx_))) { + if (OB_FAIL(instance_.init(load_param, column_ids, tablet_ids, &client_exec_ctx_))) { LOG_WARN("fail to init instance", KR(ret)); } else if (OB_FAIL(instance_.start_trans(trans_ctx_, ObTableLoadInstance::DEFAULT_SEGMENT_ID, allocator_))) { diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index 2470b29ab..f11427424 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -64,6 +64,7 @@ public: DEFINE_VAR_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us); DEFINE_STR_GETTER_AND_SETTER(ObString, load_method); DEFINE_STR_ARRAY_GETTER_AND_SETTER(ObString, column_names); + DEFINE_STR_ARRAY_GETTER_AND_SETTER(ObString, part_names); #undef DEFINE_VAR_GETTER_AND_SETTER #undef DEFINE_STR_GETTER_AND_SETTER @@ -80,7 +81,8 @@ public: K_(timeout_us), K_(heartbeat_timeout_us), K_(load_method), - K_(column_names)); + K_(column_names), + K_(part_names)); private: int set_string(const ObString &src, ObString &dest); @@ -101,6 +103,7 @@ private: int64_t heartbeat_timeout_us_; ObString load_method_; common::ObArray column_names_; + common::ObArray part_names_; }; class ObTableLoadClientTask : public common::ObDLinkBase @@ -156,7 +159,9 @@ private: int advance_status(const table::ObTableLoadClientStatus expected, const table::ObTableLoadClientStatus updated); - int init_instance(ObTableLoadParam &load_param, const ObIArray &column_ids); + int init_instance(ObTableLoadParam &load_param, + const ObIArray &column_ids, + const ObIArray &tablet_ids); int commit_instance(); void destroy_instance(); diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index fc16156b4..ffb7630b0 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -34,6 +34,7 @@ #include "observer/table_load/ob_table_load_index_long_wait.h" #include "observer/omt/ob_tenant.h" #include "storage/direct_load/ob_direct_load_mem_define.h" +#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h" namespace oceanbase { @@ -82,13 +83,14 @@ bool ObTableLoadCoordinator::is_ctx_inited(ObTableLoadTableCtx *ctx) int ObTableLoadCoordinator::init_ctx(ObTableLoadTableCtx *ctx, const ObIArray &column_ids, + const ObIArray &tablet_ids, ObTableLoadExecCtx *exec_ctx) { int ret = OB_SUCCESS; if (OB_ISNULL(ctx)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid agrs", KR(ret)); - } else if (OB_FAIL(ctx->init_coordinator_ctx(column_ids, exec_ctx))) { + } else if (OB_FAIL(ctx->init_coordinator_ctx(column_ids, tablet_ids, exec_ctx))) { LOG_WARN("fail to init coordinator ctx", KR(ret)); } return ret; @@ -288,8 +290,11 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar } else if (cluster_version < CLUSTER_VERSION_4_2_2_0 || (cluster_version >= CLUSTER_VERSION_4_3_0_0 && cluster_version < CLUSTER_VERSION_4_3_1_0)) { // not support resource manage - if (OB_FAIL(coordinator_ctx_->init_partition_location())) { - LOG_WARN("fail to init partition location", KR(ret)); + if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(coordinator_ctx_->partition_ids_, + coordinator_ctx_->target_partition_ids_, + coordinator_ctx_->partition_location_, + coordinator_ctx_->target_partition_location_))) { + LOG_WARN("fail to inner init partition location", KR(ret)); } else { ctx_->param_.session_count_ = MAX(MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2), MIN_THREAD_COUNT); ctx_->param_.write_session_count_ = ctx_->param_.session_count_; @@ -310,8 +315,11 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar LOG_WARN("fail to check status", KR(ret)); } else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) { LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->init_partition_location())) { - LOG_WARN("fail to init partition location", KR(ret)); + } else if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(coordinator_ctx_->partition_ids_, + coordinator_ctx_->target_partition_ids_, + coordinator_ctx_->partition_location_, + coordinator_ctx_->target_partition_location_))) { + LOG_WARN("fail to inner init partition location", KR(ret)); } else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) { LOG_WARN("fail to get all leader info", KR(ret)); } else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) { @@ -582,6 +590,118 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_ return ret; } +int ObTableLoadCoordinator::init_empty_tablets() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(coordinator_ctx_->empty_insert_tablet_ctx_manager_ + ->set_thread_count(param_.write_session_count_))) { + LOG_WARN("fail to set thread count", KR(ret), K(param_.write_session_count_)); + } + for(int64_t i = 0; OB_SUCC(ret) && i < param_.write_session_count_; ++i) { + ObTableLoadTask *task = nullptr; + if (OB_FAIL(ctx_->alloc_task(task))) { + LOG_WARN("fail to alloc task", KR(ret)); + } else if (OB_FAIL(task->set_processor(ctx_))) { + LOG_WARN("fail to set check begin result task processor", KR(ret)); + } else if (OB_FAIL(task->set_callback(ctx_))) { + LOG_WARN("fail to set check begin result task callback", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->task_scheduler_->add_task(i, task))) { + LOG_WARN("fail to add task", KR(ret), KPC(task)); + } + if (OB_FAIL(ret)) { + if (nullptr != task) { + ctx_->free_task(task); + } + } + } + return ret; +} + +class ObTableLoadCoordinator::InitEmptyTabletTaskProcessor : public ObITableLoadTaskProcessor +{ +public: + InitEmptyTabletTaskProcessor(ObTableLoadTask &task, + ObTableLoadTableCtx *ctx) + : ObITableLoadTaskProcessor(task), + ctx_(ctx), + empty_tablet_manager_(ctx->coordinator_ctx_->empty_insert_tablet_ctx_manager_) + { + ctx_->inc_ref_count(); + } + virtual ~InitEmptyTabletTaskProcessor() + { + ObTableLoadService::put_ctx(ctx_); + } + int process() override { + int ret = OB_SUCCESS; + bool is_finish = false; + if (OB_ISNULL(ctx_) || OB_ISNULL(empty_tablet_manager_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("ctx or empty_tablet_manager is nullptr", KR(ret)); + } + while (OB_SUCC(ret)) { + ObDirectLoadControlInitEmptyTabletsArg arg; + arg.table_id_ = ctx_->param_.table_id_; + arg.ddl_param_ = ctx_->ddl_param_; + ObAddr addr; + if (OB_FAIL(ctx_->coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) { + LOG_WARN("fail to check status", KR(ret)); + } else if (OB_FAIL(empty_tablet_manager_->get_next_task(addr, + arg.partition_id_array_, + arg.target_partition_id_array_))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next init empty partition task", KR(ret)); + } + } else { + TABLE_LOAD_CONTROL_RPC_CALL(init_empty_tablets, addr, arg); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(empty_tablet_manager_->handle_thread_finish(is_finish))) { + LOG_WARN("fail to handle thread finish", KR(ret)); + } else if (is_finish) { + ObTableLoadCoordinator coordinator(ctx_); + if (OB_FAIL(coordinator.init())) { + LOG_WARN("fail to init coordinator", KR(ret)); + } else if (OB_FAIL(coordinator.add_check_begin_result_task())) { + LOG_WARN("fail to add check begin result task", KR(ret)); + } + } + } + return ret; + } +private: + ObTableLoadTableCtx * const ctx_; + ObTableLoadEmptyInsertTabletCtxManager * const empty_tablet_manager_; +}; + +class ObTableLoadCoordinator::InitEmptyTabletTaskCallback : public ObITableLoadTaskCallback +{ +public: + InitEmptyTabletTaskCallback(ObTableLoadTableCtx *ctx) + : ctx_(ctx) + { + ctx_->inc_ref_count(); + } + virtual ~InitEmptyTabletTaskCallback() + { + ObTableLoadService::put_ctx(ctx_); + } + void callback(int ret_code, ObTableLoadTask *task) override + { + int ret = OB_SUCCESS; + if (OB_FAIL(ret_code)) { + ctx_->coordinator_ctx_->set_status_error(ret); + } + ctx_->free_task(task); + } +private: + ObTableLoadTableCtx * const ctx_; +}; + int ObTableLoadCoordinator::confirm_begin_peers() { int ret = OB_SUCCESS; @@ -632,8 +752,14 @@ int ObTableLoadCoordinator::begin() LOG_WARN("fail to confirm begin peers", KR(ret)); } else { coordinator_ctx_->set_enable_heart_beat(true); - if (OB_FAIL(add_check_begin_result_task())) { - LOG_WARN("fail to add check begin result task", KR(ret)); + if (OB_NOT_NULL(coordinator_ctx_->empty_insert_tablet_ctx_manager_)) { + if (OB_FAIL(init_empty_tablets())) { + LOG_WARN("fail to init empty partition", KR(ret)); + } + } else { + if (OB_FAIL(add_check_begin_result_task())) { + LOG_WARN("fail to add check begin result task", KR(ret)); + } } } } diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index 2a04e929a..b7ae35a8f 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -49,6 +49,7 @@ public: static bool is_ctx_inited(ObTableLoadTableCtx *ctx); static int init_ctx(ObTableLoadTableCtx *ctx, const common::ObIArray &column_ids, + const common::ObIArray &tablet_ids, ObTableLoadExecCtx *exec_ctx); static void abort_ctx(ObTableLoadTableCtx *ctx); int init(); @@ -76,6 +77,10 @@ private: int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics, table::ObTableLoadDmlStat &dml_stats); int heart_beat_peer(); +private: + int init_empty_tablets(); + class InitEmptyTabletTaskProcessor; + class InitEmptyTabletTaskCallback; private: int add_check_begin_result_task(); int check_peers_begin_result(bool &is_finish); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index 6cfd97d18..384537a25 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -20,6 +20,7 @@ #include "observer/ob_server_event_history_table_operator.h" #include "share/ob_autoincrement_service.h" #include "share/sequence/ob_sequence_cache.h" +#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h" namespace oceanbase { @@ -39,6 +40,7 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx) task_scheduler_(nullptr), exec_ctx_(nullptr), error_row_handler_(nullptr), + empty_insert_tablet_ctx_manager_(nullptr), sequence_schema_(&allocator_), last_trans_gid_(1024), next_session_id_(0), @@ -57,43 +59,8 @@ ObTableLoadCoordinatorCtx::~ObTableLoadCoordinatorCtx() destroy(); } -int ObTableLoadCoordinatorCtx::init_partition_location() -{ - int ret = OB_SUCCESS; - int retry = 0; - bool flag = false; - while (retry < 3 && OB_SUCC(ret)) { - partition_location_.reset(); - target_partition_location_.reset(); - // init partition_location_ - if (OB_FAIL(partition_location_.init(ctx_->param_.tenant_id_, ctx_->schema_.partition_ids_))) { - LOG_WARN("fail to init partition location", KR(ret)); - } else if (OB_FAIL(target_partition_location_.init(ctx_->param_.tenant_id_, target_schema_.partition_ids_))) { - LOG_WARN("fail to init origin partition location", KR(ret)); - } else if (OB_FAIL(partition_location_.check_tablet_has_same_leader(target_partition_location_, flag))) { - LOG_WARN("fail to check_tablet_has_same_leader", KR(ret)); - } - if (OB_SUCC(ret)) { - if (flag) { - break; - } else { - LOG_WARN("invalid leader info, maybe change master"); - } - } - retry ++; - } - - if (OB_SUCC(ret)) { - if (!flag) { - ret = OB_EAGAIN; - LOG_WARN("invalid leader info", KR(ret)); - } - } - - return ret; -} - int ObTableLoadCoordinatorCtx::init(const ObIArray &column_ids, + const ObIArray &tablet_ids, ObTableLoadExecCtx *exec_ctx) { int ret = OB_SUCCESS; @@ -116,7 +83,7 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &column_ids, } // init partition_calc_ else if (OB_FAIL( - partition_calc_.init(ctx_->param_, ctx_->session_info_))) { + partition_calc_.init(ctx_->param_, ctx_->session_info_, tablet_ids))) { LOG_WARN("fail to init partition calc", KR(ret)); } // init trans_allocator_ @@ -149,6 +116,16 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &column_ids, else if (ctx_->schema_.has_identity_column_ && OB_FAIL(init_sequence())) { LOG_WARN("fail to init sequence", KR(ret)); } + // init partition ids + else if (OB_FAIL(init_partition_ids(tablet_ids))) { + LOG_WARN("fail to init partition ids", KR(ret)); + } + // init empty_insert_tablet_ctx_manager_ + else if (ObDirectLoadMethod::is_full(ctx_->param_.method_) + && !empty_partition_ids_.empty() + && OB_FAIL(init_empty_insert_tablet_ctx_manager())) { + LOG_WARN("fail to init empty insert tablet ctx manager", KR(ret)); + } if (OB_SUCC(ret)) { exec_ctx_ = exec_ctx; is_inited_ = true; @@ -204,6 +181,10 @@ void ObTableLoadCoordinatorCtx::destroy() trans_ctx_map_.reuse(); segment_ctx_map_.reset(); commited_trans_ctx_array_.reset(); + if (nullptr != empty_insert_tablet_ctx_manager_) { + allocator_.free(empty_insert_tablet_ctx_manager_); + empty_insert_tablet_ctx_manager_ = nullptr; + } } int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status) @@ -526,6 +507,110 @@ int ObTableLoadCoordinatorCtx::init_session_ctx_array() return ret; } +int ObTableLoadCoordinatorCtx::init_partition_ids(const ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *origin_table_schema = nullptr; + const ObTableSchema *target_table_schema = nullptr; + ObArray all_origin_tablet_ids, all_target_tablet_ids; + ObArray all_origin_part_ids, all_target_part_ids; + ObTableLoadPartitionId origin_id, target_id; + ObHashSet tablet_ids_set; + if (OB_FAIL(ObTableLoadSchema::get_schema_guard(ctx_->param_.tenant_id_, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(schema_guard, + ctx_->param_.tenant_id_, + ctx_->param_.table_id_, + origin_table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(ctx_->param_.tenant_id_), K(ctx_->param_.table_id_)); + } else if (OB_ISNULL(origin_table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schema is nullptr", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(schema_guard, + ctx_->param_.tenant_id_, + ctx_->ddl_param_.dest_table_id_, + target_table_schema))) { + LOG_WARN("fail to get target schema", KR(ret), + K(ctx_->param_.tenant_id_), + K(ctx_->ddl_param_.dest_table_id_)); + } else if (OB_ISNULL(target_table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("target schema is nullptr", KR(ret)); + } else if (OB_FAIL(origin_table_schema->get_all_tablet_and_object_ids(all_origin_tablet_ids, + all_origin_part_ids))) { + LOG_WARN("fail to get all origin tablet ids and part ids", KR(ret)); + } else if (OB_FAIL(target_table_schema->get_all_tablet_and_object_ids(all_target_tablet_ids, + all_target_part_ids))) { + LOG_WARN("fail to get all target tablet ids and part ids", KR(ret)); + } else if (tablet_ids.empty()) { + for (int64_t i = 0; OB_SUCC(ret) && i < all_origin_tablet_ids.count(); ++i) { + origin_id.partition_id_ = all_origin_part_ids.at(i); + origin_id.tablet_id_ = all_origin_tablet_ids.at(i); + target_id.partition_id_ = all_target_part_ids.at(i); + target_id.tablet_id_ = all_target_tablet_ids.at(i); + if (OB_FAIL(partition_ids_.push_back(origin_id))) { + LOG_WARN("fail to push back origin id", KR(ret)); + } else if (OB_FAIL(target_partition_ids_.push_back(target_id))) { + LOG_WARN("fail to push back target id", KR(ret)); + } + } + } else if (OB_FAIL(tablet_ids_set.create(tablet_ids.count(), + ObMemAttr(MTL_ID(), "TLD_TABLETID")))) { + LOG_WARN("fail to create tablet ids set", KR(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { + if (OB_FAIL(tablet_ids_set.set_refactored(tablet_ids.at(i)))) { + LOG_WARN("fail to set refactored", KR(ret), K(tablet_ids.at(i))); + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < all_origin_tablet_ids.count(); ++i) { + origin_id.partition_id_ = all_origin_part_ids.at(i); + origin_id.tablet_id_ = all_origin_tablet_ids.at(i); + target_id.partition_id_ = all_target_part_ids.at(i); + target_id.tablet_id_ = all_target_tablet_ids.at(i); + ret = tablet_ids_set.exist_refactored(all_origin_tablet_ids.at(i)); + // non_empty partition + if (OB_HASH_EXIST == ret) { + if (OB_FAIL(partition_ids_.push_back(origin_id))) { + LOG_WARN("fail to push origin id", KR(ret)); + } else if (OB_FAIL(target_partition_ids_.push_back(target_id))) { + LOG_WARN("fail to push target id", KR(ret)); + } + } + // empty partition + else if (OB_HASH_NOT_EXIST == ret) { + if (OB_FAIL(empty_partition_ids_.push_back(origin_id))) { + LOG_WARN("fail to push empty origin id", KR(ret)); + } else if (OB_FAIL(empty_target_partition_ids_.push_back(target_id))) { + LOG_WARN("fail to push empty target id", KR(ret)); + } + } else { + LOG_WARN("fail to search tablet ids set", KR(ret)); + } + } + } + return ret; +} + +int ObTableLoadCoordinatorCtx::init_empty_insert_tablet_ctx_manager() +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(empty_insert_tablet_ctx_manager_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("empty_insert_tablet_ctx_manager is not nullptr", KR(ret)); + } else if (OB_ISNULL(empty_insert_tablet_ctx_manager_ + = OB_NEWx(ObTableLoadEmptyInsertTabletCtxManager, + (&allocator_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new empty_insert_tablet_ctx_manager", KR(ret)); + } else if (OB_FAIL(empty_insert_tablet_ctx_manager_->init(empty_partition_ids_, + empty_target_partition_ids_))) { + LOG_WARN("fail to init empty_insert_tablet_ctx_manager", KR(ret)); + } + return ret; +} + int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_id, ObTableLoadCoordinatorTrans *&trans) { diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index 35575f802..66c056557 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -37,13 +37,16 @@ class ObTableLoadTransCtx; class ObTableLoadCoordinatorTrans; class ObITableLoadTaskScheduler; class ObTableLoadErrorRowHandler; +class ObTableLoadEmptyInsertTabletCtxManager; class ObTableLoadCoordinatorCtx { public: ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx); ~ObTableLoadCoordinatorCtx(); - int init(const common::ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx); + int init(const common::ObIArray &column_ids, + const common::ObIArray &tablet_ids, + ObTableLoadExecCtx *exec_ctx); void stop(); void destroy(); bool is_valid() const { return is_inited_; } @@ -114,7 +117,6 @@ public: common::ObIAllocator &allocator) const; int check_exist_trans(bool &is_exist) const; int check_exist_committed_trans(bool &is_exist) const; - int init_partition_location(); int init_complete(); private: int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx); @@ -125,10 +127,16 @@ private: int generate_autoinc_params(share::AutoincParam &autoinc_param); int init_sequence(); void add_to_all_server_event(); + int init_partition_ids(const ObIArray &tablet_ids); + int init_empty_insert_tablet_ctx_manager(); public: ObTableLoadTableCtx * const ctx_; common::ObArenaAllocator allocator_; ObTableLoadSchema target_schema_; + common::ObArray partition_ids_; + common::ObArray target_partition_ids_; + common::ObArray empty_partition_ids_; + common::ObArray empty_target_partition_ids_; ObTableLoadPartitionLocation partition_location_; ObTableLoadPartitionLocation target_partition_location_; ObTableLoadPartitionCalc partition_calc_; @@ -137,6 +145,7 @@ public: ObTableLoadExecCtx *exec_ctx_; table::ObTableLoadResultInfo result_info_; ObTableLoadErrorRowHandler *error_row_handler_; + ObTableLoadEmptyInsertTabletCtxManager *empty_insert_tablet_ctx_manager_; share::schema::ObSequenceSchema sequence_schema_; struct SessionContext { diff --git a/src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.cpp b/src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.cpp new file mode 100644 index 000000000..0d65d7a3e --- /dev/null +++ b/src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.cpp @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER + +#include "observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h" +#include "observer/table_load/ob_table_load_coordinator_ctx.h" +#include "observer/table_load/ob_table_load_coordinator.h" +#include "share/table/ob_table_load_define.h" +#include "storage/direct_load/ob_direct_load_insert_table_ctx.h" + +namespace oceanbase +{ +namespace observer +{ +using namespace table; + +ObTableLoadEmptyInsertTabletCtxManager::ObTableLoadEmptyInsertTabletCtxManager() + : thread_count_(0), + idx_(0), + start_(0), + is_inited_(false) +{ +} + +ObTableLoadEmptyInsertTabletCtxManager::~ObTableLoadEmptyInsertTabletCtxManager() +{ +} + +int ObTableLoadEmptyInsertTabletCtxManager::init( + const ObIArray &partition_ids, + const ObIArray &target_partition_ids) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("empty insert tablet ctx manager init twice", KR(ret)); + } else if (!target_partition_ids.empty()) { + if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(partition_ids, + target_partition_ids, + partition_location_, + target_partition_location_))) { + LOG_WARN("fail to init partition location", KR(ret)); + } else if (OB_FAIL(partition_location_.get_all_leader_info(all_leader_info_array_))) { + LOG_WARN("fail to get all origin leader info", KR(ret)); + } else if (OB_FAIL(target_partition_location_.get_all_leader_info(target_all_leader_info_array_))) { + LOG_WARN("fail to get all target leader info", KR(ret)); + } + } + if (OB_SUCC(ret)) { + is_inited_ = true; + } + return ret; +} + +int ObTableLoadEmptyInsertTabletCtxManager::get_next_task( + ObAddr &addr, + ObIArray &partition_ids, + ObIArray &target_partition_ids) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("empty insert tablet ctx manager is not init", KR(ret)); + } else { + ObMutexGuard guard(op_lock_); + if (all_leader_info_array_.count() == idx_) { + ret = OB_ITER_END; + } else { + const LeaderInfo &leader_info = all_leader_info_array_.at(idx_); + const LeaderInfo &target_leader_info = target_all_leader_info_array_.at(idx_); + addr = target_leader_info.addr_; + for (; OB_SUCC(ret) && start_ < target_leader_info.partition_id_array_.count() + && partition_ids.count() < TABLET_COUNT_PER_TASK; ++start_) { + if (OB_FAIL(partition_ids.push_back(leader_info.partition_id_array_.at(start_)))) { + LOG_WARN("fail to push back partition ids", KR(ret)); + } else if (OB_FAIL(target_partition_ids.push_back(target_leader_info.partition_id_array_.at(start_)))) { + LOG_WARN("fail to push back target partition ids", KR(ret)); + } + } + if (target_leader_info.partition_id_array_.count() == start_) { + start_ = 0; + ++idx_; + } + } + } + return ret; +} + +int ObTableLoadEmptyInsertTabletCtxManager::set_thread_count(const int64_t thread_count) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("empty insert tablet ctx manager is not init", KR(ret)); + } else if (thread_count <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("thread_count is invalid argument", KR(ret), K(thread_count)); + } else { + thread_count_ = thread_count; + } + return ret; +} + +int ObTableLoadEmptyInsertTabletCtxManager::handle_thread_finish(bool &is_finish) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("empty insert tablet ctx is not init", KR(ret)); + } else if (thread_count_ <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("thread count is invalid", KR(ret), K(thread_count_)); + } else { + is_finish = 0 == ATOMIC_AAF(&thread_count_, -1); + } + return ret; +} + +int ObTableLoadEmptyInsertTabletCtxManager::execute( + const uint64_t &table_id, + const ObTableLoadDDLParam &ddl_param, + const ObIArray &ls_part_ids, + const ObIArray &target_ls_part_ids) +{ + int ret = OB_SUCCESS; + ObTableLoadSchema table_load_schema; + ObDirectLoadInsertTableParam insert_table_param; + ObDirectLoadInsertTableContext tmp_insert_table_ctx; + if (OB_FAIL(table_load_schema.init(MTL_ID(), table_id))) { + LOG_WARN("fail to init table load schema", KR(ret)); + } + insert_table_param.table_id_ = table_id; + insert_table_param.schema_version_ = ddl_param.schema_version_; + insert_table_param.snapshot_version_ = ddl_param.snapshot_version_; + insert_table_param.ddl_task_id_ = ddl_param.task_id_; + insert_table_param.data_version_ = ddl_param.data_version_; + insert_table_param.parallel_ = 1; + insert_table_param.reserved_parallel_ = 0; + insert_table_param.rowkey_column_count_ = table_load_schema.rowkey_column_count_; + insert_table_param.column_count_ = table_load_schema.store_column_count_; + insert_table_param.lob_column_count_ = table_load_schema.lob_column_idxs_.count(); + insert_table_param.is_partitioned_table_ = table_load_schema.is_partitioned_table_; + insert_table_param.is_heap_table_ = table_load_schema.is_heap_table_; + insert_table_param.is_column_store_ = table_load_schema.is_column_store_; + insert_table_param.online_opt_stat_gather_ = false; + insert_table_param.is_incremental_ = false; + insert_table_param.datum_utils_ = &(table_load_schema.datum_utils_); + insert_table_param.col_descs_ = &(table_load_schema.column_descs_); + insert_table_param.cmp_funcs_ = &(table_load_schema.cmp_funcs_); + insert_table_param.online_sample_percent_ = 1.0; + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_FAIL(tmp_insert_table_ctx.init(insert_table_param, + ls_part_ids, + target_ls_part_ids))) { + LOG_WARN("fail to init tmp insert table ctx", KR(ret)); + } + FOREACH_X(it, tmp_insert_table_ctx.get_tablet_ctx_map(), OB_SUCC(ret)) { + if (OB_FAIL(it->second->open())) { + LOG_WARN("fail to open tablet ctx", KR(ret)); + } else if (OB_FAIL(it->second->close())) { + LOG_WARN("fail to close tablet ctx", KR(ret)); + } + } + return ret; +} + +} // namespace observer +} // namespace oceanbase \ No newline at end of file diff --git a/src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h b/src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h new file mode 100644 index 000000000..517dd0be8 --- /dev/null +++ b/src/observer/table_load/ob_table_load_empty_insert_tablet_ctx_manager.h @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef _OB_TABLE_LOAD_EMPTY_INSERT_TABLET_CTX_MANAGER_H_ +#define _OB_TABLE_LOAD_EMPTY_INSERT_TABLET_CTX_MANAGER_H_ + +#include "lib/container/ob_iarray.h" +#include "observer/table_load/ob_table_load_partition_location.h" +#include "observer/table_load/ob_table_load_table_ctx.h" + +namespace oceanbase +{ +namespace table +{ +struct ObTableLoadPartitionId; +} // namespace table +namespace storage +{ +class ObDirectLoadInsertTableParam; +} // namespace storage +namespace observer +{ + +class ObTableLoadEmptyInsertTabletCtxManager +{ + using LeaderInfo = ObTableLoadPartitionLocation::LeaderInfo; + static const int64_t TABLET_COUNT_PER_TASK = 20; +public: + ObTableLoadEmptyInsertTabletCtxManager(); + ~ObTableLoadEmptyInsertTabletCtxManager(); + int init( + const common::ObIArray &partition_ids, + const common::ObIArray &target_partition_ids); + int get_next_task(ObAddr &addr, + ObIArray &partition_ids, + ObIArray &target_partition_ids); + int set_thread_count(const int64_t thread_count); + int handle_thread_finish(bool &is_finish); + static int execute(const uint64_t &table_id, + const ObTableLoadDDLParam &ddl_param, + const ObIArray &ls_part_ids, + const ObIArray &target_ls_part_ids); +private: + ObTableLoadPartitionLocation partition_location_; + ObTableLoadPartitionLocation target_partition_location_; + table::ObTableLoadArray all_leader_info_array_; + table::ObTableLoadArray target_all_leader_info_array_; + int64_t thread_count_ CACHE_ALIGNED; + lib::ObMutex op_lock_; + int64_t idx_; + int64_t start_; + bool is_inited_; +}; + +} // namespace observer +} // namespace oceanbase +#endif // _OB_TABLE_LOAD_EMPTY_INSERT_TABLET_CTX_MANAGER_H_ \ No newline at end of file diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 91257dc51..8a0e8a836 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -115,7 +115,7 @@ int ObTableLoadInstance::init(ObTableLoadParam ¶m, LOG_WARN("fail to check support direct load", KR(ret), K(param)); } // start direct load - else if (OB_FAIL(start_direct_load(param, column_ids))) { + else if (OB_FAIL(start_direct_load(param, column_ids, tablet_ids))) { LOG_WARN("fail to start direct load", KR(ret)); } // init succ @@ -490,7 +490,8 @@ int ObTableLoadInstance::abort_redef_table() } int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m, - const ObIArray &column_ids) + const ObIArray &column_ids, + const ObIArray &tablet_ids) { int ret = OB_SUCCESS; ObTableLoadTableCtx *table_ctx = nullptr; @@ -502,7 +503,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m, LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); } else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, ObString::make_string(""), execute_ctx_->exec_ctx_))) { LOG_WARN("fail to init table ctx", KR(ret)); - } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) { + } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, tablet_ids, execute_ctx_))) { LOG_WARN("fail to coordinator init ctx", KR(ret)); } else if (OB_FAIL(ObTableLoadService::add_ctx(table_ctx))) { LOG_WARN("fail to add ctx", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_instance.h b/src/observer/table_load/ob_table_load_instance.h index 70fd70af6..ce02b4e76 100644 --- a/src/observer/table_load/ob_table_load_instance.h +++ b/src/observer/table_load/ob_table_load_instance.h @@ -70,7 +70,9 @@ private: int abort_redef_table(); private: // direct load - int start_direct_load(const ObTableLoadParam ¶m, const common::ObIArray &column_ids); + int start_direct_load(const ObTableLoadParam ¶m, + const common::ObIArray &column_ids, + const common::ObIArray &tablet_ids); int wait_begin_finish(); int end_direct_load(const bool commit); int add_tx_result_to_user_session(); diff --git a/src/observer/table_load/ob_table_load_partition_calc.cpp b/src/observer/table_load/ob_table_load_partition_calc.cpp index 5ce24bad8..58583ed47 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.cpp +++ b/src/observer/table_load/ob_table_load_partition_calc.cpp @@ -45,7 +45,9 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc() allocator_.set_tenant_id(MTL_ID()); } -int ObTableLoadPartitionCalc::init(const ObTableLoadParam ¶m, sql::ObSQLSessionInfo *session_info) +int ObTableLoadPartitionCalc::init(const ObTableLoadParam ¶m, + sql::ObSQLSessionInfo *session_info, + const ObIArray &tablet_ids) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -81,6 +83,17 @@ int ObTableLoadPartitionCalc::init(const ObTableLoadParam ¶m, sql::ObSQLSess // 获取part_key_obj_index_ else if (OB_FAIL(init_part_key_index(table_schema, allocator_))) { LOG_WARN("fail to get rowkey index", KR(ret)); + } else if (ObDirectLoadLevel::PARTITION == param.load_level_) { + ObMemAttr attr(MTL_ID(), "TLD_TABLETID"); + if (OB_FAIL(tablet_ids_set_.create(1024, attr))) { + LOG_WARN("fail to init tablet ids set", KR(ret)); + } else { + for (uint64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { + if (OB_FAIL(tablet_ids_set_.set_refactored(tablet_ids.at(i)))) { + LOG_WARN("fail to set tablet id", KR(ret)); + } + } + } } } if (OB_SUCC(ret)) { @@ -213,9 +226,24 @@ int ObTableLoadPartitionCalc::get_partition_by_row( LOG_WARN("invalid args", K(part_ids.count()), K(tablet_ids.count())); } for (int i = 0; OB_SUCC(ret) && i < part_rows.count(); i++) { - if (OB_FAIL( - partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) { - LOG_WARN("fail to push partition id", KR(ret)); + if (ObDirectLoadLevel::PARTITION == param_->load_level_) { + ret = tablet_ids_set_.exist_refactored(tablet_ids.at(i)); + if (OB_LIKELY(OB_HASH_EXIST == ret)) { + if (OB_FAIL(partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) { + LOG_WARN("fail to push partition id", KR(ret), K(part_ids.at(i)), K(tablet_ids.at(i))); + } + } else if (OB_HASH_NOT_EXIST == ret) { + if (OB_FAIL(partition_ids.push_back(ObTableLoadPartitionId()))) { + LOG_WARN("fail to push empty partition id", KR(ret)); + } + } else { + LOG_WARN("fail to search tablet ids set", KR(ret)); + } + } else { + if (OB_FAIL( + partition_ids.push_back(ObTableLoadPartitionId(part_ids.at(i), tablet_ids.at(i))))) { + LOG_WARN("fail to push partition id", KR(ret)); + } } } return ret; diff --git a/src/observer/table_load/ob_table_load_partition_calc.h b/src/observer/table_load/ob_table_load_partition_calc.h index 2d37da3a7..d77f3a9ea 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.h +++ b/src/observer/table_load/ob_table_load_partition_calc.h @@ -34,7 +34,9 @@ class ObTableLoadPartitionCalc { public: ObTableLoadPartitionCalc(); - int init(const ObTableLoadParam ¶m, sql::ObSQLSessionInfo *session_info); + int init(const ObTableLoadParam ¶m, + sql::ObSQLSessionInfo *session_info, + const ObIArray &tablet_ids); int get_part_key(const table::ObTableLoadObjRow &row, common::ObNewRow &part_key) const; int cast_part_key(common::ObNewRow &part_key, common::ObIAllocator &allocator) const; int get_partition_by_row(common::ObIArray &part_rows, @@ -70,6 +72,7 @@ private: sql::ObExecContext exec_ctx_; sql::ObTableLocation table_location_; ObSchemaGetterGuard schema_guard_; + common::hash::ObHashSet tablet_ids_set_; // only for load_level == PARTITION bool is_inited_; DISALLOW_COPY_AND_ASSIGN(ObTableLoadPartitionCalc); }; diff --git a/src/observer/table_load/ob_table_load_partition_location.cpp b/src/observer/table_load/ob_table_load_partition_location.cpp index 2f5eb7773..1e6310fbc 100644 --- a/src/observer/table_load/ob_table_load_partition_location.cpp +++ b/src/observer/table_load/ob_table_load_partition_location.cpp @@ -51,6 +51,46 @@ int ObTableLoadPartitionLocation::check_tablet_has_same_leader(const ObTableLoad return ret; } +int ObTableLoadPartitionLocation::init_partition_location( + const ObIArray &partition_ids, + const ObIArray &target_partition_ids, + ObTableLoadPartitionLocation &partition_location, + ObTableLoadPartitionLocation &target_partition_location) +{ + int ret = OB_SUCCESS; + int retry = 0; + bool flag = false; + while (retry < 3 && OB_SUCC(ret)) { + partition_location.reset(); + target_partition_location.reset(); + // init partition_location_ + if (OB_FAIL(partition_location.init(MTL_ID(), partition_ids))) { + LOG_WARN("fail to init partition location", KR(ret)); + } else if (OB_FAIL(target_partition_location.init(MTL_ID(), target_partition_ids))) { + LOG_WARN("fail to init origin partition location", KR(ret)); + } else if (OB_FAIL(partition_location.check_tablet_has_same_leader(target_partition_location, flag))) { + LOG_WARN("fail to check_tablet_has_same_leader", KR(ret)); + } + if (OB_SUCC(ret)) { + if (flag) { + break; + } else { + LOG_WARN("invalid leader info, maybe change master"); + } + } + retry ++; + } + + if (OB_SUCC(ret)) { + if (!flag) { + ret = OB_EAGAIN; + LOG_WARN("invalid leader info", KR(ret)); + } + } + + return ret; +} + int ObTableLoadPartitionLocation::fetch_ls_id(uint64_t tenant_id, const ObTabletID &tablet_id, ObLSID &ls_id) { @@ -87,14 +127,14 @@ int ObTableLoadPartitionLocation::fetch_ls_location(uint64_t tenant_id, const Ob } int ObTableLoadPartitionLocation::fetch_ls_locations(uint64_t tenant_id, - const ObTableLoadArray &partition_ids) + const ObIArray &partition_ids) { int ret = OB_SUCCESS; ObArray ls_ids; ls_ids.set_tenant_id(MTL_ID()); for (int64_t i = 0; OB_SUCC(ret) && (i < partition_ids.count()); ++i) { - const ObTabletID &tablet_id = partition_ids[i].tablet_id_; + const ObTabletID &tablet_id = partition_ids.at(i).tablet_id_; if (OB_FAIL(tablet_ids_.push_back(tablet_id))) { LOG_WARN("failed to push back tablet_id", K(tablet_id), K(i)); } @@ -207,7 +247,7 @@ int ObTableLoadPartitionLocation::fetch_tablet_handle(uint64_t tenant_id, } int ObTableLoadPartitionLocation::init( - uint64_t tenant_id, const ObTableLoadArray &partition_ids) + uint64_t tenant_id, const ObIArray &partition_ids) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -231,7 +271,7 @@ int ObTableLoadPartitionLocation::init( } int ObTableLoadPartitionLocation::init_all_partition_location( - uint64_t tenant_id, const ObTableLoadArray &partition_ids) + uint64_t tenant_id, const ObIArray &partition_ids) { int ret = OB_SUCCESS; if (OB_FAIL(fetch_ls_locations(tenant_id, partition_ids))) { diff --git a/src/observer/table_load/ob_table_load_partition_location.h b/src/observer/table_load/ob_table_load_partition_location.h index 847e31fdd..7746ebbde 100644 --- a/src/observer/table_load/ob_table_load_partition_location.h +++ b/src/observer/table_load/ob_table_load_partition_location.h @@ -62,7 +62,7 @@ public: tablet_ids_.set_tenant_id(MTL_ID()); } int init(uint64_t tenant_id, - const table::ObTableLoadArray &partition_ids); + const common::ObIArray &partition_ids); int get_leader(common::ObTabletID tablet_id, PartitionLocationInfo &info) const; int get_all_leader(table::ObTableLoadArray &addr_array) const; int get_all_leader_info(table::ObTableLoadArray &info_array) const; @@ -77,6 +77,10 @@ public: int check_tablet_has_same_leader(const ObTableLoadPartitionLocation &other, bool &result); public: + static int init_partition_location(const ObIArray &partition_ids, + const ObIArray &target_partition_ids, + ObTableLoadPartitionLocation &partition_location, + ObTableLoadPartitionLocation &target_partition_location); // 通过tablet_id获取 static int fetch_ls_id(uint64_t tenant_id, const common::ObTabletID &tablet_id, share::ObLSID &ls_id); @@ -91,11 +95,11 @@ public: storage::ObTabletHandle &tablet_handle); private: int init_all_partition_location( - uint64_t tenant_id, const table::ObTableLoadArray &partition_ids); + uint64_t tenant_id, const common::ObIArray &partition_ids); int init_all_leader_info(); int fetch_ls_locations( uint64_t tenant_id, - const table::ObTableLoadArray &partition_ids); + const common::ObIArray &partition_ids); private: common::ObArenaAllocator allocator_; common::ObArray tablet_ids_; //保证遍历partition_map_的时候顺序不变 diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index d23de3aed..27d6670f5 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -453,6 +453,124 @@ int ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(const uint64_t return ret; } +int ObTableLoadSchema::get_tablet_ids_by_part_ids(const ObTableSchema *table_schema, + const ObIArray &part_ids, + ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(table_schema)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table schema is nullptr", KR(ret)); + } else { + for (uint64_t i = 0; OB_SUCC(ret) && i < part_ids.count(); ++i) { + ObObjectID part_id = part_ids.at(i); + ObTabletID tmp_tablet_id; + if (OB_FAIL(table_schema->get_tablet_id_by_object_id(part_id, tmp_tablet_id))) { + LOG_WARN("fail to get tablet ids by part object id", KR(ret), K(part_id)); + } else if (OB_FAIL(tablet_ids.push_back(tmp_tablet_id))) { + LOG_WARN("fail to push tablet id", KR(ret), K(tmp_tablet_id)); + } + } + } + return ret; +} + +int ObTableLoadSchema::check_has_identity_column(const ObTableSchema *table_schema, + bool &has_identity_column) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schema is nullptr", KR(ret)); + } else if (has_identity_column) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("has_identity_column is invalid", KR(ret)); + } else { + ObArray column_ids; + if (OB_FAIL(table_schema->get_column_ids(column_ids))) { + LOG_WARN("failed to get column ids", KR(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && !has_identity_column && (i < column_ids.count()); ++i) { + const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(column_ids.at(i)); + if (OB_ISNULL(col_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null column schema", KR(ret)); + } else if (col_schema->is_identity_column()) { + has_identity_column = true; + } + } + } + } + return ret; +} + +int ObTableLoadSchema::check_support_partition_exchange(const ObTableSchema *table_schema, + bool &is_support) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schema is nullptr", KR(ret)); + } else if (!is_support) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("is_support is not invalid", KR(ret), K(is_support)); + } else { + ObPartitionFuncType part_type = ObPartitionFuncType::PARTITION_FUNC_TYPE_MAX; + switch (table_schema->get_part_level()) { + case ObPartitionLevel::PARTITION_LEVEL_ONE: + part_type = table_schema->get_part_option().get_part_func_type(); + break; + case ObPartitionLevel::PARTITION_LEVEL_TWO: + part_type = table_schema->get_sub_part_option().get_part_func_type(); + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected partition level", KR(ret), K(table_schema->get_part_level())); + } + if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(!((PARTITION_FUNC_TYPE_RANGE == part_type) + || (PARTITION_FUNC_TYPE_RANGE_COLUMNS == part_type) + || (PARTITION_FUNC_TYPE_LIST == part_type) + || (PARTITION_FUNC_TYPE_LIST_COLUMNS == part_type)))) { + is_support = false; + } + } + return ret; +} + +int ObTableLoadSchema::check_has_global_index(ObSchemaGetterGuard &schema_guard, + const ObTableSchema *table_schema, + bool &is_have) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schema is nullptr", KR(ret)); + } else if (is_have) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("is_have is invalid", KR(ret), K(is_have)); + } else { + ObSEArray simple_index_infos; + if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos))) { + LOG_WARN("failed to get simple_index_infos", KR(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && !is_have && (i < simple_index_infos.count()); ++i) { + const ObTableSchema *index_schema = NULL; + if (OB_FAIL(schema_guard.get_table_schema( + table_schema->get_tenant_id(), simple_index_infos.at(i).table_id_, index_schema))) { + LOG_WARN("failed to get table schema", KR(ret)); + } else if (OB_ISNULL(index_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("index schema from schema guard is NULL", KR(ret), KP(index_schema)); + } else if (index_schema->is_global_index_table()) { + is_have = true; + } + } + } + } + return ret; +} + ObTableLoadSchema::ObTableLoadSchema() : allocator_("TLD_Schema"), is_partitioned_table_(false), @@ -503,7 +621,6 @@ void ObTableLoadSchema::reset() lob_meta_column_descs_.reset(); lob_meta_datum_utils_.reset(); cmp_funcs_.reset(); - partition_ids_.reset(); allocator_.reset(); is_inited_ = false; } @@ -577,13 +694,6 @@ int ObTableLoadSchema::init_table_schema(const ObTableSchema *table_schema) part_ids.set_tenant_id(MTL_ID()); if (OB_FAIL(table_schema->get_all_tablet_and_object_ids(tablet_ids, part_ids))) { LOG_WARN("fail to get all tablet ids", KR(ret)); - } else if (OB_FAIL(partition_ids_.create(part_ids.count(), allocator_))) { - LOG_WARN("fail to create array", KR(ret)); - } else { - for (int64_t i = 0; i < part_ids.count(); ++i) { - partition_ids_[i].partition_id_ = part_ids.at(i); - partition_ids_[i].tablet_id_ = tablet_ids.at(i); - } } for (ObTableSchema::const_column_iterator iter = table_schema->column_begin(); OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) { diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index f9237047e..cfaa27ff3 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -79,7 +79,14 @@ public: const share::schema::ObTableSchema *table_schema, bool &bret); static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value); - + static int get_tablet_ids_by_part_ids(const ObTableSchema *table_schema, + const ObIArray &part_ids, + ObIArray &tablet_ids); + static int check_has_identity_column(const ObTableSchema *table_schema, bool &has_identity_column); + static int check_support_partition_exchange(const ObTableSchema *table_schema, bool &is_support); + static int check_has_global_index(ObSchemaGetterGuard &schema_guard, + const ObTableSchema *table_schema, + bool &is_have); public: ObTableLoadSchema(); ~ObTableLoadSchema(); @@ -124,7 +131,6 @@ public: common::ObArray lob_meta_column_descs_; blocksstable::ObStorageDatumUtils lob_meta_datum_utils_; blocksstable::ObStoreCmpFuncs cmp_funcs_; // for sql statistics - table::ObTableLoadArray partition_ids_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 2ad92e72d..fd12fd761 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -556,7 +556,24 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu ret = OB_NOT_SUPPORTED; LOG_WARN("direct-load does not support table required by materialized view refresh", KR(ret)); FORWARD_USER_ERROR_MSG(ret, "%sdirect-load does not support table required by materialized view refresh", tmp_prefix); - } else if (ObDirectLoadMethod::is_incremental(method)) { // incremental direct-load + } + // check for partition level + else if (ObDirectLoadLevel::PARTITION == load_level + && OB_FAIL(check_support_direct_load_for_partition_level(schema_guard, + table_schema, + method, + compat_version))) { + LOG_WARN("fail to check support direct load for partition level", KR(ret)); + } + // check insert overwrite + else if (ObDirectLoadMode::is_insert_overwrite(load_mode) + && compat_version < DATA_VERSION_4_3_2_0) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("version lower than 4.3.2.0 does not support insert overwrite", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.2.0 does not support insert overwrite"); + } + // incremental direct-load + else if (ObDirectLoadMethod::is_incremental(method)) { if (GCTX.is_shared_storage_mode()) { ret = OB_NOT_SUPPORTED; LOG_WARN("in share storage mode, using incremental direct-load is not supported", KR(ret)); @@ -596,15 +613,17 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu LOG_WARN("incremental direct-load does not support table with check constraints", KR(ret)); FORWARD_USER_ERROR_MSG(ret, "incremental direct-load does not support table with check constraints"); } - } else if (ObDirectLoadMethod::is_full(method)) { // full direct-load + } + // full direct-load + else if (ObDirectLoadMethod::is_full(method)) { if (OB_UNLIKELY(!ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected insert mode for full direct-load", KR(ret), K(method), K(insert_mode)); - } else if (ObDirectLoadMode::is_insert_overwrite(load_mode)) { - if (OB_FAIL(check_support_direct_load_for_insert_overwrite( - schema_guard, *table_schema, load_level, compat_version))) { - LOG_WARN("failed to check support direct load for insert overwrite", KR(ret)); - } + } else if (ObDirectLoadInsertMode::OVERWRITE == insert_mode + && compat_version < DATA_VERSION_4_3_1_0) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("version lower than 4.3.1.0 does not support insert overwrite mode", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.1.0 does not support insert overwrite mode"); } } // check default column @@ -661,102 +680,61 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu return ret; } -int ObTableLoadService::check_support_direct_load_for_insert_overwrite( +int ObTableLoadService::check_support_direct_load_for_partition_level( ObSchemaGetterGuard &schema_guard, - const ObTableSchema &table_schema, - const ObDirectLoadLevel::Type load_level, + const ObTableSchema *table_schema, + const ObDirectLoadMethod::Type method, const uint64_t compat_version) { int ret = OB_SUCCESS; - if (compat_version < DATA_VERSION_4_3_2_0) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("version lower than 4.3.2.0 does not support insert overwrite", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.2.0 does not support insert overwrite"); - } else if (table_schema.get_foreign_key_infos().count() > 0) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("insert overwrite with incremental direct-load does not support table with foreign keys", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "insert overwrite with direct-load does not support table with foreign keys"); - } else if (ObDirectLoadLevel::PARTITION == load_level) { - if (compat_version < DATA_VERSION_4_3_3_0) { + if (ObDirectLoadMethod::is_incremental(method)) { + // do nothing + } else if (ObDirectLoadMethod::is_full(method)) { + bool has_global_index = false; + bool has_identity_column = false; + bool is_support_partition_exchange = true; + if (compat_version < DATA_VERSION_4_3_5_0) { ret = OB_NOT_SUPPORTED; - LOG_WARN("version lower than 4.3.3.0 does not support insert overwrite partition", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.3.0 does not support insert overwrite partition"); - } else if (table_schema.is_duplicate_table()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("insert overwrite partition does not support duplicate table", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support duplicate table"); - } else if (table_schema.get_index_tid_count() > 0) { - bool has_global_indexes = false; - ObSEArray simple_index_infos; - if (OB_FAIL(table_schema.get_simple_index_infos(simple_index_infos))) { - LOG_WARN("failed to get simple_index_infos", KR(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && !has_global_indexes && (i < simple_index_infos.count()); ++i) { - const ObTableSchema *index_schema = NULL; - if (OB_FAIL(schema_guard.get_table_schema( - table_schema.get_tenant_id(), simple_index_infos.at(i).table_id_, index_schema))) { - LOG_WARN("failed to get table schema", KR(ret)); - } else if (OB_ISNULL(index_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("index schema from schema guard is NULL", KR(ret), KP(index_schema)); - } else if (index_schema->is_global_index_table()) { - has_global_indexes = true; - } - } - } - if (OB_SUCC(ret) && (has_global_indexes)) { + LOG_WARN("version lower than 4.3.5.0 does not support direct load partition", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "version lower than 4.3.5.0 does not support direct load partition"); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schema is nullptr", KR(ret)); + } else { + if (table_schema->is_duplicate_table()) { ret = OB_NOT_SUPPORTED; - LOG_WARN("insert overwrite partition does not support table with global indexes", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support table with global indexes"); - } - } else if (0 != table_schema.get_autoinc_column_id()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("insert overwrite partition does not support table with auto_increment columns", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support table with auto_increment columns"); - } else if (lib::is_oracle_mode()) { - ObArray column_ids; - if (OB_FAIL(table_schema.get_column_ids(column_ids))) { - LOG_WARN("failed to get column ids", KR(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && (i < column_ids.count()); ++i) { - const ObColumnSchemaV2 *col_schema = table_schema.get_column_schema(column_ids.at(i)); - if (OB_ISNULL(col_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null column schema", KR(ret)); - } else if (col_schema->is_identity_column()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("insert overwrite partition does not support table with identity columns", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support table with identity columns"); - } - } + LOG_WARN("partition level direct load not support duplicate table", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support duplicate table"); + } else if (0 != table_schema->get_autoinc_column_id()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("partition level direct load not support table with auto_increment columns", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support table with auto_increment columns"); + } else if (table_schema->get_index_tid_count() > 0 + && OB_FAIL(ObTableLoadSchema::check_has_global_index(schema_guard, + table_schema, + has_global_index))) { + LOG_WARN("fail to check has global index", KR(ret)); + } else if (has_global_index) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("partition level direct load not support table with global indexes", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support table with global indexes"); + } else if (lib::is_oracle_mode() && OB_FAIL(ObTableLoadSchema::check_has_identity_column( + table_schema, has_identity_column))) { + LOG_WARN("fail to check has identity column", KR(ret)); + } else if (has_identity_column) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("partition level direct load not support table with identity columns", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support table with identity columns"); + } else if (OB_FAIL(ObTableLoadSchema::check_support_partition_exchange(table_schema, + is_support_partition_exchange))) { + LOG_WARN("fail to check support partition exchange", KR(ret)); + } else if (!is_support_partition_exchange) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("partition level direct load not support hash/key partitions", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "partition level direct-load not support hash/key partitions"); } } - - if (OB_SUCC(ret)) { // check partition type - ObPartitionFuncType part_type = PARTITION_FUNC_TYPE_MAX; - switch (table_schema.get_part_level()) { - case ObPartitionLevel::PARTITION_LEVEL_ONE: - part_type = table_schema.get_part_option().get_part_func_type(); - break; - case ObPartitionLevel::PARTITION_LEVEL_TWO: - part_type = table_schema.get_sub_part_option().get_part_func_type(); - break; - default: - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected partition level", KR(ret), K(table_schema.get_part_level())); - } - - if (OB_FAIL(ret)) { - } else if (OB_UNLIKELY(!((PARTITION_FUNC_TYPE_RANGE == part_type) - || (PARTITION_FUNC_TYPE_RANGE_COLUMNS == part_type) - || (PARTITION_FUNC_TYPE_LIST == part_type) - || (PARTITION_FUNC_TYPE_LIST_COLUMNS == part_type)))) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("insert overwrite partition does not support hash/key partitions", KR(ret), K(part_type)); - FORWARD_USER_ERROR_MSG(ret, "insert overwrite partition does not support hash/key partition type"); - } - } - } // end if + } return ret; } @@ -775,7 +753,6 @@ int ObTableLoadService::alloc_ctx(ObTableLoadTableCtx *&table_ctx) } return ret; } - void ObTableLoadService::free_ctx(ObTableLoadTableCtx *table_ctx) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index 6042ef4af..62253c378 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -59,11 +59,10 @@ public: const storage::ObDirectLoadMode::Type load_mode, const storage::ObDirectLoadLevel::Type load_level, const common::ObIArray &column_ids); - static int check_support_direct_load_for_insert_overwrite( - share::schema::ObSchemaGetterGuard &schema_guard, - const share::schema::ObTableSchema &table_schema, - const storage::ObDirectLoadLevel::Type load_level, - const uint64_t compat_version); + static int check_support_direct_load_for_partition_level(ObSchemaGetterGuard &schema_guard, + const ObTableSchema *table_schema, + const ObDirectLoadMethod::Type method, + const uint64_t compat_version); static int alloc_ctx(ObTableLoadTableCtx *&table_ctx); static void free_ctx(ObTableLoadTableCtx *table_ctx); diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index c83bc7d86..4d1be7e62 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -200,6 +200,7 @@ void ObTableLoadTableCtx::unregister_job_stat() } int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray &column_ids, + const ObIArray &tablet_ids, ObTableLoadExecCtx *exec_ctx) { int ret = OB_SUCCESS; @@ -214,7 +215,7 @@ int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray &column_i if (OB_ISNULL(coordinator_ctx = OB_NEWx(ObTableLoadCoordinatorCtx, (&allocator_), this))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadCoordinatorCtx", KR(ret)); - } else if (OB_FAIL(coordinator_ctx->init(column_ids, exec_ctx))) { + } else if (OB_FAIL(coordinator_ctx->init(column_ids, tablet_ids, exec_ctx))) { LOG_WARN("fail to init coordinator ctx", KR(ret)); } else if (OB_FAIL(coordinator_ctx->set_status_inited())) { LOG_WARN("fail to set coordinator status inited", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_table_ctx.h b/src/observer/table_load/ob_table_load_table_ctx.h index ed262a52e..cb8c87654 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.h +++ b/src/observer/table_load/ob_table_load_table_ctx.h @@ -71,6 +71,7 @@ public: K_(is_inited)); public: int init_coordinator_ctx(const common::ObIArray &column_ids, + const common::ObIArray &tablet_ids, ObTableLoadExecCtx *exec_ctx); int init_store_ctx( const table::ObTableLoadArray &partition_id_array, diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 5c5d914e2..0f689e158 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -130,12 +130,12 @@ int ObTableLoadTransBucketWriter::init_session_ctx_array() session_ctx->session_id_ = i + 1; if (!is_partitioned_) { ObTableLoadPartitionLocation::PartitionLocationInfo info; - if (OB_UNLIKELY(1 != coordinator_ctx_->ctx_->schema_.partition_ids_.count())) { + if (OB_UNLIKELY(1 != coordinator_ctx_->partition_ids_.count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected partition id num in non partitioned table", KR(ret), "count", - coordinator_ctx_->ctx_->schema_.partition_ids_.count()); + coordinator_ctx_->partition_ids_.count()); } else if (FALSE_IT(session_ctx->partition_id_ = - coordinator_ctx_->ctx_->schema_.partition_ids_[0])) { + coordinator_ctx_->partition_ids_[0])) { } else if (OB_FAIL(coordinator_ctx_->partition_location_.get_leader( session_ctx->partition_id_.tablet_id_, info))) { LOG_WARN("failed to get leader addr", K(ret)); diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index d38c42352..db0b71d2a 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -71,6 +71,7 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam() dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE), method_(ObDirectLoadMethod::INVALID_METHOD), insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE), + load_level_(ObDirectLoadLevel::INVALID_LEVEL), compressor_type_(ObCompressorType::INVALID_COMPRESSOR), online_sample_percent_(100.) { @@ -86,6 +87,7 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && ObDirectLoadMethod::is_type_valid(method_) && ObDirectLoadInsertMode::is_type_valid(insert_mode_) && + ObDirectLoadLevel::is_type_valid(load_level_) && (storage::ObDirectLoadMethod::is_full(method_) ? storage::ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode_) : true) && @@ -2347,9 +2349,10 @@ int ObLoadDataDirectImpl::init_execute_param() } else { execute_param_.need_sort_ = optimizer_ctx->need_sort_; execute_param_.max_error_rows_ = optimizer_ctx->max_error_row_count_; + execute_param_.dup_action_ = optimizer_ctx->dup_action_; execute_param_.method_ = optimizer_ctx->load_method_; execute_param_.insert_mode_ = optimizer_ctx->insert_mode_; - execute_param_.dup_action_ = optimizer_ctx->dup_action_; + execute_param_.load_level_ = optimizer_ctx->load_level_; } } // parallel_ @@ -2466,6 +2469,12 @@ int ObLoadDataDirectImpl::init_execute_param() LOG_WARN("failed to get sys online sample percent", K(ret)); } } + // tablet_ids_ + if (OB_SUCC(ret) && OB_FAIL(ObTableLoadSchema::get_tablet_ids_by_part_ids(table_schema, + load_stmt_->get_part_ids(), + execute_param_.tablet_ids_))) { + LOG_WARN("fail to get tablet ids", KR(ret)); + } return ret; } @@ -2493,9 +2502,11 @@ int ObLoadDataDirectImpl::init_execute_context() load_param.load_mode_ = ObDirectLoadMode::LOAD_DATA; load_param.compressor_type_ = execute_param_.compressor_type_; load_param.online_sample_percent_ = execute_param_.online_sample_percent_; - load_param.load_level_ = ObDirectLoadLevel::TABLE; - if (OB_FAIL( - direct_loader_.init(load_param, execute_param_.column_ids_, &execute_ctx_.exec_ctx_))) { + load_param.load_level_ = execute_param_.load_level_; + if (OB_FAIL(direct_loader_.init(load_param, + execute_param_.column_ids_, + execute_param_.tablet_ids_, + &execute_ctx_.exec_ctx_))) { LOG_WARN("fail to init direct loader", KR(ret)); } else if (OB_FAIL(init_logger())) { LOG_WARN("fail to init logger", KR(ret)); diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 29b6a3542..811f8e401 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -94,10 +94,12 @@ private: K_(dup_action), "method", storage::ObDirectLoadMethod::get_type_string(method_), "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_), + K_(load_level), K_(data_access_param), K_(column_ids), K_(compressor_type), - K_(online_sample_percent)); + K_(online_sample_percent), + K_(tablet_ids)); public: uint64_t tenant_id_; uint64_t database_id_; @@ -116,10 +118,12 @@ private: sql::ObLoadDupActionType dup_action_; storage::ObDirectLoadMethod::Type method_; storage::ObDirectLoadInsertMode::Type insert_mode_; + storage::ObDirectLoadLevel::Type load_level_; DataAccessParam data_access_param_; ObArray column_ids_; ObCompressorType compressor_type_; double online_sample_percent_; + ObArray tablet_ids_; }; struct LoadExecuteContext diff --git a/src/sql/optimizer/ob_direct_load_optimizer.cpp b/src/sql/optimizer/ob_direct_load_optimizer.cpp index 0ae370bcf..ca0926450 100644 --- a/src/sql/optimizer/ob_direct_load_optimizer.cpp +++ b/src/sql/optimizer/ob_direct_load_optimizer.cpp @@ -36,6 +36,7 @@ ObDirectLoadOptimizerCtx::ObDirectLoadOptimizerCtx() load_method_(ObDirectLoadMethod::INVALID_METHOD), insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE), load_mode_(ObDirectLoadMode::INVALID_MODE), + load_level_(ObDirectLoadLevel::INVALID_LEVEL), dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE), max_error_row_count_(0), need_sort_(false), @@ -51,6 +52,7 @@ void ObDirectLoadOptimizerCtx::reset() load_method_ = ObDirectLoadMethod::INVALID_METHOD; insert_mode_ = ObDirectLoadInsertMode::INVALID_INSERT_MODE; load_mode_ = ObDirectLoadMode::INVALID_MODE; + load_level_ = ObDirectLoadLevel::INVALID_LEVEL; dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE; max_error_row_count_ = 0; need_sort_ = false; @@ -91,9 +93,11 @@ int ObDirectLoadOptimizer::optimize(ObExecContext *exec_ctx, ObLoadDataStmt &stm direct_load_optimizer_ctx_.table_id_ = stmt.get_load_arguments().table_id_; direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::LOAD_DATA; direct_load_optimizer_ctx_.dup_action_ = stmt.get_load_arguments().dupl_action_; + direct_load_optimizer_ctx_.load_level_ = stmt.get_part_ids().empty() ? ObDirectLoadLevel::TABLE + : ObDirectLoadLevel::PARTITION; if (OB_FAIL(check_semantics())) { LOG_WARN("fail to check semantics", K(ret)); - } else if (OB_FAIL(check_support_direct_load(exec_ctx, ObDirectLoadLevel::TABLE))) { + } else if (OB_FAIL(check_support_direct_load(exec_ctx))) { LOG_WARN("fail to check support direct load", K(ret)); } else { direct_load_optimizer_ctx_.dup_action_ = direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE ? @@ -168,7 +172,6 @@ int ObDirectLoadOptimizer::optimize( // 通过hint而不是默认配置项的方式,不会修改并行度,当并行度小于2时不满足pdml条件,无需走旁路导入检查 // do nothing } else if (direct_load_optimizer_ctx_.load_method_ != ObDirectLoadMethod::INVALID_METHOD) { - ObDirectLoadLevel::Type load_level = ObDirectLoadLevel::TABLE; if (session_info->get_ddl_info().is_mview_complete_refresh()) { if (direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE) { ret = OB_ERR_UNEXPECTED; @@ -188,15 +191,15 @@ int ObDirectLoadOptimizer::optimize( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpect null table partition info", K(ret)); } else if (info->get_ref_table_id() == table_id) { - if (info->get_table_location().get_part_hint_ids().count() > 0) { - load_level = ObDirectLoadLevel::PARTITION; - } + direct_load_optimizer_ctx_.load_level_ = info->get_table_location().get_part_hint_ids().empty() + ? ObDirectLoadLevel::TABLE + : ObDirectLoadLevel::PARTITION; break; } } } if (OB_SUCC(ret)) { - if (OB_FAIL(check_support_direct_load(exec_ctx, load_level))) { + if (OB_FAIL(check_support_direct_load(exec_ctx))) { LOG_WARN("fail to check support direct load", K(ret)); bool allow_fallback = false; if (ret == OB_NOT_SUPPORTED && stmt.get_query_ctx()->optimizer_features_enable_version_ >= COMPAT_VERSION_4_3_4) { @@ -326,9 +329,7 @@ int ObDirectLoadOptimizer::check_support_insert_overwrite(const ObGlobalHint &gl return ret; } -int ObDirectLoadOptimizer::check_support_direct_load( - ObExecContext *exec_ctx, - storage::ObDirectLoadLevel::Type load_level) +int ObDirectLoadOptimizer::check_support_direct_load(ObExecContext *exec_ctx) { int ret = OB_SUCCESS; ObSqlCtx *sql_ctx = nullptr; @@ -375,7 +376,7 @@ int ObDirectLoadOptimizer::check_support_direct_load( direct_load_optimizer_ctx_.load_method_, direct_load_optimizer_ctx_.insert_mode_, direct_load_optimizer_ctx_.load_mode_, - load_level, + direct_load_optimizer_ctx_.load_level_, column_ids))) { LOG_WARN("fail to check support direct load", K(ret), K(direct_load_optimizer_ctx_)); } diff --git a/src/sql/optimizer/ob_direct_load_optimizer.h b/src/sql/optimizer/ob_direct_load_optimizer.h index e21a9e817..5019e57b2 100644 --- a/src/sql/optimizer/ob_direct_load_optimizer.h +++ b/src/sql/optimizer/ob_direct_load_optimizer.h @@ -47,13 +47,14 @@ public: bool is_insert_overwrite() const { return ObDirectLoadMode::is_insert_overwrite(load_mode_); } bool is_insert_into() const { return load_mode_ == ObDirectLoadMode::INSERT_INTO; } void reset(); - TO_STRING_KV(K_(table_id), K_(load_method), K_(insert_mode), K_(load_mode), K_(dup_action), + TO_STRING_KV(K_(table_id), K_(load_method), K_(insert_mode), K_(load_mode), K_(load_level), K_(dup_action), K_(max_error_row_count), K_(need_sort), K_(can_use_direct_load), K_(use_direct_load), K_(is_optimized)); public: uint64_t table_id_; storage::ObDirectLoadMethod::Type load_method_; storage::ObDirectLoadInsertMode::Type insert_mode_; storage::ObDirectLoadMode::Type load_mode_; + storage::ObDirectLoadLevel::Type load_level_; sql::ObLoadDupActionType dup_action_; int64_t max_error_row_count_; bool need_sort_; @@ -78,7 +79,7 @@ private: void enable_by_overwrite(); int check_semantics(); int check_support_insert_overwrite(const ObGlobalHint &global_hint); - int check_support_direct_load(ObExecContext *exec_ctx, storage::ObDirectLoadLevel::Type load_level); + int check_support_direct_load(ObExecContext *exec_ctx); int check_direct_load_allow_fallback(ObExecContext *exec_ctx, bool &allow_fallback); private: ObDirectLoadOptimizerCtx &direct_load_optimizer_ctx_; diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index 0b9aa2f64..b40772e5b 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -4907,8 +4907,7 @@ load_data_with_opt_hint opt_load_local INFILE infile_string opt_duplicate INTO T relation_factor opt_use_partition opt_compression opt_load_charset field_opt line_opt opt_load_ignore_rows opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list { - (void) $9; - malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 13, + malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 14, $2, /* 0. local */ $4, /* 1. filename */ $5, /* 2. duplicate */ @@ -4921,7 +4920,8 @@ opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list $16, /* 9. set field */ $1, /* 10. hint */ $17, /* 11. extended option list */ - $10 /* 12. compression format */ + $10, /* 12. compression format */ + $9 /* 13. partition */ ); } ; diff --git a/src/sql/resolver/cmd/ob_load_data_resolver.cpp b/src/sql/resolver/cmd/ob_load_data_resolver.cpp index f5826137a..a85c118a1 100644 --- a/src/sql/resolver/cmd/ob_load_data_resolver.cpp +++ b/src/sql/resolver/cmd/ob_load_data_resolver.cpp @@ -30,6 +30,7 @@ #include "lib/restore/ob_storage_info.h" #include "sql/engine/cmd/ob_load_data_file_reader.h" #include +#include "share/schema/ob_part_mgr_util.h" namespace oceanbase { @@ -376,6 +377,16 @@ int ObLoadDataResolver::resolve(const ParseNode &parse_tree) } } + if (OB_SUCC(ret)) { + /*13. partition */ + const ParseNode *child_node = node->children_[ENUM_OPT_USE_PARTITION]; + if (OB_NOT_NULL(child_node)) { + if (OB_FAIL(resolve_partitions(*child_node, *load_stmt))) { + LOG_WARN("fail to resolve partition"); + } + } + } + if (OB_SUCC(ret)) { ObLoadArgument &load_args = load_stmt->get_load_arguments(); const ObDirectLoadHint &direct_load_hint = load_stmt->get_hints().get_direct_load_hint(); @@ -1649,6 +1660,56 @@ int ObLoadDataResolver::check_trigger_constraint(const ObTableSchema *table_sche return ret; } +int ObLoadDataResolver::resolve_partitions(const ParseNode &node, ObLoadDataStmt &load_stmt) +{ + int ret = OB_SUCCESS; + uint64_t table_id = load_stmt.get_load_arguments().table_id_; + const ObTableSchema *table_schema = nullptr; + if (OB_ISNULL(session_info_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session info is nullptr", KR(ret)); + } else if (OB_FAIL(schema_checker_->get_table_schema(session_info_->get_effective_tenant_id(), table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret)); + } + OB_ASSERT(1 == node.num_child_ && node.children_[0]->num_child_ > 0); + if (OB_SUCC(ret) && OB_NOT_NULL(node.children_[0]) && T_NAME_LIST == node.children_[0]->type_) { + const ParseNode *name_list = node.children_[0]; + ObString partition_name; + ObArray part_ids; + for (int i = 0; OB_SUCC(ret) && i < name_list->num_child_; i++) { + ObArray partition_ids; + partition_name.assign_ptr(name_list->children_[i]->str_value_, + static_cast(name_list->children_[i]->str_len_)); + //here just conver partition_name to its lowercase + ObCharset::casedn(CS_TYPE_UTF8MB4_GENERAL_CI, partition_name); + ObPartGetter part_getter(*table_schema); + if (T_USE_PARTITION == node.type_) { + if (OB_FAIL(part_getter.get_part_ids(partition_name, partition_ids))) { + LOG_WARN("fail to get part ids", K(ret), K(partition_name)); + if (OB_UNKNOWN_PARTITION == ret && lib::is_mysql_mode()) { + LOG_USER_ERROR(OB_UNKNOWN_PARTITION, partition_name.length(), partition_name.ptr(), + table_schema->get_table_name_str().length(), + table_schema->get_table_name_str().ptr()); + } + } + } else if (OB_FAIL(part_getter.get_subpart_ids(partition_name, partition_ids))) { + LOG_WARN("fail to get subpart ids", K(ret), K(partition_name)); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(append_array_no_dup(part_ids, partition_ids))) { + LOG_WARN("Push partition id error", K(ret)); + } + } + } // end of for + if (OB_SUCC(ret)) { + if (OB_FAIL(load_stmt.set_part_ids(part_ids))) { + LOG_WARN("fail to set partition ids", KR(ret)); + } + } + } + return ret; +} + int ObLoadDataResolver::check_collection_sql_type(const ObTableSchema *table_schema) { int ret = OB_SUCCESS; diff --git a/src/sql/resolver/cmd/ob_load_data_resolver.h b/src/sql/resolver/cmd/ob_load_data_resolver.h index d3a5f9bb6..471e049d9 100644 --- a/src/sql/resolver/cmd/ob_load_data_resolver.h +++ b/src/sql/resolver/cmd/ob_load_data_resolver.h @@ -60,6 +60,7 @@ public: int resolve_filename(ObLoadDataStmt *load_stmt, ParseNode *node); int local_infile_enabled(bool &enabled) const; + int resolve_partitions(const ParseNode &node, ObLoadDataStmt &load_stmt); int check_trigger_constraint(const ObTableSchema *table_schema); int check_collection_sql_type(const ObTableSchema *table_schema); @@ -81,6 +82,7 @@ private: ENUM_OPT_HINT, ENUM_OPT_EXTENDED_OPTIONS, ENUM_OPT_COMPRESSION, + ENUM_OPT_USE_PARTITION, ENUM_TOTAL_COUNT }; ObStmtScope current_scope_; diff --git a/src/sql/resolver/cmd/ob_load_data_stmt.cpp b/src/sql/resolver/cmd/ob_load_data_stmt.cpp index c273b5c92..52b7a7e94 100644 --- a/src/sql/resolver/cmd/ob_load_data_stmt.cpp +++ b/src/sql/resolver/cmd/ob_load_data_stmt.cpp @@ -92,6 +92,11 @@ ColumnItem* ObLoadDataStmt::get_column_item_by_idx(uint64_t column_id) { return tar_item; } +int ObLoadDataStmt::set_part_ids(common::ObIArray &part_ids) +{ + return part_ids_.assign(part_ids); +} + int ObLoadDataHint::get_value(IntHintItem item, int64_t &value) const { int ret = OB_SUCCESS; diff --git a/src/sql/resolver/cmd/ob_load_data_stmt.h b/src/sql/resolver/cmd/ob_load_data_stmt.h index 6dd5b8ab3..3e00ef84a 100644 --- a/src/sql/resolver/cmd/ob_load_data_stmt.h +++ b/src/sql/resolver/cmd/ob_load_data_stmt.h @@ -268,6 +268,8 @@ public: ObLoadDataHint &get_hints() { return hints_; } void set_default_table_columns() { is_default_table_columns_ = true; } bool get_default_table_columns() { return is_default_table_columns_; } + int set_part_ids(common::ObIArray &part_ids); + const common::ObIArray &get_part_ids() const { return part_ids_; } void set_optimizer_ctx(ObDirectLoadOptimizerCtx *optimizer_ctx) { optimizer_ctx_ = optimizer_ctx; } ObDirectLoadOptimizerCtx *get_optimizer_ctx() { return optimizer_ctx_; } TO_STRING_KV(N_STMT_TYPE, ((int)stmt_type_), @@ -277,7 +279,8 @@ public: K_(field_or_var_list), K_(assignments), K_(hints), - K_(is_default_table_columns)); + K_(is_default_table_columns), + K_(part_ids)); private: ObDirectLoadOptimizerCtx *optimizer_ctx_; @@ -288,6 +291,7 @@ private: ObAssignments assignments_; ObLoadDataHint hints_; bool is_default_table_columns_; + common::ObArray part_ids_; DISALLOW_COPY_AND_ASSIGN(ObLoadDataStmt); }; diff --git a/src/storage/direct_load/ob_direct_load_struct.h b/src/storage/direct_load/ob_direct_load_struct.h index 7404d68d6..5e25bd6ec 100644 --- a/src/storage/direct_load/ob_direct_load_struct.h +++ b/src/storage/direct_load/ob_direct_load_struct.h @@ -35,6 +35,9 @@ struct ObDirectLoadMode static bool is_type_valid(const Type type); static bool is_insert_overwrite(const Type type) { return INSERT_OVERWRITE == type; } + static bool is_load_data(const Type type) { return LOAD_DATA == type; } + static bool is_insert_into(const Type type) { return INSERT_INTO == type; } + static bool is_table_load(const Type type) { return TABLE_LOAD == type; } }; struct ObDirectLoadMethod diff --git a/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data1.csv b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data1.csv new file mode 100644 index 000000000..4164fe27d --- /dev/null +++ b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data1.csv @@ -0,0 +1,4 @@ +1|1|1 +11|11|11 +21|21|21 +31|31|31 \ No newline at end of file diff --git a/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data2.csv b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data2.csv new file mode 100644 index 000000000..350645de5 --- /dev/null +++ b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data2.csv @@ -0,0 +1,4 @@ +2|2|2 +12|12|12 +22|22|22 +32|32|32 \ No newline at end of file diff --git a/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data3.csv b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data3.csv new file mode 100644 index 000000000..7ccbc61fd --- /dev/null +++ b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data3.csv @@ -0,0 +1,3 @@ +3|3|3 +13|13|13 +23|23|23 \ No newline at end of file diff --git a/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data4.csv b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data4.csv new file mode 100644 index 000000000..6712c068e --- /dev/null +++ b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data4.csv @@ -0,0 +1,2 @@ +4|4|4 +34|34|34 \ No newline at end of file diff --git a/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data5.csv b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data5.csv new file mode 100644 index 000000000..69841cf90 --- /dev/null +++ b/tools/deploy/mysql_test/test_suite/direct_load_data/data/partition/partition_data5.csv @@ -0,0 +1,4 @@ +1|1|1 +12|12|12 +23|23|23 +34|34|34 \ No newline at end of file