diff --git a/src/observer/table/ob_table_direct_load_processor.cpp b/src/observer/table/ob_table_direct_load_processor.cpp index 5f6ad61a53..631030ab8e 100644 --- a/src/observer/table/ob_table_direct_load_processor.cpp +++ b/src/observer/table/ob_table_direct_load_processor.cpp @@ -45,6 +45,7 @@ int ObTableDirectLoadP::try_process() exec_ctx_.set_tenant_id(credential_.tenant_id_); exec_ctx_.set_user_id(credential_.user_id_); exec_ctx_.set_database_id(credential_.database_id_); + exec_ctx_.set_user_client_addr(user_client_addr_); if (OB_FAIL(ObTableLoadClientService::direct_load_operate(exec_ctx_, arg_, result_))) { LOG_WARN("fail to do direct load operate", KR(ret), K(arg_)); } diff --git a/src/observer/table_load/client/ob_table_direct_load_exec_context.h b/src/observer/table_load/client/ob_table_direct_load_exec_context.h index abaf3fd4c3..b7bdb107cd 100644 --- a/src/observer/table_load/client/ob_table_direct_load_exec_context.h +++ b/src/observer/table_load/client/ob_table_direct_load_exec_context.h @@ -13,6 +13,7 @@ #pragma once #include "lib/allocator/ob_allocator.h" +#include "lib/net/ob_addr.h" namespace oceanbase { @@ -34,6 +35,11 @@ public: uint64_t get_user_id() const { return user_id_; } void set_database_id(uint64_t database_id) { database_id_ = database_id; } uint64_t get_database_id() const { return database_id_; } + void set_user_client_addr(const ObAddr &user_client_addr) + { + user_client_addr_ = user_client_addr; + } + const ObAddr get_user_client_addr() const { return user_client_addr_; } private: DISALLOW_COPY_AND_ASSIGN(ObTableDirectLoadExecContext); private: @@ -41,6 +47,7 @@ private: uint64_t tenant_id_; uint64_t user_id_; uint64_t database_id_; + ObAddr user_client_addr_; }; } // namespace observer diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index cec0fcc986..8b2f3168ad 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -15,14 +15,11 @@ #include "ob_table_direct_load_rpc_executor.h" #include "observer/ob_server.h" #include "observer/omt/ob_multi_tenant.h" -#include "observer/omt/ob_tenant.h" #include "observer/table_load/ob_table_load_client_service.h" #include "observer/table_load/ob_table_load_client_task.h" -#include "observer/table_load/ob_table_load_coordinator.h" -#include "observer/table_load/ob_table_load_redef_table.h" +#include "observer/table_load/ob_table_load_exec_ctx.h" #include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" -#include "observer/table_load/ob_table_load_table_ctx.h" namespace oceanbase { @@ -35,24 +32,6 @@ using namespace sql; using namespace table; // begin -ObTableDirectLoadBeginExecutor::ObTableDirectLoadBeginExecutor( - ObTableDirectLoadExecContext &ctx, const ObTableDirectLoadRequest &request, - ObTableDirectLoadResult &result) - : ParentType(ctx, request, result), client_task_(nullptr), table_ctx_(nullptr) -{ -} - -ObTableDirectLoadBeginExecutor::~ObTableDirectLoadBeginExecutor() -{ - if (nullptr != client_task_) { - ObTableLoadClientService::revert_task(client_task_); - client_task_ = nullptr; - } - if (nullptr != table_ctx_) { - ObTableLoadService::put_ctx(table_ctx_); - table_ctx_ = nullptr; - } -} int ObTableDirectLoadBeginExecutor::check_args() { @@ -81,231 +60,75 @@ int ObTableDirectLoadBeginExecutor::process() int ret = OB_SUCCESS; LOG_INFO("table direct load begin", K_(arg)); const uint64_t tenant_id = ctx_.get_tenant_id(); - const uint64_t user_id = ctx_.get_user_id(); const uint64_t database_id = ctx_.get_database_id(); uint64_t table_id = 0; - - THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + arg_.timeout_); + ObTableLoadClientTask *client_task = nullptr; if (OB_FAIL(ObTableLoadService::check_tenant())) { LOG_WARN("fail to check tenant", KR(ret)); } else if (OB_FAIL(ObTableLoadSchema::get_table_id(tenant_id, database_id, arg_.table_name_, table_id))) { LOG_WARN("fail to get table id", KR(ret), K(tenant_id), K(database_id), K_(arg)); - } - - // get the existing client task if it exists - while (OB_SUCC(ret)) { - ObTableLoadKey key(tenant_id, table_id); - if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task_))) { - if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { - LOG_WARN("fail to get client task", KR(ret), K(key)); - } else { - ret = OB_SUCCESS; - client_task_ = nullptr; - break; - } - } else { - bool need_wait_finish = false; - ObTableLoadClientStatus wait_client_status; - ObTableLoadClientStatus client_status = client_task_->get_status(); - switch (client_status) { - case ObTableLoadClientStatus::RUNNING: - case ObTableLoadClientStatus::COMMITTING: - if (arg_.force_create_) { - if (OB_FAIL(ObTableLoadClientService::abort_task(client_task_))) { - LOG_WARN("fail to abort client task", KR(ret)); - } else { - need_wait_finish = true; - wait_client_status = ObTableLoadClientStatus::ABORT; - } - } - break; - case ObTableLoadClientStatus::COMMIT: - case ObTableLoadClientStatus::ABORT: - need_wait_finish = true; - wait_client_status = client_status; - break; - default: - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected client status", KR(ret), KPC(client_task_), K(client_status)); - break; - } - if (OB_FAIL(ret)) { - } else if (!need_wait_finish) { - break; - } else { - ObTableLoadUniqueKey task_key(table_id, client_task_->ddl_param_.task_id_); - ObTableLoadClientService::revert_task(client_task_); - client_task_ = nullptr; - if (OB_FAIL(ObTableLoadClientService::wait_task_finish(task_key))) { - LOG_WARN("fail to wait client task finish", KR(ret), K(task_key), K(wait_client_status)); - } - } + } else { + ObTableLoadClientTaskParam param; + param.set_client_addr(ctx_.get_user_client_addr()); + param.set_tenant_id(tenant_id); + param.set_user_id(ctx_.get_user_id()); + param.set_database_id(database_id); + param.set_table_id(table_id); + param.set_parallel(arg_.parallel_); + param.set_max_error_row_count(arg_.max_error_row_count_); + param.set_dup_action(arg_.dup_action_); + param.set_timeout_us(arg_.timeout_); + param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_); + if (OB_FAIL(ObTableLoadClientService::alloc_task(client_task))) { + LOG_WARN("fail to alloc client task", KR(ret)); + } else if (OB_FAIL(client_task->init(param))) { + LOG_WARN("fail to init client task", KR(ret), K(param)); + } else if (OB_FAIL(client_task->start())) { + LOG_WARN("fail to start client task", KR(ret)); + } else if (OB_FAIL(ObTableLoadClientService::add_task(client_task))) { + LOG_WARN("fail to add client task", KR(ret)); } } - // create new client task if it does not exist - if (OB_SUCC(ret) && nullptr == client_task_) { - if (OB_FAIL(ObTableLoadService::check_support_direct_load(table_id))) { - LOG_WARN("fail to check support direct load", KR(ret), K(table_id)); - } - // create client task - if (OB_SUCC(ret)) { - if (OB_ISNULL(client_task_ = ObTableLoadClientService::alloc_task())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc client task", KR(ret)); - } else if (OB_FAIL(client_task_->init(tenant_id, user_id, database_id, table_id, - arg_.timeout_, arg_.heartbeat_timeout_))) { - LOG_WARN("fail to init client task", KR(ret)); + if (OB_SUCC(ret) && !arg_.is_async_) { + ObTableLoadClientStatus client_status = ObTableLoadClientStatus::MAX_STATUS; + int client_error_code = OB_SUCCESS; + while (OB_SUCC(ret) && ObTableLoadClientStatus::RUNNING != client_status) { + if (OB_UNLIKELY(THIS_WORKER.is_timeout())) { + ret = OB_TIMEOUT; + LOG_WARN("worker timeout", KR(ret)); } else { - // create table ctx - if (OB_FAIL(create_table_ctx())) { - LOG_WARN("fail to create table ctx", KR(ret)); - } else { - client_task_->ddl_param_ = table_ctx_->ddl_param_; - if (OB_FAIL(client_task_->set_table_ctx(table_ctx_))) { - LOG_WARN("fail to set table ctx", KR(ret)); - } - if (OB_FAIL(ret)) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) { - LOG_WARN("fail to remove ctx", KR(tmp_ret)); - } - } + client_task->get_status(client_status, client_error_code); + switch (client_status) { + case ObTableLoadClientStatus::RUNNING: + break; + case ObTableLoadClientStatus::INITIALIZING: + case ObTableLoadClientStatus::WAITTING: + ob_usleep(200LL * 1000); // sleep 200ms + break; + case ObTableLoadClientStatus::ERROR: + case ObTableLoadClientStatus::ABORT: + ret = OB_SUCCESS == client_error_code ? OB_CANCELED : client_error_code; + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected client status", KR(ret), K(client_status)); + break; } } } - // begin - if (OB_SUCC(ret)) { - if (OB_FAIL(do_begin())) { - LOG_WARN("fail to do begin", KR(ret)); - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(ObTableLoadClientService::add_task(client_task_))) { - LOG_WARN("fail to add client task", KR(ret)); - } - } - if (OB_FAIL(ret)) { - if (nullptr != table_ctx_) { - ObTableLoadCoordinator::abort_ctx(table_ctx_); - } - } } // fill res if (OB_SUCC(ret)) { - res_.table_id_ = client_task_->table_id_; - res_.task_id_ = client_task_->ddl_param_.task_id_; - if (OB_FAIL(res_.column_names_.assign(client_task_->column_names_))) { - LOG_WARN("fail to assign column names", KR(ret)); - } else { - client_task_->get_status(res_.status_, res_.error_code_); - } + res_.table_id_ = client_task->param_.get_table_id(); + res_.task_id_ = client_task->task_id_; + client_task->get_status(res_.status_, res_.error_code_); } - - return ret; -} - -int ObTableDirectLoadBeginExecutor::create_table_ctx() -{ - int ret = OB_SUCCESS; - const uint64_t tenant_id = client_task_->tenant_id_; - const uint64_t table_id = client_task_->table_id_; - ObTableLoadDDLParam ddl_param; - ObTableLoadParam param; - // start redef table - if (OB_SUCC(ret)) { - ObTableLoadRedefTableStartArg start_arg; - ObTableLoadRedefTableStartRes start_res; - start_arg.tenant_id_ = tenant_id; - start_arg.table_id_ = table_id; - start_arg.parallelism_ = arg_.parallel_; - start_arg.is_load_data_ = true; - if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, - *client_task_->get_session_info()))) { - LOG_WARN("fail to start redef table", KR(ret), K(start_arg)); - } else { - ddl_param.dest_table_id_ = start_res.dest_table_id_; - ddl_param.task_id_ = start_res.task_id_; - ddl_param.schema_version_ = start_res.schema_version_; - ddl_param.snapshot_version_ = start_res.snapshot_version_; - ddl_param.data_version_ = start_res.data_format_version_; - } - } - // init param - if (OB_SUCC(ret)) { - ObTenant *tenant = nullptr; - if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { - LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); - } else { - param.tenant_id_ = tenant_id; - param.table_id_ = table_id; - param.batch_size_ = 100; - param.parallel_ = arg_.parallel_; - param.session_count_ = MIN(arg_.parallel_, (int32_t)tenant->unit_max_cpu() * 2); - param.max_error_row_count_ = arg_.max_error_row_count_; - param.column_count_ = client_task_->column_names_.count(); - param.need_sort_ = true; - param.px_mode_ = false; - param.online_opt_stat_gather_ = false; - param.dup_action_ = arg_.dup_action_; - if (OB_FAIL(param.normalize())) { - LOG_WARN("fail to normalize param", KR(ret)); - } - } - } - if (OB_SUCC(ret)) { - if (OB_ISNULL(table_ctx_ = ObTableLoadService::alloc_ctx())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); - } else if (OB_FAIL(table_ctx_->init(param, ddl_param, client_task_->get_session_info()))) { - LOG_WARN("fail to init table ctx", KR(ret)); - } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx_, client_task_->column_idxs_, - client_task_->get_exec_ctx()))) { - LOG_WARN("fail to coordinator init ctx", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::add_ctx(table_ctx_))) { - LOG_WARN("fail to add ctx", KR(ret)); - } - if (OB_FAIL(ret)) { - int tmp_ret = OB_SUCCESS; - if (ddl_param.is_valid()) { - ObTableLoadRedefTableAbortArg abort_arg; - abort_arg.tenant_id_ = param.tenant_id_; - abort_arg.task_id_ = ddl_param.task_id_; - if (OB_TMP_FAIL( - ObTableLoadRedefTable::abort(abort_arg, *client_task_->get_session_info()))) { - LOG_WARN("fail to abort redef table", KR(tmp_ret), K(abort_arg)); - } - } - if (nullptr != table_ctx_) { - ObTableLoadService::free_ctx(table_ctx_); - table_ctx_ = nullptr; - } - } - } - return ret; -} - -int ObTableDirectLoadBeginExecutor::do_begin() -{ - int ret = OB_SUCCESS; - ObTableLoadCoordinator coordinator(table_ctx_); - ObTableLoadTransId trans_id; - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.begin())) { - LOG_WARN("fail to coordinator begin", KR(ret)); - } else if (OB_FAIL(coordinator.start_trans(ObTableLoadSegmentID(1), trans_id))) { - LOG_WARN("fail to start trans", KR(ret)); - } else { - client_task_->set_trans_id(trans_id); - if (OB_FAIL(client_task_->set_status_running())) { - LOG_WARN("fail to set status running", KR(ret)); - } - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); + if (nullptr != client_task) { + ObTableLoadClientService::revert_task(client_task); + client_task = nullptr; } return ret; } @@ -330,10 +153,8 @@ int ObTableDirectLoadCommitExecutor::process() ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { LOG_WARN("fail to get client task", KR(ret), K(key)); - } else if (OB_FAIL(client_task->check_status(ObTableLoadClientStatus::RUNNING))) { - LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(ObTableLoadClientService::commit_task(client_task))) { - LOG_WARN("fail to commit client task", KR(ret)); + } else if (OB_FAIL(client_task->commit())) { + LOG_WARN("fail to commit client task", KR(ret), K(key)); } if (nullptr != client_task) { ObTableLoadClientService::revert_task(client_task); @@ -362,16 +183,13 @@ int ObTableDirectLoadAbortExecutor::process() ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { LOG_WARN("fail to get client task", KR(ret), K(key)); - } else if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) { - LOG_WARN("fail to abort client task", KR(ret)); + } else { + client_task->abort(); } if (nullptr != client_task) { ObTableLoadClientService::revert_task(client_task); client_task = nullptr; } - if (OB_SUCC(ret) && OB_FAIL(ObTableLoadClientService::wait_task_finish(key))) { - LOG_WARN("fail to wait client task finish", KR(ret), K(key)); - } return ret; } @@ -438,35 +256,18 @@ int ObTableDirectLoadInsertExecutor::process() { int ret = OB_SUCCESS; LOG_DEBUG("table direct load insert", K_(arg)); - ObTableLoadObjRowArray obj_rows; ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); ObTableLoadClientTask *client_task = nullptr; - if (OB_FAIL(decode_payload(arg_.payload_, obj_rows))) { - LOG_WARN("fail to decode payload", KR(ret), K_(arg)); - } else if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { + if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { LOG_WARN("fail to get client task", KR(ret), K(key)); } else if (OB_FAIL(client_task->check_status(ObTableLoadClientStatus::RUNNING))) { LOG_WARN("fail to check status", KR(ret)); } else { - ObTableLoadTableCtx *table_ctx = nullptr; - if (OB_FAIL(client_task->get_table_ctx(table_ctx))) { - LOG_WARN("fail to get table ctx", KR(ret)); - } else { - ObTableLoadCoordinator coordinator(table_ctx); - const ObTableLoadTransId &trans_id = client_task->get_trans_id(); - const int64_t batch_id = client_task->get_next_batch_id(); - const int32_t session_id = batch_id % table_ctx->param_.session_count_ + 1; - if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) { - LOG_WARN("fail to set batch seq no", KR(ret)); - } else if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.write(trans_id, session_id, 0 /*seq_no*/, obj_rows))) { - LOG_WARN("fail to coordinator write", KR(ret)); - } - } - if (nullptr != table_ctx) { - ObTableLoadService::put_ctx(table_ctx); - table_ctx = nullptr; + ObTableLoadObjRowArray obj_rows; + if (OB_FAIL(decode_payload(arg_.payload_, obj_rows))) { + LOG_WARN("fail to decode payload", KR(ret), K_(arg)); + } else if (OB_FAIL(client_task->write(obj_rows))) { + LOG_WARN("fail to write", KR(ret)); } if (OB_FAIL(ret)) { client_task->set_status_error(ret); @@ -507,28 +308,6 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload, return ret; } -int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id, - ObTableLoadObjRowArray &obj_row_array) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(obj_row_array.empty())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(obj_row_array)); - } else if (OB_UNLIKELY(batch_id > ObTableLoadSequenceNo::MAX_BATCH_ID || - obj_row_array.count() > ObTableLoadSequenceNo::MAX_BATCH_SEQ_NO)) { - ret = OB_SIZE_OVERFLOW; - LOG_WARN("size is overflow", KR(ret), K(batch_id), K(obj_row_array.count())); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < obj_row_array.count(); ++i) { - ObTableLoadObjRow &row = obj_row_array.at(i); - row.seq_no_.sequence_no_ = batch_id; - row.seq_no_.sequence_no_ <<= ObTableLoadSequenceNo::BATCH_ID_SHIFT; - row.seq_no_.sequence_no_ |= i; - } - } - return ret; -} - // heart_beat int ObTableDirectLoadHeartBeatExecutor::check_args() { diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h index 743e0f0319..43c185f2a6 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h @@ -60,21 +60,16 @@ class ObTableDirectLoadBeginExecutor public: ObTableDirectLoadBeginExecutor(ObTableDirectLoadExecContext &ctx, const table::ObTableDirectLoadRequest &request, - table::ObTableDirectLoadResult &result); - virtual ~ObTableDirectLoadBeginExecutor(); + table::ObTableDirectLoadResult &result) + : ParentType(ctx, request, result) + { + } + virtual ~ObTableDirectLoadBeginExecutor() = default; protected: int check_args() override; int set_result_header() override; int process() override; - -private: - int create_table_ctx(); - int do_begin(); - -private: - ObTableLoadClientTask *client_task_; - ObTableLoadTableCtx *table_ctx_; }; // commit @@ -160,7 +155,6 @@ protected: private: static int decode_payload(const common::ObString &payload, table::ObTableLoadObjRowArray &obj_row_array); - int set_batch_seq_no(int64_t batch_id, table::ObTableLoadObjRowArray &obj_row_array); }; // heart_beat diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp index f3c9515b42..ce2c2b0b40 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp @@ -27,7 +27,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg, dup_action_, timeout_, heartbeat_timeout_, - force_create_); + force_create_, + is_async_); OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes, table_id_, diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h index c232942bdf..19e960c7aa 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h @@ -32,11 +32,12 @@ public: dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE), timeout_(0), heartbeat_timeout_(0), - force_create_(false) + force_create_(false), + is_async_(false) { } TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout), - K_(heartbeat_timeout), K_(force_create)); + K_(heartbeat_timeout), K_(force_create), K_(is_async)); public: ObString table_name_; int64_t parallel_; @@ -44,7 +45,8 @@ public: sql::ObLoadDupActionType dup_action_; int64_t timeout_; int64_t heartbeat_timeout_; - bool force_create_; + bool force_create_; // unused + bool is_async_; }; struct ObTableDirectLoadBeginRes @@ -62,7 +64,7 @@ public: public: uint64_t table_id_; int64_t task_id_; - common::ObSArray column_names_; + common::ObSArray column_names_; // unused table::ObTableLoadClientStatus status_; int32_t error_code_; }; diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 1fcb5e9dee..2e1a55cf6e 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -26,305 +26,6 @@ namespace observer using namespace common; using namespace table; -/** - * CommitTaskProcessor - */ - -class ObTableLoadClientService::CommitTaskProcessor : public ObITableLoadTaskProcessor -{ -public: - CommitTaskProcessor(ObTableLoadTask &task, ObTableLoadClientTask *client_task) - : ObITableLoadTaskProcessor(task), client_task_(client_task), table_ctx_(nullptr) - { - client_task_->inc_ref_count(); - } - virtual ~CommitTaskProcessor() - { - ObTableLoadClientService::revert_task(client_task_); - if (nullptr != table_ctx_) { - ObTableLoadService::put_ctx(table_ctx_); - } - } - int process() override - { - int ret = OB_SUCCESS; - if (OB_FAIL(init())) { - LOG_WARN("fail to init", KR(ret)); - } - // 1. finish all trans - else if (OB_FAIL(finish_all_trans())) { - LOG_WARN("fail to finish all trans", KR(ret)); - } - // 2. check all trans commit - else if (OB_FAIL(check_all_trans_commit())) { - LOG_WARN("fail to check all trans commit", KR(ret)); - } - // 3. finish - else if (OB_FAIL(finish())) { - LOG_WARN("fail to finish table load", KR(ret)); - } - // 4. check merged - else if (OB_FAIL(check_merged())) { - LOG_WARN("fail to check merged", KR(ret)); - } - // 5. commit - else if (OB_FAIL(commit())) { - LOG_WARN("fail to commit table load", KR(ret)); - } - // end - else if (OB_FAIL(client_task_->set_status_commit())) { - LOG_WARN("fail to set status commit", KR(ret)); - } - // auto abort - if (OB_FAIL(ret)) { - int tmp_ret = OB_SUCCESS; - if (OB_FAIL(OB_TMP_FAIL(ObTableLoadClientService::abort_task(client_task_)))) { - LOG_WARN("fail to abort client task", KR(tmp_ret)); - } - } - return ret; - } -private: - int init() - { - int ret = OB_SUCCESS; - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - if (OB_FAIL(client_task_->get_table_ctx(table_ctx_))) { - LOG_WARN("fail to get table ctx", KR(ret)); - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); - } - } - return ret; - } - int finish_all_trans() - { - int ret = OB_SUCCESS; - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - ObTableLoadCoordinator coordinator(table_ctx_); - const ObTableLoadTransId &trans_id = client_task_->get_trans_id(); - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.finish_trans(trans_id))) { - LOG_WARN("fail to coordinator finish trans", KR(ret), K(trans_id)); - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); - } - } - return ret; - } - int check_all_trans_commit() - { - int ret = OB_SUCCESS; - const ObTableLoadTransId &trans_id = client_task_->get_trans_id(); - while (OB_SUCC(ret)) { - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) { - LOG_WARN("fail to check exec status", KR(ret)); - } else { - ObTableLoadCoordinator coordinator(table_ctx_); - ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE; - int error_code = OB_SUCCESS; - bool try_again = false; - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) { - LOG_WARN("fail to coordinator get status", KR(ret), K(trans_id)); - } else { - switch (trans_status) { - case ObTableLoadTransStatusType::FROZEN: - try_again = true; - break; - case ObTableLoadTransStatusType::COMMIT: - break; - case ObTableLoadTransStatusType::ERROR: - ret = error_code; - LOG_WARN("trans has error", KR(ret)); - break; - default: - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected trans status", KR(ret), K(trans_status)); - break; - } - } - if (OB_SUCC(ret)) { - if (!try_again) { - break; - } else { - ob_usleep(1000 * 1000); - } - } - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); - } - } - } - return ret; - } - int finish() - { - int ret = OB_SUCCESS; - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - ObTableLoadCoordinator coordinator(table_ctx_); - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.finish())) { - LOG_WARN("fail to coordinator finish", KR(ret)); - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); - } - } - return ret; - } - int check_merged() - { - int ret = OB_SUCCESS; - while (OB_SUCC(ret)) { - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) { - LOG_WARN("fail to check exec status", KR(ret)); - } else { - bool is_merged = false; - ObTableLoadStatusType status = ObTableLoadStatusType::NONE; - int error_code = OB_SUCCESS; - ObTableLoadCoordinator coordinator(table_ctx_); - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.get_status(status, error_code))) { - LOG_WARN("fail to coordinator get status", KR(ret)); - } else { - switch (status) { - case ObTableLoadStatusType::FROZEN: - case ObTableLoadStatusType::MERGING: - is_merged = false; - break; - case ObTableLoadStatusType::MERGED: - is_merged = true; - break; - case ObTableLoadStatusType::ERROR: - ret = error_code; - LOG_WARN("table load has error", KR(ret)); - break; - default: - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected status", KR(ret), K(status)); - break; - } - } - if (OB_SUCC(ret)) { - if (is_merged) { - break; - } else { - ob_usleep(1000 * 1000); - } - } - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); - } - } - } - return ret; - } - int commit() - { - int ret = OB_SUCCESS; - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - ObTableLoadCoordinator coordinator(table_ctx_); - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.commit(client_task_->result_info_))) { - LOG_WARN("fail to coordinator commit", KR(ret)); - } - if (OB_FAIL(ret)) { - client_task_->set_status_error(ret); - } - } - return ret; - } -private: - ObTableLoadClientTask *client_task_; - ObTableLoadTableCtx *table_ctx_; -}; - -/** - * AbortTaskProcessor - */ - -class ObTableLoadClientService::AbortTaskProcessor : public ObITableLoadTaskProcessor -{ -public: - AbortTaskProcessor(ObTableLoadTask &task, ObTableLoadClientTask *client_task) - : ObITableLoadTaskProcessor(task), client_task_(client_task) - { - client_task_->inc_ref_count(); - } - virtual ~AbortTaskProcessor() - { - ObTableLoadClientService::revert_task(client_task_); - } - int process() override - { - int ret = OB_SUCCESS; - if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::ABORT))) { - LOG_WARN("fail to check status", KR(ret)); - } else { - ObTableLoadTableCtx *table_ctx = nullptr; - if (OB_FAIL(client_task_->get_table_ctx(table_ctx))) { - LOG_WARN("fail to get table ctx", KR(ret)); - } else { - ObTableLoadCoordinator::abort_ctx(table_ctx); - } - if (nullptr != table_ctx) { - ObTableLoadService::put_ctx(table_ctx); - table_ctx = nullptr; - } - } - return ret; - } -private: - ObTableLoadClientTask *client_task_; -}; - -/** - * CommonTaskCallback - */ - -class ObTableLoadClientService::CommonTaskCallback : public ObITableLoadTaskCallback -{ -public: - CommonTaskCallback(ObTableLoadClientTask *client_task) : client_task_(client_task) - { - client_task_->inc_ref_count(); - } - virtual ~CommonTaskCallback() - { - ObTableLoadClientService::revert_task(client_task_); - } - void callback(int ret_code, ObTableLoadTask *task) override - { - client_task_->free_task(task); - } -private: - ObTableLoadClientTask *client_task_; -}; - /** * ClientTaskBriefEraseIfExpired */ @@ -339,7 +40,7 @@ bool ObTableLoadClientService::ClientTaskBriefEraseIfExpired::operator()( * ObTableLoadClientService */ -ObTableLoadClientService::ObTableLoadClientService() : is_inited_(false) {} +ObTableLoadClientService::ObTableLoadClientService() : next_task_id_(1), is_inited_(false) {} ObTableLoadClientService::~ObTableLoadClientService() {} @@ -380,14 +81,32 @@ ObTableLoadClientService *ObTableLoadClientService::get_client_service() return client_service; } -ObTableLoadClientTask *ObTableLoadClientService::alloc_task() +int ObTableLoadClientService::alloc_task(ObTableLoadClientTask *&client_task) { - ObTableLoadClientTask *client_task = - OB_NEW(ObTableLoadClientTask, ObMemAttr(MTL_ID(), "TLD_ClientTask")); - if (nullptr != client_task) { - client_task->inc_ref_count(); + int ret = OB_SUCCESS; + ObTableLoadService *service = nullptr; + if (OB_ISNULL(service = MTL(ObTableLoadService *))) { + ret = OB_ERR_SYS; + LOG_WARN("null table load service", KR(ret)); + } else { + ObTableLoadClientTask *new_client_task = nullptr; + if (OB_ISNULL(new_client_task = + OB_NEW(ObTableLoadClientTask, ObMemAttr(MTL_ID(), "TLD_ClientTask")))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadClientTask", KR(ret)); + } else { + new_client_task->task_id_ = service->get_client_service().generate_task_id(); + client_task = new_client_task; + client_task->inc_ref_count(); + } + if (OB_FAIL(ret)) { + if (nullptr != new_client_task) { + free_task(new_client_task); + new_client_task = nullptr; + } + } } - return client_task; + return ret; } void ObTableLoadClientService::free_task(ObTableLoadClientTask *client_task) @@ -409,12 +128,9 @@ void ObTableLoadClientService::revert_task(ObTableLoadClientTask *client_task) const int64_t ref_count = client_task->dec_ref_count(); OB_ASSERT(ref_count >= 0); if (0 == ref_count) { - const uint64_t tenant_id = client_task->tenant_id_; - const uint64_t table_id = client_task->table_id_; - const uint64_t hidden_table_id = client_task->ddl_param_.dest_table_id_; - const int64_t task_id = client_task->ddl_param_.task_id_; - LOG_INFO("free client task", K(tenant_id), K(table_id), K(hidden_table_id), K(task_id), - KP(client_task)); + const int64_t task_id = client_task->task_id_; + const uint64_t table_id = client_task->param_.get_table_id(); + LOG_INFO("free client task", K(task_id), K(table_id), KP(client_task)); free_task(client_task); client_task = nullptr; } @@ -429,7 +145,7 @@ int ObTableLoadClientService::add_task(ObTableLoadClientTask *client_task) ret = OB_ERR_SYS; LOG_WARN("null table load service", KR(ret)); } else { - ObTableLoadUniqueKey key(client_task->table_id_, client_task->ddl_param_.task_id_); + ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_); ret = service->get_client_service().add_client_task(key, client_task); } return ret; @@ -443,7 +159,7 @@ int ObTableLoadClientService::remove_task(ObTableLoadClientTask *client_task) ret = OB_ERR_SYS; LOG_WARN("null table load service", KR(ret)); } else { - ObTableLoadUniqueKey key(client_task->table_id_, client_task->ddl_param_.task_id_); + ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_); ret = service->get_client_service().remove_client_task(key, client_task); } return ret; @@ -482,86 +198,6 @@ int ObTableLoadClientService::get_task(const ObTableLoadKey &key, return ret; } -int ObTableLoadClientService::exist_task(const ObTableLoadUniqueKey &key, bool &is_exist) -{ - int ret = OB_SUCCESS; - ObTableLoadService *service = nullptr; - if (OB_ISNULL(service = MTL(ObTableLoadService *))) { - ret = OB_ERR_SYS; - LOG_WARN("null table load service", KR(ret)); - } else { - if (OB_FAIL(service->get_client_service().exist_client_task(key, is_exist))) { - LOG_WARN("fail to check exist client task", KR(ret), K(key)); - } - } - return ret; -} - -int ObTableLoadClientService::commit_task(ObTableLoadClientTask *client_task) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == client_task)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KPC(client_task)); - } else if (OB_FAIL(client_task->set_status_committing())) { - LOG_WARN("fail to set status committing", KR(ret)); - } else { - LOG_INFO("client task commit"); - if (OB_FAIL(construct_commit_task(client_task))) { - LOG_WARN("fail to construct commit task", KR(ret)); - } - if (OB_FAIL(ret)) { - client_task->set_status_error(ret); - } - } - return ret; -} - -int ObTableLoadClientService::abort_task(ObTableLoadClientTask *client_task) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == client_task)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KPC(client_task)); - } else if (ObTableLoadClientStatus::ABORT == client_task->get_status()) { - // already abort - } else { - LOG_INFO("client task abort"); - client_task->set_status_abort(); - if (OB_FAIL(construct_abort_task(client_task))) { - LOG_WARN("fail to construct abort task", KR(ret)); - } - } - return ret; -} - -int ObTableLoadClientService::wait_task_finish(const ObTableLoadUniqueKey &key) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!key.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(key)); - } else { - bool is_exist = true; - ObTimeoutCtx ctx; - if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, 10LL * 1000 * 1000))) { - LOG_WARN("fail to set default timeout ctx", KR(ret)); - } - while (OB_SUCC(ret) && is_exist) { - if (ctx.is_timeouted()) { - ret = OB_TIMEOUT; - LOG_WARN("timeouted", KR(ret), K(ctx)); - } else if (OB_FAIL(exist_task(key, is_exist))) { - LOG_WARN("fail to check exist client task", KR(ret), K(key)); - } else if (is_exist) { - // wait - ob_usleep(100LL * 1000); - } - } - } - return ret; -} - int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task) { @@ -639,10 +275,10 @@ int ObTableLoadClientService::remove_client_task(const ObTableLoadUniqueKey &key if (OB_FAIL(client_task_brief_map_.create(key, client_task_brief))) { LOG_WARN("fail to create client task brief", KR(ret), K(key)); } else { - client_task_brief->table_id_ = client_task->table_id_; - client_task_brief->dest_table_id_ = client_task->ddl_param_.dest_table_id_; - client_task_brief->task_id_ = client_task->ddl_param_.task_id_; + client_task_brief->task_id_ = client_task->task_id_; + client_task_brief->table_id_ = client_task->param_.get_table_id(); client_task->get_status(client_task_brief->client_status_, client_task_brief->error_code_); + client_task_brief->result_info_ = client_task->result_info_; client_task_brief->active_time_ = ObTimeUtil::current_time(); } if (nullptr != client_task_brief) { @@ -731,30 +367,6 @@ int ObTableLoadClientService::get_client_task_by_table_id(uint64_t table_id, return ret; } -int ObTableLoadClientService::exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist) -{ - int ret = OB_SUCCESS; - is_exist = false; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadClientService not init", KR(ret), KP(this)); - } else { - obsys::ObRLockGuard guard(rwlock_); - ObTableLoadClientTask *client_task = nullptr; - if (OB_FAIL(client_task_map_.get_refactored(key, client_task))) { - if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { - LOG_WARN("fail to get refactored", KR(ret), K(key)); - } else { - ret = OB_SUCCESS; - is_exist = false; - } - } else { - is_exist = true; - } - } - return ret; -} - int64_t ObTableLoadClientService::get_client_task_count() const { obsys::ObRLockGuard guard(rwlock_); @@ -873,49 +485,5 @@ void ObTableLoadClientService::purge_client_task_brief() } } -int ObTableLoadClientService::construct_commit_task(ObTableLoadClientTask *client_task) -{ - int ret = OB_SUCCESS; - ObTableLoadTask *task = nullptr; - if (OB_FAIL(client_task->alloc_task(task))) { - LOG_WARN("fail to alloc task", KR(ret)); - } else if (OB_FAIL(task->set_processor(client_task))) { - LOG_WARN("fail to set commit task processor", KR(ret)); - } else if (OB_FAIL(task->set_callback(client_task))) { - LOG_WARN("fail to set common task callback", KR(ret)); - } else if (OB_FAIL(client_task->add_task(task))) { - LOG_WARN("fail to add task", KR(ret)); - } - if (OB_FAIL(ret)) { - if (nullptr != task) { - client_task->free_task(task); - task = nullptr; - } - } - return ret; -} - -int ObTableLoadClientService::construct_abort_task(ObTableLoadClientTask *client_task) -{ - int ret = OB_SUCCESS; - ObTableLoadTask *task = nullptr; - if (OB_FAIL(client_task->alloc_task(task))) { - LOG_WARN("fail to alloc task", KR(ret)); - } else if (OB_FAIL(task->set_processor(client_task))) { - LOG_WARN("fail to set abort task processor", KR(ret)); - } else if (OB_FAIL(task->set_callback(client_task))) { - LOG_WARN("fail to set common task callback", KR(ret)); - } else if (OB_FAIL(client_task->add_task(task))) { - LOG_WARN("fail to add task", KR(ret)); - } - if (OB_FAIL(ret)) { - if (nullptr != task) { - client_task->free_task(task); - task = nullptr; - } - } - return ret; -} - } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_client_service.h b/src/observer/table_load/ob_table_load_client_service.h index 63034594e7..20b59bd15b 100644 --- a/src/observer/table_load/ob_table_load_client_service.h +++ b/src/observer/table_load/ob_table_load_client_service.h @@ -39,24 +39,19 @@ public: static ObTableLoadClientService *get_client_service(); // client task api - static ObTableLoadClientTask *alloc_task(); + static int alloc_task(ObTableLoadClientTask *&client_task); static void free_task(ObTableLoadClientTask *client_task); static void revert_task(ObTableLoadClientTask *client_task); static int add_task(ObTableLoadClientTask *client_task); static int remove_task(ObTableLoadClientTask *client_task); static int get_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task); static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task); - static int exist_task(const ObTableLoadUniqueKey &key, bool &is_exist); - static int commit_task(ObTableLoadClientTask *client_task); - static int abort_task(ObTableLoadClientTask *client_task); - static int wait_task_finish(const ObTableLoadUniqueKey &key); int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int get_all_client_task(common::ObIArray &client_task_array); int get_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task); int get_client_task_by_table_id(uint64_t table_id, ObTableLoadClientTask *&client_task); - int exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist); int64_t get_client_task_count() const; void purge_client_task(); @@ -78,12 +73,7 @@ public: } private: - static int construct_commit_task(ObTableLoadClientTask *client_task); - static int construct_abort_task(ObTableLoadClientTask *client_task); -private: - class CommitTaskProcessor; - class AbortTaskProcessor; - class CommonTaskCallback; + OB_INLINE int64_t generate_task_id() { return ATOMIC_FAA(&next_task_id_, 1); } private: static const int64_t CLIENT_TASK_RETENTION_PERIOD = 24LL * 60 * 60 * 1000 * 1000; // 1day @@ -112,6 +102,7 @@ private: { return client_task_ == entry.second; } + private: ObTableLoadClientTask *client_task_; }; @@ -122,6 +113,7 @@ private: ClientTaskBriefEraseIfExpired(int64_t expired_ts) : expired_ts_(expired_ts) {} bool operator()(const ObTableLoadUniqueKey &key, ObTableLoadClientTaskBrief *client_task_brief) const; + private: int64_t expired_ts_; }; @@ -131,6 +123,7 @@ private: ClientTaskMap client_task_map_; ClientTaskIndexMap client_task_index_map_; ClientTaskBriefMap client_task_brief_map_; // thread safety + int64_t next_task_id_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 77948fde91..1c21601682 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -13,40 +13,194 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_client_task.h" +#include "observer/ob_server.h" +#include "observer/omt/ob_tenant.h" #include "observer/table_load/ob_table_load_exec_ctx.h" #include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_table_ctx.h" +#include "observer/table_load/ob_table_load_task.h" #include "observer/table_load/ob_table_load_task_scheduler.h" #include "observer/table_load/ob_table_load_utils.h" -#include "observer/table_load/ob_table_load_task.h" -#include "observer/ob_server.h" namespace oceanbase { namespace observer { using namespace common; +using namespace sql; using namespace table; -ObTableLoadClientTask::ObTableLoadClientTask() - : tenant_id_(OB_INVALID_ID), +/** + * ObTableLoadClientTaskParam + */ + +ObTableLoadClientTaskParam::ObTableLoadClientTaskParam() + : tenant_id_(OB_INVALID_TENANT_ID), user_id_(OB_INVALID_ID), + database_id_(OB_INVALID_ID), table_id_(OB_INVALID_ID), + parallel_(0), + max_error_row_count_(0), + dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE), + timeout_us_(0), + heartbeat_timeout_us_(0) +{ +} + +ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {} + +void ObTableLoadClientTaskParam::reset() +{ + client_addr_.reset(); + tenant_id_ = OB_INVALID_TENANT_ID; + user_id_ = OB_INVALID_ID; + database_id_ = OB_INVALID_ID; + table_id_ = OB_INVALID_ID; + parallel_ = 0; + max_error_row_count_ = 0; + dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE; + timeout_us_ = 0; + heartbeat_timeout_us_ = 0; +} + +int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + reset(); + client_addr_ = other.client_addr_; + tenant_id_ = other.tenant_id_; + user_id_ = other.user_id_; + database_id_ = other.database_id_; + table_id_ = other.table_id_; + parallel_ = other.parallel_; + max_error_row_count_ = other.max_error_row_count_; + dup_action_ = other.dup_action_; + timeout_us_ = other.timeout_us_; + heartbeat_timeout_us_ = other.heartbeat_timeout_us_; + } + return ret; +} + +bool ObTableLoadClientTaskParam::is_valid() const +{ + return client_addr_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ && + OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && OB_INVALID_ID != table_id_ && + parallel_ > 0 && ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && + timeout_us_ > 0 && heartbeat_timeout_us_ > 0; +} + +/** + * ClientTaskExecuteProcessor + */ + +class ObTableLoadClientTask::ClientTaskExectueProcessor : public ObITableLoadTaskProcessor +{ +public: + ClientTaskExectueProcessor(ObTableLoadTask &task, ObTableLoadClientTask *client_task) + : ObITableLoadTaskProcessor(task), client_task_(client_task) + { + client_task_->inc_ref_count(); + } + virtual ~ClientTaskExectueProcessor() { ObTableLoadClientService::revert_task(client_task_); } + int process() override + { + int ret = OB_SUCCESS; + ObSQLSessionInfo *origin_session_info = THIS_WORKER.get_session(); + ObSQLSessionInfo *session_info = client_task_->get_session_info(); + THIS_WORKER.set_session(session_info); + if (OB_ISNULL(session_info)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null session info", KR(ret)); + } else { + session_info->set_thread_id(GETTID()); + session_info->update_last_active_time(); + if (OB_FAIL(session_info->set_session_state(QUERY_ACTIVE))) { + LOG_WARN("fail to set session state", K(ret)); + } + } + // begin + if (OB_SUCC(ret)) { + if (OB_FAIL(client_task_->init_instance())) { + LOG_WARN("fail to init instance", KR(ret)); + } else if (OB_FAIL(client_task_->set_status_waitting())) { + LOG_WARN("fail to set status waitting", KR(ret)); + } else if (OB_FAIL(client_task_->set_status_running())) { + LOG_WARN("fail to set status running", KR(ret)); + } + } + // wait client commit + while (OB_SUCC(ret)) { + ObTableLoadClientStatus status = client_task_->get_status(); + if (ObTableLoadClientStatus::RUNNING == status) { + ob_usleep(100LL * 1000); // sleep 100ms + } else if (ObTableLoadClientStatus::COMMITTING == status) { + break; + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected client status", KR(ret), K(status)); + } + } + // commit + if (OB_SUCC(ret)) { + if (OB_FAIL(client_task_->commit_instance())) { + LOG_WARN("fail to commit instance", KR(ret)); + } else if (OB_FAIL(client_task_->set_status_commit())) { + LOG_WARN("fail to set status running", KR(ret)); + } + } + client_task_->destroy_instance(); + THIS_WORKER.set_session(origin_session_info); + return ret; + } + +private: + ObTableLoadClientTask *client_task_; +}; + +/** + * ClientTaskExectueCallback + */ + +class ObTableLoadClientTask::ClientTaskExectueCallback : public ObITableLoadTaskCallback +{ +public: + ClientTaskExectueCallback(ObTableLoadClientTask *client_task) : client_task_(client_task) + { + client_task_->inc_ref_count(); + } + virtual ~ClientTaskExectueCallback() { ObTableLoadClientService::revert_task(client_task_); } + void callback(int ret_code, ObTableLoadTask *task) override + { + if (OB_UNLIKELY(OB_SUCCESS != ret_code)) { + client_task_->set_status_abort(ret_code); + } + task->~ObTableLoadTask(); + } + +private: + ObTableLoadClientTask *client_task_; +}; + +/** + * ObTableLoadClientTask + */ + +ObTableLoadClientTask::ObTableLoadClientTask() + : task_id_(OB_INVALID_ID), allocator_("TLD_ClientTask"), + task_scheduler_(nullptr), session_info_(nullptr), exec_ctx_(nullptr), - task_scheduler_(nullptr), + session_count_(0), next_batch_id_(0), - table_ctx_(nullptr), client_status_(ObTableLoadClientStatus::MAX_STATUS), error_code_(OB_SUCCESS), ref_count_(0), is_inited_(false) { allocator_.set_tenant_id(MTL_ID()); - column_names_.set_tenant_id(MTL_ID()); - column_idxs_.set_tenant_id(MTL_ID()); free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID; } @@ -59,52 +213,40 @@ ObTableLoadClientTask::~ObTableLoadClientTask() allocator_.free(task_scheduler_); task_scheduler_ = nullptr; } - if (nullptr != session_info_) { - ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_); - session_info_ = nullptr; - } if (nullptr != exec_ctx_) { exec_ctx_->~ObTableLoadClientExecCtx(); allocator_.free(exec_ctx_); exec_ctx_ = nullptr; } - if (nullptr != table_ctx_) { - int ret = OB_SUCCESS; - if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) { - LOG_WARN("fail to remove table ctx", KR(ret), KP(table_ctx_)); - } - ObTableLoadService::put_ctx(table_ctx_); - table_ctx_ = nullptr; + if (nullptr != session_info_) { + ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_); + session_info_ = nullptr; } } -int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, - uint64_t table_id, int64_t timeout_us, int64_t heartbeat_timeout_us) +int ObTableLoadClientTask::init(const ObTableLoadClientTaskParam ¶m) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadClientTask init twice", KR(ret), KP(this)); - } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id || - OB_INVALID_ID == user_id || 0 == timeout_us)) { + } else if (OB_UNLIKELY(!param.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(tenant_id), K(user_id), K(table_id), K(timeout_us)); + LOG_WARN("invalid args", KR(ret), K(param)); + } else if (OB_FAIL(param_.assign(param))) { + LOG_WARN("fail to assign param", KR(ret)); } else { - tenant_id_ = tenant_id; - user_id_ = user_id; - database_id_ = database_id; - table_id_ = table_id; - if (OB_FAIL(create_session_info(user_id_, database_id_, table_id_, session_info_, - free_session_ctx_))) { + const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + param_.get_timeout_us()); + if (OB_FAIL(create_session_info(param_.get_tenant_id(), param_.get_user_id(), + param_.get_database_id(), param_.get_table_id(), session_info_, + free_session_ctx_))) { LOG_WARN("fail to create session info", KR(ret)); - } else if (OB_FAIL(init_column_names_and_idxs())) { - LOG_WARN("fail to init column names and idxs", KR(ret)); - } else if (OB_FAIL(init_exec_ctx(timeout_us, heartbeat_timeout_us))) { + } else if (OB_FAIL(init_exec_ctx(param_.get_timeout_us(), param_.get_heartbeat_timeout_us()))) { LOG_WARN("fail to init client exec ctx", KR(ret)); - } else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) { - LOG_WARN("fail to init task allocator", KR(ret)); - } else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, - (&allocator_), 1, table_id, "Client"))) { + } else if (OB_ISNULL(task_scheduler_ = + OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1, + param_.get_table_id(), "Client"))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } else if (OB_FAIL(task_scheduler_->init())) { @@ -114,13 +256,15 @@ int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t d } else { is_inited_ = true; } + THIS_WORKER.set_timeout_ts(origin_timeout_ts); } return ret; } -int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id, - sql::ObSQLSessionInfo *&session_info, - sql::ObFreeSessionCtx &free_session_ctx) +int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user_id, + uint64_t database_id, uint64_t table_id, + ObSQLSessionInfo *&session_info, + ObFreeSessionCtx &free_session_ctx) { int ret = OB_SUCCESS; schema::ObSchemaGetterGuard schema_guard; @@ -128,7 +272,6 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa const ObUserInfo *user_info = nullptr; const ObDatabaseSchema *database_schema = nullptr; const ObTableSchema *table_schema = nullptr; - uint64_t tenant_id = MTL_ID(); if (OB_ISNULL(GCTX.schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid argument", K(ret), K(GCTX.schema_service_)); @@ -149,9 +292,8 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa } else if (OB_ISNULL(database_schema)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid database schema", K(ret), K(tenant_id), K(database_id)); - } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, - table_schema))) { - LOG_WARN("fail to get database and table schema", KR(ret)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret)); } else if (OB_ISNULL(table_schema)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid table schema", K(ret), K(tenant_id), K(table_id)); @@ -161,8 +303,10 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa ObArenaAllocator allocator("TLD_Tmp"); allocator.set_tenant_id(MTL_ID()); ObStringBuffer buffer(&allocator); - buffer.append("DIRECT LOAD_"); + buffer.append("DIRECT LOAD: "); buffer.append(table_schema->get_table_name()); + ObObj timeout_val; + timeout_val.set_int(param_.get_timeout_us()); OZ(session_info->load_default_sys_variable(false, false)); //加载默认的session参数 OZ(session_info->load_default_configs_in_pc()); OX(session_info->init_tenant(tenant_info->get_tenant_name(), tenant_id)); @@ -173,11 +317,12 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa OX(session_info->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT)); OX(session_info->set_default_database(database_schema->get_database_name(), CS_TYPE_UTF8MB4_GENERAL_CI)); - OX(session_info->set_mysql_cmd(obmysql::COM_QUERY)); + OX(session_info->set_mysql_cmd(COM_QUERY)); OX(session_info->set_current_trace_id(ObCurTraceId::get_trace_id())); - OX(session_info->set_client_addr(ObServer::get_instance().get_self())); + OX(session_info->set_client_addr(param_.get_client_addr())); OX(session_info->set_peer_addr(ObServer::get_instance().get_self())); - OX(session_info->set_thread_id(GETTID())); + OX(session_info->set_query_start_time(ObTimeUtil::current_time())); + OZ(session_info->update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, timeout_val)); } if (OB_FAIL(ret)) { if (session_info != nullptr) { @@ -188,32 +333,6 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa return ret; } -int ObTableLoadClientTask::init_column_names_and_idxs() -{ - int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - if (OB_FAIL( - ObTableLoadSchema::get_table_schema(tenant_id_, table_id_, schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K_(table_id)); - } else if (OB_FAIL( - ObTableLoadSchema::get_column_names(table_schema, allocator_, column_names_))) { - LOG_WARN("fail to get all column name", KR(ret)); - } else if (OB_UNLIKELY(column_names_.empty())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected empty column names", KR(ret)); - } else if (OB_FAIL(ObTableLoadSchema::get_column_idxs(table_schema, column_idxs_))) { - LOG_WARN("failed to get all column idx", K(ret)); - } else if (OB_UNLIKELY(column_idxs_.empty())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected empty column idxs", KR(ret)); - } else if (OB_UNLIKELY(column_names_.count() != column_idxs_.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected column names and idxs", KR(ret), K(column_names_), K(column_idxs_)); - } - return ret; -} - int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us) { int ret = OB_SUCCESS; @@ -230,36 +349,111 @@ int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_t return ret; } -int ObTableLoadClientTask::set_table_ctx(ObTableLoadTableCtx *table_ctx) +int ObTableLoadClientTask::start() { int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == table_ctx)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KP(table_ctx)); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadClientTask not init", KR(ret)); } else { obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(nullptr != table_ctx_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected set table ctx twice", KR(ret), KP(table_ctx_), KP(table_ctx)); + if (OB_UNLIKELY(ObTableLoadClientStatus::MAX_STATUS != client_status_)) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected status", KR(ret), K(client_status_)); } else { - table_ctx->inc_ref_count(); - table_ctx_ = table_ctx; + client_status_ = ObTableLoadClientStatus::INITIALIZING; + ObTableLoadTask *task = nullptr; + if (OB_ISNULL(task = OB_NEWx(ObTableLoadTask, &allocator_, param_.get_tenant_id()))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadTask", KR(ret)); + } else if (OB_FAIL(task->set_processor(this))) { + LOG_WARN("fail to set client task processor", KR(ret)); + } else if (OB_FAIL(task->set_callback(this))) { + LOG_WARN("fail to set common task callback", KR(ret)); + } else if (OB_FAIL(task_scheduler_->add_task(0, task))) { + LOG_WARN("fail to add task", KR(ret)); + } + if (OB_FAIL(ret)) { + client_status_ = ObTableLoadClientStatus::ERROR; + error_code_ = ret; + if (nullptr != task) { + task->~ObTableLoadTask(); + allocator_.free(task); + task = nullptr; + } + } } } return ret; } -int ObTableLoadClientTask::get_table_ctx(ObTableLoadTableCtx *&table_ctx) +int ObTableLoadClientTask::write(ObTableLoadObjRowArray &obj_rows) { int ret = OB_SUCCESS; - table_ctx = nullptr; - obsys::ObRLockGuard guard(rw_lock_); - if (OB_UNLIKELY(nullptr == table_ctx_)) { + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + } else if (OB_UNLIKELY(obj_rows.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(obj_rows)); + } else if (OB_UNLIKELY(session_count_ <= 0)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null table ctx", KR(ret)); + LOG_WARN("unexpected session count", KR(ret), K(session_count_)); } else { - table_ctx = table_ctx_; - table_ctx->inc_ref_count(); + const int64_t batch_id = ATOMIC_FAA(&next_batch_id_, 1); + ; + const int32_t session_id = batch_id % session_count_ + 1; + ObTableLoadSequenceNo start_seq_no(batch_id << ObTableLoadSequenceNo::BATCH_ID_SHIFT); + for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) { + ObTableLoadObjRow &row = obj_rows.at(i); + row.seq_no_ = start_seq_no++; + } + if (OB_SUCC(ret)) { + if (OB_FAIL(instance_.write(session_id, obj_rows))) { + LOG_WARN("fail to write", KR(ret)); + } + } + } + return ret; +} + +int ObTableLoadClientTask::commit() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + } else if (OB_FAIL(check_status(ObTableLoadClientStatus::RUNNING))) { + LOG_WARN("fail to check status", KR(ret)); + } else if (OB_FAIL(set_status_committing())) { + LOG_WARN("fail to set status committing", KR(ret)); + } + return ret; +} + +void ObTableLoadClientTask::abort() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + } else { + set_status_abort(); + if (nullptr != session_info_ && OB_FAIL(session_info_->kill_query())) { + LOG_WARN("fail to kill query", KR(ret)); + } + } +} + +int ObTableLoadClientTask::set_status_waitting() +{ + int ret = OB_SUCCESS; + obsys::ObWLockGuard guard(rw_lock_); + if (OB_UNLIKELY(ObTableLoadClientStatus::INITIALIZING != client_status_)) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("unexpected status", KR(ret), K(client_status_)); + } else { + client_status_ = ObTableLoadClientStatus::WAITTING; } return ret; } @@ -268,7 +462,7 @@ int ObTableLoadClientTask::set_status_running() { int ret = OB_SUCCESS; obsys::ObWLockGuard guard(rw_lock_); - if (OB_UNLIKELY(ObTableLoadClientStatus::MAX_STATUS != client_status_)) { + if (OB_UNLIKELY(ObTableLoadClientStatus::WAITTING != client_status_)) { ret = OB_STATE_NOT_MATCH; LOG_WARN("unexpected status", KR(ret), K(client_status_)); } else { @@ -322,13 +516,16 @@ int ObTableLoadClientTask::set_status_error(int error_code) return ret; } -void ObTableLoadClientTask::set_status_abort() +void ObTableLoadClientTask::set_status_abort(int error_code) { obsys::ObWLockGuard guard(rw_lock_); if (ObTableLoadClientStatus::ABORT == client_status_) { // ignore } else { client_status_ = ObTableLoadClientStatus::ABORT; + if (OB_SUCCESS == error_code_) { + error_code_ = error_code; + } } } @@ -337,10 +534,9 @@ int ObTableLoadClientTask::check_status(ObTableLoadClientStatus client_status) int ret = OB_SUCCESS; obsys::ObRLockGuard guard(rw_lock_); if (OB_UNLIKELY(client_status != client_status_)) { - if (ObTableLoadClientStatus::ERROR == client_status_) { + if (ObTableLoadClientStatus::ERROR == client_status_ || + ObTableLoadClientStatus::ABORT == client_status_) { ret = error_code_; - } else if (ObTableLoadClientStatus::ABORT == client_status_) { - ret = OB_CANCELED; } else { ret = OB_STATE_NOT_MATCH; } @@ -362,51 +558,70 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status, error_code = error_code_; } -int ObTableLoadClientTask::alloc_task(ObTableLoadTask *&task) +int ObTableLoadClientTask::init_instance() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadClientTask not init", KR(ret)); } else { - if (OB_ISNULL(task = task_allocator_.alloc(MTL_ID()))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc task", KR(ret)); + omt::ObTenant *tenant = nullptr; + ObArray column_idxs; + if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) { + LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id())); + } else if (OB_FAIL(ObTableLoadSchema::get_column_idxs(param_.get_tenant_id(), + param_.get_table_id(), column_idxs))) { + LOG_WARN("failed to get column idx", K(ret)); + } else if (OB_UNLIKELY(column_idxs.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected empty column idxs", KR(ret)); + } else { + session_count_ = MIN(param_.get_parallel(), (int64_t)tenant->unit_max_cpu() * 2); + ObTableLoadParam load_param; + load_param.tenant_id_ = param_.get_tenant_id(); + load_param.table_id_ = param_.get_table_id(); + load_param.parallel_ = param_.get_parallel(); + load_param.session_count_ = session_count_; + load_param.batch_size_ = 100; + load_param.max_error_row_count_ = param_.get_max_error_row_count(); + // load_param.sql_mode_ = 0; // TODO(suzhi.yt) 自增列会用到这个参数 + load_param.column_count_ = column_idxs.count(); + load_param.need_sort_ = true; + load_param.px_mode_ = false; + load_param.online_opt_stat_gather_ = false; // 支持统计信息收集需要构造ObExecContext + load_param.dup_action_ = param_.get_dup_action(); + if (OB_FAIL(instance_.init(load_param, column_idxs, exec_ctx_))) { + LOG_WARN("fail to init instance", KR(ret)); + } } } return ret; } -void ObTableLoadClientTask::free_task(ObTableLoadTask *task) +int ObTableLoadClientTask::commit_instance() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadClientTask not init", KR(ret)); - } else if (OB_ISNULL(task)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid null task", KR(ret)); } else { - task_allocator_.free(task); - } -} - -int ObTableLoadClientTask::add_task(ObTableLoadTask *task) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadClientTask not init", KR(ret)); - } else if (OB_ISNULL(task)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid null task", KR(ret)); - } else { - if (OB_FAIL(task_scheduler_->add_task(0, task))) { - LOG_WARN("fail to add task", KR(ret)); + if (OB_FAIL(instance_.commit(result_info_))) { + LOG_WARN("fail to commit instance", KR(ret)); } } return ret; } +void ObTableLoadClientTask::destroy_instance() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadClientTask not init", KR(ret)); + } else { + instance_.destroy(); + } +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index d8f30247e6..8de4da928c 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -13,9 +13,10 @@ #pragma once #include "lib/hash/ob_link_hashmap.h" -#include "observer/table_load/ob_table_load_object_allocator.h" +#include "observer/table_load/ob_table_load_instance.h" #include "observer/table_load/ob_table_load_struct.h" #include "share/table/ob_table_load_define.h" +#include "share/table/ob_table_load_row_array.h" #include "sql/session/ob_sql_session_mgr.h" namespace oceanbase @@ -26,66 +27,109 @@ class ObTableLoadClientExecCtx; class ObTableLoadTableCtx; class ObTableLoadTask; class ObITableLoadTaskScheduler; +class ObTableLoadInstance; + +struct ObTableLoadClientTaskParam +{ +public: + ObTableLoadClientTaskParam(); + ~ObTableLoadClientTaskParam(); + void reset(); + int assign(const ObTableLoadClientTaskParam &other); + bool is_valid() const; + +#define DEFINE_GETTER_AND_SETTER(type, name) \ + OB_INLINE type get_##name() const { return name##_; } \ + OB_INLINE void set_##name(type name) { name##_ = name; } + + DEFINE_GETTER_AND_SETTER(ObAddr, client_addr); + DEFINE_GETTER_AND_SETTER(uint64_t, tenant_id); + DEFINE_GETTER_AND_SETTER(uint64_t, user_id); + DEFINE_GETTER_AND_SETTER(uint64_t, database_id); + DEFINE_GETTER_AND_SETTER(uint64_t, table_id); + DEFINE_GETTER_AND_SETTER(int64_t, parallel); + DEFINE_GETTER_AND_SETTER(uint64_t, max_error_row_count); + DEFINE_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action); + DEFINE_GETTER_AND_SETTER(uint64_t, timeout_us); + DEFINE_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us); + +#undef DEFINE_GETTER_AND_SETTER + + TO_STRING_KV(K_(client_addr), K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), + K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout_us), + K_(heartbeat_timeout_us)); + +private: + ObAddr client_addr_; + uint64_t tenant_id_; + uint64_t user_id_; + uint64_t database_id_; + uint64_t table_id_; + int64_t parallel_; + uint64_t max_error_row_count_; + sql::ObLoadDupActionType dup_action_; + int64_t timeout_us_; + int64_t heartbeat_timeout_us_; +}; class ObTableLoadClientTask { public: ObTableLoadClientTask(); ~ObTableLoadClientTask(); - int init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, uint64_t table_id, - int64_t timeout_us, int64_t heartbeat_timeout_us); - bool is_inited() const { return is_inited_; } - int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); } - int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); } - int64_t dec_ref_count() { return ATOMIC_SAF(&ref_count_, 1); } - int set_table_ctx(ObTableLoadTableCtx *table_ctx); - int get_table_ctx(ObTableLoadTableCtx *&table_ctx); + int init(const ObTableLoadClientTaskParam ¶m); + int start(); + int write(table::ObTableLoadObjRowArray &obj_rows); + int commit(); + void abort(); + OB_INLINE int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); } + OB_INLINE int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); } + OB_INLINE int64_t dec_ref_count() { return ATOMIC_SAF(&ref_count_, 1); } OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; } OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; } - OB_INLINE void set_trans_id(const table::ObTableLoadTransId &trans_id) { trans_id_ = trans_id; } - OB_INLINE const table::ObTableLoadTransId &get_trans_id() const { return trans_id_; } - int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); } + int set_status_waitting(); int set_status_running(); int set_status_committing(); int set_status_commit(); int set_status_error(int error_code); - void set_status_abort(); + void set_status_abort(int error_code = OB_CANCELED); table::ObTableLoadClientStatus get_status() const; void get_status(table::ObTableLoadClientStatus &client_status, int &error_code) const; int check_status(table::ObTableLoadClientStatus client_status); - int alloc_task(ObTableLoadTask *&task); - void free_task(ObTableLoadTask *task); - int add_task(ObTableLoadTask *task); - TO_STRING_KV(K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), K_(ddl_param), - K_(column_names), K_(column_idxs), K_(result_info), KP_(session_info), - K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_id), - KP_(table_ctx), K_(client_status), K_(error_code), K_(ref_count)); + TO_STRING_KV(K_(task_id), K_(param), K_(result_info), KP_(session_info), K_(free_session_ctx), + KP_(exec_ctx), KP_(task_scheduler), K_(client_status), K_(error_code), + K_(ref_count)); + private: - int create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id, - sql::ObSQLSessionInfo *&session_info, + int init_task_scheduler(); + int create_session_info(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, + uint64_t table_id, sql::ObSQLSessionInfo *&session_info, sql::ObFreeSessionCtx &free_session_ctx); - int init_column_names_and_idxs(); int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us); + + int init_instance(); + int commit_instance(); + void destroy_instance(); + +private: + class ClientTaskExectueProcessor; + class ClientTaskExectueCallback; + public: - uint64_t tenant_id_; - uint64_t user_id_; - uint64_t database_id_; - uint64_t table_id_; - ObTableLoadDDLParam ddl_param_; - common::ObArray column_names_; - common::ObArray column_idxs_; + uint64_t task_id_; + ObTableLoadClientTaskParam param_; table::ObTableLoadResultInfo result_info_; + private: ObArenaAllocator allocator_; + ObITableLoadTaskScheduler *task_scheduler_; sql::ObSQLSessionInfo *session_info_; sql::ObFreeSessionCtx free_session_ctx_; ObTableLoadClientExecCtx *exec_ctx_; - ObTableLoadObjectAllocator task_allocator_; - ObITableLoadTaskScheduler *task_scheduler_; - table::ObTableLoadTransId trans_id_; + int64_t session_count_; + ObTableLoadInstance instance_; int64_t next_batch_id_ CACHE_ALIGNED; mutable obsys::ObRWLock rw_lock_; - ObTableLoadTableCtx *table_ctx_; table::ObTableLoadClientStatus client_status_; int error_code_; int64_t ref_count_ CACHE_ALIGNED; @@ -96,22 +140,22 @@ struct ObTableLoadClientTaskBrief : public common::LinkHashValue heartbeat_timeout_us_)) { ret = OB_TIMEOUT; - LOG_WARN("heart beat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_)); + LOG_WARN("heartbeat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_)); } else if (OB_ISNULL(session_info_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("session info is null"); - } else if (session_info_->is_terminate(ret)){ - LOG_WARN("execution was terminated", K(ret)); + LOG_WARN("session info is null", KR(ret)); + } else if (OB_UNLIKELY(session_info_->is_terminate(ret))) { + LOG_WARN("execution was terminated", KR(ret)); } return ret; } diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 31cb17e01e..e263d5964e 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -202,6 +202,7 @@ int ObTableLoadInstance::write(int32_t session_id, const table::ObTableLoadObjRo ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(session_id), K(obj_rows.count())); } else { + // TODO(suzhi.yt): java客户端调用的时候, 对于相同session_id可能会并发 uint64_t &next_sequence_no = trans_ctx_.next_sequence_no_array_[session_id - 1]; ObTableLoadCoordinator coordinator(table_ctx_); if (OB_FAIL(coordinator.init())) { diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index e305eea688..63443e403b 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -22,6 +22,7 @@ namespace oceanbase namespace observer { using namespace common; +using namespace share; using namespace share::schema; using namespace table; using namespace blocksstable; @@ -144,10 +145,26 @@ int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAl return ret; } +int ObTableLoadSchema::get_column_idxs(uint64_t tenant_id, uint64_t table_id, + ObIArray &column_idxs) +{ + int ret = OB_SUCCESS; + column_idxs.reset(); + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else { + ret = get_column_idxs(table_schema, column_idxs); + } + return ret; +} + int ObTableLoadSchema::get_column_idxs(const ObTableSchema *table_schema, ObIArray &column_idxs) { int ret = OB_SUCCESS; + column_idxs.reset(); if (OB_ISNULL(table_schema)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(table_schema)); @@ -192,6 +209,50 @@ int ObTableLoadSchema::check_has_udt_column(const ObTableSchema *table_schema, b return ret; } +int ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, + bool &value) +{ + int ret = OB_SUCCESS; + value = false; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + sqlclient::ObMySQLResult *result = nullptr; + // TODO(suzhi.yt) 这里为啥是带zone纬度的? 如果查询结果中有多个zone的, 选哪个作为返回值呢? + if (OB_FAIL(sql.assign_fmt( + "SELECT value FROM %s WHERE tenant_id = %ld and (zone, name, schema_version) in (select " + "zone, name, max(schema_version) FROM %s group by zone, name) and name = '%s'", + OB_ALL_SYS_VARIABLE_HISTORY_TNAME, + ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), + OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) { + LOG_WARN("fail to append sql", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) { + LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next row", KR(ret), K(tenant_id)); + } else { + ret = OB_SUCCESS; + break; + } + } else { + ObString data; + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data); + if (0 == strcmp(data.ptr(), "1")) { + value = true; + } + } + } + } + } + return ret; +} + ObTableLoadSchema::ObTableLoadSchema() : allocator_("TLD_Schema"), is_partitioned_table_(false), diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index 3ff849aacf..88bfdee268 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -41,9 +41,12 @@ public: static int get_column_names(const share::schema::ObTableSchema *table_schema, common::ObIAllocator &allocator, common::ObIArray &column_names); + static int get_column_idxs(uint64_t tenant_id, uint64_t table_id, + common::ObIArray &column_idxs); static int get_column_idxs(const share::schema::ObTableSchema *table_schema, common::ObIArray &column_idxs); static int check_has_udt_column(const share::schema::ObTableSchema *table_schema, bool &bret); + static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value); public: ObTableLoadSchema(); ~ObTableLoadSchema(); diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index ad47cd7b3a..6766162e8a 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -315,9 +315,7 @@ void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask() ObTableLoadClientTask *client_task = client_task_array.at(i); if (OB_UNLIKELY(ObTableLoadClientStatus::ERROR == client_task->get_status() || client_task->get_exec_ctx()->check_status() != OB_SUCCESS)) { - if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) { - LOG_WARN("fail to abort client task", KR(ret), KPC(client_task)); - } + client_task->abort(); } ObTableLoadClientService::revert_task(client_task); } @@ -641,9 +639,7 @@ void ObTableLoadService::abort_all_client_task() } else { for (int i = 0; i < client_task_array.count(); ++i) { ObTableLoadClientTask *client_task = client_task_array.at(i); - if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) { - LOG_WARN("fail to abort client task", KR(ret), KPC(client_task)); - } + client_task->abort(); ObTableLoadClientService::revert_task(client_task); } } diff --git a/src/share/table/ob_table_load_define.h b/src/share/table/ob_table_load_define.h index 67c609ac15..58dd4af647 100644 --- a/src/share/table/ob_table_load_define.h +++ b/src/share/table/ob_table_load_define.h @@ -390,6 +390,8 @@ enum class ObTableLoadClientStatus : int64_t COMMIT = 2, ERROR = 3, ABORT = 4, + INITIALIZING = 5, // 初始化中 + WAITTING = 6, // 排队等待资源中 MAX_STATUS }; @@ -413,6 +415,12 @@ static int table_load_client_status_to_string(ObTableLoadClientStatus client_sta case ObTableLoadClientStatus::ABORT: status_str = "ABORT"; break; + case ObTableLoadClientStatus::INITIALIZING: + status_str = "INITIALIZING"; + break; + case ObTableLoadClientStatus::WAITTING: + status_str = "WAITTING"; + break; default: ret = OB_ERR_UNEXPECTED; OB_LOG(WARN, "unexpected client status", KR(ret), K(client_status));