From 995d01af9b61182abaa8d3dcc64b18830d49d00d Mon Sep 17 00:00:00 2001 From: suz-yang Date: Wed, 14 Aug 2024 09:18:38 +0000 Subject: [PATCH] direct load support specifying any number of columns and column order --- .../ob_table_direct_load_rpc_executor.cpp | 108 ++- .../ob_table_direct_load_rpc_executor.h | 4 +- .../ob_table_direct_load_rpc_struct.cpp | 3 +- .../client/ob_table_direct_load_rpc_struct.h | 4 +- .../ob_table_load_autoinc_nextval.cpp | 2 +- .../ob_table_load_client_service.cpp | 30 +- .../table_load/ob_table_load_client_service.h | 5 +- .../table_load/ob_table_load_client_task.cpp | 663 ++++++++++-------- .../table_load/ob_table_load_client_task.h | 110 +-- .../table_load/ob_table_load_coordinator.cpp | 15 +- .../ob_table_load_coordinator_ctx.cpp | 17 +- .../table_load/ob_table_load_exec_ctx.cpp | 23 +- .../table_load/ob_table_load_exec_ctx.h | 12 +- .../table_load/ob_table_load_instance.cpp | 8 +- .../ob_table_load_mem_compactor.cpp | 4 +- ...ble_load_multiple_heap_table_compactor.cpp | 4 +- .../table_load/ob_table_load_obj_cast.cpp | 59 +- .../ob_table_load_partition_calc.cpp | 10 +- .../table_load/ob_table_load_schema.cpp | 171 +++-- .../table_load/ob_table_load_schema.h | 23 +- .../table_load/ob_table_load_service.cpp | 81 ++- .../table_load/ob_table_load_service.h | 9 +- .../table_load/ob_table_load_store_ctx.cpp | 4 +- .../table_load/ob_table_load_table_ctx.cpp | 5 - .../ob_table_load_trans_bucket_writer.cpp | 61 +- .../ob_table_load_trans_bucket_writer.h | 1 + .../table_load/ob_table_load_trans_store.cpp | 40 +- src/share/table/ob_table_load_row.h | 19 +- .../engine/cmd/ob_load_data_direct_impl.cpp | 63 +- .../engine/cmd/ob_table_direct_insert_ctx.cpp | 69 +- .../engine/cmd/ob_table_direct_insert_ctx.h | 7 - .../data/misc/specify_column_0.csv | 3 + 32 files changed, 982 insertions(+), 655 deletions(-) create mode 100644 tools/deploy/mysql_test/test_suite/direct_load_data/data/misc/specify_column_0.csv diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index a9d17cd3e2..7562d3a28a 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -17,10 +17,7 @@ #include "observer/omt/ob_multi_tenant.h" #include "observer/table_load/ob_table_load_client_service.h" #include "observer/table_load/ob_table_load_client_task.h" -#include "observer/table_load/ob_table_load_exec_ctx.h" -#include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" -#include "sql/resolver/dml/ob_hint.h" namespace oceanbase { @@ -37,7 +34,7 @@ using namespace table; int ObTableDirectLoadBeginExecutor::check_args() { int ret = OB_SUCCESS; - if (OB_UNLIKELY(arg_.table_name_.empty() || arg_.parallel_ <= 0 || + if (OB_UNLIKELY(arg_.table_name_.empty() || arg_.parallel_ <= 0 || arg_.max_error_row_count_ < 0 || arg_.dup_action_ == ObLoadDupActionType::LOAD_INVALID_MODE || arg_.timeout_ <= 0)) { ret = OB_INVALID_ARGUMENT; @@ -64,8 +61,8 @@ int ObTableDirectLoadBeginExecutor::process() ObTableLoadClientTask *client_task = nullptr; if (OB_FAIL(ObTableLoadService::check_tenant())) { LOG_WARN("fail to check tenant", KR(ret)); - } else if (OB_FAIL(resolve_param(param))) { - LOG_WARN("fail to resolve param", KR(ret)); + } else if (OB_FAIL(init_param(param))) { + LOG_WARN("fail to init param", KR(ret)); } else if (OB_FAIL(ObTableLoadClientService::alloc_task(client_task))) { LOG_WARN("fail to alloc client task", KR(ret)); } else if (OB_FAIL(client_task->init(param))) { @@ -80,6 +77,7 @@ int ObTableDirectLoadBeginExecutor::process() ObTableLoadClientStatus client_status = ObTableLoadClientStatus::MAX_STATUS; int client_error_code = OB_SUCCESS; while (OB_SUCC(ret) && ObTableLoadClientStatus::RUNNING != client_status) { + client_task->heart_beat(); // 保持心跳 if (OB_UNLIKELY(THIS_WORKER.is_timeout())) { ret = OB_TIMEOUT; LOG_WARN("worker timeout", KR(ret)); @@ -94,7 +92,8 @@ int ObTableDirectLoadBeginExecutor::process() break; case ObTableLoadClientStatus::ERROR: case ObTableLoadClientStatus::ABORT: - ret = OB_SUCCESS == client_error_code ? OB_CANCELED : client_error_code; + ret = client_error_code; + LOG_WARN("client status error", KR(ret), K(client_status), K(client_error_code)); break; default: ret = OB_ERR_UNEXPECTED; @@ -107,8 +106,8 @@ int ObTableDirectLoadBeginExecutor::process() // fill res if (OB_SUCC(ret)) { - res_.table_id_ = client_task->param_.get_table_id(); - res_.task_id_ = client_task->task_id_; + res_.table_id_ = client_task->get_table_id(); + res_.task_id_ = client_task->get_task_id(); client_task->get_status(res_.status_, res_.error_code_); } if (nullptr != client_task) { @@ -118,64 +117,23 @@ int ObTableDirectLoadBeginExecutor::process() return ret; } -int ObTableDirectLoadBeginExecutor::resolve_param(ObTableLoadClientTaskParam ¶m) +int ObTableDirectLoadBeginExecutor::init_param(ObTableLoadClientTaskParam ¶m) { int ret = OB_SUCCESS; - const uint64_t tenant_id = ctx_.get_tenant_id(); - const uint64_t database_id = ctx_.get_database_id(); - uint64_t table_id = 0; - ObLoadDupActionType dup_action = arg_.dup_action_; - ObDirectLoadMethod::Type method = ObDirectLoadMethod::INVALID_METHOD; - ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE; param.reset(); - if (OB_FAIL(ObTableLoadSchema::get_table_id(tenant_id, database_id, arg_.table_name_, - table_id))) { - LOG_WARN("fail to get table id", KR(ret), K(tenant_id), K(database_id), K_(arg)); - } else if (arg_.load_method_.empty()) { - method = ObDirectLoadMethod::FULL; - insert_mode = ObDirectLoadInsertMode::NORMAL; - } else { - ObDirectLoadHint::LoadMethod load_method = ObDirectLoadHint::get_load_method_value(arg_.load_method_); - switch (load_method) { - case ObDirectLoadHint::FULL: - method = ObDirectLoadMethod::FULL; - insert_mode = ObDirectLoadInsertMode::NORMAL; - break; - case ObDirectLoadHint::INC: - method = ObDirectLoadMethod::INCREMENTAL; - insert_mode = ObDirectLoadInsertMode::NORMAL; - break; - case ObDirectLoadHint::INC_REPLACE: - if (OB_UNLIKELY(ObLoadDupActionType::LOAD_STOP_ON_DUP != dup_action)) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("replace or ignore for inc_replace load method not supported", KR(ret), - K(arg_.load_method_), K(arg_.dup_action_)); - } else { - dup_action = ObLoadDupActionType::LOAD_REPLACE; //rewrite dup action - method = ObDirectLoadMethod::INCREMENTAL; - insert_mode = ObDirectLoadInsertMode::INC_REPLACE; - } - break; - default: - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid load method", KR(ret), K(arg_.load_method_)); - break; - } - } - if (OB_SUCC(ret)) { - param.set_client_addr(ctx_.get_user_client_addr()); - param.set_tenant_id(tenant_id); - param.set_user_id(ctx_.get_user_id()); - param.set_database_id(database_id); - param.set_table_id(table_id); - param.set_parallel(arg_.parallel_); - param.set_max_error_row_count(arg_.max_error_row_count_); - param.set_dup_action(dup_action); - param.set_timeout_us(arg_.timeout_); - param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_); - param.set_method(method); - param.set_insert_mode(insert_mode); - } + OX(param.set_task_id(ObTableLoadClientService::generate_task_id())); + OX(param.set_client_addr(ctx_.get_user_client_addr())); + OX(param.set_tenant_id(ctx_.get_tenant_id())); + OX(param.set_user_id(ctx_.get_user_id())); + OX(param.set_database_id(ctx_.get_database_id())); + OZ(param.set_table_name(arg_.table_name_)); + OX(param.set_parallel(arg_.parallel_)); + OX(param.set_max_error_row_count(arg_.max_error_row_count_)); + OX(param.set_dup_action(arg_.dup_action_)); + OX(param.set_timeout_us(arg_.timeout_)); + OX(param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_)); + OZ(param.set_load_method(arg_.load_method_)); + OZ(param.set_column_names(arg_.column_names_)); return ret; } @@ -196,9 +154,23 @@ int ObTableDirectLoadCommitExecutor::process() int ret = OB_SUCCESS; LOG_INFO("table direct load commit", K_(arg)); ObTableLoadClientTask *client_task = nullptr; + ObTableLoadClientTaskBrief *client_task_brief = nullptr; ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { - LOG_WARN("fail to get client task", KR(ret), K(key)); + if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { + LOG_WARN("fail to get client task", KR(ret), K(key)); + } else { + // 处理重试场景 + ret = OB_SUCCESS; + if (OB_FAIL(ObTableLoadClientService::get_task_brief(key, client_task_brief))) { + LOG_WARN("fail to get client task brief", KR(ret), K(key)); + } else if (ObTableLoadClientStatus::COMMIT == client_task_brief->client_status_) { + LOG_INFO("client task is commit", KR(ret)); + } else { + ret = client_task_brief->error_code_; + LOG_WARN("client task is failed", KR(ret), KPC(client_task_brief)); + } + } } else if (OB_FAIL(client_task->commit())) { LOG_WARN("fail to commit client task", KR(ret), K(key)); } @@ -264,9 +236,7 @@ int ObTableDirectLoadGetStatusExecutor::process() } else { ret = OB_SUCCESS; if (OB_FAIL(ObTableLoadClientService::get_task_brief(key, client_task_brief))) { - if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { - LOG_WARN("fail to get client task brief", KR(ret), K(key)); - } + LOG_WARN("fail to get client task brief", KR(ret), K(key)); } else { res_.status_ = client_task_brief->client_status_; res_.error_code_ = client_task_brief->error_code_; @@ -374,7 +344,7 @@ int ObTableDirectLoadHeartBeatExecutor::process() if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { LOG_WARN("fail to get client task", KR(ret), K(key)); } else { - client_task->get_exec_ctx()->last_heartbeat_time_ = ObTimeUtil::current_time(); + client_task->heart_beat(); client_task->get_status(res_.status_, res_.error_code_); } if (nullptr != client_task) { diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h index a16f1d226d..c8c5b7f226 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h @@ -21,8 +21,6 @@ namespace oceanbase namespace observer { class ObTableLoadClientTaskParam; -class ObTableLoadClientTask; -class ObTableLoadTableCtx; template class ObTableDirectLoadRpcExecutor @@ -73,7 +71,7 @@ protected: int process() override; private: - int resolve_param(ObTableLoadClientTaskParam ¶m); + int init_param(ObTableLoadClientTaskParam ¶m); }; // commit diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp index b49179f48f..16cd609b1b 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp @@ -29,7 +29,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg, heartbeat_timeout_, force_create_, is_async_, - load_method_); + load_method_, + column_names_); OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes, table_id_, diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h index fb232ce763..9ac4539e5c 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h @@ -44,7 +44,8 @@ public: K_(heartbeat_timeout), K_(force_create), K_(is_async), - K_(load_method)); + K_(load_method), + K_(column_names)); public: ObString table_name_; int64_t parallel_; @@ -55,6 +56,7 @@ public: bool force_create_; // unused bool is_async_; ObString load_method_; + common::ObSArray column_names_; }; struct ObTableDirectLoadBeginRes diff --git a/src/observer/table_load/ob_table_load_autoinc_nextval.cpp b/src/observer/table_load/ob_table_load_autoinc_nextval.cpp index 2d5754dd75..ef5052babc 100644 --- a/src/observer/table_load/ob_table_load_autoinc_nextval.cpp +++ b/src/observer/table_load/ob_table_load_autoinc_nextval.cpp @@ -78,7 +78,7 @@ int ObTableLoadAutoincNextval::get_input_value(ObStorageDatum &datum, const uint64_t &sql_mode) { int ret = OB_SUCCESS; - if (datum.is_null()) { + if (datum.is_nop() || datum.is_null()) { is_to_generate = true; } else { bool is_zero = false; diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 4d2f09dcf5..5f06f27fda 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -40,7 +40,14 @@ bool ObTableLoadClientService::ClientTaskBriefEraseIfExpired::operator()( * ObTableLoadClientService */ -ObTableLoadClientService::ObTableLoadClientService() : next_task_id_(1), is_inited_(false) {} +int64_t ObTableLoadClientService::next_task_sequence_ = 0; + +int64_t ObTableLoadClientService::generate_task_id() +{ + return ObTimeUtil::current_time() * 1000 + ATOMIC_FAA(&next_task_sequence_, 1) % 1000; +} + +ObTableLoadClientService::ObTableLoadClientService() : is_inited_(false) {} ObTableLoadClientService::~ObTableLoadClientService() {} @@ -92,7 +99,6 @@ int ObTableLoadClientService::alloc_task(ObTableLoadClientTask *&client_task) ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadClientTask", KR(ret)); } else { - new_client_task->task_id_ = service->get_client_service().generate_task_id(); client_task = new_client_task; client_task->inc_ref_count(); } @@ -125,9 +131,7 @@ void ObTableLoadClientService::revert_task(ObTableLoadClientTask *client_task) const int64_t ref_count = client_task->dec_ref_count(); OB_ASSERT(ref_count >= 0); if (0 == ref_count) { - const int64_t task_id = client_task->task_id_; - const uint64_t table_id = client_task->param_.get_table_id(); - LOG_INFO("free client task", K(task_id), K(table_id), KP(client_task)); + LOG_INFO("free client task", KPC(client_task)); free_task(client_task); client_task = nullptr; } @@ -141,8 +145,11 @@ int ObTableLoadClientService::add_task(ObTableLoadClientTask *client_task) if (OB_ISNULL(service = MTL(ObTableLoadService *))) { ret = OB_ERR_SYS; LOG_WARN("null table load service", KR(ret)); + } else if (OB_ISNULL(client_task)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(client_task)); } else { - ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_); + ObTableLoadUniqueKey key(client_task->get_table_id(), client_task->get_task_id()); ret = service->get_client_service().add_client_task(key, client_task); } return ret; @@ -155,8 +162,11 @@ int ObTableLoadClientService::remove_task(ObTableLoadClientTask *client_task) if (OB_ISNULL(service = MTL(ObTableLoadService *))) { ret = OB_ERR_SYS; LOG_WARN("null table load service", KR(ret)); + } else if (OB_ISNULL(client_task)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(client_task)); } else { - ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_); + ObTableLoadUniqueKey key(client_task->get_table_id(), client_task->get_task_id()); ret = service->get_client_service().remove_client_task(key, client_task); } return ret; @@ -236,10 +246,10 @@ int ObTableLoadClientService::remove_client_task(const ObTableLoadUniqueKey &key if (OB_FAIL(client_task_brief_map_.create(key, client_task_brief))) { LOG_WARN("fail to create client task brief", KR(ret), K(key)); } else { - client_task_brief->task_id_ = client_task->task_id_; - client_task_brief->table_id_ = client_task->param_.get_table_id(); + client_task_brief->task_id_ = client_task->get_task_id(); + client_task_brief->table_id_ = client_task->get_table_id(); client_task->get_status(client_task_brief->client_status_, client_task_brief->error_code_); - client_task_brief->result_info_ = client_task->result_info_; + client_task_brief->result_info_ = client_task->get_result_info(); client_task_brief->active_time_ = ObTimeUtil::current_time(); } if (nullptr != client_task_brief) { diff --git a/src/observer/table_load/ob_table_load_client_service.h b/src/observer/table_load/ob_table_load_client_service.h index 65baf99a16..b26a3af6a4 100644 --- a/src/observer/table_load/ob_table_load_client_service.h +++ b/src/observer/table_load/ob_table_load_client_service.h @@ -70,11 +70,11 @@ public: return ObTableDirectLoadRpcProxy::dispatch(ctx, request, result); } -private: - OB_INLINE int64_t generate_task_id() { return ATOMIC_FAA(&next_task_id_, 1); } + static int64_t generate_task_id(); private: static const int64_t CLIENT_TASK_RETENTION_PERIOD = 24LL * 60 * 60 * 1000 * 1000; // 1day + static int64_t next_task_sequence_; // key => client_task typedef common::hash::ObHashMap @@ -116,7 +116,6 @@ private: mutable obsys::ObRWLock rwlock_; ClientTaskMap client_task_map_; ClientTaskBriefMap client_task_brief_map_; // thread safety - int64_t next_task_id_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index acbff18419..b64c80bf74 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -14,7 +14,6 @@ #include "observer/table_load/ob_table_load_client_task.h" #include "observer/ob_server.h" -#include "observer/omt/ob_tenant.h" #include "observer/table_load/ob_table_load_exec_ctx.h" #include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" @@ -39,18 +38,22 @@ using namespace table; */ ObTableLoadClientTaskParam::ObTableLoadClientTaskParam() - : tenant_id_(OB_INVALID_TENANT_ID), + : allocator_("TLD_CTask"), + task_id_(0), + tenant_id_(OB_INVALID_TENANT_ID), user_id_(OB_INVALID_ID), database_id_(OB_INVALID_ID), - table_id_(OB_INVALID_ID), + table_name_(), parallel_(0), max_error_row_count_(0), dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE), timeout_us_(0), heartbeat_timeout_us_(0), - method_(ObDirectLoadMethod::INVALID_METHOD), - insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE) + load_method_(), + column_names_() { + allocator_.set_tenant_id(MTL_ID()); + column_names_.set_block_allocator(ModulePageAllocator(allocator_)); } ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {} @@ -58,17 +61,19 @@ ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {} void ObTableLoadClientTaskParam::reset() { client_addr_.reset(); + task_id_ = 0; tenant_id_ = OB_INVALID_TENANT_ID; user_id_ = OB_INVALID_ID; database_id_ = OB_INVALID_ID; - table_id_ = OB_INVALID_ID; + table_name_.reset(); parallel_ = 0; max_error_row_count_ = 0; dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE; timeout_us_ = 0; heartbeat_timeout_us_ = 0; - method_ = ObDirectLoadMethod::INVALID_METHOD; - insert_mode_ = ObDirectLoadInsertMode::INVALID_INSERT_MODE; + load_method_.reset(); + column_names_.reset(); + allocator_.reset(); } int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other) @@ -77,39 +82,59 @@ int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other) if (this != &other) { reset(); client_addr_ = other.client_addr_; + task_id_ = other.task_id_; tenant_id_ = other.tenant_id_; user_id_ = other.user_id_; database_id_ = other.database_id_; - table_id_ = other.table_id_; parallel_ = other.parallel_; max_error_row_count_ = other.max_error_row_count_; dup_action_ = other.dup_action_; timeout_us_ = other.timeout_us_; heartbeat_timeout_us_ = other.heartbeat_timeout_us_; - method_ = other.method_; - insert_mode_ = other.insert_mode_; + if (OB_FAIL(set_table_name(other.table_name_))) { + LOG_WARN("fail to set table name", KR(ret)); + } else if (OB_FAIL(set_load_method(other.load_method_))) { + LOG_WARN("fail to set load method", KR(ret)); + } else if (OB_FAIL(set_column_names(other.column_names_))) { + LOG_WARN("fail to deep copy column names", KR(ret)); + } } return ret; } bool ObTableLoadClientTaskParam::is_valid() const { - return client_addr_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ && - OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && OB_INVALID_ID != table_id_ && - parallel_ > 0 && ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && - timeout_us_ > 0 && heartbeat_timeout_us_ > 0 && - ObDirectLoadMethod::is_type_valid(method_) && - ObDirectLoadInsertMode::is_type_valid(insert_mode_) && - (storage::ObDirectLoadMethod::is_full(method_) - ? storage::ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode_) - : true) && - (storage::ObDirectLoadMethod::is_incremental(method_) - ? storage::ObDirectLoadInsertMode::is_valid_for_incremental_method(insert_mode_) - : true) && - (storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_ - ? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_ - : true); + return client_addr_.is_valid() && task_id_ > 0 && OB_INVALID_TENANT_ID != tenant_id_ && + OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && !table_name_.empty() && + parallel_ > 0 && max_error_row_count_ >= 0 && + ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && timeout_us_ > 0 && + heartbeat_timeout_us_ > 0; +} +int ObTableLoadClientTaskParam::set_string(const ObString &src, ObString &dest) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ob_write_string(allocator_, src, dest))) { + LOG_WARN("fail to write string", KR(ret)); + } + return ret; +} + +int ObTableLoadClientTaskParam::set_string_array(const ObIArray &src, + ObIArray &dest) +{ + int ret = OB_SUCCESS; + dest.reset(); + for (int64_t i = 0; OB_SUCC(ret) && i < src.count(); ++i) { + const ObString &src_str = src.at(i); + ObString dest_str; + if (OB_FAIL(ob_write_string(allocator_, src_str, dest_str))) { + LOG_WARN("fail to write string", KR(ret)); + } else if (OB_FAIL(dest.push_back(dest_str))) { + LOG_WARN("fail to push back", KR(ret)); + } + } + return ret; } /** @@ -128,22 +153,40 @@ public: int process() override { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; ObSQLSessionInfo *origin_session_info = THIS_WORKER.get_session(); - ObSQLSessionInfo *session_info = client_task_->get_session_info(); + ObSQLSessionInfo *session_info = client_task_->session_info_; + ObSchemaGetterGuard &schema_guard = client_task_->schema_guard_; + ObTableLoadParam load_param; + ObArray column_ids; THIS_WORKER.set_session(session_info); if (OB_ISNULL(session_info)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null session info", KR(ret)); } else { - session_info->set_thread_id(GETTID()); + session_info->set_thread_id(get_tid_cache()); session_info->update_last_active_time(); - if (OB_FAIL(session_info->set_session_state(QUERY_ACTIVE))) { - LOG_WARN("fail to set session state", K(ret)); + if (OB_TMP_FAIL(session_info->set_session_state(QUERY_ACTIVE))) { + LOG_WARN("fail to set session state", KR(tmp_ret)); } } + // resolve + if (OB_FAIL( + resolve(client_task_->client_exec_ctx_, client_task_->param_, load_param, column_ids))) { + LOG_WARN("fail to resolve", KR(ret), K(client_task_->param_)); + } + // check support + else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard, + load_param.table_id_, + load_param.method_, + load_param.insert_mode_, + load_param.load_mode_, + column_ids))) { + LOG_WARN("fail to check support direct load", KR(ret)); + } // begin if (OB_SUCC(ret)) { - if (OB_FAIL(client_task_->init_instance())) { + if (OB_FAIL(client_task_->init_instance(load_param, column_ids))) { LOG_WARN("fail to init instance", KR(ret)); } else if (OB_FAIL(client_task_->set_status_waitting())) { LOG_WARN("fail to set status waitting", KR(ret)); @@ -173,6 +216,168 @@ public: } client_task_->destroy_instance(); THIS_WORKER.set_session(origin_session_info); + if (session_info != nullptr && OB_TMP_FAIL(session_info->set_session_state(SESSION_SLEEP))) { + LOG_WARN("fail to set session state", KR(tmp_ret)); + } + return ret; + } + + static int resolve(ObTableLoadClientExecCtx &client_exec_ctx, + const ObTableLoadClientTaskParam &task_param, ObTableLoadParam &load_param, + ObIArray &column_ids) + { + int ret = OB_SUCCESS; + const uint64_t tenant_id = task_param.get_tenant_id(); + const uint64_t database_id = task_param.get_database_id(); + ObSchemaGetterGuard *schema_guard = nullptr; + const ObTableSchema *table_schema = nullptr; + ObDirectLoadMethod::Type method = ObDirectLoadMethod::INVALID_METHOD; + ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE; + ObCompressorType compressor_type = INVALID_COMPRESSOR; + bool online_opt_stat_gather = false; + double online_sample_percent = 100.; + if (OB_ISNULL(schema_guard = client_exec_ctx.get_schema_guard())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected schema guard is null", KR(ret)); + } + // resolve table_name_ + else if (OB_FAIL(ObTableLoadSchema::get_table_schema( + *schema_guard, tenant_id, database_id, task_param.get_table_name(), table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id), + K(task_param.get_table_name())); + } + // resolve column_names_ + else if (OB_FAIL(resolve_columns(table_schema, task_param.get_column_names(), column_ids))) { + LOG_WARN("fail to resolve columns", KR(ret), K(task_param.get_column_names())); + } + // resolve load_method_ + else if (OB_FAIL(resolve_load_method(task_param.get_load_method(), method, insert_mode))) { + LOG_WARN("fail to resolve load method", KR(ret), K(task_param.get_load_method())); + } + // compress type + else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type( + table_schema->get_compressor_type(), task_param.get_parallel(), compressor_type))) { + LOG_WARN("fail to get tmp store compressor type", KR(ret)); + } + // opt stat gather + else if (OB_FAIL(ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load( + tenant_id, online_opt_stat_gather))) { + LOG_WARN("fail to get tenant optimizer gather stats on load", KR(ret), K(tenant_id)); + } else if (online_opt_stat_gather && OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent( + *(client_exec_ctx.exec_ctx_), tenant_id, + table_schema->get_table_id(), online_sample_percent))) { + LOG_WARN("failed to get sys online sample percent", K(ret)); + } + if (OB_SUCC(ret)) { + load_param.tenant_id_ = tenant_id; + load_param.table_id_ = table_schema->get_table_id(); + load_param.parallel_ = task_param.get_parallel(); + load_param.session_count_ = task_param.get_parallel(); + load_param.batch_size_ = 100; + load_param.max_error_row_count_ = task_param.get_max_error_row_count(); + load_param.column_count_ = column_ids.count(); + load_param.need_sort_ = true; + load_param.px_mode_ = false; + load_param.online_opt_stat_gather_ = online_opt_stat_gather; + load_param.dup_action_ = + // rewrite dup action to replace in inc_replace + (method == ObDirectLoadMethod::INCREMENTAL && + insert_mode == ObDirectLoadInsertMode::INC_REPLACE) + ? ObLoadDupActionType::LOAD_REPLACE + : task_param.get_dup_action(); + load_param.method_ = method; + load_param.insert_mode_ = insert_mode; + load_param.load_mode_ = ObDirectLoadMode::TABLE_LOAD; + load_param.compressor_type_ = compressor_type; + load_param.online_sample_percent_ = online_sample_percent; + } + return ret; + } + + static int resolve_columns(const ObTableSchema *table_schema, + const ObIArray &column_names, ObIArray &column_ids) + { + int ret = OB_SUCCESS; + column_ids.reset(); + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected table schema is null", KR(ret), KP(table_schema)); + } else if (column_names.empty()) { + if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, column_ids))) { + LOG_WARN("fail to get user column ids", KR(ret)); + } + } else { + const static uint64_t INVALID_COLUMN_ID = UINT64_MAX; + ObArray user_column_ids; + ObArray user_column_names; + if (OB_FAIL(ObTableLoadSchema::get_user_column_id_and_names(table_schema, + user_column_ids, + user_column_names))) { + LOG_WARN("fail to get user column id and names", KR(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_names.count(); ++i) { + const ObString &column_name = column_names.at(i); + if (OB_UNLIKELY(column_name.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("empty column name is invalid", KR(ret), K(i), K(column_names)); + } else { + int64_t found_column_idx = -1; + for (int64_t j = 0; found_column_idx == -1 && j < user_column_names.count(); ++j) { + const ObString &user_column_name = user_column_names.at(j); + if (column_name.length() != user_column_name.length()) { + } else if (column_name.case_compare(user_column_name) == 0) { + found_column_idx = j; + } + } + if (OB_UNLIKELY(found_column_idx == -1)) { + ret = OB_ERR_BAD_FIELD_ERROR; + LOG_WARN("unknow column", KR(ret), K(column_name), K(user_column_names)); + } else { + const uint64_t user_column_id = user_column_ids.at(found_column_idx); + if (OB_UNLIKELY(user_column_id == INVALID_COLUMN_ID)) { + ret = OB_ERR_FIELD_SPECIFIED_TWICE; + LOG_WARN("column specified twice", KR(ret), K(i), K(column_name), K(column_names)); + } else if (OB_FAIL(column_ids.push_back(user_column_id))) { + LOG_WARN("fail to push back column id", KR(ret)); + } else { + user_column_ids.at(found_column_idx) = INVALID_COLUMN_ID; + } + } + } + } + } + return ret; + } + + static int resolve_load_method(const ObString &load_method_str, ObDirectLoadMethod::Type &method, + ObDirectLoadInsertMode::Type &insert_mode) + { + int ret = OB_SUCCESS; + if (load_method_str.empty()) { + method = ObDirectLoadMethod::FULL; + insert_mode = ObDirectLoadInsertMode::NORMAL; + } else { + const ObDirectLoadHint::LoadMethod load_method = + ObDirectLoadHint::get_load_method_value(load_method_str); + switch (load_method) { + case ObDirectLoadHint::FULL: + method = ObDirectLoadMethod::FULL; + insert_mode = ObDirectLoadInsertMode::NORMAL; + break; + case ObDirectLoadHint::INC: + method = ObDirectLoadMethod::INCREMENTAL; + insert_mode = ObDirectLoadInsertMode::NORMAL; + break; + case ObDirectLoadHint::INC_REPLACE: + method = ObDirectLoadMethod::INCREMENTAL; + insert_mode = ObDirectLoadInsertMode::INC_REPLACE; + break; + default: + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid load method", KR(ret), K(load_method_str)); + break; + } + } return ret; } @@ -195,7 +400,7 @@ public: void callback(int ret_code, ObTableLoadTask *task) override { if (OB_UNLIKELY(OB_SUCCESS != ret_code)) { - client_task_->set_status_abort(ret_code); + client_task_->set_status_error(ret_code); } task->~ObTableLoadTask(); } @@ -209,12 +414,11 @@ private: */ ObTableLoadClientTask::ObTableLoadClientTask() - : task_id_(OB_INVALID_ID), - allocator_("TLD_ClientTask"), - task_scheduler_(nullptr), + : allocator_("TLD_ClientTask"), session_info_(nullptr), plan_ctx_(allocator_), exec_ctx_(allocator_), + task_scheduler_(nullptr), session_count_(0), next_batch_id_(0), client_status_(ObTableLoadClientStatus::MAX_STATUS), @@ -251,41 +455,33 @@ int ObTableLoadClientTask::init(const ObTableLoadClientTaskParam ¶m) } else if (OB_UNLIKELY(!param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param)); - } else if (OB_FAIL(param_.assign(param))) { - LOG_WARN("fail to assign param", KR(ret)); } else { - const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + param_.get_timeout_us()); - if (OB_FAIL(init_exec_ctx())) { - LOG_WARN("fail to init client exec ctx", KR(ret)); - } else if (OB_ISNULL(task_scheduler_ = - OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1, - param_.get_table_id(), "Client"))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); - } else if (OB_FAIL(task_scheduler_->init())) { + if (OB_FAIL(param_.assign(param))) { + LOG_WARN("fail to assign param", KR(ret), K(param)); + } else if (OB_FAIL(init_exec_ctx())) { + LOG_WARN("fail to init exec ctx", KR(ret)); + } else if (OB_FAIL(init_task_scheduler())) { LOG_WARN("fail to init task scheduler", KR(ret)); - } else if (OB_FAIL(task_scheduler_->start())) { - LOG_WARN("fail to start task scheduler", KR(ret)); } else { is_inited_ = true; } - THIS_WORKER.set_timeout_ts(origin_timeout_ts); } return ret; } -int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user_id, - uint64_t database_id, uint64_t table_id, - ObSQLSessionInfo *&session_info, - ObFreeSessionCtx &free_session_ctx) +int ObTableLoadClientTask::create_session_info() { int ret = OB_SUCCESS; - const schema::ObTenantSchema *tenant_info = nullptr; + const uint64_t tenant_id = param_.get_tenant_id(); + const uint64_t user_id = param_.get_user_id(); + const uint64_t database_id = param_.get_database_id(); + const ObTenantSchema *tenant_info = nullptr; const ObUserInfo *user_info = nullptr; const ObDatabaseSchema *database_schema = nullptr; - const ObTableSchema *table_schema = nullptr; - if (OB_FAIL(schema_guard_.get_tenant_info(tenant_id, tenant_info))) { + if (OB_NOT_NULL(session_info_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected session info not null", KR(ret)); + } else if (OB_FAIL(schema_guard_.get_tenant_info(tenant_id, tenant_info))) { LOG_WARN("get tenant info failed", K(ret)); } else if (OB_ISNULL(tenant_info)) { ret = OB_ERR_UNEXPECTED; @@ -300,43 +496,31 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user } else if (OB_ISNULL(database_schema)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid database schema", K(ret), K(tenant_id), K(database_id)); - } else if (OB_FAIL(schema_guard_.get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid table schema", K(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(ObTableLoadUtils::create_session_info(session_info, free_session_ctx))) { + } else if (OB_FAIL(ObTableLoadUtils::create_session_info(session_info_, free_session_ctx_))) { LOG_WARN("create session id failed", KR(ret)); } else { - ObArenaAllocator allocator("TLD_Tmp"); - allocator.set_tenant_id(MTL_ID()); - ObStringBuffer buffer(&allocator); - buffer.append("DIRECT LOAD: "); - buffer.append(table_schema->get_table_name()); + ObSqlString query_str; ObObj timeout_val; timeout_val.set_int(param_.get_timeout_us()); - OZ(session_info->load_default_sys_variable(false, false)); //加载默认的session参数 - OZ(session_info->load_default_configs_in_pc()); - OX(session_info->init_tenant(tenant_info->get_tenant_name(), tenant_id)); - OX(session_info->set_priv_user_id(user_id)); - OX(session_info->store_query_string(ObString(buffer.length(), buffer.ptr()))); - OX(session_info->set_user(user_info->get_user_name(), user_info->get_host_name_str(), - user_info->get_user_id())); - OX(session_info->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT)); - OX(session_info->set_default_database(database_schema->get_database_name(), - CS_TYPE_UTF8MB4_GENERAL_CI)); - OX(session_info->set_mysql_cmd(COM_QUERY)); - OX(session_info->set_current_trace_id(ObCurTraceId::get_trace_id())); - OX(session_info->set_client_addr(param_.get_client_addr())); - OX(session_info->set_peer_addr(ObServer::get_instance().get_self())); - OX(session_info->set_query_start_time(ObTimeUtil::current_time())); - OZ(session_info->update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, timeout_val)); - } - if (OB_FAIL(ret)) { - if (session_info != nullptr) { - observer::ObTableLoadUtils::free_session_info(session_info, free_session_ctx); - session_info = nullptr; - } + OX(query_str.assign_fmt("DIRECT LOAD: %.*s, task_id:%ld", + static_cast(param_.get_table_name().length()), + param_.get_table_name().ptr(), param_.get_task_id())); + OZ(session_info_->load_default_sys_variable(false /*print_info_log*/, false /*is_sys_tenant*/)); //加载默认的session参数 + OZ(session_info_->load_default_configs_in_pc()); + OX(session_info_->init_tenant(tenant_info->get_tenant_name(), tenant_id)); + OX(session_info_->set_priv_user_id(user_id)); + OX(session_info_->store_query_string(query_str.string())); + OX(session_info_->set_user(user_info->get_user_name(), user_info->get_host_name_str(), + user_info->get_user_id())); + OX(session_info_->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT)); + OX(session_info_->set_default_database(database_schema->get_database_name(), + CS_TYPE_UTF8MB4_GENERAL_CI)); + OX(session_info_->set_mysql_cmd(COM_QUERY)); + OX(session_info_->set_current_trace_id(ObCurTraceId::get_trace_id())); + OX(session_info_->set_client_addr(param_.get_client_addr())); + OX(session_info_->set_peer_addr(ObServer::get_instance().get_self())); + OX(session_info_->set_query_start_time(ObTimeUtil::current_time())); + OZ(session_info_->update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, timeout_val)); } return ret; } @@ -344,13 +528,9 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user int ObTableLoadClientTask::init_exec_ctx() { int ret = OB_SUCCESS; - if (OB_ISNULL(GCTX.schema_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid argument", K(ret), K(GCTX.schema_service_)); - } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(param_.get_tenant_id(), schema_guard_))) { + if (OB_FAIL(ObTableLoadSchema::get_schema_guard(param_.get_tenant_id(), schema_guard_))) { LOG_WARN("get_schema_guard failed", K(ret)); - } else if (OB_FAIL(create_session_info(param_.get_tenant_id(), param_.get_user_id(), - param_.get_database_id(), param_.get_table_id(), session_info_, free_session_ctx_))) { + } else if (OB_FAIL(create_session_info())) { LOG_WARN("fail to create session info", KR(ret)); } else { sql_ctx_.schema_guard_ = &schema_guard_; @@ -359,44 +539,55 @@ int ObTableLoadClientTask::init_exec_ctx() exec_ctx_.set_physical_plan_ctx(&plan_ctx_); exec_ctx_.set_my_session(session_info_); client_exec_ctx_.exec_ctx_ = &exec_ctx_; - client_exec_ctx_.last_heartbeat_time_ = ObTimeUtil::current_time(); - client_exec_ctx_.heartbeat_timeout_us_ = param_.get_heartbeat_timeout_us(); + client_exec_ctx_.init_heart_beat(param_.get_heartbeat_timeout_us()); } return ret; } +int ObTableLoadClientTask::init_task_scheduler() +{ + int ret = OB_SUCCESS; + const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + param_.get_timeout_us()); + if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1, + param_.get_task_id(), "Executor"))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); + } else if (OB_FAIL(task_scheduler_->init())) { + LOG_WARN("fail to init task scheduler", KR(ret)); + } else if (OB_FAIL(task_scheduler_->start())) { + LOG_WARN("fail to start task scheduler", KR(ret)); + } + THIS_WORKER.set_timeout_ts(origin_timeout_ts); + return ret; +} + int ObTableLoadClientTask::start() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + } else if (OB_FAIL(set_status_initializing())) { + LOG_WARN("fail to set status initializing", KR(ret)); } else { - obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(ObTableLoadClientStatus::MAX_STATUS != client_status_)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(client_status_)); - } else { - client_status_ = ObTableLoadClientStatus::INITIALIZING; - ObTableLoadTask *task = nullptr; - if (OB_ISNULL(task = OB_NEWx(ObTableLoadTask, &allocator_, param_.get_tenant_id()))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to new ObTableLoadTask", KR(ret)); - } else if (OB_FAIL(task->set_processor(this))) { - LOG_WARN("fail to set client task processor", KR(ret)); - } else if (OB_FAIL(task->set_callback(this))) { - LOG_WARN("fail to set common task callback", KR(ret)); - } else if (OB_FAIL(task_scheduler_->add_task(0, task))) { - LOG_WARN("fail to add task", KR(ret)); - } - if (OB_FAIL(ret)) { - client_status_ = ObTableLoadClientStatus::ERROR; - error_code_ = ret; - if (nullptr != task) { - task->~ObTableLoadTask(); - allocator_.free(task); - task = nullptr; - } + ObTableLoadTask *task = nullptr; + if (OB_ISNULL(task = OB_NEWx(ObTableLoadTask, &allocator_, param_.get_tenant_id()))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadTask", KR(ret)); + } else if (OB_FAIL(task->set_processor(this))) { + LOG_WARN("fail to set client task processor", KR(ret)); + } else if (OB_FAIL(task->set_callback(this))) { + LOG_WARN("fail to set common task callback", KR(ret)); + } else if (OB_FAIL(task_scheduler_->add_task(0, task))) { + LOG_WARN("fail to add task", KR(ret)); + } + if (OB_FAIL(ret)) { + set_status_error(ret); + if (nullptr != task) { + task->~ObTableLoadTask(); + allocator_.free(task); + task = nullptr; } } } @@ -417,7 +608,6 @@ int ObTableLoadClientTask::write(ObTableLoadObjRowArray &obj_rows) LOG_WARN("unexpected session count", KR(ret), K(session_count_)); } else { const int64_t batch_id = ATOMIC_FAA(&next_batch_id_, 1); - ; const int32_t session_id = batch_id % session_count_ + 1; ObTableLoadSequenceNo start_seq_no(batch_id << ObTableLoadSequenceNo::BATCH_ID_SHIFT); for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) { @@ -439,10 +629,15 @@ int ObTableLoadClientTask::commit() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadClientTask not init", KR(ret)); - } else if (OB_FAIL(check_status(ObTableLoadClientStatus::RUNNING))) { - LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(set_status_committing())) { - LOG_WARN("fail to set status committing", KR(ret)); + } else { + obsys::ObWLockGuard guard(rw_lock_); + if (ObTableLoadClientStatus::COMMITTING == client_status_ || + ObTableLoadClientStatus::COMMIT == client_status_) { + LOG_INFO("client task already commit", K(client_status_)); + } else { + ret = advance_status_nolock(ObTableLoadClientStatus::RUNNING, + ObTableLoadClientStatus::COMMITTING); + } } return ret; } @@ -460,57 +655,54 @@ void ObTableLoadClientTask::abort() } } } +void ObTableLoadClientTask::heart_beat() { client_exec_ctx_.heart_beat(); } + +int ObTableLoadClientTask::check_status() { return client_exec_ctx_.check_status(); } + +int ObTableLoadClientTask::advance_status_nolock(const ObTableLoadClientStatus expected, + const ObTableLoadClientStatus updated) +{ + int ret = OB_SUCCESS; + if (OB_LIKELY(client_status_ == expected)) { + client_status_ = updated; + LOG_INFO("LOAD DATA client status advance", K(client_status_)); + } else if (ObTableLoadClientStatus::ERROR == client_status_ || + ObTableLoadClientStatus::ABORT == client_status_) { + ret = error_code_; + LOG_WARN("client status error", KR(ret), K(client_status_), K(error_code_)); + } else { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected status", KR(ret), K(client_status_)); + } + return ret; +} + +int ObTableLoadClientTask::advance_status(const ObTableLoadClientStatus expected, + const ObTableLoadClientStatus updated) +{ + obsys::ObWLockGuard guard(rw_lock_); + + return advance_status_nolock(expected, updated); +} + +int ObTableLoadClientTask::set_status_initializing() +{ + return advance_status(ObTableLoadClientStatus::MAX_STATUS, ObTableLoadClientStatus::INITIALIZING); +} int ObTableLoadClientTask::set_status_waitting() { - int ret = OB_SUCCESS; - obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(ObTableLoadClientStatus::INITIALIZING != client_status_)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(client_status_)); - } else { - client_status_ = ObTableLoadClientStatus::WAITTING; - } - return ret; + return advance_status(ObTableLoadClientStatus::INITIALIZING, ObTableLoadClientStatus::WAITTING); } int ObTableLoadClientTask::set_status_running() { - int ret = OB_SUCCESS; - obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(ObTableLoadClientStatus::WAITTING != client_status_)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(client_status_)); - } else { - client_status_ = ObTableLoadClientStatus::RUNNING; - } - return ret; -} - -int ObTableLoadClientTask::set_status_committing() -{ - int ret = OB_SUCCESS; - obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(ObTableLoadClientStatus::RUNNING != client_status_)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(client_status_)); - } else { - client_status_ = ObTableLoadClientStatus::COMMITTING; - } - return ret; + return advance_status(ObTableLoadClientStatus::WAITTING, ObTableLoadClientStatus::RUNNING); } int ObTableLoadClientTask::set_status_commit() { - int ret = OB_SUCCESS; - obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(ObTableLoadClientStatus::COMMITTING != client_status_)) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("unexpected status", KR(ret), K(client_status_)); - } else { - client_status_ = ObTableLoadClientStatus::COMMIT; - } - return ret; + return advance_status(ObTableLoadClientStatus::COMMITTING, ObTableLoadClientStatus::COMMIT); } int ObTableLoadClientTask::set_status_error(int error_code) @@ -574,105 +766,22 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status, error_code = error_code_; } -int ObTableLoadClientTask::get_compressor_type(const uint64_t tenant_id, - const uint64_t table_id, - const int64_t parallel, - ObCompressorType &compressor_type) +int ObTableLoadClientTask::init_instance(ObTableLoadParam &load_param, + const ObIArray &column_ids) { int ret = OB_SUCCESS; - ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR; - if (OB_FAIL( - ObTableLoadSchema::get_table_compressor_type(tenant_id, table_id, table_compressor_type))) { - LOG_WARN("fail to get table compressor type", KR(ret)); - } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_compressor_type, parallel, - compressor_type))) { - LOG_WARN("fail to get tmp store compressor type", KR(ret)); - } - return ret; -} - - -int ObTableLoadClientTask::init_instance() -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + const ObTableLoadTableCtx *tmp_ctx = nullptr; + if (OB_FAIL(instance_.init(load_param, column_ids, &client_exec_ctx_))) { + LOG_WARN("fail to init instance", KR(ret)); + } else if (OB_FAIL(instance_.start_trans(trans_ctx_, ObTableLoadInstance::DEFAULT_SEGMENT_ID, + allocator_))) { + LOG_WARN("fail to start trans", KR(ret)); + } else if (OB_ISNULL(tmp_ctx = instance_.get_table_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get table ctx", KR(ret)); } else { - const uint64_t tenant_id = param_.get_tenant_id(); - const uint64_t table_id = param_.get_table_id(); - const ObDirectLoadMethod::Type method = param_.get_method(); - const ObDirectLoadInsertMode::Type insert_mode = param_.get_insert_mode(); - omt::ObTenant *tenant = nullptr; - ObSchemaGetterGuard schema_guard; - ObArray column_ids; - ObCompressorType compressor_type = INVALID_COMPRESSOR; - bool online_opt_stat_gather = false; - if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { - LOG_WARN("fail to get tenant handle", KR(ret), K(tenant_id)); - } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { - LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard, - table_id, - method, - insert_mode, - ObDirectLoadMode::TABLE_LOAD))) { - LOG_WARN("fail to check support direct load", KR(ret)); - } else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(schema_guard, - tenant_id, - table_id, - column_ids))) { - LOG_WARN("fail to get user column ids", KR(ret)); - } else if (OB_FAIL(get_compressor_type(tenant_id, table_id, session_count_, compressor_type))) { - LOG_WARN("fail to get compressor type", KR(ret)); - } else if (OB_FAIL(ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load( - tenant_id, online_opt_stat_gather))) { - LOG_WARN("fail to get tenant optimizer gather stats on load", KR(ret), K(tenant_id)); - } - - ObTableLoadParam load_param; - double online_sample_percent = 100.; - if (OB_SUCC(ret)) { - if (online_opt_stat_gather && - OB_FAIL(ObDbmsStatsUtils::get_sys_online_estimate_percent(exec_ctx_, - tenant_id, - table_id, - online_sample_percent))) { - LOG_WARN("failed to get sys online sample percent", K(ret)); - } else { - load_param.online_sample_percent_ = online_sample_percent; - } - } - - if (OB_SUCC(ret)) { - load_param.tenant_id_ = tenant_id; - load_param.table_id_ = table_id; - load_param.parallel_ = param_.get_parallel(); - load_param.session_count_ = load_param.parallel_; - load_param.batch_size_ = 100; - load_param.max_error_row_count_ = param_.get_max_error_row_count(); - load_param.column_count_ = column_ids.count(); - load_param.need_sort_ = true; - load_param.px_mode_ = false; - load_param.online_opt_stat_gather_ = online_opt_stat_gather; - load_param.dup_action_ = param_.get_dup_action(); - load_param.method_ = method; - load_param.insert_mode_ = insert_mode; - load_param.load_mode_ = ObDirectLoadMode::TABLE_LOAD; - load_param.compressor_type_ = compressor_type; - const ObTableLoadTableCtx *tmp_ctx = nullptr; - if (OB_FAIL(instance_.init(load_param, column_ids, &client_exec_ctx_))) { - LOG_WARN("fail to init instance", KR(ret)); - } else if (OB_FAIL(instance_.start_trans(trans_ctx_, ObTableLoadInstance::DEFAULT_SEGMENT_ID, allocator_))) { - LOG_WARN("fail to start trans", KR(ret)); - } else if (OB_ISNULL(tmp_ctx = instance_.get_table_ctx())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to get table ctx", KR(ret)); - } else { - session_count_ = tmp_ctx->param_.write_session_count_; - tmp_ctx = nullptr; - } - } + session_count_ = tmp_ctx->param_.write_session_count_; + tmp_ctx = nullptr; } return ret; } @@ -680,31 +789,17 @@ int ObTableLoadClientTask::init_instance() int ObTableLoadClientTask::commit_instance() { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + if (OB_FAIL(instance_.commit_trans(trans_ctx_))) { + LOG_WARN("fail to commit trans", KR(ret)); + } else if (OB_FAIL(instance_.commit())) { + LOG_WARN("fail to commit instance", KR(ret)); } else { - if (OB_FAIL(instance_.commit_trans(trans_ctx_))) { - LOG_WARN("fail to commit trans", KR(ret)); - } else if (OB_FAIL(instance_.commit())) { - LOG_WARN("fail to commit instance", KR(ret)); - } else { - result_info_ = instance_.get_result_info(); - } + result_info_ = instance_.get_result_info(); } return ret; } -void ObTableLoadClientTask::destroy_instance() -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadClientTask not init", KR(ret)); - } else { - instance_.destroy(); - } -} +void ObTableLoadClientTask::destroy_instance() { instance_.destroy(); } } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index cce401e314..e3ad693364 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -13,9 +13,9 @@ #pragma once #include "lib/hash/ob_link_hashmap.h" +#include "observer/table_load/ob_table_load_exec_ctx.h" #include "observer/table_load/ob_table_load_instance.h" #include "observer/table_load/ob_table_load_struct.h" -#include "observer/table_load/ob_table_load_exec_ctx.h" #include "share/table/ob_table_load_define.h" #include "share/table/ob_table_load_row_array.h" #include "sql/session/ob_sql_session_mgr.h" @@ -39,51 +39,68 @@ public: int assign(const ObTableLoadClientTaskParam &other); bool is_valid() const; -#define DEFINE_GETTER_AND_SETTER(type, name) \ +#define DEFINE_VAR_GETTER_AND_SETTER(type, name) \ OB_INLINE type get_##name() const { return name##_; } \ OB_INLINE void set_##name(type name) { name##_ = name; } - DEFINE_GETTER_AND_SETTER(ObAddr, client_addr); - DEFINE_GETTER_AND_SETTER(uint64_t, tenant_id); - DEFINE_GETTER_AND_SETTER(uint64_t, user_id); - DEFINE_GETTER_AND_SETTER(uint64_t, database_id); - DEFINE_GETTER_AND_SETTER(uint64_t, table_id); - DEFINE_GETTER_AND_SETTER(int64_t, parallel); - DEFINE_GETTER_AND_SETTER(uint64_t, max_error_row_count); - DEFINE_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action); - DEFINE_GETTER_AND_SETTER(uint64_t, timeout_us); - DEFINE_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us); - DEFINE_GETTER_AND_SETTER(storage::ObDirectLoadMethod::Type, method); - DEFINE_GETTER_AND_SETTER(storage::ObDirectLoadInsertMode::Type, insert_mode); +#define DEFINE_STR_GETTER_AND_SETTER(type, name) \ + OB_INLINE const type &get_##name() const { return name##_; } \ + OB_INLINE int set_##name(const type &name) { return set_string(name, name##_); } -#undef DEFINE_GETTER_AND_SETTER +#define DEFINE_STR_ARRAY_GETTER_AND_SETTER(type, name) \ + OB_INLINE const ObIArray &get_##name() const { return name##_; } \ + OB_INLINE int set_##name(const ObIArray &name) { return set_string_array(name, name##_); } + + DEFINE_VAR_GETTER_AND_SETTER(ObAddr, client_addr); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, task_id); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, tenant_id); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, user_id); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, database_id); + DEFINE_STR_GETTER_AND_SETTER(ObString, table_name); + DEFINE_VAR_GETTER_AND_SETTER(int64_t, parallel); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, max_error_row_count); + DEFINE_VAR_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, timeout_us); + DEFINE_VAR_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us); + DEFINE_STR_GETTER_AND_SETTER(ObString, load_method); + DEFINE_STR_ARRAY_GETTER_AND_SETTER(ObString, column_names); + +#undef DEFINE_VAR_GETTER_AND_SETTER +#undef DEFINE_STR_GETTER_AND_SETTER TO_STRING_KV(K_(client_addr), + K_(task_id), K_(tenant_id), K_(user_id), K_(database_id), - K_(table_id), + K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout_us), K_(heartbeat_timeout_us), - "method", storage::ObDirectLoadMethod::get_type_string(method_), - "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_)); + K_(load_method), + K_(column_names)); private: + int set_string(const ObString &src, ObString &dest); + int set_string_array(const ObIArray &src, ObIArray &dest); + +private: + ObArenaAllocator allocator_; + int64_t task_id_; ObAddr client_addr_; uint64_t tenant_id_; uint64_t user_id_; uint64_t database_id_; - uint64_t table_id_; + ObString table_name_; int64_t parallel_; uint64_t max_error_row_count_; sql::ObLoadDupActionType dup_action_; int64_t timeout_us_; int64_t heartbeat_timeout_us_; - storage::ObDirectLoadMethod::Type method_; - storage::ObDirectLoadInsertMode::Type insert_mode_; + ObString load_method_; + common::ObArray column_names_; }; class ObTableLoadClientTask @@ -99,55 +116,61 @@ public: OB_INLINE int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); } OB_INLINE int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); } OB_INLINE int64_t dec_ref_count() { return ATOMIC_SAF(&ref_count_, 1); } - OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; } - OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return &client_exec_ctx_; } + OB_INLINE int64_t get_task_id() const { return param_.get_task_id(); } + OB_INLINE uint64_t get_table_id() const { return 0; } + void heart_beat(); + int check_status(); + + int set_status_initializing(); int set_status_waitting(); int set_status_running(); - int set_status_committing(); int set_status_commit(); int set_status_error(int error_code); void set_status_abort(int error_code = OB_CANCELED); table::ObTableLoadClientStatus get_status() const; void get_status(table::ObTableLoadClientStatus &client_status, int &error_code) const; int check_status(table::ObTableLoadClientStatus client_status); - TO_STRING_KV(K_(task_id), K_(param), K_(result_info), KP_(session_info), K_(free_session_ctx), - K_(client_exec_ctx), KP_(task_scheduler), K_(client_status), K_(error_code), + + OB_INLINE const table::ObTableLoadResultInfo &get_result_info() const { return result_info_; } + TO_STRING_KV(K_(param), + KP_(session_info), + K_(free_session_ctx), + K_(client_exec_ctx), + KP_(task_scheduler), + K_(client_status), + K_(error_code), + K_(result_info), K_(ref_count)); private: - int init_task_scheduler(); - int create_session_info(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, - uint64_t table_id, sql::ObSQLSessionInfo *&session_info, - sql::ObFreeSessionCtx &free_session_ctx); + int create_session_info(); int init_exec_ctx(); + int init_task_scheduler(); - int init_instance(); + int advance_status_nolock(const table::ObTableLoadClientStatus expected, + const table::ObTableLoadClientStatus updated); + int advance_status(const table::ObTableLoadClientStatus expected, + const table::ObTableLoadClientStatus updated); + + int init_instance(ObTableLoadParam &load_param, const ObIArray &column_ids); int commit_instance(); void destroy_instance(); - int get_compressor_type(const uint64_t tenant_id, - const uint64_t table_id, - const int64_t parallel, - ObCompressorType &compressor_type); private: class ClientTaskExectueProcessor; class ClientTaskExectueCallback; -public: - uint64_t task_id_; - ObTableLoadClientTaskParam param_; - table::ObTableLoadResultInfo result_info_; - private: ObArenaAllocator allocator_; - ObITableLoadTaskScheduler *task_scheduler_; + ObTableLoadClientTaskParam param_; + share::schema::ObSchemaGetterGuard schema_guard_; sql::ObSQLSessionInfo *session_info_; sql::ObFreeSessionCtx free_session_ctx_; - ObTableLoadClientExecCtx client_exec_ctx_; - ObSchemaGetterGuard schema_guard_; sql::ObSqlCtx sql_ctx_; sql::ObPhysicalPlanCtx plan_ctx_; ObExecContext exec_ctx_; + ObTableLoadClientExecCtx client_exec_ctx_; + ObITableLoadTaskScheduler *task_scheduler_; int64_t session_count_; ObTableLoadInstance instance_; ObTableLoadInstance::TransCtx trans_ctx_; @@ -155,6 +178,7 @@ private: mutable obsys::ObRWLock rw_lock_; table::ObTableLoadClientStatus client_status_; int error_code_; + table::ObTableLoadResultInfo result_info_; int64_t ref_count_ CACHE_ALIGNED; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 816b1febd1..b1b16f0902 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -17,6 +17,7 @@ #include "observer/table_load/control/ob_table_load_control_rpc_proxy.h" #include "observer/table_load/ob_table_load_coordinator_ctx.h" #include "observer/table_load/ob_table_load_coordinator_trans.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_redef_table.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_stat.h" @@ -1117,7 +1118,7 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist "TLD_TabStatNode", tenant_id))) { LOG_WARN("fail to create table stats map", KR(ret)); - } else if (OB_FAIL(inc_column_stats.create(ctx_->param_.column_count_, + } else if (OB_FAIL(inc_column_stats.create(ctx_->schema_.store_column_count_, "TLD_ColStatBkt", "TLD_ColStatNode", tenant_id))) { @@ -1659,10 +1660,20 @@ public: int set_objs(const ObTableLoadObjRowArray &obj_rows, const ObIArray &idx_array) { int ret = OB_SUCCESS; + ObTableLoadErrorRowHandler *error_row_handler = ctx_->coordinator_ctx_->error_row_handler_; for (int64_t i = 0; OB_SUCC(ret) && (i < obj_rows.count()); ++i) { const ObTableLoadObjRow &src_obj_row = obj_rows.at(i); ObTableLoadObjRow out_obj_row; - if (OB_FAIL(src_obj_row.project(idx_array, out_obj_row))) { + // 对于客户端导入场景, 需要处理多列或者少列 + if (OB_UNLIKELY(src_obj_row.count_ != ctx_->param_.column_count_)) { + ret = OB_ERR_COULUMN_VALUE_NOT_MATCH; + LOG_WARN("column count doesn't match value count", KR(ret), K(src_obj_row), + K(ctx_->param_.column_count_)); + ObNewRow new_row(src_obj_row.cells_, src_obj_row.count_); + if (OB_FAIL(error_row_handler->handle_error_row(ret, new_row))) { + LOG_WARN("fail to handle error row", KR(ret)); + } + } else if (OB_FAIL(src_obj_row.project(idx_array, out_obj_row))) { LOG_WARN("failed to projecte out_obj_row", KR(ret), K(src_obj_row.count_)); } else if (OB_FAIL(obj_rows_.push_back(out_obj_row))) { LOG_WARN("failed to add row to obj_rows_", KR(ret), K(out_obj_row)); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index f7bb554161..eaaed98a97 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -353,10 +353,9 @@ int ObTableLoadCoordinatorCtx::init_column_idxs(const ObIArray &column int ret = OB_SUCCESS; idx_array_.reset(); const ObIArray &column_descs = ctx_->schema_.column_descs_; - bool found_column = true; - for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) { + for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) { const ObColDesc &col_desc = column_descs.at(i); - found_column = (ctx_->schema_.is_heap_table_ && i == 0); // skip hidden pk in heap table + bool found_column = (ctx_->schema_.is_heap_table_ && i == 0); // skip hidden pk in heap table // 在源数据的列数组中找到对应的列 for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < column_ids.count(); ++j) { const uint64_t column_id = column_ids.at(j); @@ -368,10 +367,14 @@ int ObTableLoadCoordinatorCtx::init_column_idxs(const ObIArray &column } } } - } - if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) { - ret = OB_SCHEMA_NOT_UPTODATE; - LOG_WARN("column not found", KR(ret), K(idx_array_), K(column_descs), K(column_ids)); + if (OB_SUCC(ret) && !found_column) { + if (OB_UNLIKELY(ctx_->param_.px_mode_)) { + ret = OB_SCHEMA_NOT_UPTODATE; + LOG_WARN("column not found", KR(ret), K(idx_array_), K(column_descs), K(column_ids)); + } else if (OB_FAIL(idx_array_.push_back(-1))) { + LOG_WARN("fail to push back column idx", KR(ret), K(idx_array_), K(i), K(col_desc)); + } + } } return ret; } diff --git a/src/observer/table_load/ob_table_load_exec_ctx.cpp b/src/observer/table_load/ob_table_load_exec_ctx.cpp index 2578d5ad79..33a7c09f8a 100644 --- a/src/observer/table_load/ob_table_load_exec_ctx.cpp +++ b/src/observer/table_load/ob_table_load_exec_ctx.cpp @@ -45,6 +45,15 @@ ObSQLSessionInfo *ObTableLoadExecCtx::get_session_info() return session_info; } +ObSchemaGetterGuard *ObTableLoadExecCtx::get_schema_guard() +{ + ObSchemaGetterGuard *schema_guard = nullptr; + if (nullptr != exec_ctx_ && nullptr != exec_ctx_->get_sql_ctx()) { + schema_guard = exec_ctx_->get_sql_ctx()->schema_guard_; + } + return schema_guard; +} + int ObTableLoadExecCtx::check_status() { int ret = OB_SUCCESS; @@ -74,12 +83,24 @@ int ObTableLoadClientExecCtx::check_status() int ret = OB_SUCCESS; if (OB_FAIL(ObTableLoadExecCtx::check_status())) { LOG_WARN("fail to check status", KR(ret)); - } else if (OB_UNLIKELY(ObTimeUtil::current_time() - last_heartbeat_time_ > heartbeat_timeout_us_)) { + } else if (OB_UNLIKELY(last_heartbeat_time_ + heartbeat_timeout_us_ < + ObTimeUtil::current_time())) { ret = OB_TIMEOUT; LOG_WARN("heartbeat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_)); } return ret; } +void ObTableLoadClientExecCtx::init_heart_beat(const int64_t heartbeat_timeout_us) +{ + heartbeat_timeout_us_ = heartbeat_timeout_us; + last_heartbeat_time_ = ObTimeUtil::current_time(); +} + +void ObTableLoadClientExecCtx::heart_beat() +{ + last_heartbeat_time_ = ObTimeUtil::current_time(); +} + } // namespace observer } // namespace oceanbase \ No newline at end of file diff --git a/src/observer/table_load/ob_table_load_exec_ctx.h b/src/observer/table_load/ob_table_load_exec_ctx.h index 1ec33edfad..bfb94e8a71 100644 --- a/src/observer/table_load/ob_table_load_exec_ctx.h +++ b/src/observer/table_load/ob_table_load_exec_ctx.h @@ -17,6 +17,13 @@ namespace oceanbase { +namespace share +{ +namespace schema +{ +class ObSchemaGetterGuard; +} // namespace schema +} // namespace share namespace sql { class ObExecContext; @@ -36,6 +43,7 @@ public: virtual ~ObTableLoadExecCtx() = default; common::ObIAllocator *get_allocator(); sql::ObSQLSessionInfo *get_session_info(); + share::schema::ObSchemaGetterGuard *get_schema_guard(); virtual int check_status(); bool is_valid() const { return nullptr != exec_ctx_; } TO_STRING_KV(KP_(exec_ctx), KP_(tx_desc)); @@ -54,8 +62,10 @@ public: } virtual ~ObTableLoadClientExecCtx() = default; virtual int check_status(); + void init_heart_beat(const int64_t heartbeat_timeout_us); + void heart_beat(); TO_STRING_KV(KP_(exec_ctx), KP_(tx_desc), K_(heartbeat_timeout_us), K_(last_heartbeat_time)); -public: +private: int64_t heartbeat_timeout_us_; int64_t last_heartbeat_time_; }; diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 94c08b4adb..b37a820b23 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -70,9 +70,10 @@ int ObTableLoadInstance::init(ObTableLoadParam ¶m, if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadInstance init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!param.is_valid() || !execute_ctx->is_valid())) { + } else if (OB_UNLIKELY(!param.is_valid() || column_ids.empty() || + column_ids.count() != param.column_count_ || !execute_ctx->is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(param), KPC(execute_ctx)); + LOG_WARN("invalid args", KR(ret), K(param), K(column_ids), KPC(execute_ctx)); } else { DISABLE_SQL_MEMLEAK_GUARD; execute_ctx_ = execute_ctx; @@ -92,7 +93,8 @@ int ObTableLoadInstance::init(ObTableLoadParam ¶m, else if (OB_FAIL(ObTableLoadService::check_support_direct_load(param.table_id_, param.method_, param.insert_mode_, - param.load_mode_))) { + param.load_mode_, + column_ids))) { LOG_WARN("fail to check support direct load", KR(ret), K(param)); } // start direct load diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index d426ab12c5..6337a03b7a 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -290,7 +290,9 @@ int ObTableLoadMemCompactor::inner_init() mem_ctx_.table_data_desc_ = store_ctx_->table_data_desc_; mem_ctx_.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_); mem_ctx_.need_sort_ = param_->need_sort_; - mem_ctx_.column_count_ = param_->column_count_; + mem_ctx_.column_count_ = (store_ctx_->ctx_->schema_.is_heap_table_ + ? store_ctx_->ctx_->schema_.store_column_count_ - 1 + : store_ctx_->ctx_->schema_.store_column_count_); } mem_ctx_.mem_load_task_count_ = param_->session_count_; diff --git a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp index 14f356afa3..8d5b8233c3 100644 --- a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp @@ -331,7 +331,9 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init() mem_ctx_.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_); mem_ctx_.need_sort_ = param_->need_sort_; mem_ctx_.mem_load_task_count_ = param_->session_count_; - mem_ctx_.column_count_ = param_->column_count_; + mem_ctx_.column_count_ = + (store_ctx_->ctx_->schema_.is_heap_table_ ? store_ctx_->ctx_->schema_.store_column_count_ - 1 + : store_ctx_->ctx_->schema_.store_column_count_); mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_; mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; mem_ctx_.dup_action_ = param_->dup_action_; diff --git a/src/observer/table_load/ob_table_load_obj_cast.cpp b/src/observer/table_load/ob_table_load_obj_cast.cpp index b32936dc07..834e9132a2 100644 --- a/src/observer/table_load/ob_table_load_obj_cast.cpp +++ b/src/observer/table_load/ob_table_load_obj_cast.cpp @@ -96,18 +96,49 @@ static int pad_obj(ObTableLoadCastObjCtx &cast_obj_ctx, const ObColumnSchemaV2 * } int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx, - const ObColumnSchemaV2 *column_schema, const ObObj &src, + const ObColumnSchemaV2 *column_schema, + const ObObj &src, ObObj &dst) { int ret = OB_SUCCESS; const ObObj *convert_src_obj = nullptr; const ObObjType expect_type = column_schema->get_meta_type().get_type(); const ObAccuracy &accuracy = column_schema->get_accuracy(); - if (OB_FAIL(convert_obj(expect_type, src, convert_src_obj))) { - LOG_WARN("fail to convert obj", KR(ret)); + if (src.is_nop_value()) { + // 默认值是表达式 + if (lib::is_mysql_mode() && column_schema->get_cur_default_value().is_ext()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("column default value is ext", KR(ret), KPC(column_schema)); + } else if (lib::is_oracle_mode() && column_schema->is_default_expr_v2_column()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("column default value is expr", KR(ret), KPC(column_schema)); + } + // 没有默认值, 且为NOT NULL + // 例外:枚举类型默认为第一个 + else if (column_schema->is_not_null_for_write() && + column_schema->get_cur_default_value().is_null()) { + if (column_schema->get_meta_type().is_enum()) { + const uint64_t ENUM_FIRST_VAL = 1; + dst.set_enum(ENUM_FIRST_VAL); + } else { + ret = OB_ERR_NO_DEFAULT_FOR_FIELD; + LOG_WARN("column can not be null", KR(ret), KPC(column_schema)); + } + } + // mysql模式可以直接用default value + else if (lib::is_mysql_mode()) { + dst = column_schema->get_cur_default_value(); + } + // oracle模式需要转换 + else { + convert_src_obj = &(column_schema->get_cur_default_value()); + } + } else { + if (OB_FAIL(convert_obj(expect_type, src, convert_src_obj))) { + LOG_WARN("fail to convert obj", KR(ret)); + } } - - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && convert_src_obj != nullptr) { if (column_schema->is_enum_or_set()) { if (OB_FAIL(handle_string_to_enum_set(cast_obj_ctx, column_schema, src, dst))) { LOG_WARN("fail to convert string to enum or set", KR(ret), K(src), K(dst)); @@ -117,18 +148,18 @@ int ObTableLoadObjCaster::cast_obj(ObTableLoadCastObjCtx &cast_obj_ctx, LOG_WARN("fail to do to type", KR(ret)); } } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(pad_obj(cast_obj_ctx, column_schema, dst))) { - LOG_WARN("fail to pad obj", KR(ret)); + if (OB_SUCC(ret)) { + if (OB_FAIL(pad_obj(cast_obj_ctx, column_schema, dst))) { + LOG_WARN("fail to pad obj", KR(ret)); + } } - } - if (OB_SUCC(ret)) { - if (cast_obj_ctx.is_need_check_ && - OB_FAIL(cast_obj_check(cast_obj_ctx, column_schema, dst))) { - LOG_WARN("fail to check cast obj result", KR(ret), K(dst)); + if (OB_SUCC(ret)) { + if (cast_obj_ctx.is_need_check_ && + OB_FAIL(cast_obj_check(cast_obj_ctx, column_schema, dst))) { + LOG_WARN("fail to check cast obj result", KR(ret), K(dst)); + } } } return ret; diff --git a/src/observer/table_load/ob_table_load_partition_calc.cpp b/src/observer/table_load/ob_table_load_partition_calc.cpp index 83573faa5e..5ce24bad80 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.cpp +++ b/src/observer/table_load/ob_table_load_partition_calc.cpp @@ -181,11 +181,15 @@ int ObTableLoadPartitionCalc::cast_part_key(common::ObNewRow &part_key, common:: ObObj obj; for (int64_t i = 0; OB_SUCC(ret) && i < part_key_obj_index_.count(); ++i) { const IndexAndType &index_and_type = part_key_obj_index_.at(i); - if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_, - part_key.cells_[i], obj))) { + const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_; + ObObj &part_obj = part_key.cells_[i]; + if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, + column_schema, + part_obj, + obj))) { LOG_WARN("fail to cast obj", KR(ret)); } else { - part_key.cells_[i] = obj; + part_obj = obj; } } } diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index 675f99b7d1..21b3bb57a2 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -40,17 +40,16 @@ int ObTableLoadSchema::get_schema_guard(uint64_t tenant_id, ObSchemaGetterGuard return ret; } -int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t database_id, +int ObTableLoadSchema::get_table_schema(ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t database_id, const ObString &table_name, - ObSchemaGetterGuard &schema_guard, const ObTableSchema *&table_schema) { int ret = OB_SUCCESS; table_schema = nullptr; - if (OB_FAIL(get_schema_guard(tenant_id, schema_guard))) { - LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); - } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, database_id, table_name, false, - table_schema))) { + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, database_id, table_name, false /*is_index*/, + table_schema))) { LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id), K(table_name)); } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; @@ -59,20 +58,6 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t database_id return ret; } -int ObTableLoadSchema::get_table_id(uint64_t tenant_id, uint64_t database_id, - const ObString &table_name, uint64_t &table_id) -{ - int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - if (OB_FAIL(get_table_schema(tenant_id, database_id, table_name, schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(database_id), K(table_name)); - } else { - table_id = table_schema->get_table_id(); - } - return ret; -} - int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id, ObSchemaGetterGuard &schema_guard, const ObTableSchema *&table_schema) @@ -112,6 +97,25 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id, return ret; } +int ObTableLoadSchema::get_table_schema(ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + const ObTableSchema *&table_schema) +{ + int ret = OB_SUCCESS; + table_schema = nullptr; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == table_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); + } + return ret; +} + int ObTableLoadSchema::get_user_column_schemas(const ObTableSchema *table_schema, ObIArray &column_schemas) { @@ -155,35 +159,82 @@ int ObTableLoadSchema::get_user_column_schemas(ObSchemaGetterGuard &schema_guard { int ret = OB_SUCCESS; const ObTableSchema *table_schema = nullptr; - if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == table_id)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); + if (OB_FAIL(get_table_schema(schema_guard, tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret)); } else { ret = get_user_column_schemas(table_schema, column_schemas); } return ret; } +int ObTableLoadSchema::get_user_column_ids(const ObTableSchema *table_schema, + ObIArray &column_ids) +{ + int ret = OB_SUCCESS; + column_ids.reset(); + ObArray column_schemas; + if (OB_FAIL(get_user_column_schemas(table_schema, column_schemas))) { + LOG_WARN("fail to get user column schemas", KR(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) { + const ObColumnSchemaV2 *column_schema = column_schemas.at(i); + if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) { + LOG_WARN("fail to push back column id", KR(ret)); + } + } + return ret; +} + int ObTableLoadSchema::get_user_column_ids(ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, ObIArray &column_ids) { int ret = OB_SUCCESS; - column_ids.reset(); + const ObTableSchema *table_schema = nullptr; + if (OB_FAIL(get_table_schema(schema_guard, tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret)); + } else { + ret = get_user_column_ids(table_schema, column_ids); + } + return ret; +} + +int ObTableLoadSchema::get_user_column_names(const ObTableSchema *table_schema, + ObIArray &column_names) +{ + int ret = OB_SUCCESS; + column_names.reset(); ObArray column_schemas; - if (OB_FAIL(get_user_column_schemas(schema_guard, tenant_id, table_id, column_schemas))) { + if (OB_FAIL(get_user_column_schemas(table_schema, column_schemas))) { + LOG_WARN("fail to get user column schemas", KR(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) { + const ObColumnSchemaV2 *column_schema = column_schemas.at(i); + if (OB_FAIL(column_names.push_back(column_schema->get_column_name_str()))) { + LOG_WARN("fail to push back column name", KR(ret)); + } + } + return ret; +} + +int ObTableLoadSchema::get_user_column_id_and_names(const ObTableSchema *table_schema, + ObIArray &column_ids, + ObIArray &column_names) +{ + int ret = OB_SUCCESS; + column_ids.reset(); + column_names.reset(); + ObArray column_schemas; + if (OB_FAIL(get_user_column_schemas(table_schema, column_schemas))) { LOG_WARN("fail to get user column schemas", KR(ret)); } for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) { const ObColumnSchemaV2 *column_schema = column_schemas.at(i); if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) { LOG_WARN("fail to push back column id", KR(ret)); + } else if (OB_FAIL(column_names.push_back(column_schema->get_column_name_str()))) { + LOG_WARN("fail to push back column name", KR(ret)); } } return ret; @@ -205,6 +256,31 @@ int ObTableLoadSchema::get_user_column_count(ObSchemaGetterGuard &schema_guard, return ret; } +int ObTableLoadSchema::get_column_ids(const ObTableSchema *table_schema, + ObIArray &column_ids, + bool contain_hidden_pk_column) +{ + int ret = OB_SUCCESS; + column_ids.reset(); + if (OB_ISNULL(table_schema)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(table_schema)); + } else { + ObArray column_descs; + if (OB_FAIL(table_schema->get_column_ids(column_descs))) { + STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) { + const ObColDesc &col_desc = column_descs.at(i); + if (ObColumnSchemaV2::is_hidden_pk_column_id(col_desc.col_id_) && !contain_hidden_pk_column) { + } else if (OB_FAIL(column_ids.push_back(col_desc.col_id_))) { + LOG_WARN("failed to push back column id", KR(ret), K(i)); + } + } + } + return ret; +} + int ObTableLoadSchema::get_column_ids(ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, @@ -218,26 +294,13 @@ int ObTableLoadSchema::get_column_ids(ObSchemaGetterGuard &schema_guard, LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id)); } else { const ObTableSchema *table_schema = nullptr; - ObArray column_descs; if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(table_schema->get_column_ids(column_descs))) { - STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) { - const ObColDesc &col_desc = column_descs.at(i); - const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_); - if (OB_ISNULL(col_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null column schema", KR(ret), K(col_desc)); - } else if (ObColumnSchemaV2::is_hidden_pk_column_id(col_schema->get_column_id()) && - !contain_hidden_pk_column) { - } else if (OB_FAIL(column_ids.push_back(col_schema->get_column_id()))) { - LOG_WARN("failed to push back column id", KR(ret), K(i)); - } + } else { + ret = get_column_ids(table_schema, column_ids, contain_hidden_pk_column); } } return ret; @@ -341,6 +404,24 @@ int ObTableLoadSchema::check_has_unused_column(const ObTableSchema *table_schema return ret; } +int ObTableLoadSchema::check_has_roaringbitmap_column(const ObTableSchema *table_schema, bool &bret) +{ + int ret = OB_SUCCESS; + bret = false; + for (ObTableSchema::const_column_iterator iter = table_schema->column_begin(); + OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) { + ObColumnSchemaV2 *column_schema = *iter; + if (OB_ISNULL(column_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid column schema", K(column_schema)); + } else if (column_schema->is_roaringbitmap()) { + bret = true; + break; + } + } + return ret; +} + int ObTableLoadSchema::check_has_lob_column(const ObTableSchema *table_schema, bool &bret) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index 86cd7ab105..6a06360791 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -29,29 +29,43 @@ class ObTableLoadSchema { public: static int get_schema_guard(uint64_t tenant_id, share::schema::ObSchemaGetterGuard &schema_guard); - static int get_table_schema(uint64_t tenant_id, uint64_t database_id, + static int get_table_schema(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t database_id, const common::ObString &table_name, - share::schema::ObSchemaGetterGuard &schema_guard, const share::schema::ObTableSchema *&table_schema); - static int get_table_id(uint64_t tenant_id, uint64_t database_id, - const common::ObString &table_name, uint64_t &table_id); + // 获取最新schema_guard和table_schema static int get_table_schema(uint64_t tenant_id, uint64_t table_id, share::schema::ObSchemaGetterGuard &schema_guard, const share::schema::ObTableSchema *&table_schema); + // 指定schema_guard获取table_schema + static int get_table_schema(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, uint64_t table_id, + const share::schema::ObTableSchema *&table_schema); static int get_user_column_schemas(const share::schema::ObTableSchema *table_schema, ObIArray &column_schemas); static int get_user_column_schemas(share::schema::ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, ObIArray &column_schemas); + static int get_user_column_ids(const share::schema::ObTableSchema *table_schema, + common::ObIArray &column_ids); static int get_user_column_ids(share::schema::ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, common::ObIArray &column_ids); + static int get_user_column_names(const share::schema::ObTableSchema *table_schema, + common::ObIArray &column_names); + static int get_user_column_id_and_names(const share::schema::ObTableSchema *table_schema, + common::ObIArray &column_ids, + common::ObIArray &column_names); static int get_user_column_count(share::schema::ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, int64_t &column_count); + static int get_column_ids(const share::schema::ObTableSchema *table_schema, + common::ObIArray &column_ids, + bool contain_hidden_pk_column = false); static int get_column_ids(share::schema::ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, uint64_t table_id, @@ -61,6 +75,7 @@ public: static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value); static int check_has_invisible_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int check_has_unused_column(const share::schema::ObTableSchema *table_schema, bool &bret); + static int check_has_roaringbitmap_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int check_has_lob_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int get_table_compressor_type(uint64_t tenant_id, uint64_t table_id, ObCompressorType &compressor_type); diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index e5542401ef..73f7b940c7 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -350,7 +350,7 @@ void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask() for (int64_t i = 0; i < client_task_array.count(); ++i) { ObTableLoadClientTask *client_task = client_task_array.at(i); if (OB_UNLIKELY(ObTableLoadClientStatus::ERROR == client_task->get_status() || - client_task->get_exec_ctx()->check_status() != OB_SUCCESS)) { + client_task->check_status() != OB_SUCCESS)) { client_task->abort(); } ObTableLoadClientService::revert_task(client_task); @@ -433,7 +433,8 @@ int ObTableLoadService::check_support_direct_load( const uint64_t table_id, const ObDirectLoadMethod::Type method, const ObDirectLoadInsertMode::Type insert_mode, - const storage::ObDirectLoadMode::Type load_mode) + const ObDirectLoadMode::Type load_mode, + const ObIArray &column_ids) { int ret = OB_SUCCESS; if (OB_UNLIKELY(OB_INVALID_ID == table_id)) { @@ -447,7 +448,7 @@ int ObTableLoadService::check_support_direct_load( ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); } else { - ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode); + ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode, column_ids); } } return ret; @@ -457,7 +458,8 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu uint64_t table_id, const ObDirectLoadMethod::Type method, const ObDirectLoadInsertMode::Type insert_mode, - const storage::ObDirectLoadMode::Type load_mode) + const ObDirectLoadMode::Type load_mode, + const ObIArray &column_ids) { int ret = OB_SUCCESS; if (OB_UNLIKELY(OB_INVALID_ID == table_id)) { @@ -472,7 +474,7 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu ret = OB_TABLE_NOT_EXIST; LOG_WARN("table schema is null", KR(ret)); } else { - ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode); + ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode, load_mode, column_ids); } } return ret; @@ -484,15 +486,17 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu const ObTableSchema *table_schema, const ObDirectLoadMethod::Type method, const ObDirectLoadInsertMode::Type insert_mode, - const storage::ObDirectLoadMode::Type load_mode) + const ObDirectLoadMode::Type load_mode, + const ObIArray &column_ids) { int ret = OB_SUCCESS; if (OB_UNLIKELY(nullptr == table_schema || !ObDirectLoadMethod::is_type_valid(method) || - !ObDirectLoadInsertMode::is_type_valid(insert_mode)) || - !ObDirectLoadMode::is_type_valid(load_mode)) { + !ObDirectLoadInsertMode::is_type_valid(insert_mode) || + !ObDirectLoadMode::is_type_valid(load_mode) || + column_ids.empty())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KP(table_schema), K(method), K(insert_mode)); + LOG_WARN("invalid args", KR(ret), KP(table_schema), K(method), K(insert_mode), K(column_ids)); } else { const uint64_t tenant_id = MTL_ID(); bool trigger_enabled = false; @@ -501,6 +505,7 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu bool has_multivalue_index = false; bool has_invisible_column = false; bool has_unused_column = false; + bool has_roaringbitmap_column = false; // check if it is a user table const char *tmp_prefix = ObDirectLoadMode::is_insert_overwrite(load_mode) ? InsertOverwritePrefix : EmptyPrefix; @@ -574,6 +579,14 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu LOG_WARN("direct-load does not support table has unused column", KR(ret)); FORWARD_USER_ERROR_MSG(ret, "%sdirect-load does not support table has unused column", tmp_prefix); } + // check has roaringbitmap column + else if (OB_FAIL(ObTableLoadSchema::check_has_roaringbitmap_column(table_schema, has_roaringbitmap_column))) { + LOG_WARN("fail to check has roaringbitmap column", KR(ret)); + } else if (has_roaringbitmap_column) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("direct-load does not support table has roaringbitmap column", KR(ret)); + FORWARD_USER_ERROR_MSG(ret, "%sdirect-load does not support table has roaringbitmap column", tmp_prefix); + } // check if table has mlog else if (table_schema->has_mlog_table()) { ret = OB_NOT_SUPPORTED; @@ -623,6 +636,56 @@ int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_gu } } } + // check default column + if (OB_SUCC(ret)) { + ObArray column_descs; + if (OB_FAIL(table_schema->get_column_ids(column_descs))) { + STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema)); + } else if (column_ids.count() == (table_schema->is_heap_table() ? column_descs.count() - 1 + : column_descs.count())) { + // non default column + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) { + const ObColDesc &col_desc = column_descs.at(i); + bool found_column = ObColumnSchemaV2::is_hidden_pk_column_id(col_desc.col_id_); + for (int64_t j = 0; !found_column && j < column_ids.count(); ++j) { + if (col_desc.col_id_ == column_ids.at(j)) { + found_column = true; + } + } + if (!found_column) { + const ObColumnSchemaV2 *column_schema = table_schema->get_column_schema(col_desc.col_id_); + if (OB_ISNULL(column_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null column schema", KR(ret), K(col_desc)); + } + // 自增列 + else if (column_schema->is_autoincrement() || column_schema->is_identity_column()) { + } + // 默认值是表达式 + else if (OB_UNLIKELY(lib::is_mysql_mode() && column_schema->get_cur_default_value().is_ext())) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("direct-load does not support column default value is ext", KR(ret), + KPC(column_schema), K(column_schema->get_cur_default_value())); + FORWARD_USER_ERROR_MSG(ret, "direct-load does not support column default value is ext"); + } else if (OB_UNLIKELY(lib::is_oracle_mode() && column_schema->is_default_expr_v2_column())) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("direct-load does not support column default value is expr", KR(ret), + KPC(column_schema), K(column_schema->get_cur_default_value())); + FORWARD_USER_ERROR_MSG(ret, "direct-load does not support column default value is expr"); + } + // 没有默认值, 且为NOT NULL + // 例外:枚举类型默认为第一个 + else if (OB_UNLIKELY(column_schema->is_not_null_for_write() && + column_schema->get_cur_default_value().is_null() && + !column_schema->get_meta_type().is_enum())) { + ret = OB_ERR_NO_DEFAULT_FOR_FIELD; + LOG_WARN("column doesn't have a default value", KR(ret), KPC(column_schema)); + } + } + } + } + } } return ret; } diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index 2e2c8655d9..aacc14db58 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -39,18 +39,21 @@ public: static int check_support_direct_load(uint64_t table_id, const storage::ObDirectLoadMethod::Type method, const storage::ObDirectLoadInsertMode::Type insert_mode, - const storage::ObDirectLoadMode::Type load_mode); + const storage::ObDirectLoadMode::Type load_mode, + const common::ObIArray &column_ids); // 业务层指定schema_guard进行检查 static int check_support_direct_load(share::schema::ObSchemaGetterGuard &schema_guard, uint64_t table_id, const storage::ObDirectLoadMethod::Type method, const storage::ObDirectLoadInsertMode::Type insert_mode, - const storage::ObDirectLoadMode::Type load_mode); + const storage::ObDirectLoadMode::Type load_mode, + const common::ObIArray &column_ids); static int check_support_direct_load(share::schema::ObSchemaGetterGuard &schema_guard, const share::schema::ObTableSchema *table_schema, const storage::ObDirectLoadMethod::Type method, const storage::ObDirectLoadInsertMode::Type insert_mode, - const storage::ObDirectLoadMode::Type load_mode); + const storage::ObDirectLoadMode::Type load_mode, + const common::ObIArray &column_ids); static ObTableLoadTableCtx *alloc_ctx(); static void free_ctx(ObTableLoadTableCtx *table_ctx); static int add_ctx(ObTableLoadTableCtx *table_ctx); diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 7ae20591e6..0a983b7869 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -103,7 +103,9 @@ int ObTableLoadStoreCtx::init( if (OB_SUCC(ret)) { table_data_desc_.rowkey_column_num_ = (!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0); - table_data_desc_.column_count_ = ctx_->param_.column_count_; + table_data_desc_.column_count_ = + (!ctx_->schema_.is_heap_table_ ? ctx_->schema_.store_column_count_ + : ctx_->schema_.store_column_count_ - 1); table_data_desc_.external_data_block_size_ = ObDirectLoadDataBlock::DEFAULT_DATA_BLOCK_SIZE; table_data_desc_.sstable_index_block_size_ = ObDirectLoadSSTableIndexBlock::DEFAULT_INDEX_BLOCK_SIZE; diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index 96a90540f5..3e677ea046 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -69,11 +69,6 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD if (OB_FAIL(schema_.init(param_.tenant_id_, param_.table_id_))) { LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_), K(param_.table_id_)); - } else if (OB_UNLIKELY(param.column_count_ != (schema_.is_heap_table_ - ? (schema_.store_column_count_ - 1) - : schema_.store_column_count_))) { - ret = OB_SCHEMA_NOT_UPTODATE; - LOG_WARN("unexpected column count", KR(ret), K(param.column_count_), K(schema_.store_column_count_), K(schema_.is_heap_table_)); } else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", param_.tenant_id_))) { LOG_WARN("fail to init allocator", KR(ret)); } else if (OB_FAIL(trans_ctx_allocator_.init("TLD_TCtxPool", param_.tenant_id_))) { diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 4960a58734..076b578799 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -67,6 +67,7 @@ ObTableLoadTransBucketWriter::ObTableLoadTransBucketWriter(ObTableLoadTransCtx * param_(trans_ctx_->ctx_->param_), allocator_("TLD_TBWriter"), is_partitioned_(false), + column_count_(0), cast_mode_(CM_NONE), session_ctx_array_(nullptr), ref_count_(0), @@ -98,7 +99,10 @@ int ObTableLoadTransBucketWriter::init() ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null coordinator ctx", KR(ret)); } else { - is_partitioned_ = coordinator_ctx_->ctx_->schema_.is_partitioned_table_; + const ObTableLoadSchema &schema = coordinator_ctx_->ctx_->schema_; + is_partitioned_ = schema.is_partitioned_table_; + column_count_ = + (!schema.is_heap_table_ ? schema.store_column_count_ : schema.store_column_count_ - 1); if (OB_FAIL(ObSQLUtils::get_default_cast_mode(coordinator_ctx_->ctx_->session_info_, cast_mode_))) { LOG_WARN("fail to get_default_cast_mode", KR(ret)); } else if (OB_FAIL(init_session_ctx_array())) { @@ -229,19 +233,27 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity( for (int64_t j = 0; OB_SUCC(ret) && j < row_count; ++j) { ObStorageDatum storage_datum; ObTableLoadObjRow &obj_row = obj_rows.at(j); - out_obj.set_null(); const ObTableLoadPartitionCalc::IndexAndType &index_and_type = coordinator_ctx_->partition_calc_.part_key_obj_index_.at( coordinator_ctx_->partition_calc_.partition_with_autoinc_idx_); const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_; const int64_t obj_index = index_and_type.index_; - if (OB_UNLIKELY(obj_index >= param_.column_count_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid length", KR(ret), K(obj_index), K(param_.column_count_)); - } else if (!obj_row.cells_[obj_index].is_null() && - OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_, - obj_row.cells_[obj_index], out_obj))) { + const ObObj &obj = obj_row.cells_[obj_index]; + if (OB_UNLIKELY(obj_row.count_ != column_count_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected column count not match", KR(ret), K(obj_row), K(column_count_)); + } else if (OB_UNLIKELY(obj_index < 0 || obj_index >= column_count_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected obj index", KR(ret), K(index_and_type), K(column_count_)); + } else if (obj.is_null() || obj.is_nop_value()) { + out_obj = obj; + } else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, + column_schema, + obj, + out_obj))) { LOG_WARN("fail to cast obj", KR(ret)); + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(storage_datum.from_obj_enhance(out_obj))) { LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj)); } else if (column_schema->is_autoincrement() && @@ -283,19 +295,22 @@ int ObTableLoadTransBucketWriter::handle_identity_column(const ObColumnSchemaV2 ObArenaAllocator &cast_allocator) { int ret = OB_SUCCESS; - if (column_schema->is_always_identity_column()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("direct-load does not support always identity column", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "direct-load does not support always identity column"); - } else if (column_schema->is_default_identity_column() && datum.is_null()) { - ret = OB_ERR_INVALID_NOT_NULL_CONSTRAINT_ON_IDENTITY_COLUMN; - LOG_WARN("default identity column has null value", KR(ret)); - } else if (column_schema->is_default_on_null_identity_column()) { + // 1. generated always as identity : 不能指定此列导入 + // 2. generated by default as identity : 不指定时自动生成, 不能导入null + // 3. generated by default on null as identity : 不指定或者指定null会自动生成 + if (OB_UNLIKELY(column_schema->is_always_identity_column() && !datum.is_nop())) { + ret = OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN; + LOG_USER_ERROR(OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN); + } else if (OB_UNLIKELY(column_schema->is_default_identity_column() && datum.is_null())) { + ret = OB_BAD_NULL_ERROR; + LOG_WARN("default identity column cannot insert null", KR(ret)); + } else if (datum.is_nop() || datum.is_null()) { ObSequenceValue seq_value; - if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(coordinator_ctx_->sequence_schema_, - cast_allocator, seq_value))) { + if (OB_FAIL(ObSequenceCache::get_instance().nextval(coordinator_ctx_->sequence_schema_, + cast_allocator, + seq_value))) { LOG_WARN("fail get nextval for seq", KR(ret)); - } else if (datum.is_null()) { + } else { datum.set_number(seq_value.val()); } } @@ -311,9 +326,9 @@ int ObTableLoadTransBucketWriter::write_for_non_partitioned(SessionContext &sess for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) { const ObTableLoadObjRow &row = obj_rows.at(i); bool need_write = false; - if (OB_UNLIKELY(row.count_ != param_.column_count_)) { + if (OB_UNLIKELY(row.count_ != column_count_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected column count not match", KR(ret), K(row.count_), K(param_.column_count_)); + LOG_WARN("unexpected column count not match", KR(ret), K(row), K(column_count_)); } else if (OB_FAIL(load_bucket->add_row(session_ctx.partition_id_.tablet_id_, row, param_.batch_size_, @@ -346,9 +361,9 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ ObNewRow part_key; part_key.count_ = part_key_obj_count; part_key.cells_ = static_cast(allocator.alloc(sizeof(ObObj) * part_key_obj_count)); - if (OB_UNLIKELY(row.count_ != param_.column_count_)) { + if (OB_UNLIKELY(row.count_ != column_count_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected column count not match", KR(ret), K(row.count_), K(param_.column_count_)); + LOG_WARN("unexpected column count not match", KR(ret), K(row), K(column_count_)); } else if (OB_ISNULL(part_key.cells_)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.h b/src/observer/table_load/ob_table_load_trans_bucket_writer.h index 52fb990214..86123cc14d 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.h +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.h @@ -70,6 +70,7 @@ private: const ObTableLoadParam ¶m_; common::ObArenaAllocator allocator_; bool is_partitioned_; + int64_t column_count_; common::ObCastMode cast_mode_; struct SessionContext { diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index c378854e3f..80ca5f24e9 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -439,20 +439,7 @@ int ObTableLoadTransStoreWriter::cast_column( (obj.is_null() || obj.is_nop_value()); ObObj out_obj; if (is_null_autoinc) { - out_obj.set_null(); - } else if (obj.is_nop_value()) { - if (column_schema->is_not_null_for_write() && - column_schema->get_cur_default_value().is_null()) { - if (column_schema->get_meta_type().is_enum()) { - const uint64_t ENUM_FIRST_VAL = 1; - out_obj.set_enum(ENUM_FIRST_VAL); - } else { - ret = OB_ERR_NO_DEFAULT_FOR_FIELD; - LOG_WARN("column can not be null", KR(ret), KPC(column_schema)); - } - } else { - out_obj = column_schema->get_cur_default_value(); - } + out_obj = obj; } else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, column_schema, obj, out_obj))) { LOG_WARN("fail to cast obj and check", KR(ret), K(obj)); } @@ -491,19 +478,22 @@ int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 * ObArenaAllocator &cast_allocator) { int ret = OB_SUCCESS; - if (column_schema->is_always_identity_column()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("direct-load does not support always identity column", KR(ret)); - FORWARD_USER_ERROR_MSG(ret, "direct-load does not support always identity column"); - } else if (column_schema->is_default_identity_column() && datum.is_null()) { - ret = OB_ERR_INVALID_NOT_NULL_CONSTRAINT_ON_IDENTITY_COLUMN; - LOG_WARN("default identity column has null value", KR(ret)); - } else if (column_schema->is_default_on_null_identity_column()) { + // 1. generated always as identity : 不能指定此列导入 + // 2. generated by default as identity : 不指定时自动生成, 不能导入null + // 3. generated by default on null as identity : 不指定或者指定null会自动生成 + if (OB_UNLIKELY(column_schema->is_always_identity_column() && !datum.is_nop())) { + ret = OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN; + LOG_USER_ERROR(OB_ERR_INSERT_INTO_GENERATED_ALWAYS_IDENTITY_COLUMN); + } else if (OB_UNLIKELY(column_schema->is_default_identity_column() && datum.is_null())) { + ret = OB_BAD_NULL_ERROR; + LOG_WARN("default identity column cannot insert null", KR(ret)); + } else if (datum.is_nop() || datum.is_null()) { ObSequenceValue seq_value; - if (OB_FAIL(share::ObSequenceCache::get_instance().nextval( - trans_ctx_->ctx_->store_ctx_->sequence_schema_, cast_allocator, seq_value))) { + if (OB_FAIL(ObSequenceCache::get_instance().nextval(trans_ctx_->ctx_->store_ctx_->sequence_schema_, + cast_allocator, + seq_value))) { LOG_WARN("fail get nextval for seq", KR(ret)); - } else if (datum.is_null()) { + } else { datum.set_number(seq_value.val()); } } diff --git a/src/share/table/ob_table_load_row.h b/src/share/table/ob_table_load_row.h index ddca73e1d9..b5b78a862c 100644 --- a/src/share/table/ob_table_load_row.h +++ b/src/share/table/ob_table_load_row.h @@ -152,19 +152,18 @@ template int ObTableLoadRow::project(const ObIArray &idx_projector, ObTableLoadRow &projected_row) const { int ret = OB_SUCCESS; - if (OB_UNLIKELY(idx_projector.count() != count_)) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "unexpected count", KR(ret), K(idx_projector), K(count_)); - } else if (OB_FAIL(projected_row.init(count_, allocator_handle_))) { - OB_LOG(WARN, "failed to alloate cells", KR(ret), K(projected_row.count_)); + if (OB_FAIL(projected_row.init(idx_projector.count(), allocator_handle_))) { + OB_LOG(WARN, "failed to alloate cells", KR(ret), K(idx_projector.count())); } else { - for (int64_t j = 0; j < count_; ++j) { - const int64_t idx = idx_projector.at(j); - if (OB_UNLIKELY(idx < 0 || idx >= count_)) { + for (int64_t i = 0; i < idx_projector.count(); ++i) { + const int64_t idx = idx_projector.at(i); + if (idx < 0) { + projected_row.cells_[i].set_nop_value(); + } else if (OB_UNLIKELY(idx >= count_)) { ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "unexpected idx", KR(ret), K(j), K(idx), K(idx_projector)); + OB_LOG(WARN, "unexpected idx", KR(ret), K(i), K(idx), K(idx_projector), K(count_)); } else { - projected_row.cells_[j] = cells_[idx]; + projected_row.cells_[i] = cells_[idx]; } } projected_row.seq_no_ = seq_no_; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 751c56999f..2b91955349 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -2218,7 +2218,8 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) execute_param_.table_id_, execute_param_.method_, execute_param_.insert_mode_, - ObDirectLoadMode::LOAD_DATA))) { + ObDirectLoadMode::LOAD_DATA, + execute_param_.column_ids_))) { LOG_WARN("fail to check support direct load", KR(ret)); } else if (OB_FAIL(init_execute_context())) { LOG_WARN("fail to init execute context", KR(ret), K(ctx), K(load_stmt)); @@ -2425,31 +2426,55 @@ int ObLoadDataDirectImpl::init_execute_param() ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_; int64_t column_count = 0; execute_param_.column_ids_.reset(); - if (is_backup) { + if (is_backup) { // 备份数据导入 if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard, - execute_param_.tenant_id_, - execute_param_.table_id_, - execute_param_.column_ids_))) { - LOG_WARN("fail to get column ids for backup", KR(ret)); + execute_param_.tenant_id_, + execute_param_.table_id_, + execute_param_.column_ids_))) { + LOG_WARN("fail to get column ids for backup", KR(ret)); } - } else { - if (OB_FAIL(ObTableLoadSchema::get_user_column_count(*schema_guard, - execute_param_.tenant_id_, - execute_param_.table_id_, - column_count))) { - LOG_WARN("fail to get user column count", KR(ret)); - } else if (OB_UNLIKELY(column_count != field_or_var_list.count())) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not contain all columns is not supported", KR(ret), K(column_count), - K(field_or_var_list)); + } else if (load_stmt_->get_default_table_columns()) { // 默认列导入 + if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(*schema_guard, + execute_param_.tenant_id_, + execute_param_.table_id_, + execute_param_.column_ids_))) { + LOG_WARN("fail to get user column ids", KR(ret)); + } + } else { // 指定列导入 + const static uint64_t INVALID_COLUMN_ID = UINT64_MAX; + const ObTableSchema *table_schema = nullptr; + ObArray user_column_ids; + if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard, + execute_param_.tenant_id_, + execute_param_.table_id_, + table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(execute_param_)); + } else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, user_column_ids))) { + LOG_WARN("fail to get user column ids", KR(ret)); } for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) { const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(i); if (OB_UNLIKELY(!field_or_var_struct.is_table_column_)) { ret = OB_NOT_SUPPORTED; - LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i), K(field_or_var_list)); - } else if (OB_FAIL(execute_param_.column_ids_.push_back(field_or_var_struct.column_id_))) { - LOG_WARN("fail to push back column id", KR(ret)); + LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i), + K(field_or_var_list)); + } else { + const uint64_t column_id = field_or_var_struct.column_id_; + int64_t found_column_idx = -1; + for (int64_t j = 0; found_column_idx == -1 && j < user_column_ids.count(); ++j) { + const uint64_t user_column_id = user_column_ids.at(j); + if (column_id == user_column_id) { + found_column_idx = j; + } + } + if (OB_UNLIKELY(found_column_idx == -1)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unknow column", KR(ret), K(user_column_ids), K(field_or_var_struct)); + } else if (OB_FAIL(execute_param_.column_ids_.push_back(column_id))) { + LOG_WARN("fail to push back column id", KR(ret)); + } else { + user_column_ids.at(found_column_idx) = INVALID_COLUMN_ID; + } } } } diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index a32fcb0c02..0d10ceeb00 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -79,8 +79,8 @@ int ObTableDirectInsertCtx::init( LOG_WARN("fail to new ObTableLoadInstance", KR(ret)); } else { load_exec_ctx_->exec_ctx_ = exec_ctx; + const ObTableSchema *table_schema = nullptr; ObArray column_ids; - omt::ObTenant *tenant = nullptr; ObCompressorType compressor_type = ObCompressorType::NONE_COMPRESSOR; ObDirectLoadMethod::Type method = (is_incremental ? ObDirectLoadMethod::INCREMENTAL : ObDirectLoadMethod::FULL); ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE; @@ -93,23 +93,21 @@ int ObTableDirectInsertCtx::init( } ObDirectLoadMode::Type load_mode = is_insert_overwrite ? ObDirectLoadMode::INSERT_OVERWRITE : ObDirectLoadMode::INSERT_INTO; bool is_heap_table = false; - if (OB_FAIL(GCTX.omt_->get_tenant(MTL_ID(), tenant))) { - LOG_WARN("fail to get tenant handle", KR(ret), K(MTL_ID())); + if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard, tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret)); + } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema->get_compressor_type(), + parallel, + compressor_type))) { + LOG_WARN("fail to get tmp store compressor type", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_column_ids(table_schema, column_ids))) { + LOG_WARN("failed to init store column idxs", KR(ret)); } else if (OB_FAIL(ObTableLoadService::check_support_direct_load(*schema_guard, - table_id, + table_schema, method, insert_mode, - load_mode))) { + load_mode, + column_ids))) { LOG_WARN("fail to check support direct load", KR(ret)); - } else if (OB_FAIL(get_compressor_type(MTL_ID(), table_id, parallel, compressor_type))) { - LOG_WARN("fail to get compressor type", KR(ret)); - } else if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard, - tenant_id, - table_id, - column_ids))) { - LOG_WARN("failed to init store column idxs", KR(ret)); - } else if(OB_FAIL(get_is_heap_table(*schema_guard, tenant_id, table_id, is_heap_table))) { - LOG_WARN("failed to get is heap table", KR(ret), K(tenant_id), K(table_id)); } else { ObTableLoadParam param; param.tenant_id_ = MTL_ID(); @@ -120,7 +118,7 @@ int ObTableDirectInsertCtx::init( param.column_count_ = column_ids.count(); param.px_mode_ = true; param.online_opt_stat_gather_ = is_online_gather_statistics_; - param.need_sort_ = is_heap_table ? phy_plan.get_direct_load_need_sort() : true; + param.need_sort_ = table_schema->is_heap_table() ? phy_plan.get_direct_load_need_sort() : true; param.max_error_row_count_ = 0; param.dup_action_ = (enable_inc_replace ? sql::ObLoadDupActionType::LOAD_REPLACE : sql::ObLoadDupActionType::LOAD_STOP_ON_DUP); @@ -185,46 +183,5 @@ void ObTableDirectInsertCtx::destroy() is_online_gather_statistics_ = false; } -int ObTableDirectInsertCtx::get_compressor_type(const uint64_t tenant_id, - const uint64_t table_id, - const int64_t parallel, - ObCompressorType &compressor_type) -{ - int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, - schema_guard))) { - LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); - } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema->get_compressor_type(), - parallel, compressor_type))) { - LOG_WARN("fail to get tmp store compressor type", KR(ret)); - } - return ret; -} - -int ObTableDirectInsertCtx::get_is_heap_table( - ObSchemaGetterGuard &schema_guard, - const uint64_t tenant_id, - const uint64_t table_id, - bool &is_heap_table) -{ - int ret = OB_SUCCESS; - const ObTableSchema *table_schema = nullptr; - if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("table schema is null", KR(ret)); - } else { - is_heap_table = table_schema->is_heap_table(); - } - return ret; -} } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h index d1f24912fd..269ff3628e 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h @@ -70,13 +70,6 @@ public: return online_sample_percent_; } -private: - int get_compressor_type(const uint64_t tenant_id, const uint64_t table_id, const int64_t parallel, - ObCompressorType &compressor_type); - int get_is_heap_table(share::schema::ObSchemaGetterGuard &schema_guard, - const uint64_t tenant_id, - const uint64_t table_id, - bool &is_heap_table); private: observer::ObTableLoadExecCtx *load_exec_ctx_; observer::ObTableLoadInstance *table_load_instance_; diff --git a/tools/deploy/mysql_test/test_suite/direct_load_data/data/misc/specify_column_0.csv b/tools/deploy/mysql_test/test_suite/direct_load_data/data/misc/specify_column_0.csv new file mode 100644 index 0000000000..f72941a1ef --- /dev/null +++ b/tools/deploy/mysql_test/test_suite/direct_load_data/data/misc/specify_column_0.csv @@ -0,0 +1,3 @@ +3,1,2 +33,11,22 +333,111,222 \ No newline at end of file