Direct load support async begin for java client
This commit is contained in:
@ -45,6 +45,7 @@ int ObTableDirectLoadP::try_process()
|
||||
exec_ctx_.set_tenant_id(credential_.tenant_id_);
|
||||
exec_ctx_.set_user_id(credential_.user_id_);
|
||||
exec_ctx_.set_database_id(credential_.database_id_);
|
||||
exec_ctx_.set_user_client_addr(user_client_addr_);
|
||||
if (OB_FAIL(ObTableLoadClientService::direct_load_operate(exec_ctx_, arg_, result_))) {
|
||||
LOG_WARN("fail to do direct load operate", KR(ret), K(arg_));
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "lib/allocator/ob_allocator.h"
|
||||
#include "lib/net/ob_addr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -34,6 +35,11 @@ public:
|
||||
uint64_t get_user_id() const { return user_id_; }
|
||||
void set_database_id(uint64_t database_id) { database_id_ = database_id; }
|
||||
uint64_t get_database_id() const { return database_id_; }
|
||||
void set_user_client_addr(const ObAddr &user_client_addr)
|
||||
{
|
||||
user_client_addr_ = user_client_addr;
|
||||
}
|
||||
const ObAddr get_user_client_addr() const { return user_client_addr_; }
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableDirectLoadExecContext);
|
||||
private:
|
||||
@ -41,6 +47,7 @@ private:
|
||||
uint64_t tenant_id_;
|
||||
uint64_t user_id_;
|
||||
uint64_t database_id_;
|
||||
ObAddr user_client_addr_;
|
||||
};
|
||||
|
||||
} // namespace observer
|
||||
|
||||
@ -15,14 +15,11 @@
|
||||
#include "ob_table_direct_load_rpc_executor.h"
|
||||
#include "observer/ob_server.h"
|
||||
#include "observer/omt/ob_multi_tenant.h"
|
||||
#include "observer/omt/ob_tenant.h"
|
||||
#include "observer/table_load/ob_table_load_client_service.h"
|
||||
#include "observer/table_load/ob_table_load_client_task.h"
|
||||
#include "observer/table_load/ob_table_load_coordinator.h"
|
||||
#include "observer/table_load/ob_table_load_redef_table.h"
|
||||
#include "observer/table_load/ob_table_load_exec_ctx.h"
|
||||
#include "observer/table_load/ob_table_load_schema.h"
|
||||
#include "observer/table_load/ob_table_load_service.h"
|
||||
#include "observer/table_load/ob_table_load_table_ctx.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -35,24 +32,6 @@ using namespace sql;
|
||||
using namespace table;
|
||||
|
||||
// begin
|
||||
ObTableDirectLoadBeginExecutor::ObTableDirectLoadBeginExecutor(
|
||||
ObTableDirectLoadExecContext &ctx, const ObTableDirectLoadRequest &request,
|
||||
ObTableDirectLoadResult &result)
|
||||
: ParentType(ctx, request, result), client_task_(nullptr), table_ctx_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
ObTableDirectLoadBeginExecutor::~ObTableDirectLoadBeginExecutor()
|
||||
{
|
||||
if (nullptr != client_task_) {
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
client_task_ = nullptr;
|
||||
}
|
||||
if (nullptr != table_ctx_) {
|
||||
ObTableLoadService::put_ctx(table_ctx_);
|
||||
table_ctx_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableDirectLoadBeginExecutor::check_args()
|
||||
{
|
||||
@ -81,231 +60,75 @@ int ObTableDirectLoadBeginExecutor::process()
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_INFO("table direct load begin", K_(arg));
|
||||
const uint64_t tenant_id = ctx_.get_tenant_id();
|
||||
const uint64_t user_id = ctx_.get_user_id();
|
||||
const uint64_t database_id = ctx_.get_database_id();
|
||||
uint64_t table_id = 0;
|
||||
|
||||
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + arg_.timeout_);
|
||||
ObTableLoadClientTask *client_task = nullptr;
|
||||
if (OB_FAIL(ObTableLoadService::check_tenant())) {
|
||||
LOG_WARN("fail to check tenant", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_id(tenant_id, database_id, arg_.table_name_,
|
||||
table_id))) {
|
||||
LOG_WARN("fail to get table id", KR(ret), K(tenant_id), K(database_id), K_(arg));
|
||||
}
|
||||
|
||||
// get the existing client task if it exists
|
||||
while (OB_SUCC(ret)) {
|
||||
ObTableLoadKey key(tenant_id, table_id);
|
||||
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task_))) {
|
||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get client task", KR(ret), K(key));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
client_task_ = nullptr;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
bool need_wait_finish = false;
|
||||
ObTableLoadClientStatus wait_client_status;
|
||||
ObTableLoadClientStatus client_status = client_task_->get_status();
|
||||
switch (client_status) {
|
||||
case ObTableLoadClientStatus::RUNNING:
|
||||
case ObTableLoadClientStatus::COMMITTING:
|
||||
if (arg_.force_create_) {
|
||||
if (OB_FAIL(ObTableLoadClientService::abort_task(client_task_))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
} else {
|
||||
need_wait_finish = true;
|
||||
wait_client_status = ObTableLoadClientStatus::ABORT;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case ObTableLoadClientStatus::COMMIT:
|
||||
case ObTableLoadClientStatus::ABORT:
|
||||
need_wait_finish = true;
|
||||
wait_client_status = client_status;
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected client status", KR(ret), KPC(client_task_), K(client_status));
|
||||
break;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!need_wait_finish) {
|
||||
break;
|
||||
} else {
|
||||
ObTableLoadUniqueKey task_key(table_id, client_task_->ddl_param_.task_id_);
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
client_task_ = nullptr;
|
||||
if (OB_FAIL(ObTableLoadClientService::wait_task_finish(task_key))) {
|
||||
LOG_WARN("fail to wait client task finish", KR(ret), K(task_key), K(wait_client_status));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create new client task if it does not exist
|
||||
if (OB_SUCC(ret) && nullptr == client_task_) {
|
||||
if (OB_FAIL(ObTableLoadService::check_support_direct_load(table_id))) {
|
||||
LOG_WARN("fail to check support direct load", KR(ret), K(table_id));
|
||||
}
|
||||
// create client task
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(client_task_ = ObTableLoadClientService::alloc_task())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
ObTableLoadClientTaskParam param;
|
||||
param.set_client_addr(ctx_.get_user_client_addr());
|
||||
param.set_tenant_id(tenant_id);
|
||||
param.set_user_id(ctx_.get_user_id());
|
||||
param.set_database_id(database_id);
|
||||
param.set_table_id(table_id);
|
||||
param.set_parallel(arg_.parallel_);
|
||||
param.set_max_error_row_count(arg_.max_error_row_count_);
|
||||
param.set_dup_action(arg_.dup_action_);
|
||||
param.set_timeout_us(arg_.timeout_);
|
||||
param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_);
|
||||
if (OB_FAIL(ObTableLoadClientService::alloc_task(client_task))) {
|
||||
LOG_WARN("fail to alloc client task", KR(ret));
|
||||
} else if (OB_FAIL(client_task_->init(tenant_id, user_id, database_id, table_id,
|
||||
arg_.timeout_, arg_.heartbeat_timeout_))) {
|
||||
LOG_WARN("fail to init client task", KR(ret));
|
||||
} else {
|
||||
// create table ctx
|
||||
if (OB_FAIL(create_table_ctx())) {
|
||||
LOG_WARN("fail to create table ctx", KR(ret));
|
||||
} else {
|
||||
client_task_->ddl_param_ = table_ctx_->ddl_param_;
|
||||
if (OB_FAIL(client_task_->set_table_ctx(table_ctx_))) {
|
||||
LOG_WARN("fail to set table ctx", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) {
|
||||
LOG_WARN("fail to remove ctx", KR(tmp_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// begin
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(do_begin())) {
|
||||
LOG_WARN("fail to do begin", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ObTableLoadClientService::add_task(client_task_))) {
|
||||
} else if (OB_FAIL(client_task->init(param))) {
|
||||
LOG_WARN("fail to init client task", KR(ret), K(param));
|
||||
} else if (OB_FAIL(client_task->start())) {
|
||||
LOG_WARN("fail to start client task", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::add_task(client_task))) {
|
||||
LOG_WARN("fail to add client task", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != table_ctx_) {
|
||||
ObTableLoadCoordinator::abort_ctx(table_ctx_);
|
||||
|
||||
if (OB_SUCC(ret) && !arg_.is_async_) {
|
||||
ObTableLoadClientStatus client_status = ObTableLoadClientStatus::MAX_STATUS;
|
||||
int client_error_code = OB_SUCCESS;
|
||||
while (OB_SUCC(ret) && ObTableLoadClientStatus::RUNNING != client_status) {
|
||||
if (OB_UNLIKELY(THIS_WORKER.is_timeout())) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("worker timeout", KR(ret));
|
||||
} else {
|
||||
client_task->get_status(client_status, client_error_code);
|
||||
switch (client_status) {
|
||||
case ObTableLoadClientStatus::RUNNING:
|
||||
break;
|
||||
case ObTableLoadClientStatus::INITIALIZING:
|
||||
case ObTableLoadClientStatus::WAITTING:
|
||||
ob_usleep(200LL * 1000); // sleep 200ms
|
||||
break;
|
||||
case ObTableLoadClientStatus::ERROR:
|
||||
case ObTableLoadClientStatus::ABORT:
|
||||
ret = OB_SUCCESS == client_error_code ? OB_CANCELED : client_error_code;
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected client status", KR(ret), K(client_status));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fill res
|
||||
if (OB_SUCC(ret)) {
|
||||
res_.table_id_ = client_task_->table_id_;
|
||||
res_.task_id_ = client_task_->ddl_param_.task_id_;
|
||||
if (OB_FAIL(res_.column_names_.assign(client_task_->column_names_))) {
|
||||
LOG_WARN("fail to assign column names", KR(ret));
|
||||
} else {
|
||||
client_task_->get_status(res_.status_, res_.error_code_);
|
||||
res_.table_id_ = client_task->param_.get_table_id();
|
||||
res_.task_id_ = client_task->task_id_;
|
||||
client_task->get_status(res_.status_, res_.error_code_);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableDirectLoadBeginExecutor::create_table_ctx()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = client_task_->tenant_id_;
|
||||
const uint64_t table_id = client_task_->table_id_;
|
||||
ObTableLoadDDLParam ddl_param;
|
||||
ObTableLoadParam param;
|
||||
// start redef table
|
||||
if (OB_SUCC(ret)) {
|
||||
ObTableLoadRedefTableStartArg start_arg;
|
||||
ObTableLoadRedefTableStartRes start_res;
|
||||
start_arg.tenant_id_ = tenant_id;
|
||||
start_arg.table_id_ = table_id;
|
||||
start_arg.parallelism_ = arg_.parallel_;
|
||||
start_arg.is_load_data_ = true;
|
||||
if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res,
|
||||
*client_task_->get_session_info()))) {
|
||||
LOG_WARN("fail to start redef table", KR(ret), K(start_arg));
|
||||
} else {
|
||||
ddl_param.dest_table_id_ = start_res.dest_table_id_;
|
||||
ddl_param.task_id_ = start_res.task_id_;
|
||||
ddl_param.schema_version_ = start_res.schema_version_;
|
||||
ddl_param.snapshot_version_ = start_res.snapshot_version_;
|
||||
ddl_param.data_version_ = start_res.data_format_version_;
|
||||
}
|
||||
}
|
||||
// init param
|
||||
if (OB_SUCC(ret)) {
|
||||
ObTenant *tenant = nullptr;
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) {
|
||||
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
param.tenant_id_ = tenant_id;
|
||||
param.table_id_ = table_id;
|
||||
param.batch_size_ = 100;
|
||||
param.parallel_ = arg_.parallel_;
|
||||
param.session_count_ = MIN(arg_.parallel_, (int32_t)tenant->unit_max_cpu() * 2);
|
||||
param.max_error_row_count_ = arg_.max_error_row_count_;
|
||||
param.column_count_ = client_task_->column_names_.count();
|
||||
param.need_sort_ = true;
|
||||
param.px_mode_ = false;
|
||||
param.online_opt_stat_gather_ = false;
|
||||
param.dup_action_ = arg_.dup_action_;
|
||||
if (OB_FAIL(param.normalize())) {
|
||||
LOG_WARN("fail to normalize param", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(table_ctx_ = ObTableLoadService::alloc_ctx())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
|
||||
} else if (OB_FAIL(table_ctx_->init(param, ddl_param, client_task_->get_session_info()))) {
|
||||
LOG_WARN("fail to init table ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx_, client_task_->column_idxs_,
|
||||
client_task_->get_exec_ctx()))) {
|
||||
LOG_WARN("fail to coordinator init ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadService::add_ctx(table_ctx_))) {
|
||||
LOG_WARN("fail to add ctx", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (ddl_param.is_valid()) {
|
||||
ObTableLoadRedefTableAbortArg abort_arg;
|
||||
abort_arg.tenant_id_ = param.tenant_id_;
|
||||
abort_arg.task_id_ = ddl_param.task_id_;
|
||||
if (OB_TMP_FAIL(
|
||||
ObTableLoadRedefTable::abort(abort_arg, *client_task_->get_session_info()))) {
|
||||
LOG_WARN("fail to abort redef table", KR(tmp_ret), K(abort_arg));
|
||||
}
|
||||
}
|
||||
if (nullptr != table_ctx_) {
|
||||
ObTableLoadService::free_ctx(table_ctx_);
|
||||
table_ctx_ = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableDirectLoadBeginExecutor::do_begin()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
ObTableLoadTransId trans_id;
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.begin())) {
|
||||
LOG_WARN("fail to coordinator begin", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.start_trans(ObTableLoadSegmentID(1), trans_id))) {
|
||||
LOG_WARN("fail to start trans", KR(ret));
|
||||
} else {
|
||||
client_task_->set_trans_id(trans_id);
|
||||
if (OB_FAIL(client_task_->set_status_running())) {
|
||||
LOG_WARN("fail to set status running", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
if (nullptr != client_task) {
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
client_task = nullptr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -330,10 +153,8 @@ int ObTableDirectLoadCommitExecutor::process()
|
||||
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
|
||||
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
|
||||
LOG_WARN("fail to get client task", KR(ret), K(key));
|
||||
} else if (OB_FAIL(client_task->check_status(ObTableLoadClientStatus::RUNNING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::commit_task(client_task))) {
|
||||
LOG_WARN("fail to commit client task", KR(ret));
|
||||
} else if (OB_FAIL(client_task->commit())) {
|
||||
LOG_WARN("fail to commit client task", KR(ret), K(key));
|
||||
}
|
||||
if (nullptr != client_task) {
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
@ -362,16 +183,13 @@ int ObTableDirectLoadAbortExecutor::process()
|
||||
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
|
||||
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
|
||||
LOG_WARN("fail to get client task", KR(ret), K(key));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret));
|
||||
} else {
|
||||
client_task->abort();
|
||||
}
|
||||
if (nullptr != client_task) {
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
client_task = nullptr;
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_FAIL(ObTableLoadClientService::wait_task_finish(key))) {
|
||||
LOG_WARN("fail to wait client task finish", KR(ret), K(key));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -438,35 +256,18 @@ int ObTableDirectLoadInsertExecutor::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_DEBUG("table direct load insert", K_(arg));
|
||||
ObTableLoadObjRowArray obj_rows;
|
||||
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
|
||||
ObTableLoadClientTask *client_task = nullptr;
|
||||
if (OB_FAIL(decode_payload(arg_.payload_, obj_rows))) {
|
||||
LOG_WARN("fail to decode payload", KR(ret), K_(arg));
|
||||
} else if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
|
||||
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
|
||||
LOG_WARN("fail to get client task", KR(ret), K(key));
|
||||
} else if (OB_FAIL(client_task->check_status(ObTableLoadClientStatus::RUNNING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
ObTableLoadTableCtx *table_ctx = nullptr;
|
||||
if (OB_FAIL(client_task->get_table_ctx(table_ctx))) {
|
||||
LOG_WARN("fail to get table ctx", KR(ret));
|
||||
} else {
|
||||
ObTableLoadCoordinator coordinator(table_ctx);
|
||||
const ObTableLoadTransId &trans_id = client_task->get_trans_id();
|
||||
const int64_t batch_id = client_task->get_next_batch_id();
|
||||
const int32_t session_id = batch_id % table_ctx->param_.session_count_ + 1;
|
||||
if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) {
|
||||
LOG_WARN("fail to set batch seq no", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.write(trans_id, session_id, 0 /*seq_no*/, obj_rows))) {
|
||||
LOG_WARN("fail to coordinator write", KR(ret));
|
||||
}
|
||||
}
|
||||
if (nullptr != table_ctx) {
|
||||
ObTableLoadService::put_ctx(table_ctx);
|
||||
table_ctx = nullptr;
|
||||
ObTableLoadObjRowArray obj_rows;
|
||||
if (OB_FAIL(decode_payload(arg_.payload_, obj_rows))) {
|
||||
LOG_WARN("fail to decode payload", KR(ret), K_(arg));
|
||||
} else if (OB_FAIL(client_task->write(obj_rows))) {
|
||||
LOG_WARN("fail to write", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task->set_status_error(ret);
|
||||
@ -507,28 +308,6 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id,
|
||||
ObTableLoadObjRowArray &obj_row_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(obj_row_array.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(obj_row_array));
|
||||
} else if (OB_UNLIKELY(batch_id > ObTableLoadSequenceNo::MAX_BATCH_ID ||
|
||||
obj_row_array.count() > ObTableLoadSequenceNo::MAX_BATCH_SEQ_NO)) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_WARN("size is overflow", KR(ret), K(batch_id), K(obj_row_array.count()));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < obj_row_array.count(); ++i) {
|
||||
ObTableLoadObjRow &row = obj_row_array.at(i);
|
||||
row.seq_no_.sequence_no_ = batch_id;
|
||||
row.seq_no_.sequence_no_ <<= ObTableLoadSequenceNo::BATCH_ID_SHIFT;
|
||||
row.seq_no_.sequence_no_ |= i;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// heart_beat
|
||||
int ObTableDirectLoadHeartBeatExecutor::check_args()
|
||||
{
|
||||
|
||||
@ -60,21 +60,16 @@ class ObTableDirectLoadBeginExecutor
|
||||
public:
|
||||
ObTableDirectLoadBeginExecutor(ObTableDirectLoadExecContext &ctx,
|
||||
const table::ObTableDirectLoadRequest &request,
|
||||
table::ObTableDirectLoadResult &result);
|
||||
virtual ~ObTableDirectLoadBeginExecutor();
|
||||
table::ObTableDirectLoadResult &result)
|
||||
: ParentType(ctx, request, result)
|
||||
{
|
||||
}
|
||||
virtual ~ObTableDirectLoadBeginExecutor() = default;
|
||||
|
||||
protected:
|
||||
int check_args() override;
|
||||
int set_result_header() override;
|
||||
int process() override;
|
||||
|
||||
private:
|
||||
int create_table_ctx();
|
||||
int do_begin();
|
||||
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
ObTableLoadTableCtx *table_ctx_;
|
||||
};
|
||||
|
||||
// commit
|
||||
@ -160,7 +155,6 @@ protected:
|
||||
private:
|
||||
static int decode_payload(const common::ObString &payload,
|
||||
table::ObTableLoadObjRowArray &obj_row_array);
|
||||
int set_batch_seq_no(int64_t batch_id, table::ObTableLoadObjRowArray &obj_row_array);
|
||||
};
|
||||
|
||||
// heart_beat
|
||||
|
||||
@ -27,7 +27,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg,
|
||||
dup_action_,
|
||||
timeout_,
|
||||
heartbeat_timeout_,
|
||||
force_create_);
|
||||
force_create_,
|
||||
is_async_);
|
||||
|
||||
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes,
|
||||
table_id_,
|
||||
|
||||
@ -32,11 +32,12 @@ public:
|
||||
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE),
|
||||
timeout_(0),
|
||||
heartbeat_timeout_(0),
|
||||
force_create_(false)
|
||||
force_create_(false),
|
||||
is_async_(false)
|
||||
{
|
||||
}
|
||||
TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout),
|
||||
K_(heartbeat_timeout), K_(force_create));
|
||||
K_(heartbeat_timeout), K_(force_create), K_(is_async));
|
||||
public:
|
||||
ObString table_name_;
|
||||
int64_t parallel_;
|
||||
@ -44,7 +45,8 @@ public:
|
||||
sql::ObLoadDupActionType dup_action_;
|
||||
int64_t timeout_;
|
||||
int64_t heartbeat_timeout_;
|
||||
bool force_create_;
|
||||
bool force_create_; // unused
|
||||
bool is_async_;
|
||||
};
|
||||
|
||||
struct ObTableDirectLoadBeginRes
|
||||
@ -62,7 +64,7 @@ public:
|
||||
public:
|
||||
uint64_t table_id_;
|
||||
int64_t task_id_;
|
||||
common::ObSArray<ObString> column_names_;
|
||||
common::ObSArray<ObString> column_names_; // unused
|
||||
table::ObTableLoadClientStatus status_;
|
||||
int32_t error_code_;
|
||||
};
|
||||
|
||||
@ -26,305 +26,6 @@ namespace observer
|
||||
using namespace common;
|
||||
using namespace table;
|
||||
|
||||
/**
|
||||
* CommitTaskProcessor
|
||||
*/
|
||||
|
||||
class ObTableLoadClientService::CommitTaskProcessor : public ObITableLoadTaskProcessor
|
||||
{
|
||||
public:
|
||||
CommitTaskProcessor(ObTableLoadTask &task, ObTableLoadClientTask *client_task)
|
||||
: ObITableLoadTaskProcessor(task), client_task_(client_task), table_ctx_(nullptr)
|
||||
{
|
||||
client_task_->inc_ref_count();
|
||||
}
|
||||
virtual ~CommitTaskProcessor()
|
||||
{
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
if (nullptr != table_ctx_) {
|
||||
ObTableLoadService::put_ctx(table_ctx_);
|
||||
}
|
||||
}
|
||||
int process() override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(init())) {
|
||||
LOG_WARN("fail to init", KR(ret));
|
||||
}
|
||||
// 1. finish all trans
|
||||
else if (OB_FAIL(finish_all_trans())) {
|
||||
LOG_WARN("fail to finish all trans", KR(ret));
|
||||
}
|
||||
// 2. check all trans commit
|
||||
else if (OB_FAIL(check_all_trans_commit())) {
|
||||
LOG_WARN("fail to check all trans commit", KR(ret));
|
||||
}
|
||||
// 3. finish
|
||||
else if (OB_FAIL(finish())) {
|
||||
LOG_WARN("fail to finish table load", KR(ret));
|
||||
}
|
||||
// 4. check merged
|
||||
else if (OB_FAIL(check_merged())) {
|
||||
LOG_WARN("fail to check merged", KR(ret));
|
||||
}
|
||||
// 5. commit
|
||||
else if (OB_FAIL(commit())) {
|
||||
LOG_WARN("fail to commit table load", KR(ret));
|
||||
}
|
||||
// end
|
||||
else if (OB_FAIL(client_task_->set_status_commit())) {
|
||||
LOG_WARN("fail to set status commit", KR(ret));
|
||||
}
|
||||
// auto abort
|
||||
if (OB_FAIL(ret)) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_FAIL(OB_TMP_FAIL(ObTableLoadClientService::abort_task(client_task_)))) {
|
||||
LOG_WARN("fail to abort client task", KR(tmp_ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
int init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(client_task_->get_table_ctx(table_ctx_))) {
|
||||
LOG_WARN("fail to get table ctx", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int finish_all_trans()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
const ObTableLoadTransId &trans_id = client_task_->get_trans_id();
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.finish_trans(trans_id))) {
|
||||
LOG_WARN("fail to coordinator finish trans", KR(ret), K(trans_id));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int check_all_trans_commit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTableLoadTransId &trans_id = client_task_->get_trans_id();
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) {
|
||||
LOG_WARN("fail to check exec status", KR(ret));
|
||||
} else {
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE;
|
||||
int error_code = OB_SUCCESS;
|
||||
bool try_again = false;
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) {
|
||||
LOG_WARN("fail to coordinator get status", KR(ret), K(trans_id));
|
||||
} else {
|
||||
switch (trans_status) {
|
||||
case ObTableLoadTransStatusType::FROZEN:
|
||||
try_again = true;
|
||||
break;
|
||||
case ObTableLoadTransStatusType::COMMIT:
|
||||
break;
|
||||
case ObTableLoadTransStatusType::ERROR:
|
||||
ret = error_code;
|
||||
LOG_WARN("trans has error", KR(ret));
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected trans status", KR(ret), K(trans_status));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!try_again) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(1000 * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int finish()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.finish())) {
|
||||
LOG_WARN("fail to coordinator finish", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int check_merged()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) {
|
||||
LOG_WARN("fail to check exec status", KR(ret));
|
||||
} else {
|
||||
bool is_merged = false;
|
||||
ObTableLoadStatusType status = ObTableLoadStatusType::NONE;
|
||||
int error_code = OB_SUCCESS;
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.get_status(status, error_code))) {
|
||||
LOG_WARN("fail to coordinator get status", KR(ret));
|
||||
} else {
|
||||
switch (status) {
|
||||
case ObTableLoadStatusType::FROZEN:
|
||||
case ObTableLoadStatusType::MERGING:
|
||||
is_merged = false;
|
||||
break;
|
||||
case ObTableLoadStatusType::MERGED:
|
||||
is_merged = true;
|
||||
break;
|
||||
case ObTableLoadStatusType::ERROR:
|
||||
ret = error_code;
|
||||
LOG_WARN("table load has error", KR(ret));
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected status", KR(ret), K(status));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_merged) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(1000 * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int commit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
} else if (OB_FAIL(coordinator.commit(client_task_->result_info_))) {
|
||||
LOG_WARN("fail to coordinator commit", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task_->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
ObTableLoadTableCtx *table_ctx_;
|
||||
};
|
||||
|
||||
/**
|
||||
* AbortTaskProcessor
|
||||
*/
|
||||
|
||||
class ObTableLoadClientService::AbortTaskProcessor : public ObITableLoadTaskProcessor
|
||||
{
|
||||
public:
|
||||
AbortTaskProcessor(ObTableLoadTask &task, ObTableLoadClientTask *client_task)
|
||||
: ObITableLoadTaskProcessor(task), client_task_(client_task)
|
||||
{
|
||||
client_task_->inc_ref_count();
|
||||
}
|
||||
virtual ~AbortTaskProcessor()
|
||||
{
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
}
|
||||
int process() override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::ABORT))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
ObTableLoadTableCtx *table_ctx = nullptr;
|
||||
if (OB_FAIL(client_task_->get_table_ctx(table_ctx))) {
|
||||
LOG_WARN("fail to get table ctx", KR(ret));
|
||||
} else {
|
||||
ObTableLoadCoordinator::abort_ctx(table_ctx);
|
||||
}
|
||||
if (nullptr != table_ctx) {
|
||||
ObTableLoadService::put_ctx(table_ctx);
|
||||
table_ctx = nullptr;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
};
|
||||
|
||||
/**
|
||||
* CommonTaskCallback
|
||||
*/
|
||||
|
||||
class ObTableLoadClientService::CommonTaskCallback : public ObITableLoadTaskCallback
|
||||
{
|
||||
public:
|
||||
CommonTaskCallback(ObTableLoadClientTask *client_task) : client_task_(client_task)
|
||||
{
|
||||
client_task_->inc_ref_count();
|
||||
}
|
||||
virtual ~CommonTaskCallback()
|
||||
{
|
||||
ObTableLoadClientService::revert_task(client_task_);
|
||||
}
|
||||
void callback(int ret_code, ObTableLoadTask *task) override
|
||||
{
|
||||
client_task_->free_task(task);
|
||||
}
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
};
|
||||
|
||||
/**
|
||||
* ClientTaskBriefEraseIfExpired
|
||||
*/
|
||||
@ -339,7 +40,7 @@ bool ObTableLoadClientService::ClientTaskBriefEraseIfExpired::operator()(
|
||||
* ObTableLoadClientService
|
||||
*/
|
||||
|
||||
ObTableLoadClientService::ObTableLoadClientService() : is_inited_(false) {}
|
||||
ObTableLoadClientService::ObTableLoadClientService() : next_task_id_(1), is_inited_(false) {}
|
||||
|
||||
ObTableLoadClientService::~ObTableLoadClientService() {}
|
||||
|
||||
@ -380,14 +81,32 @@ ObTableLoadClientService *ObTableLoadClientService::get_client_service()
|
||||
return client_service;
|
||||
}
|
||||
|
||||
ObTableLoadClientTask *ObTableLoadClientService::alloc_task()
|
||||
int ObTableLoadClientService::alloc_task(ObTableLoadClientTask *&client_task)
|
||||
{
|
||||
ObTableLoadClientTask *client_task =
|
||||
OB_NEW(ObTableLoadClientTask, ObMemAttr(MTL_ID(), "TLD_ClientTask"));
|
||||
if (nullptr != client_task) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadService *service = nullptr;
|
||||
if (OB_ISNULL(service = MTL(ObTableLoadService *))) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
ObTableLoadClientTask *new_client_task = nullptr;
|
||||
if (OB_ISNULL(new_client_task =
|
||||
OB_NEW(ObTableLoadClientTask, ObMemAttr(MTL_ID(), "TLD_ClientTask")))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadClientTask", KR(ret));
|
||||
} else {
|
||||
new_client_task->task_id_ = service->get_client_service().generate_task_id();
|
||||
client_task = new_client_task;
|
||||
client_task->inc_ref_count();
|
||||
}
|
||||
return client_task;
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != new_client_task) {
|
||||
free_task(new_client_task);
|
||||
new_client_task = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadClientService::free_task(ObTableLoadClientTask *client_task)
|
||||
@ -409,12 +128,9 @@ void ObTableLoadClientService::revert_task(ObTableLoadClientTask *client_task)
|
||||
const int64_t ref_count = client_task->dec_ref_count();
|
||||
OB_ASSERT(ref_count >= 0);
|
||||
if (0 == ref_count) {
|
||||
const uint64_t tenant_id = client_task->tenant_id_;
|
||||
const uint64_t table_id = client_task->table_id_;
|
||||
const uint64_t hidden_table_id = client_task->ddl_param_.dest_table_id_;
|
||||
const int64_t task_id = client_task->ddl_param_.task_id_;
|
||||
LOG_INFO("free client task", K(tenant_id), K(table_id), K(hidden_table_id), K(task_id),
|
||||
KP(client_task));
|
||||
const int64_t task_id = client_task->task_id_;
|
||||
const uint64_t table_id = client_task->param_.get_table_id();
|
||||
LOG_INFO("free client task", K(task_id), K(table_id), KP(client_task));
|
||||
free_task(client_task);
|
||||
client_task = nullptr;
|
||||
}
|
||||
@ -429,7 +145,7 @@ int ObTableLoadClientService::add_task(ObTableLoadClientTask *client_task)
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
ObTableLoadUniqueKey key(client_task->table_id_, client_task->ddl_param_.task_id_);
|
||||
ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_);
|
||||
ret = service->get_client_service().add_client_task(key, client_task);
|
||||
}
|
||||
return ret;
|
||||
@ -443,7 +159,7 @@ int ObTableLoadClientService::remove_task(ObTableLoadClientTask *client_task)
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
ObTableLoadUniqueKey key(client_task->table_id_, client_task->ddl_param_.task_id_);
|
||||
ObTableLoadUniqueKey key(client_task->param_.get_table_id(), client_task->task_id_);
|
||||
ret = service->get_client_service().remove_client_task(key, client_task);
|
||||
}
|
||||
return ret;
|
||||
@ -482,86 +198,6 @@ int ObTableLoadClientService::get_task(const ObTableLoadKey &key,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::exist_task(const ObTableLoadUniqueKey &key, bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadService *service = nullptr;
|
||||
if (OB_ISNULL(service = MTL(ObTableLoadService *))) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(service->get_client_service().exist_client_task(key, is_exist))) {
|
||||
LOG_WARN("fail to check exist client task", KR(ret), K(key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::commit_task(ObTableLoadClientTask *client_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == client_task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KPC(client_task));
|
||||
} else if (OB_FAIL(client_task->set_status_committing())) {
|
||||
LOG_WARN("fail to set status committing", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("client task commit");
|
||||
if (OB_FAIL(construct_commit_task(client_task))) {
|
||||
LOG_WARN("fail to construct commit task", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_task->set_status_error(ret);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::abort_task(ObTableLoadClientTask *client_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == client_task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KPC(client_task));
|
||||
} else if (ObTableLoadClientStatus::ABORT == client_task->get_status()) {
|
||||
// already abort
|
||||
} else {
|
||||
LOG_INFO("client task abort");
|
||||
client_task->set_status_abort();
|
||||
if (OB_FAIL(construct_abort_task(client_task))) {
|
||||
LOG_WARN("fail to construct abort task", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::wait_task_finish(const ObTableLoadUniqueKey &key)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!key.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(key));
|
||||
} else {
|
||||
bool is_exist = true;
|
||||
ObTimeoutCtx ctx;
|
||||
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, 10LL * 1000 * 1000))) {
|
||||
LOG_WARN("fail to set default timeout ctx", KR(ret));
|
||||
}
|
||||
while (OB_SUCC(ret) && is_exist) {
|
||||
if (ctx.is_timeouted()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("timeouted", KR(ret), K(ctx));
|
||||
} else if (OB_FAIL(exist_task(key, is_exist))) {
|
||||
LOG_WARN("fail to check exist client task", KR(ret), K(key));
|
||||
} else if (is_exist) {
|
||||
// wait
|
||||
ob_usleep(100LL * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key,
|
||||
ObTableLoadClientTask *client_task)
|
||||
{
|
||||
@ -639,10 +275,10 @@ int ObTableLoadClientService::remove_client_task(const ObTableLoadUniqueKey &key
|
||||
if (OB_FAIL(client_task_brief_map_.create(key, client_task_brief))) {
|
||||
LOG_WARN("fail to create client task brief", KR(ret), K(key));
|
||||
} else {
|
||||
client_task_brief->table_id_ = client_task->table_id_;
|
||||
client_task_brief->dest_table_id_ = client_task->ddl_param_.dest_table_id_;
|
||||
client_task_brief->task_id_ = client_task->ddl_param_.task_id_;
|
||||
client_task_brief->task_id_ = client_task->task_id_;
|
||||
client_task_brief->table_id_ = client_task->param_.get_table_id();
|
||||
client_task->get_status(client_task_brief->client_status_, client_task_brief->error_code_);
|
||||
client_task_brief->result_info_ = client_task->result_info_;
|
||||
client_task_brief->active_time_ = ObTimeUtil::current_time();
|
||||
}
|
||||
if (nullptr != client_task_brief) {
|
||||
@ -731,30 +367,6 @@ int ObTableLoadClientService::get_client_task_by_table_id(uint64_t table_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientService not init", KR(ret), KP(this));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
ObTableLoadClientTask *client_task = nullptr;
|
||||
if (OB_FAIL(client_task_map_.get_refactored(key, client_task))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(key));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
}
|
||||
} else {
|
||||
is_exist = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObTableLoadClientService::get_client_task_count() const
|
||||
{
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
@ -873,49 +485,5 @@ void ObTableLoadClientService::purge_client_task_brief()
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::construct_commit_task(ObTableLoadClientTask *client_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadTask *task = nullptr;
|
||||
if (OB_FAIL(client_task->alloc_task(task))) {
|
||||
LOG_WARN("fail to alloc task", KR(ret));
|
||||
} else if (OB_FAIL(task->set_processor<CommitTaskProcessor>(client_task))) {
|
||||
LOG_WARN("fail to set commit task processor", KR(ret));
|
||||
} else if (OB_FAIL(task->set_callback<CommonTaskCallback>(client_task))) {
|
||||
LOG_WARN("fail to set common task callback", KR(ret));
|
||||
} else if (OB_FAIL(client_task->add_task(task))) {
|
||||
LOG_WARN("fail to add task", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != task) {
|
||||
client_task->free_task(task);
|
||||
task = nullptr;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientService::construct_abort_task(ObTableLoadClientTask *client_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadTask *task = nullptr;
|
||||
if (OB_FAIL(client_task->alloc_task(task))) {
|
||||
LOG_WARN("fail to alloc task", KR(ret));
|
||||
} else if (OB_FAIL(task->set_processor<AbortTaskProcessor>(client_task))) {
|
||||
LOG_WARN("fail to set abort task processor", KR(ret));
|
||||
} else if (OB_FAIL(task->set_callback<CommonTaskCallback>(client_task))) {
|
||||
LOG_WARN("fail to set common task callback", KR(ret));
|
||||
} else if (OB_FAIL(client_task->add_task(task))) {
|
||||
LOG_WARN("fail to add task", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != task) {
|
||||
client_task->free_task(task);
|
||||
task = nullptr;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace observer
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -39,24 +39,19 @@ public:
|
||||
static ObTableLoadClientService *get_client_service();
|
||||
|
||||
// client task api
|
||||
static ObTableLoadClientTask *alloc_task();
|
||||
static int alloc_task(ObTableLoadClientTask *&client_task);
|
||||
static void free_task(ObTableLoadClientTask *client_task);
|
||||
static void revert_task(ObTableLoadClientTask *client_task);
|
||||
static int add_task(ObTableLoadClientTask *client_task);
|
||||
static int remove_task(ObTableLoadClientTask *client_task);
|
||||
static int get_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task);
|
||||
static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task);
|
||||
static int exist_task(const ObTableLoadUniqueKey &key, bool &is_exist);
|
||||
static int commit_task(ObTableLoadClientTask *client_task);
|
||||
static int abort_task(ObTableLoadClientTask *client_task);
|
||||
static int wait_task_finish(const ObTableLoadUniqueKey &key);
|
||||
|
||||
int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task);
|
||||
int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task);
|
||||
int get_all_client_task(common::ObIArray<ObTableLoadClientTask *> &client_task_array);
|
||||
int get_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task);
|
||||
int get_client_task_by_table_id(uint64_t table_id, ObTableLoadClientTask *&client_task);
|
||||
int exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist);
|
||||
int64_t get_client_task_count() const;
|
||||
void purge_client_task();
|
||||
|
||||
@ -78,12 +73,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
static int construct_commit_task(ObTableLoadClientTask *client_task);
|
||||
static int construct_abort_task(ObTableLoadClientTask *client_task);
|
||||
private:
|
||||
class CommitTaskProcessor;
|
||||
class AbortTaskProcessor;
|
||||
class CommonTaskCallback;
|
||||
OB_INLINE int64_t generate_task_id() { return ATOMIC_FAA(&next_task_id_, 1); }
|
||||
|
||||
private:
|
||||
static const int64_t CLIENT_TASK_RETENTION_PERIOD = 24LL * 60 * 60 * 1000 * 1000; // 1day
|
||||
@ -112,6 +102,7 @@ private:
|
||||
{
|
||||
return client_task_ == entry.second;
|
||||
}
|
||||
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
};
|
||||
@ -122,6 +113,7 @@ private:
|
||||
ClientTaskBriefEraseIfExpired(int64_t expired_ts) : expired_ts_(expired_ts) {}
|
||||
bool operator()(const ObTableLoadUniqueKey &key,
|
||||
ObTableLoadClientTaskBrief *client_task_brief) const;
|
||||
|
||||
private:
|
||||
int64_t expired_ts_;
|
||||
};
|
||||
@ -131,6 +123,7 @@ private:
|
||||
ClientTaskMap client_task_map_;
|
||||
ClientTaskIndexMap client_task_index_map_;
|
||||
ClientTaskBriefMap client_task_brief_map_; // thread safety
|
||||
int64_t next_task_id_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
|
||||
@ -13,40 +13,194 @@
|
||||
#define USING_LOG_PREFIX SERVER
|
||||
|
||||
#include "observer/table_load/ob_table_load_client_task.h"
|
||||
#include "observer/ob_server.h"
|
||||
#include "observer/omt/ob_tenant.h"
|
||||
#include "observer/table_load/ob_table_load_exec_ctx.h"
|
||||
#include "observer/table_load/ob_table_load_schema.h"
|
||||
#include "observer/table_load/ob_table_load_service.h"
|
||||
#include "observer/table_load/ob_table_load_table_ctx.h"
|
||||
#include "observer/table_load/ob_table_load_task.h"
|
||||
#include "observer/table_load/ob_table_load_task_scheduler.h"
|
||||
#include "observer/table_load/ob_table_load_utils.h"
|
||||
#include "observer/table_load/ob_table_load_task.h"
|
||||
#include "observer/ob_server.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace observer
|
||||
{
|
||||
using namespace common;
|
||||
using namespace sql;
|
||||
using namespace table;
|
||||
|
||||
ObTableLoadClientTask::ObTableLoadClientTask()
|
||||
: tenant_id_(OB_INVALID_ID),
|
||||
/**
|
||||
* ObTableLoadClientTaskParam
|
||||
*/
|
||||
|
||||
ObTableLoadClientTaskParam::ObTableLoadClientTaskParam()
|
||||
: tenant_id_(OB_INVALID_TENANT_ID),
|
||||
user_id_(OB_INVALID_ID),
|
||||
database_id_(OB_INVALID_ID),
|
||||
table_id_(OB_INVALID_ID),
|
||||
parallel_(0),
|
||||
max_error_row_count_(0),
|
||||
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
|
||||
timeout_us_(0),
|
||||
heartbeat_timeout_us_(0)
|
||||
{
|
||||
}
|
||||
|
||||
ObTableLoadClientTaskParam::~ObTableLoadClientTaskParam() {}
|
||||
|
||||
void ObTableLoadClientTaskParam::reset()
|
||||
{
|
||||
client_addr_.reset();
|
||||
tenant_id_ = OB_INVALID_TENANT_ID;
|
||||
user_id_ = OB_INVALID_ID;
|
||||
database_id_ = OB_INVALID_ID;
|
||||
table_id_ = OB_INVALID_ID;
|
||||
parallel_ = 0;
|
||||
max_error_row_count_ = 0;
|
||||
dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE;
|
||||
timeout_us_ = 0;
|
||||
heartbeat_timeout_us_ = 0;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this != &other) {
|
||||
reset();
|
||||
client_addr_ = other.client_addr_;
|
||||
tenant_id_ = other.tenant_id_;
|
||||
user_id_ = other.user_id_;
|
||||
database_id_ = other.database_id_;
|
||||
table_id_ = other.table_id_;
|
||||
parallel_ = other.parallel_;
|
||||
max_error_row_count_ = other.max_error_row_count_;
|
||||
dup_action_ = other.dup_action_;
|
||||
timeout_us_ = other.timeout_us_;
|
||||
heartbeat_timeout_us_ = other.heartbeat_timeout_us_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTableLoadClientTaskParam::is_valid() const
|
||||
{
|
||||
return client_addr_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ &&
|
||||
OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && OB_INVALID_ID != table_id_ &&
|
||||
parallel_ > 0 && ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ &&
|
||||
timeout_us_ > 0 && heartbeat_timeout_us_ > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* ClientTaskExecuteProcessor
|
||||
*/
|
||||
|
||||
class ObTableLoadClientTask::ClientTaskExectueProcessor : public ObITableLoadTaskProcessor
|
||||
{
|
||||
public:
|
||||
ClientTaskExectueProcessor(ObTableLoadTask &task, ObTableLoadClientTask *client_task)
|
||||
: ObITableLoadTaskProcessor(task), client_task_(client_task)
|
||||
{
|
||||
client_task_->inc_ref_count();
|
||||
}
|
||||
virtual ~ClientTaskExectueProcessor() { ObTableLoadClientService::revert_task(client_task_); }
|
||||
int process() override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *origin_session_info = THIS_WORKER.get_session();
|
||||
ObSQLSessionInfo *session_info = client_task_->get_session_info();
|
||||
THIS_WORKER.set_session(session_info);
|
||||
if (OB_ISNULL(session_info)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null session info", KR(ret));
|
||||
} else {
|
||||
session_info->set_thread_id(GETTID());
|
||||
session_info->update_last_active_time();
|
||||
if (OB_FAIL(session_info->set_session_state(QUERY_ACTIVE))) {
|
||||
LOG_WARN("fail to set session state", K(ret));
|
||||
}
|
||||
}
|
||||
// begin
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(client_task_->init_instance())) {
|
||||
LOG_WARN("fail to init instance", KR(ret));
|
||||
} else if (OB_FAIL(client_task_->set_status_waitting())) {
|
||||
LOG_WARN("fail to set status waitting", KR(ret));
|
||||
} else if (OB_FAIL(client_task_->set_status_running())) {
|
||||
LOG_WARN("fail to set status running", KR(ret));
|
||||
}
|
||||
}
|
||||
// wait client commit
|
||||
while (OB_SUCC(ret)) {
|
||||
ObTableLoadClientStatus status = client_task_->get_status();
|
||||
if (ObTableLoadClientStatus::RUNNING == status) {
|
||||
ob_usleep(100LL * 1000); // sleep 100ms
|
||||
} else if (ObTableLoadClientStatus::COMMITTING == status) {
|
||||
break;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected client status", KR(ret), K(status));
|
||||
}
|
||||
}
|
||||
// commit
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(client_task_->commit_instance())) {
|
||||
LOG_WARN("fail to commit instance", KR(ret));
|
||||
} else if (OB_FAIL(client_task_->set_status_commit())) {
|
||||
LOG_WARN("fail to set status running", KR(ret));
|
||||
}
|
||||
}
|
||||
client_task_->destroy_instance();
|
||||
THIS_WORKER.set_session(origin_session_info);
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
};
|
||||
|
||||
/**
|
||||
* ClientTaskExectueCallback
|
||||
*/
|
||||
|
||||
class ObTableLoadClientTask::ClientTaskExectueCallback : public ObITableLoadTaskCallback
|
||||
{
|
||||
public:
|
||||
ClientTaskExectueCallback(ObTableLoadClientTask *client_task) : client_task_(client_task)
|
||||
{
|
||||
client_task_->inc_ref_count();
|
||||
}
|
||||
virtual ~ClientTaskExectueCallback() { ObTableLoadClientService::revert_task(client_task_); }
|
||||
void callback(int ret_code, ObTableLoadTask *task) override
|
||||
{
|
||||
if (OB_UNLIKELY(OB_SUCCESS != ret_code)) {
|
||||
client_task_->set_status_abort(ret_code);
|
||||
}
|
||||
task->~ObTableLoadTask();
|
||||
}
|
||||
|
||||
private:
|
||||
ObTableLoadClientTask *client_task_;
|
||||
};
|
||||
|
||||
/**
|
||||
* ObTableLoadClientTask
|
||||
*/
|
||||
|
||||
ObTableLoadClientTask::ObTableLoadClientTask()
|
||||
: task_id_(OB_INVALID_ID),
|
||||
allocator_("TLD_ClientTask"),
|
||||
task_scheduler_(nullptr),
|
||||
session_info_(nullptr),
|
||||
exec_ctx_(nullptr),
|
||||
task_scheduler_(nullptr),
|
||||
session_count_(0),
|
||||
next_batch_id_(0),
|
||||
table_ctx_(nullptr),
|
||||
client_status_(ObTableLoadClientStatus::MAX_STATUS),
|
||||
error_code_(OB_SUCCESS),
|
||||
ref_count_(0),
|
||||
is_inited_(false)
|
||||
{
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
column_names_.set_tenant_id(MTL_ID());
|
||||
column_idxs_.set_tenant_id(MTL_ID());
|
||||
free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID;
|
||||
}
|
||||
|
||||
@ -59,52 +213,40 @@ ObTableLoadClientTask::~ObTableLoadClientTask()
|
||||
allocator_.free(task_scheduler_);
|
||||
task_scheduler_ = nullptr;
|
||||
}
|
||||
if (nullptr != session_info_) {
|
||||
ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);
|
||||
session_info_ = nullptr;
|
||||
}
|
||||
if (nullptr != exec_ctx_) {
|
||||
exec_ctx_->~ObTableLoadClientExecCtx();
|
||||
allocator_.free(exec_ctx_);
|
||||
exec_ctx_ = nullptr;
|
||||
}
|
||||
if (nullptr != table_ctx_) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) {
|
||||
LOG_WARN("fail to remove table ctx", KR(ret), KP(table_ctx_));
|
||||
}
|
||||
ObTableLoadService::put_ctx(table_ctx_);
|
||||
table_ctx_ = nullptr;
|
||||
if (nullptr != session_info_) {
|
||||
ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);
|
||||
session_info_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id,
|
||||
uint64_t table_id, int64_t timeout_us, int64_t heartbeat_timeout_us)
|
||||
int ObTableLoadClientTask::init(const ObTableLoadClientTaskParam ¶m)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObTableLoadClientTask init twice", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id ||
|
||||
OB_INVALID_ID == user_id || 0 == timeout_us)) {
|
||||
} else if (OB_UNLIKELY(!param.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(user_id), K(table_id), K(timeout_us));
|
||||
LOG_WARN("invalid args", KR(ret), K(param));
|
||||
} else if (OB_FAIL(param_.assign(param))) {
|
||||
LOG_WARN("fail to assign param", KR(ret));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
user_id_ = user_id;
|
||||
database_id_ = database_id;
|
||||
table_id_ = table_id;
|
||||
if (OB_FAIL(create_session_info(user_id_, database_id_, table_id_, session_info_,
|
||||
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
|
||||
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + param_.get_timeout_us());
|
||||
if (OB_FAIL(create_session_info(param_.get_tenant_id(), param_.get_user_id(),
|
||||
param_.get_database_id(), param_.get_table_id(), session_info_,
|
||||
free_session_ctx_))) {
|
||||
LOG_WARN("fail to create session info", KR(ret));
|
||||
} else if (OB_FAIL(init_column_names_and_idxs())) {
|
||||
LOG_WARN("fail to init column names and idxs", KR(ret));
|
||||
} else if (OB_FAIL(init_exec_ctx(timeout_us, heartbeat_timeout_us))) {
|
||||
} else if (OB_FAIL(init_exec_ctx(param_.get_timeout_us(), param_.get_heartbeat_timeout_us()))) {
|
||||
LOG_WARN("fail to init client exec ctx", KR(ret));
|
||||
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) {
|
||||
LOG_WARN("fail to init task allocator", KR(ret));
|
||||
} else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler,
|
||||
(&allocator_), 1, table_id, "Client"))) {
|
||||
} else if (OB_ISNULL(task_scheduler_ =
|
||||
OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1,
|
||||
param_.get_table_id(), "Client"))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||
@ -114,13 +256,15 @@ int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t d
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id,
|
||||
sql::ObSQLSessionInfo *&session_info,
|
||||
sql::ObFreeSessionCtx &free_session_ctx)
|
||||
int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user_id,
|
||||
uint64_t database_id, uint64_t table_id,
|
||||
ObSQLSessionInfo *&session_info,
|
||||
ObFreeSessionCtx &free_session_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
schema::ObSchemaGetterGuard schema_guard;
|
||||
@ -128,7 +272,6 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa
|
||||
const ObUserInfo *user_info = nullptr;
|
||||
const ObDatabaseSchema *database_schema = nullptr;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
uint64_t tenant_id = MTL_ID();
|
||||
if (OB_ISNULL(GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid argument", K(ret), K(GCTX.schema_service_));
|
||||
@ -149,9 +292,8 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa
|
||||
} else if (OB_ISNULL(database_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid database schema", K(ret), K(tenant_id), K(database_id));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard,
|
||||
table_schema))) {
|
||||
LOG_WARN("fail to get database and table schema", KR(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid table schema", K(ret), K(tenant_id), K(table_id));
|
||||
@ -161,8 +303,10 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa
|
||||
ObArenaAllocator allocator("TLD_Tmp");
|
||||
allocator.set_tenant_id(MTL_ID());
|
||||
ObStringBuffer buffer(&allocator);
|
||||
buffer.append("DIRECT LOAD_");
|
||||
buffer.append("DIRECT LOAD: ");
|
||||
buffer.append(table_schema->get_table_name());
|
||||
ObObj timeout_val;
|
||||
timeout_val.set_int(param_.get_timeout_us());
|
||||
OZ(session_info->load_default_sys_variable(false, false)); //加载默认的session参数
|
||||
OZ(session_info->load_default_configs_in_pc());
|
||||
OX(session_info->init_tenant(tenant_info->get_tenant_name(), tenant_id));
|
||||
@ -173,11 +317,12 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa
|
||||
OX(session_info->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
|
||||
OX(session_info->set_default_database(database_schema->get_database_name(),
|
||||
CS_TYPE_UTF8MB4_GENERAL_CI));
|
||||
OX(session_info->set_mysql_cmd(obmysql::COM_QUERY));
|
||||
OX(session_info->set_mysql_cmd(COM_QUERY));
|
||||
OX(session_info->set_current_trace_id(ObCurTraceId::get_trace_id()));
|
||||
OX(session_info->set_client_addr(ObServer::get_instance().get_self()));
|
||||
OX(session_info->set_client_addr(param_.get_client_addr()));
|
||||
OX(session_info->set_peer_addr(ObServer::get_instance().get_self()));
|
||||
OX(session_info->set_thread_id(GETTID()));
|
||||
OX(session_info->set_query_start_time(ObTimeUtil::current_time()));
|
||||
OZ(session_info->update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, timeout_val));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (session_info != nullptr) {
|
||||
@ -188,32 +333,6 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::init_column_names_and_idxs()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
if (OB_FAIL(
|
||||
ObTableLoadSchema::get_table_schema(tenant_id_, table_id_, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K_(table_id));
|
||||
} else if (OB_FAIL(
|
||||
ObTableLoadSchema::get_column_names(table_schema, allocator_, column_names_))) {
|
||||
LOG_WARN("fail to get all column name", KR(ret));
|
||||
} else if (OB_UNLIKELY(column_names_.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected empty column names", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_column_idxs(table_schema, column_idxs_))) {
|
||||
LOG_WARN("failed to get all column idx", K(ret));
|
||||
} else if (OB_UNLIKELY(column_idxs_.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected empty column idxs", KR(ret));
|
||||
} else if (OB_UNLIKELY(column_names_.count() != column_idxs_.count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected column names and idxs", KR(ret), K(column_names_), K(column_idxs_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -230,36 +349,111 @@ int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_t
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::set_table_ctx(ObTableLoadTableCtx *table_ctx)
|
||||
int ObTableLoadClientTask::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == table_ctx)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(table_ctx));
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else {
|
||||
obsys::ObWLockGuard guard(rw_lock_);
|
||||
if (OB_UNLIKELY(nullptr != table_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected set table ctx twice", KR(ret), KP(table_ctx_), KP(table_ctx));
|
||||
if (OB_UNLIKELY(ObTableLoadClientStatus::MAX_STATUS != client_status_)) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("unexpected status", KR(ret), K(client_status_));
|
||||
} else {
|
||||
table_ctx->inc_ref_count();
|
||||
table_ctx_ = table_ctx;
|
||||
client_status_ = ObTableLoadClientStatus::INITIALIZING;
|
||||
ObTableLoadTask *task = nullptr;
|
||||
if (OB_ISNULL(task = OB_NEWx(ObTableLoadTask, &allocator_, param_.get_tenant_id()))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadTask", KR(ret));
|
||||
} else if (OB_FAIL(task->set_processor<ClientTaskExectueProcessor>(this))) {
|
||||
LOG_WARN("fail to set client task processor", KR(ret));
|
||||
} else if (OB_FAIL(task->set_callback<ClientTaskExectueCallback>(this))) {
|
||||
LOG_WARN("fail to set common task callback", KR(ret));
|
||||
} else if (OB_FAIL(task_scheduler_->add_task(0, task))) {
|
||||
LOG_WARN("fail to add task", KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
client_status_ = ObTableLoadClientStatus::ERROR;
|
||||
error_code_ = ret;
|
||||
if (nullptr != task) {
|
||||
task->~ObTableLoadTask();
|
||||
allocator_.free(task);
|
||||
task = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::get_table_ctx(ObTableLoadTableCtx *&table_ctx)
|
||||
int ObTableLoadClientTask::write(ObTableLoadObjRowArray &obj_rows)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
table_ctx = nullptr;
|
||||
obsys::ObRLockGuard guard(rw_lock_);
|
||||
if (OB_UNLIKELY(nullptr == table_ctx_)) {
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else if (OB_UNLIKELY(obj_rows.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(obj_rows));
|
||||
} else if (OB_UNLIKELY(session_count_ <= 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null table ctx", KR(ret));
|
||||
LOG_WARN("unexpected session count", KR(ret), K(session_count_));
|
||||
} else {
|
||||
table_ctx = table_ctx_;
|
||||
table_ctx->inc_ref_count();
|
||||
const int64_t batch_id = ATOMIC_FAA(&next_batch_id_, 1);
|
||||
;
|
||||
const int32_t session_id = batch_id % session_count_ + 1;
|
||||
ObTableLoadSequenceNo start_seq_no(batch_id << ObTableLoadSequenceNo::BATCH_ID_SHIFT);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) {
|
||||
ObTableLoadObjRow &row = obj_rows.at(i);
|
||||
row.seq_no_ = start_seq_no++;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(instance_.write(session_id, obj_rows))) {
|
||||
LOG_WARN("fail to write", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::commit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else if (OB_FAIL(check_status(ObTableLoadClientStatus::RUNNING))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(set_status_committing())) {
|
||||
LOG_WARN("fail to set status committing", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadClientTask::abort()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else {
|
||||
set_status_abort();
|
||||
if (nullptr != session_info_ && OB_FAIL(session_info_->kill_query())) {
|
||||
LOG_WARN("fail to kill query", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::set_status_waitting()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
obsys::ObWLockGuard guard(rw_lock_);
|
||||
if (OB_UNLIKELY(ObTableLoadClientStatus::INITIALIZING != client_status_)) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("unexpected status", KR(ret), K(client_status_));
|
||||
} else {
|
||||
client_status_ = ObTableLoadClientStatus::WAITTING;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -268,7 +462,7 @@ int ObTableLoadClientTask::set_status_running()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
obsys::ObWLockGuard guard(rw_lock_);
|
||||
if (OB_UNLIKELY(ObTableLoadClientStatus::MAX_STATUS != client_status_)) {
|
||||
if (OB_UNLIKELY(ObTableLoadClientStatus::WAITTING != client_status_)) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("unexpected status", KR(ret), K(client_status_));
|
||||
} else {
|
||||
@ -322,13 +516,16 @@ int ObTableLoadClientTask::set_status_error(int error_code)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadClientTask::set_status_abort()
|
||||
void ObTableLoadClientTask::set_status_abort(int error_code)
|
||||
{
|
||||
obsys::ObWLockGuard guard(rw_lock_);
|
||||
if (ObTableLoadClientStatus::ABORT == client_status_) {
|
||||
// ignore
|
||||
} else {
|
||||
client_status_ = ObTableLoadClientStatus::ABORT;
|
||||
if (OB_SUCCESS == error_code_) {
|
||||
error_code_ = error_code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -337,10 +534,9 @@ int ObTableLoadClientTask::check_status(ObTableLoadClientStatus client_status)
|
||||
int ret = OB_SUCCESS;
|
||||
obsys::ObRLockGuard guard(rw_lock_);
|
||||
if (OB_UNLIKELY(client_status != client_status_)) {
|
||||
if (ObTableLoadClientStatus::ERROR == client_status_) {
|
||||
if (ObTableLoadClientStatus::ERROR == client_status_ ||
|
||||
ObTableLoadClientStatus::ABORT == client_status_) {
|
||||
ret = error_code_;
|
||||
} else if (ObTableLoadClientStatus::ABORT == client_status_) {
|
||||
ret = OB_CANCELED;
|
||||
} else {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
}
|
||||
@ -362,51 +558,70 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status,
|
||||
error_code = error_code_;
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::alloc_task(ObTableLoadTask *&task)
|
||||
int ObTableLoadClientTask::init_instance()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else {
|
||||
if (OB_ISNULL(task = task_allocator_.alloc(MTL_ID()))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc task", KR(ret));
|
||||
omt::ObTenant *tenant = nullptr;
|
||||
ObArray<int64_t> column_idxs;
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) {
|
||||
LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id()));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_column_idxs(param_.get_tenant_id(),
|
||||
param_.get_table_id(), column_idxs))) {
|
||||
LOG_WARN("failed to get column idx", K(ret));
|
||||
} else if (OB_UNLIKELY(column_idxs.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected empty column idxs", KR(ret));
|
||||
} else {
|
||||
session_count_ = MIN(param_.get_parallel(), (int64_t)tenant->unit_max_cpu() * 2);
|
||||
ObTableLoadParam load_param;
|
||||
load_param.tenant_id_ = param_.get_tenant_id();
|
||||
load_param.table_id_ = param_.get_table_id();
|
||||
load_param.parallel_ = param_.get_parallel();
|
||||
load_param.session_count_ = session_count_;
|
||||
load_param.batch_size_ = 100;
|
||||
load_param.max_error_row_count_ = param_.get_max_error_row_count();
|
||||
// load_param.sql_mode_ = 0; // TODO(suzhi.yt) 自增列会用到这个参数
|
||||
load_param.column_count_ = column_idxs.count();
|
||||
load_param.need_sort_ = true;
|
||||
load_param.px_mode_ = false;
|
||||
load_param.online_opt_stat_gather_ = false; // 支持统计信息收集需要构造ObExecContext
|
||||
load_param.dup_action_ = param_.get_dup_action();
|
||||
if (OB_FAIL(instance_.init(load_param, column_idxs, exec_ctx_))) {
|
||||
LOG_WARN("fail to init instance", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadClientTask::free_task(ObTableLoadTask *task)
|
||||
int ObTableLoadClientTask::commit_instance()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else if (OB_ISNULL(task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid null task", KR(ret));
|
||||
} else {
|
||||
task_allocator_.free(task);
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableLoadClientTask::add_task(ObTableLoadTask *task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else if (OB_ISNULL(task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid null task", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(task_scheduler_->add_task(0, task))) {
|
||||
LOG_WARN("fail to add task", KR(ret));
|
||||
if (OB_FAIL(instance_.commit(result_info_))) {
|
||||
LOG_WARN("fail to commit instance", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadClientTask::destroy_instance()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadClientTask not init", KR(ret));
|
||||
} else {
|
||||
instance_.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace observer
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -13,9 +13,10 @@
|
||||
#pragma once
|
||||
|
||||
#include "lib/hash/ob_link_hashmap.h"
|
||||
#include "observer/table_load/ob_table_load_object_allocator.h"
|
||||
#include "observer/table_load/ob_table_load_instance.h"
|
||||
#include "observer/table_load/ob_table_load_struct.h"
|
||||
#include "share/table/ob_table_load_define.h"
|
||||
#include "share/table/ob_table_load_row_array.h"
|
||||
#include "sql/session/ob_sql_session_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
@ -26,66 +27,109 @@ class ObTableLoadClientExecCtx;
|
||||
class ObTableLoadTableCtx;
|
||||
class ObTableLoadTask;
|
||||
class ObITableLoadTaskScheduler;
|
||||
class ObTableLoadInstance;
|
||||
|
||||
struct ObTableLoadClientTaskParam
|
||||
{
|
||||
public:
|
||||
ObTableLoadClientTaskParam();
|
||||
~ObTableLoadClientTaskParam();
|
||||
void reset();
|
||||
int assign(const ObTableLoadClientTaskParam &other);
|
||||
bool is_valid() const;
|
||||
|
||||
#define DEFINE_GETTER_AND_SETTER(type, name) \
|
||||
OB_INLINE type get_##name() const { return name##_; } \
|
||||
OB_INLINE void set_##name(type name) { name##_ = name; }
|
||||
|
||||
DEFINE_GETTER_AND_SETTER(ObAddr, client_addr);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, tenant_id);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, user_id);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, database_id);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, table_id);
|
||||
DEFINE_GETTER_AND_SETTER(int64_t, parallel);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, max_error_row_count);
|
||||
DEFINE_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, timeout_us);
|
||||
DEFINE_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us);
|
||||
|
||||
#undef DEFINE_GETTER_AND_SETTER
|
||||
|
||||
TO_STRING_KV(K_(client_addr), K_(tenant_id), K_(user_id), K_(database_id), K_(table_id),
|
||||
K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout_us),
|
||||
K_(heartbeat_timeout_us));
|
||||
|
||||
private:
|
||||
ObAddr client_addr_;
|
||||
uint64_t tenant_id_;
|
||||
uint64_t user_id_;
|
||||
uint64_t database_id_;
|
||||
uint64_t table_id_;
|
||||
int64_t parallel_;
|
||||
uint64_t max_error_row_count_;
|
||||
sql::ObLoadDupActionType dup_action_;
|
||||
int64_t timeout_us_;
|
||||
int64_t heartbeat_timeout_us_;
|
||||
};
|
||||
|
||||
class ObTableLoadClientTask
|
||||
{
|
||||
public:
|
||||
ObTableLoadClientTask();
|
||||
~ObTableLoadClientTask();
|
||||
int init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, uint64_t table_id,
|
||||
int64_t timeout_us, int64_t heartbeat_timeout_us);
|
||||
bool is_inited() const { return is_inited_; }
|
||||
int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
|
||||
int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); }
|
||||
int64_t dec_ref_count() { return ATOMIC_SAF(&ref_count_, 1); }
|
||||
int set_table_ctx(ObTableLoadTableCtx *table_ctx);
|
||||
int get_table_ctx(ObTableLoadTableCtx *&table_ctx);
|
||||
int init(const ObTableLoadClientTaskParam ¶m);
|
||||
int start();
|
||||
int write(table::ObTableLoadObjRowArray &obj_rows);
|
||||
int commit();
|
||||
void abort();
|
||||
OB_INLINE int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
|
||||
OB_INLINE int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); }
|
||||
OB_INLINE int64_t dec_ref_count() { return ATOMIC_SAF(&ref_count_, 1); }
|
||||
OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; }
|
||||
OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; }
|
||||
OB_INLINE void set_trans_id(const table::ObTableLoadTransId &trans_id) { trans_id_ = trans_id; }
|
||||
OB_INLINE const table::ObTableLoadTransId &get_trans_id() const { return trans_id_; }
|
||||
int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); }
|
||||
int set_status_waitting();
|
||||
int set_status_running();
|
||||
int set_status_committing();
|
||||
int set_status_commit();
|
||||
int set_status_error(int error_code);
|
||||
void set_status_abort();
|
||||
void set_status_abort(int error_code = OB_CANCELED);
|
||||
table::ObTableLoadClientStatus get_status() const;
|
||||
void get_status(table::ObTableLoadClientStatus &client_status, int &error_code) const;
|
||||
int check_status(table::ObTableLoadClientStatus client_status);
|
||||
int alloc_task(ObTableLoadTask *&task);
|
||||
void free_task(ObTableLoadTask *task);
|
||||
int add_task(ObTableLoadTask *task);
|
||||
TO_STRING_KV(K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), K_(ddl_param),
|
||||
K_(column_names), K_(column_idxs), K_(result_info), KP_(session_info),
|
||||
K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_id),
|
||||
KP_(table_ctx), K_(client_status), K_(error_code), K_(ref_count));
|
||||
TO_STRING_KV(K_(task_id), K_(param), K_(result_info), KP_(session_info), K_(free_session_ctx),
|
||||
KP_(exec_ctx), KP_(task_scheduler), K_(client_status), K_(error_code),
|
||||
K_(ref_count));
|
||||
|
||||
private:
|
||||
int create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id,
|
||||
sql::ObSQLSessionInfo *&session_info,
|
||||
int init_task_scheduler();
|
||||
int create_session_info(uint64_t tenant_id, uint64_t user_id, uint64_t database_id,
|
||||
uint64_t table_id, sql::ObSQLSessionInfo *&session_info,
|
||||
sql::ObFreeSessionCtx &free_session_ctx);
|
||||
int init_column_names_and_idxs();
|
||||
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);
|
||||
|
||||
int init_instance();
|
||||
int commit_instance();
|
||||
void destroy_instance();
|
||||
|
||||
private:
|
||||
class ClientTaskExectueProcessor;
|
||||
class ClientTaskExectueCallback;
|
||||
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
uint64_t user_id_;
|
||||
uint64_t database_id_;
|
||||
uint64_t table_id_;
|
||||
ObTableLoadDDLParam ddl_param_;
|
||||
common::ObArray<ObString> column_names_;
|
||||
common::ObArray<int64_t> column_idxs_;
|
||||
uint64_t task_id_;
|
||||
ObTableLoadClientTaskParam param_;
|
||||
table::ObTableLoadResultInfo result_info_;
|
||||
|
||||
private:
|
||||
ObArenaAllocator allocator_;
|
||||
ObITableLoadTaskScheduler *task_scheduler_;
|
||||
sql::ObSQLSessionInfo *session_info_;
|
||||
sql::ObFreeSessionCtx free_session_ctx_;
|
||||
ObTableLoadClientExecCtx *exec_ctx_;
|
||||
ObTableLoadObjectAllocator<ObTableLoadTask> task_allocator_;
|
||||
ObITableLoadTaskScheduler *task_scheduler_;
|
||||
table::ObTableLoadTransId trans_id_;
|
||||
int64_t session_count_;
|
||||
ObTableLoadInstance instance_;
|
||||
int64_t next_batch_id_ CACHE_ALIGNED;
|
||||
mutable obsys::ObRWLock rw_lock_;
|
||||
ObTableLoadTableCtx *table_ctx_;
|
||||
table::ObTableLoadClientStatus client_status_;
|
||||
int error_code_;
|
||||
int64_t ref_count_ CACHE_ALIGNED;
|
||||
@ -96,22 +140,22 @@ struct ObTableLoadClientTaskBrief : public common::LinkHashValue<ObTableLoadUniq
|
||||
{
|
||||
public:
|
||||
ObTableLoadClientTaskBrief()
|
||||
: table_id_(common::OB_INVALID_ID),
|
||||
dest_table_id_(common::OB_INVALID_ID),
|
||||
task_id_(0),
|
||||
: task_id_(common::OB_INVALID_ID),
|
||||
table_id_(common::OB_INVALID_ID),
|
||||
client_status_(table::ObTableLoadClientStatus::MAX_STATUS),
|
||||
error_code_(common::OB_SUCCESS),
|
||||
active_time_(0)
|
||||
{
|
||||
}
|
||||
TO_STRING_KV(K_(table_id), K_(dest_table_id), K_(task_id), K_(client_status), K_(error_code),
|
||||
TO_STRING_KV(K_(task_id), K_(table_id), K_(client_status), K_(error_code), K_(result_info),
|
||||
K_(active_time));
|
||||
|
||||
public:
|
||||
uint64_t table_id_;
|
||||
uint64_t dest_table_id_;
|
||||
int64_t task_id_;
|
||||
uint64_t table_id_;
|
||||
table::ObTableLoadClientStatus client_status_;
|
||||
int error_code_;
|
||||
table::ObTableLoadResultInfo result_info_;
|
||||
int64_t active_time_;
|
||||
};
|
||||
|
||||
|
||||
@ -76,12 +76,12 @@ int ObTableLoadClientExecCtx::check_status()
|
||||
LOG_WARN("table load is timeout", KR(ret), K_(timeout_ts));
|
||||
} else if (OB_UNLIKELY(ObTimeUtil::current_time() - last_heartbeat_time_ > heartbeat_timeout_us_)) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("heart beat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_));
|
||||
LOG_WARN("heartbeat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_));
|
||||
} else if (OB_ISNULL(session_info_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("session info is null");
|
||||
} else if (session_info_->is_terminate(ret)){
|
||||
LOG_WARN("execution was terminated", K(ret));
|
||||
LOG_WARN("session info is null", KR(ret));
|
||||
} else if (OB_UNLIKELY(session_info_->is_terminate(ret))) {
|
||||
LOG_WARN("execution was terminated", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -202,6 +202,7 @@ int ObTableLoadInstance::write(int32_t session_id, const table::ObTableLoadObjRo
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(session_id), K(obj_rows.count()));
|
||||
} else {
|
||||
// TODO(suzhi.yt): java客户端调用的时候, 对于相同session_id可能会并发
|
||||
uint64_t &next_sequence_no = trans_ctx_.next_sequence_no_array_[session_id - 1];
|
||||
ObTableLoadCoordinator coordinator(table_ctx_);
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
|
||||
@ -22,6 +22,7 @@ namespace oceanbase
|
||||
namespace observer
|
||||
{
|
||||
using namespace common;
|
||||
using namespace share;
|
||||
using namespace share::schema;
|
||||
using namespace table;
|
||||
using namespace blocksstable;
|
||||
@ -144,10 +145,26 @@ int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAl
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::get_column_idxs(uint64_t tenant_id, uint64_t table_id,
|
||||
ObIArray<int64_t> &column_idxs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
column_idxs.reset();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
} else {
|
||||
ret = get_column_idxs(table_schema, column_idxs);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::get_column_idxs(const ObTableSchema *table_schema,
|
||||
ObIArray<int64_t> &column_idxs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
column_idxs.reset();
|
||||
if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(table_schema));
|
||||
@ -192,6 +209,50 @@ int ObTableLoadSchema::check_has_udt_column(const ObTableSchema *table_schema, b
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id,
|
||||
bool &value)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
value = false;
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res)
|
||||
{
|
||||
sqlclient::ObMySQLResult *result = nullptr;
|
||||
// TODO(suzhi.yt) 这里为啥是带zone纬度的? 如果查询结果中有多个zone的, 选哪个作为返回值呢?
|
||||
if (OB_FAIL(sql.assign_fmt(
|
||||
"SELECT value FROM %s WHERE tenant_id = %ld and (zone, name, schema_version) in (select "
|
||||
"zone, name, max(schema_version) FROM %s group by zone, name) and name = '%s'",
|
||||
OB_ALL_SYS_VARIABLE_HISTORY_TNAME,
|
||||
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
|
||||
OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(result->next())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to get next row", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
ObString data;
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data);
|
||||
if (0 == strcmp(data.ptr(), "1")) {
|
||||
value = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTableLoadSchema::ObTableLoadSchema()
|
||||
: allocator_("TLD_Schema"),
|
||||
is_partitioned_table_(false),
|
||||
|
||||
@ -41,9 +41,12 @@ public:
|
||||
static int get_column_names(const share::schema::ObTableSchema *table_schema,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<common::ObString> &column_names);
|
||||
static int get_column_idxs(uint64_t tenant_id, uint64_t table_id,
|
||||
common::ObIArray<int64_t> &column_idxs);
|
||||
static int get_column_idxs(const share::schema::ObTableSchema *table_schema,
|
||||
common::ObIArray<int64_t> &column_idxs);
|
||||
static int check_has_udt_column(const share::schema::ObTableSchema *table_schema, bool &bret);
|
||||
static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value);
|
||||
public:
|
||||
ObTableLoadSchema();
|
||||
~ObTableLoadSchema();
|
||||
|
||||
@ -315,9 +315,7 @@ void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask()
|
||||
ObTableLoadClientTask *client_task = client_task_array.at(i);
|
||||
if (OB_UNLIKELY(ObTableLoadClientStatus::ERROR == client_task->get_status() ||
|
||||
client_task->get_exec_ctx()->check_status() != OB_SUCCESS)) {
|
||||
if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret), KPC(client_task));
|
||||
}
|
||||
client_task->abort();
|
||||
}
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
}
|
||||
@ -641,9 +639,7 @@ void ObTableLoadService::abort_all_client_task()
|
||||
} else {
|
||||
for (int i = 0; i < client_task_array.count(); ++i) {
|
||||
ObTableLoadClientTask *client_task = client_task_array.at(i);
|
||||
if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) {
|
||||
LOG_WARN("fail to abort client task", KR(ret), KPC(client_task));
|
||||
}
|
||||
client_task->abort();
|
||||
ObTableLoadClientService::revert_task(client_task);
|
||||
}
|
||||
}
|
||||
|
||||
@ -390,6 +390,8 @@ enum class ObTableLoadClientStatus : int64_t
|
||||
COMMIT = 2,
|
||||
ERROR = 3,
|
||||
ABORT = 4,
|
||||
INITIALIZING = 5, // 初始化中
|
||||
WAITTING = 6, // 排队等待资源中
|
||||
MAX_STATUS
|
||||
};
|
||||
|
||||
@ -413,6 +415,12 @@ static int table_load_client_status_to_string(ObTableLoadClientStatus client_sta
|
||||
case ObTableLoadClientStatus::ABORT:
|
||||
status_str = "ABORT";
|
||||
break;
|
||||
case ObTableLoadClientStatus::INITIALIZING:
|
||||
status_str = "INITIALIZING";
|
||||
break;
|
||||
case ObTableLoadClientStatus::WAITTING:
|
||||
status_str = "WAITTING";
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "unexpected client status", KR(ret), K(client_status));
|
||||
|
||||
Reference in New Issue
Block a user