fix timestap bug

This commit is contained in:
yongshige
2023-04-17 04:44:53 +00:00
committed by ob-robot
parent 32ef62e2b0
commit 2ef15964cb
19 changed files with 218 additions and 100 deletions

View File

@ -34,12 +34,10 @@ int ObTableLoadAbortP::process()
ret = OB_SUCCESS;
}
} else {
if (OB_FAIL(ObTableLoadUtils::init_session_info(credential_.user_id_, session_info_))) {
LOG_WARN("fail to init session info", KR(ret));
} else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(key));
} else {
ObTableLoadCoordinator::abort_ctx(table_ctx, session_info_);
ObTableLoadCoordinator::abort_ctx(table_ctx);
}
}
if (OB_NOT_NULL(table_ctx)) {

View File

@ -30,7 +30,6 @@ private:
private:
const ObGlobalContext &gctx_;
table::ObTableApiCredential credential_;
sql::ObSQLSessionInfo session_info_;
};
class ObTableLoadAbortPeerP : public obrpc::ObRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_LOAD_ABORT_PEER> >

View File

@ -69,12 +69,6 @@ int ObTableLoadBeginP::process()
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObTableLoadUtils::init_session_info(credential_.user_id_, session_info_))) {
LOG_WARN("fail to init session info", KR(ret));
}
}
// get the existing table ctx if it exists
if (OB_SUCC(ret)) {
ObTableLoadKey key(credential_.tenant_id_, table_id);
@ -94,12 +88,17 @@ int ObTableLoadBeginP::process()
// create new table ctx if it does not exist
if (OB_SUCC(ret) && nullptr == table_ctx_) {
ObTenant *tenant = nullptr;
sql::ObSQLSessionInfo *session_info = nullptr;
sql::ObFreeSessionCtx free_session_ctx;
free_session_ctx.sessid_ = ObSQLSessionInfo::INVALID_SESSID;
if (arg_.config_.flag_.data_type_ >=
static_cast<uint64_t>(ObTableLoadDataType::MAX_DATA_TYPE)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("data type is error", KR(ret), K(arg_.config_.flag_.data_type_));
} else if (OB_FAIL(GCTX.omt_->get_tenant(credential_.tenant_id_, tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(credential_.tenant_id_));
} else if (OB_FAIL(ObTableLoadUtils::create_session_info(credential_.user_id_, session_info, free_session_ctx))) {
LOG_WARN("fail to init session info", KR(ret));
} else {
ObTableLoadParam param;
param.tenant_id_ = credential_.tenant_id_;
@ -121,12 +120,16 @@ int ObTableLoadBeginP::process()
LOG_WARN("fail to check support direct load", KR(ret), K(table_id));
}
// create table ctx
else if (OB_FAIL(create_table_ctx(param, idx_array_, session_info_, table_ctx_))) {
else if (OB_FAIL(create_table_ctx(param, idx_array_, *session_info, table_ctx_))) {
LOG_WARN("fail to create table ctx", KR(ret));
} else {
is_new = true;
}
}
if (session_info != nullptr) {
ObTableLoadUtils::free_session_info(session_info, free_session_ctx);
session_info = nullptr;
}
}
if (OB_SUCC(ret)) {
@ -149,7 +152,7 @@ int ObTableLoadBeginP::process()
if (OB_FAIL(ret)) {
if (nullptr != table_ctx_) {
ObTableLoadService::remove_ctx(table_ctx_);
ObTableLoadCoordinator::abort_ctx(table_ctx_, session_info_);
ObTableLoadCoordinator::abort_ctx(table_ctx_);
}
}
@ -215,7 +218,7 @@ int ObTableLoadBeginP::create_table_ctx(const ObTableLoadParam &param,
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, ddl_param))) {
} else if (OB_FAIL(table_ctx->init(param, ddl_param, &session_info))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, idx_array_,
session_info.get_priv_user_id()))) {
@ -316,7 +319,7 @@ int ObTableLoadPreBeginPeerP::create_table_ctx(const ObTableLoadParam &param,
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, ddl_param))) {
} else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadStore::init_ctx(table_ctx,
arg_.partition_id_array_,

View File

@ -37,7 +37,6 @@ private:
const ObGlobalContext &gctx_;
common::ObArenaAllocator allocator_;
table::ObTableApiCredential credential_;
sql::ObSQLSessionInfo session_info_;
common::ObArray<int64_t> idx_array_;
ObTableLoadTableCtx *table_ctx_;
};

View File

@ -26,8 +26,6 @@ int ObTableLoadCommitP::process()
int ret = OB_SUCCESS;
if (OB_FAIL(check_user_access(arg_.credential_))) {
LOG_WARN("fail to check_user_access", KR(ret));
} else if (OB_FAIL(ObTableLoadUtils::init_session_info(credential_.user_id_, session_info_))) {
LOG_WARN("fail to init session info", KR(ret));
} else {
ObTableLoadTableCtx *table_ctx = nullptr;
ObExecContext *exec_ctx = nullptr;
@ -38,7 +36,7 @@ int ObTableLoadCommitP::process()
ObTableLoadCoordinator coordinator(table_ctx);
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.commit(exec_ctx, session_info_, result_.result_info_))) {
} else if (OB_FAIL(coordinator.commit(exec_ctx, result_.result_info_))) {
LOG_WARN("fail to coordinator commit", KR(ret));
} else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(key));

View File

@ -30,7 +30,6 @@ private:
private:
const ObGlobalContext &gctx_;
table::ObTableApiCredential credential_;
sql::ObSQLSessionInfo session_info_;
};
class ObTableLoadCommitPeerP : public obrpc::ObRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_LOAD_COMMIT_PEER> >

View File

@ -76,7 +76,7 @@ int ObTableLoadCoordinator::init_ctx(ObTableLoadTableCtx *ctx, const ObIArray<in
return ret;
}
void ObTableLoadCoordinator::abort_ctx(ObTableLoadTableCtx *ctx, ObSQLSessionInfo &session_info)
void ObTableLoadCoordinator::abort_ctx(ObTableLoadTableCtx *ctx)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(nullptr == ctx || !ctx->is_valid())) {
@ -100,7 +100,7 @@ void ObTableLoadCoordinator::abort_ctx(ObTableLoadTableCtx *ctx, ObSQLSessionInf
LOG_WARN("fail to abort peers ctx", KR(ret));
}
// 4. abort redef table, release table lock
if (OB_FAIL(abort_redef_table(ctx, session_info))) {
if (OB_FAIL(abort_redef_table(ctx))) {
LOG_WARN("fail to abort redef table", KR(ret));
}
}
@ -161,13 +161,13 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
return ret;
}
int ObTableLoadCoordinator::abort_redef_table(ObTableLoadTableCtx *ctx, ObSQLSessionInfo &session_info)
int ObTableLoadCoordinator::abort_redef_table(ObTableLoadTableCtx *ctx)
{
int ret = OB_SUCCESS;
ObTableLoadRedefTableAbortArg arg;
arg.tenant_id_ = ctx->param_.tenant_id_;
arg.task_id_ = ctx->ddl_param_.task_id_;
if (OB_FAIL(ObTableLoadRedefTable::abort(arg, session_info))) {
if (OB_FAIL(ObTableLoadRedefTable::abort(arg, *ctx->session_info_))) {
LOG_WARN("fail to abort redef table", KR(ret), K(arg));
}
return ret;
@ -230,6 +230,7 @@ int ObTableLoadCoordinator::pre_begin_peers()
request.schema_version_ = ctx_->ddl_param_.schema_version_;
request.snapshot_version_ = ctx_->ddl_param_.snapshot_version_;
request.data_version_ = ctx_->ddl_param_.data_version_;
request.session_info_ = ctx_->session_info_;
for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) {
const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i);
const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info = target_all_leader_info_array.at(i);
@ -630,7 +631,7 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
return ret;
}
int ObTableLoadCoordinator::commit_redef_table(ObSQLSessionInfo &session_info)
int ObTableLoadCoordinator::commit_redef_table()
{
int ret = OB_SUCCESS;
ObTableLoadRedefTableFinishArg arg;
@ -639,7 +640,7 @@ int ObTableLoadCoordinator::commit_redef_table(ObSQLSessionInfo &session_info)
arg.dest_table_id_ = ctx_->ddl_param_.dest_table_id_;
arg.task_id_ = ctx_->ddl_param_.task_id_;
arg.schema_version_ = ctx_->ddl_param_.schema_version_;
if (OB_FAIL(ObTableLoadRedefTable::finish(arg, session_info))) {
if (OB_FAIL(ObTableLoadRedefTable::finish(arg, *ctx_->session_info_))) {
LOG_WARN("fail to finish redef table", KR(ret), K(arg));
}
return ret;
@ -647,7 +648,7 @@ int ObTableLoadCoordinator::commit_redef_table(ObSQLSessionInfo &session_info)
// commit() = px_commit_data() + px_commit_ddl()
// used in non px_mode
int ObTableLoadCoordinator::commit(ObExecContext *exec_ctx, ObSQLSessionInfo &session_info,
int ObTableLoadCoordinator::commit(ObExecContext *exec_ctx,
ObTableLoadResultInfo &result_info)
{
int ret = OB_SUCCESS;
@ -665,7 +666,7 @@ int ObTableLoadCoordinator::commit(ObExecContext *exec_ctx, ObSQLSessionInfo &se
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(drive_sql_stat(*exec_ctx, sql_statistics))) {
LOG_WARN("fail to drive sql stat", KR(ret));
} else if (OB_FAIL(commit_redef_table(session_info))) {
} else if (OB_FAIL(commit_redef_table())) {
LOG_WARN("fail to commit redef table", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->set_status_commit_unlock())) {
LOG_WARN("fail to set coordinator status commit", KR(ret));
@ -701,7 +702,7 @@ int ObTableLoadCoordinator::px_commit_data(ObExecContext *exec_ctx)
}
// commit ddl procedure
int ObTableLoadCoordinator::px_commit_ddl(ObSQLSessionInfo &session_info)
int ObTableLoadCoordinator::px_commit_ddl()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -712,7 +713,7 @@ int ObTableLoadCoordinator::px_commit_ddl(ObSQLSessionInfo &session_info)
obsys::ObWLockGuard guard(coordinator_ctx_->get_status_lock());
if (OB_FAIL(coordinator_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_redef_table(session_info))) {
} else if (OB_FAIL(commit_redef_table())) {
LOG_WARN("fail to commit redef table", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->set_status_commit_unlock())) {
LOG_WARN("fail to set coordinator status commit", KR(ret));

View File

@ -27,22 +27,21 @@ public:
static bool is_ctx_inited(ObTableLoadTableCtx *ctx);
static int init_ctx(ObTableLoadTableCtx *ctx, const common::ObIArray<int64_t> &idx_array,
uint64_t user_id);
static void abort_ctx(ObTableLoadTableCtx *ctx, sql::ObSQLSessionInfo &session_info);
static void abort_ctx(ObTableLoadTableCtx *ctx);
int init();
bool is_valid() const { return is_inited_; }
private:
static int abort_active_trans(ObTableLoadTableCtx *ctx);
static int abort_peers_ctx(ObTableLoadTableCtx *ctx);
static int abort_redef_table(ObTableLoadTableCtx *ctx, sql::ObSQLSessionInfo &session_info);
static int abort_redef_table(ObTableLoadTableCtx *ctx);
// table load ctrl interface
public:
int begin();
int finish();
int commit(sql::ObExecContext *exec_ctx, sql::ObSQLSessionInfo &session_info,
table::ObTableLoadResultInfo &result_info);
int commit(sql::ObExecContext *exec_ctx, table::ObTableLoadResultInfo &result_info);
int px_commit_data(sql::ObExecContext *exec_ctx);
int px_commit_ddl(sql::ObSQLSessionInfo &session_info);
int px_commit_ddl();
int get_status(table::ObTableLoadStatusType &status, int &error_code);
private:
int pre_begin_peers();
@ -50,7 +49,7 @@ private:
int pre_merge_peers();
int start_merge_peers();
int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics);
int commit_redef_table(sql::ObSQLSessionInfo &session_info);
int commit_redef_table();
int drive_sql_stat(sql::ObExecContext &ctx, table::ObTableLoadSqlStatistics &sql_statistics);
private:
int add_check_merge_result_task();

View File

@ -24,7 +24,6 @@ using namespace table;
ObTableLoadInstance::ObTableLoadInstance()
: execute_ctx_(nullptr),
allocator_(nullptr),
session_info_(nullptr),
table_ctx_(nullptr),
job_stat_(nullptr),
is_committed_(false),
@ -42,7 +41,7 @@ void ObTableLoadInstance::destroy()
if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) {
LOG_WARN("table ctx may remove by service", KR(ret), KP(table_ctx_));
} else if (!is_committed_) {
ObTableLoadCoordinator::abort_ctx(table_ctx_, *session_info_);
ObTableLoadCoordinator::abort_ctx(table_ctx_);
}
ObTableLoadService::put_ctx(table_ctx_);
table_ctx_ = nullptr;
@ -63,7 +62,6 @@ int ObTableLoadInstance::init(ObTableLoadParam &param, const ObIArray<int64_t> &
} else {
execute_ctx_ = execute_ctx;
allocator_ = execute_ctx->allocator_;
session_info_ = execute_ctx_->exec_ctx_->get_my_session();
if (OB_FAIL(param.normalize())) {
LOG_WARN("fail to normalize param", KR(ret));
}
@ -72,7 +70,7 @@ int ObTableLoadInstance::init(ObTableLoadParam &param, const ObIArray<int64_t> &
LOG_WARN("fail to check support direct load", KR(ret), K(param.table_id_));
}
// create table ctx
else if (OB_FAIL(create_table_ctx(param, idx_array))) {
else if (OB_FAIL(create_table_ctx(param, idx_array, execute_ctx_->exec_ctx_->get_my_session()))) {
LOG_WARN("fail to create table ctx", KR(ret));
}
// begin
@ -93,7 +91,8 @@ int ObTableLoadInstance::init(ObTableLoadParam &param, const ObIArray<int64_t> &
}
int ObTableLoadInstance::create_table_ctx(ObTableLoadParam &param,
const ObIArray<int64_t> &idx_array)
const ObIArray<int64_t> &idx_array,
sql::ObSQLSessionInfo *session_info)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
@ -108,7 +107,7 @@ int ObTableLoadInstance::create_table_ctx(ObTableLoadParam &param,
start_arg.is_load_data_ = !param.px_mode_;
if (OB_FAIL(GET_MIN_DATA_VERSION(param.tenant_id_, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret));
} else if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, *session_info_))) {
} else if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, *session_info))) {
LOG_WARN("fail to start redef table", KR(ret), K(start_arg));
} else {
ddl_param.dest_table_id_ = start_res.dest_table_id_;
@ -121,10 +120,10 @@ int ObTableLoadInstance::create_table_ctx(ObTableLoadParam &param,
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, ddl_param))) {
} else if (OB_FAIL(table_ctx->init(param, ddl_param, session_info))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, idx_array,
session_info_->get_priv_user_id()))) {
session_info->get_priv_user_id()))) {
LOG_WARN("fail to coordinator init ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadService::add_ctx(table_ctx))) {
LOG_WARN("fail to add ctx", KR(ret));
@ -138,7 +137,7 @@ int ObTableLoadInstance::create_table_ctx(ObTableLoadParam &param,
ObTableLoadRedefTableAbortArg abort_arg;
abort_arg.tenant_id_ = param.tenant_id_;
abort_arg.task_id_ = ddl_param.task_id_;
if (OB_TMP_FAIL(ObTableLoadRedefTable::abort(abort_arg, *session_info_))) {
if (OB_TMP_FAIL(ObTableLoadRedefTable::abort(abort_arg, *session_info))) {
LOG_WARN("fail to abort redef table", KR(tmp_ret), K(abort_arg));
}
}
@ -304,7 +303,7 @@ int ObTableLoadInstance::commit(ObTableLoadResultInfo &result_info)
LOG_WARN("fail to check merged", KR(ret));
}
// commit
else if (OB_FAIL(coordinator.commit(execute_ctx_->exec_ctx_, *session_info_, result_info))) {
else if (OB_FAIL(coordinator.commit(execute_ctx_->exec_ctx_, result_info))) {
LOG_WARN("fail to commit", KR(ret));
} else {
is_committed_ = true;
@ -351,7 +350,7 @@ int ObTableLoadInstance::px_commit_ddl()
ObTableLoadCoordinator coordinator(table_ctx_);
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.px_commit_ddl(*session_info_))) {
} else if (OB_FAIL(coordinator.px_commit_ddl())) {
LOG_WARN("fail to do px_commit_ddl", KR(ret));
} else {
is_committed_ = true;

View File

@ -39,7 +39,8 @@ public:
ATOMIC_AAF(&job_stat_->parsed_bytes_, parsed_bytes);
}
private:
int create_table_ctx(ObTableLoadParam &param, const common::ObIArray<int64_t> &idx_array);
int create_table_ctx(ObTableLoadParam &param, const common::ObIArray<int64_t> &idx_array,
sql::ObSQLSessionInfo *session_info);
int begin();
int start_trans();
int check_trans_committed();
@ -61,7 +62,6 @@ private:
static const int64_t DEFAULT_SEGMENT_ID = 1;
ObTableLoadExecCtx *execute_ctx_;
common::ObIAllocator *allocator_;
sql::ObSQLSessionInfo *session_info_;
ObTableLoadTableCtx *table_ctx_;
sql::ObLoadDataStat *job_stat_;
TransCtx trans_ctx_;

View File

@ -335,24 +335,17 @@ void ObTableLoadService::abort_all_ctx()
if (OB_FAIL(manager_.remove_all_table_ctx(table_ctx_array))) {
LOG_WARN("fail to remove all table ctx list", KR(ret));
} else {
SMART_VAR(sql::ObSQLSessionInfo, session_info)
{
if (OB_FAIL(ObTableLoadUtils::init_session_info(OB_SERVER_USER_ID, session_info))) {
LOG_WARN("fail to init session info", KR(ret));
} else {
for (int i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
// abort coordinator
if (nullptr != table_ctx->coordinator_ctx_) {
ObTableLoadCoordinator::abort_ctx(table_ctx, session_info);
}
// abort store
else if (nullptr != table_ctx->store_ctx_) {
ObTableLoadStore::abort_ctx(table_ctx);
}
manager_.put_table_ctx(table_ctx);
}
for (int i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
// abort coordinator
if (nullptr != table_ctx->coordinator_ctx_) {
ObTableLoadCoordinator::abort_ctx(table_ctx);
}
// abort store
else if (nullptr != table_ctx->store_ctx_) {
ObTableLoadStore::abort_ctx(table_ctx);
}
manager_.put_table_ctx(table_ctx);
}
}
}

View File

@ -27,11 +27,13 @@ ObTableLoadTableCtx::ObTableLoadTableCtx()
: coordinator_ctx_(nullptr),
store_ctx_(nullptr),
job_stat_(nullptr),
session_info_(nullptr),
allocator_("TLD_TableCtx"),
ref_count_(0),
is_dirty_(false),
is_inited_(false)
{
free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID;
}
ObTableLoadTableCtx::~ObTableLoadTableCtx()
@ -39,13 +41,14 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx()
destroy();
}
int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param)
int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param,
sql::ObSQLSessionInfo *session_info)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadTableCtx init twice", KR(ret));
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid())) {
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param));
} else {
@ -66,6 +69,10 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDD
LOG_WARN("fail to init allocator", KR(ret));
} else if (OB_FAIL(register_job_stat())) {
LOG_WARN("fail to register job stat", KR(ret));
} else if (OB_FAIL(ObTableLoadUtils::create_session_info(session_info_, free_session_ctx_))) {
LOG_WARN("fail to create session info", KR(ret));
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(session_info, session_info_, allocator_))) {
LOG_WARN("fail to deep copy", KR(ret));
} else {
is_inited_ = true;
}
@ -243,6 +250,10 @@ void ObTableLoadTableCtx::destroy()
allocator_.free(store_ctx_);
store_ctx_ = nullptr;
}
if (nullptr != session_info_) {
observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);
session_info_ = nullptr;
}
unregister_job_stat();
is_inited_ = false;
}

View File

@ -12,6 +12,7 @@
#include "sql/engine/cmd/ob_load_data_utils.h"
#include "observer/table_load/ob_table_load_object_allocator.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/session/ob_sql_session_mgr.h"
namespace oceanbase
{
@ -28,7 +29,7 @@ class ObTableLoadTableCtx : public common::ObDLinkBase<ObTableLoadTableCtx>
public:
ObTableLoadTableCtx();
~ObTableLoadTableCtx();
int init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param);
int init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info);
void stop();
void destroy();
bool is_valid() const { return is_inited_; }
@ -52,7 +53,6 @@ public:
private:
int register_job_stat();
void unregister_job_stat();
public:
ObTableLoadParam param_;
ObTableLoadDDLParam ddl_param_;
@ -61,6 +61,8 @@ public:
ObTableLoadStoreCtx *store_ctx_; // 只在数据节点构造
sql::ObLoadDataGID gid_;
sql::ObLoadDataStat *job_stat_;
sql::ObSQLSessionInfo *session_info_;
sql::ObFreeSessionCtx free_session_ctx_;
private:
// 只在初始化的时候使用, 线程不安全
common::ObArenaAllocator allocator_;

View File

@ -135,9 +135,7 @@ int ObTableLoadTransStoreWriter::init()
} else {
table_data_desc_ = &store_ctx_->table_data_desc_;
collation_type_ = trans_ctx_->ctx_->schema_.collation_type_;
if (OB_FAIL(OTTZ_MGR.get_tenant_tz(param_.tenant_id_, tz_info_.get_tz_map_wrap()))) {
LOG_WARN("fail to get tenant time zone", KR(ret), K(param_.tenant_id_));
} else if (OB_FAIL(init_session_ctx_array())) {
if (OB_FAIL(init_session_ctx_array())) {
LOG_WARN("fail to init session ctx array", KR(ret));
} else if (OB_FAIL(init_column_schemas())) {
LOG_WARN("fail to init column schemas", KR(ret));
@ -172,7 +170,7 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array()
{
int ret = OB_SUCCESS;
void *buf = nullptr;
ObDataTypeCastParams cast_params(&tz_info_);
ObDataTypeCastParams cast_params(trans_ctx_->ctx_->session_info_->get_timezone_info());
if (OB_ISNULL(buf = allocator_.alloc(sizeof(SessionContext) * param_.session_count_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", KR(ret));

View File

@ -92,7 +92,6 @@ private:
common::ObArenaAllocator allocator_;
storage::ObDirectLoadTableDataDesc *table_data_desc_;
common::ObCollationType collation_type_;
common::ObTimeZoneInfo tz_info_;
ObTableLoadTimeConverter time_cvrt_;
// does not contain hidden primary key columns
// and does not contain virtual generated columns

View File

@ -180,23 +180,89 @@ int ObTableLoadUtils::deep_copy(const ObDatumRange &src, ObDatumRange &dest, ObI
return ret;
}
int ObTableLoadUtils::deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
char *buf = nullptr;
int64_t buf_size = src.get_serialize_size();
int64_t data_len = 0;
int64_t pos = 0;
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size));
} else if (OB_FAIL(src.serialize(buf, buf_size, pos))) {
LOG_WARN("serialize session info failed", KR(ret));
} else {
data_len = pos;
pos = 0;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(dest.deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize session info failed", KR(ret));
}
}
return ret;
}
bool ObTableLoadUtils::is_local_addr(const ObAddr &addr)
{
return (ObServer::get_instance().get_self() == addr);
}
int ObTableLoadUtils::init_session_info(uint64_t user_id, ObSQLSessionInfo &session_info)
int ObTableLoadUtils::create_session_info(uint64_t user_id, sql::ObSQLSessionInfo *&session_info, sql::ObFreeSessionCtx &free_session_ctx)
{
int ret = OB_SUCCESS;
if (OB_FAIL(session_info.init(0, 0, nullptr, nullptr, ObTimeUtility::current_time(), MTL_ID()))) {
LOG_WARN("fail to init session info", KR(ret));
if (OB_FAIL(create_session_info(session_info, free_session_ctx))) {
LOG_WARN("create session id failed", KR(ret));
} else if (session_info != nullptr){
OZ(session_info->load_default_sys_variable(false, false)); //加载默认的session参数
OZ(session_info->load_default_configs_in_pc());
OX(session_info->set_priv_user_id(user_id));
}
if (OB_FAIL(ret)) {
if (session_info != nullptr) {
observer::ObTableLoadUtils::free_session_info(session_info, free_session_ctx);
session_info = nullptr;
}
}
OZ(session_info.load_default_sys_variable(false, false)); //加载默认的session参数
OZ(session_info.load_default_configs_in_pc());
OX(session_info.set_priv_user_id(user_id));
return ret;
}
int ObTableLoadUtils::create_session_info(sql::ObSQLSessionInfo *&session_info, sql::ObFreeSessionCtx &free_session_ctx)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID;
uint64_t proxy_sid = 0;
if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) {
LOG_WARN("alloc session id failed", KR(ret));
} else if (OB_FAIL(GCTX.session_mgr_->create_session(
tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) {
GCTX.session_mgr_->mark_sessid_unused(sid);
session_info = nullptr;
LOG_WARN("create session failed", KR(ret), K(sid));
} else {
free_session_ctx.sessid_ = sid;
free_session_ctx.proxy_sessid_ = proxy_sid;
}
return ret;
}
void ObTableLoadUtils::free_session_info(sql::ObSQLSessionInfo *session_info, const sql::ObFreeSessionCtx &free_session_ctx)
{
int ret = OB_SUCCESS;
if (session_info == nullptr || free_session_ctx.sessid_ == sql::ObSQLSessionInfo::INVALID_SESSID) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(session_info), K(free_session_ctx));
} else {
session_info->set_session_sleep();
GCTX.session_mgr_->revert_session(session_info);
GCTX.session_mgr_->free_session(free_session_ctx);
GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_);
session_info = nullptr;
}
}
int ObTableLoadUtils::generate_credential(uint64_t tenant_id, uint64_t user_id,
uint64_t database_id, int64_t expire_ts,
uint64_t user_token, ObIAllocator &allocator,

View File

@ -6,6 +6,7 @@
#include "lib/container/ob_iarray.h"
#include "share/table/ob_table_load_array.h"
#include "sql/session/ob_sql_session_mgr.h"
namespace oceanbase
{
@ -51,6 +52,7 @@ public:
static int deep_copy(const blocksstable::ObStorageDatum &src, blocksstable::ObStorageDatum &dest, common::ObIAllocator &allocator);
static int deep_copy(const blocksstable::ObDatumRowkey &src, blocksstable::ObDatumRowkey &dest, common::ObIAllocator &allocator);
static int deep_copy(const blocksstable::ObDatumRange &src, blocksstable::ObDatumRange &dest, common::ObIAllocator &allocator);
static int deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, common::ObIAllocator &allocator);
template<class T>
static int deep_copy(const table::ObTableLoadArray<T> &src, table::ObTableLoadArray<T> &dest, common::ObIAllocator &allocator);
@ -60,10 +62,10 @@ public:
template<class T>
static int deep_copy(const common::ObIArray<T> &src, table::ObTableLoadArray<T> &dest, common::ObIAllocator &allocator);
static bool is_local_addr(const common::ObAddr &addr);
static int init_session_info(uint64_t user_id, sql::ObSQLSessionInfo &session_info);
static int create_session_info(uint64_t user_id, sql::ObSQLSessionInfo *&session_info, sql::ObFreeSessionCtx &free_session_ctx);
static int create_session_info(sql::ObSQLSessionInfo *&session_info, sql::ObFreeSessionCtx &free_session_ctx);
static void free_session_info(sql::ObSQLSessionInfo *session_info, const sql::ObFreeSessionCtx &free_session_ctx);
static const int64_t CREDENTIAL_BUF_SIZE = 256;
static int generate_credential(uint64_t tenant_id, uint64_t user_id, uint64_t database_id,
int64_t expire_ts, uint64_t user_token,

View File

@ -5,6 +5,7 @@
#define USING_LOG_PREFIX CLIENT
#include "ob_table_load_rpc_struct.h"
#include "observer/table_load/ob_table_load_utils.h"
namespace oceanbase
{
@ -30,21 +31,57 @@ OB_SERIALIZE_MEMBER(ObTableLoadBeginResult,
status_,
error_code_);
OB_SERIALIZE_MEMBER(ObTableLoadPreBeginPeerRequest,
credential_,
table_id_,
config_,
column_count_,
dup_action_,
px_mode_,
online_opt_stat_gather_,
dest_table_id_,
task_id_,
schema_version_,
snapshot_version_,
data_version_,
partition_id_array_,
target_partition_id_array_);
OB_DEF_SERIALIZE(ObTableLoadPreBeginPeerRequest)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, credential_, table_id_, config_, column_count_, dup_action_, px_mode_,
online_opt_stat_gather_, snapshot_version_, dest_table_id_, task_id_, schema_version_,
snapshot_version_, data_version_, partition_id_array_, target_partition_id_array_);
if (OB_SUCC(ret)) {
if (OB_ISNULL(session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is null", K(ret));
} else {
OB_UNIS_ENCODE(*session_info_);
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObTableLoadPreBeginPeerRequest)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_DECODE, credential_, table_id_, config_, column_count_, dup_action_, px_mode_,
online_opt_stat_gather_, snapshot_version_, dest_table_id_, task_id_, schema_version_,
snapshot_version_, data_version_, partition_id_array_, target_partition_id_array_);
if (OB_SUCC(ret)) {
if (OB_FAIL(observer::ObTableLoadUtils::create_session_info(session_info_, free_session_ctx_))) {
LOG_WARN("fail to init session info", KR(ret));
} else {
OB_UNIS_DECODE(*session_info_);
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObTableLoadPreBeginPeerRequest)
{
int ret = OB_SUCCESS;
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, credential_, table_id_, config_, column_count_, dup_action_,
px_mode_, online_opt_stat_gather_, snapshot_version_, dest_table_id_, task_id_,
schema_version_, snapshot_version_, data_version_, partition_id_array_,
target_partition_id_array_);
if (OB_SUCC(ret)) {
if (OB_ISNULL(session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is null", K(ret), K(session_info_));
} else {
OB_UNIS_ADD_LEN(*session_info_);
}
}
return len;
}
OB_SERIALIZE_MEMBER(ObTableLoadPreBeginPeerResult,
ret_code_);

View File

@ -10,6 +10,8 @@
#include "ob_table_load_define.h"
#include "share/table/ob_table_load_row_array.h"
#include "sql/resolver/cmd/ob_load_data_stmt.h"
#include "sql/session/ob_sql_session_info.h"
#include "observer/table_load/ob_table_load_utils.h"
namespace oceanbase
{
@ -68,8 +70,19 @@ public:
task_id_(0),
schema_version_(0),
snapshot_version_(0),
data_version_(0)
data_version_(0),
session_info_(nullptr)
{
free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID;
}
~ObTableLoadPreBeginPeerRequest()
{
if (nullptr != session_info_) {
if (free_session_ctx_.sessid_ != sql::ObSQLSessionInfo::INVALID_SESSID) {
observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);
}
session_info_ = nullptr;
}
}
TO_STRING_KV(K_(table_id),
K_(config),
@ -101,6 +114,8 @@ public:
// partition info
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> partition_id_array_;//orig table
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> target_partition_id_array_;//FIXME: target table
sql::ObSQLSessionInfo *session_info_;
sql::ObFreeSessionCtx free_session_ctx_;
};
class ObTableLoadPreBeginPeerResult final