table direct load support incremental direct load

This commit is contained in:
suz-yang
2024-06-17 14:49:17 +00:00
committed by ob-robot
parent dc46617135
commit c03b71b647
6 changed files with 128 additions and 37 deletions

View File

@ -20,6 +20,7 @@
#include "observer/table_load/ob_table_load_exec_ctx.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_service.h"
#include "sql/resolver/dml/ob_hint.h"
namespace oceanbase
{
@ -59,28 +60,13 @@ int ObTableDirectLoadBeginExecutor::process()
{
int ret = OB_SUCCESS;
LOG_INFO("table direct load begin", K_(arg));
const uint64_t tenant_id = ctx_.get_tenant_id();
const uint64_t database_id = ctx_.get_database_id();
uint64_t table_id = 0;
ObTableLoadClientTaskParam param;
ObTableLoadClientTask *client_task = nullptr;
if (OB_FAIL(ObTableLoadService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_table_id(tenant_id, database_id, arg_.table_name_,
table_id))) {
LOG_WARN("fail to get table id", KR(ret), K(tenant_id), K(database_id), K_(arg));
} else {
ObTableLoadClientTaskParam param;
param.set_client_addr(ctx_.get_user_client_addr());
param.set_tenant_id(tenant_id);
param.set_user_id(ctx_.get_user_id());
param.set_database_id(database_id);
param.set_table_id(table_id);
param.set_parallel(arg_.parallel_);
param.set_max_error_row_count(arg_.max_error_row_count_);
param.set_dup_action(arg_.dup_action_);
param.set_timeout_us(arg_.timeout_);
param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_);
if (OB_FAIL(ObTableLoadClientService::alloc_task(client_task))) {
} else if (OB_FAIL(resolve_param(param))) {
LOG_WARN("fail to resolve param", KR(ret));
} else if (OB_FAIL(ObTableLoadClientService::alloc_task(client_task))) {
LOG_WARN("fail to alloc client task", KR(ret));
} else if (OB_FAIL(client_task->init(param))) {
LOG_WARN("fail to init client task", KR(ret), K(param));
@ -89,7 +75,6 @@ int ObTableDirectLoadBeginExecutor::process()
} else if (OB_FAIL(ObTableLoadClientService::add_task(client_task))) {
LOG_WARN("fail to add client task", KR(ret));
}
}
if (OB_SUCC(ret) && !arg_.is_async_) {
ObTableLoadClientStatus client_status = ObTableLoadClientStatus::MAX_STATUS;
@ -133,6 +118,67 @@ int ObTableDirectLoadBeginExecutor::process()
return ret;
}
int ObTableDirectLoadBeginExecutor::resolve_param(ObTableLoadClientTaskParam &param)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = ctx_.get_tenant_id();
const uint64_t database_id = ctx_.get_database_id();
uint64_t table_id = 0;
ObLoadDupActionType dup_action = arg_.dup_action_;
ObDirectLoadMethod::Type method = ObDirectLoadMethod::INVALID_METHOD;
ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
param.reset();
if (OB_FAIL(ObTableLoadSchema::get_table_id(tenant_id, database_id, arg_.table_name_,
table_id))) {
LOG_WARN("fail to get table id", KR(ret), K(tenant_id), K(database_id), K_(arg));
} else if (arg_.load_method_.empty()) {
method = ObDirectLoadMethod::FULL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
} else {
ObDirectLoadHint::LoadMethod load_method = ObDirectLoadHint::get_load_method_value(arg_.load_method_);
switch (load_method) {
case ObDirectLoadHint::FULL:
method = ObDirectLoadMethod::FULL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
break;
case ObDirectLoadHint::INC:
method = ObDirectLoadMethod::INCREMENTAL;
insert_mode = ObDirectLoadInsertMode::NORMAL;
break;
case ObDirectLoadHint::INC_REPLACE:
if (OB_UNLIKELY(ObLoadDupActionType::LOAD_STOP_ON_DUP != dup_action)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("replace or ignore for inc_replace load method not supported", KR(ret),
K(arg_.load_method_), K(arg_.dup_action_));
} else {
dup_action = ObLoadDupActionType::LOAD_REPLACE; //rewrite dup action
method = ObDirectLoadMethod::INCREMENTAL;
insert_mode = ObDirectLoadInsertMode::INC_REPLACE;
}
break;
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid load method", KR(ret), K(arg_.load_method_));
break;
}
}
if (OB_SUCC(ret)) {
param.set_client_addr(ctx_.get_user_client_addr());
param.set_tenant_id(tenant_id);
param.set_user_id(ctx_.get_user_id());
param.set_database_id(database_id);
param.set_table_id(table_id);
param.set_parallel(arg_.parallel_);
param.set_max_error_row_count(arg_.max_error_row_count_);
param.set_dup_action(dup_action);
param.set_timeout_us(arg_.timeout_);
param.set_heartbeat_timeout_us(arg_.heartbeat_timeout_);
param.set_method(method);
param.set_insert_mode(insert_mode);
}
return ret;
}
// commit
int ObTableDirectLoadCommitExecutor::check_args()

View File

@ -20,6 +20,7 @@ namespace oceanbase
{
namespace observer
{
class ObTableLoadClientTaskParam;
class ObTableLoadClientTask;
class ObTableLoadTableCtx;
@ -70,6 +71,9 @@ protected:
int check_args() override;
int set_result_header() override;
int process() override;
private:
int resolve_param(ObTableLoadClientTaskParam &param);
};
// commit

View File

@ -28,7 +28,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg,
timeout_,
heartbeat_timeout_,
force_create_,
is_async_);
is_async_,
load_method_);
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes,
table_id_,

View File

@ -36,8 +36,15 @@ public:
is_async_(false)
{
}
TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout),
K_(heartbeat_timeout), K_(force_create), K_(is_async));
TO_STRING_KV(K_(table_name),
K_(parallel),
K_(max_error_row_count),
K_(dup_action),
K_(timeout),
K_(heartbeat_timeout),
K_(force_create),
K_(is_async),
K_(load_method));
public:
ObString table_name_;
int64_t parallel_;
@ -47,6 +54,7 @@ public:
int64_t heartbeat_timeout_;
bool force_create_; // unused
bool is_async_;
ObString load_method_;
};
struct ObTableDirectLoadBeginRes

View File

@ -46,7 +46,9 @@ ObTableLoadClientTaskParam::ObTableLoadClientTaskParam()
max_error_row_count_(0),
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
timeout_us_(0),
heartbeat_timeout_us_(0)
heartbeat_timeout_us_(0),
method_(ObDirectLoadMethod::INVALID_METHOD),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE)
{
}
@ -64,6 +66,8 @@ void ObTableLoadClientTaskParam::reset()
dup_action_ = ObLoadDupActionType::LOAD_INVALID_MODE;
timeout_us_ = 0;
heartbeat_timeout_us_ = 0;
method_ = ObDirectLoadMethod::INVALID_METHOD;
insert_mode_ = ObDirectLoadInsertMode::INVALID_INSERT_MODE;
}
int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other)
@ -81,6 +85,8 @@ int ObTableLoadClientTaskParam::assign(const ObTableLoadClientTaskParam &other)
dup_action_ = other.dup_action_;
timeout_us_ = other.timeout_us_;
heartbeat_timeout_us_ = other.heartbeat_timeout_us_;
method_ = other.method_;
insert_mode_ = other.insert_mode_;
}
return ret;
}
@ -90,7 +96,19 @@ bool ObTableLoadClientTaskParam::is_valid() const
return client_addr_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ &&
OB_INVALID_ID != user_id_ && OB_INVALID_ID != database_id_ && OB_INVALID_ID != table_id_ &&
parallel_ > 0 && ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ &&
timeout_us_ > 0 && heartbeat_timeout_us_ > 0;
timeout_us_ > 0 && heartbeat_timeout_us_ > 0 &&
ObDirectLoadMethod::is_type_valid(method_) &&
ObDirectLoadInsertMode::is_type_valid(insert_mode_) &&
(storage::ObDirectLoadMethod::is_full(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode_)
: true) &&
(storage::ObDirectLoadMethod::is_incremental(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_incremental_method(insert_mode_)
: true) &&
(storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_
? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_
: true);
}
/**
@ -569,8 +587,8 @@ int ObTableLoadClientTask::init_instance()
} else {
const uint64_t tenant_id = param_.get_tenant_id();
const uint64_t table_id = param_.get_table_id();
ObDirectLoadMethod::Type method = ObDirectLoadMethod::FULL;
ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::NORMAL;
const ObDirectLoadMethod::Type method = param_.get_method();
const ObDirectLoadInsertMode::Type insert_mode = param_.get_insert_mode();
omt::ObTenant *tenant = nullptr;
ObSchemaGetterGuard schema_guard;
ObArray<uint64_t> column_ids;

View File

@ -18,6 +18,7 @@
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_row_array.h"
#include "sql/session/ob_sql_session_mgr.h"
#include "storage/direct_load/ob_direct_load_struct.h"
namespace oceanbase
{
@ -52,12 +53,23 @@ public:
DEFINE_GETTER_AND_SETTER(sql::ObLoadDupActionType, dup_action);
DEFINE_GETTER_AND_SETTER(uint64_t, timeout_us);
DEFINE_GETTER_AND_SETTER(uint64_t, heartbeat_timeout_us);
DEFINE_GETTER_AND_SETTER(storage::ObDirectLoadMethod::Type, method);
DEFINE_GETTER_AND_SETTER(storage::ObDirectLoadInsertMode::Type, insert_mode);
#undef DEFINE_GETTER_AND_SETTER
TO_STRING_KV(K_(client_addr), K_(tenant_id), K_(user_id), K_(database_id), K_(table_id),
K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout_us),
K_(heartbeat_timeout_us));
TO_STRING_KV(K_(client_addr),
K_(tenant_id),
K_(user_id),
K_(database_id),
K_(table_id),
K_(parallel),
K_(max_error_row_count),
K_(dup_action),
K_(timeout_us),
K_(heartbeat_timeout_us),
"method", storage::ObDirectLoadMethod::get_type_string(method_),
"insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_));
private:
ObAddr client_addr_;
@ -70,6 +82,8 @@ private:
sql::ObLoadDupActionType dup_action_;
int64_t timeout_us_;
int64_t heartbeat_timeout_us_;
storage::ObDirectLoadMethod::Type method_;
storage::ObDirectLoadInsertMode::Type insert_mode_;
};
class ObTableLoadClientTask