Resolving memory-leak by adding px clean up

This commit is contained in:
obdev
2023-02-07 16:07:07 +08:00
committed by ob-robot
parent cde81bf4b1
commit 2a1917b23b
18 changed files with 373 additions and 178 deletions

View File

@ -616,6 +616,8 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
return ret; return ret;
} }
// commit() = px_commit_data() + px_commit_ddl()
// used in non px_mode
int ObTableLoadCoordinator::commit(ObExecContext &ctx, ObTableLoadResultInfo &result_info) int ObTableLoadCoordinator::commit(ObExecContext &ctx, ObTableLoadResultInfo &result_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -642,6 +644,46 @@ int ObTableLoadCoordinator::commit(ObExecContext &ctx, ObTableLoadResultInfo &re
return ret; return ret;
} }
// used in insert /*+ append */ into select clause
// commit data loaded
int ObTableLoadCoordinator::px_commit_data(ObExecContext &ctx)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this));
} else {
LOG_INFO("coordinator px_commit_data");
ObTableLoadSqlStatistics sql_statistics;
if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_peers(sql_statistics))) {
LOG_WARN("fail to commit peers", KR(ret));
} else if (param_.online_opt_stat_gather_ && OB_FAIL(drive_sql_stat(ctx, sql_statistics))) {
LOG_WARN("fail to drive sql stat", KR(ret));
}
}
return ret;
}
// commit ddl procedure
int ObTableLoadCoordinator::px_commit_ddl()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this));
} else {
LOG_INFO("coordinator px_commit_ddl");
if (OB_FAIL(coordinator_ctx_->commit())) {
LOG_WARN("fail to commit coordinator", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->set_status_commit())) {
LOG_WARN("fail to set coordinator status commit", KR(ret));
}
}
return ret;
}
int ObTableLoadCoordinator::drive_sql_stat(ObExecContext &ctx, ObTableLoadSqlStatistics &sql_statistics) { int ObTableLoadCoordinator::drive_sql_stat(ObExecContext &ctx, ObTableLoadSqlStatistics &sql_statistics) {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID(); const uint64_t tenant_id = MTL_ID();

View File

@ -40,6 +40,8 @@ public:
int begin(); int begin();
int finish(); int finish();
int commit(sql::ObExecContext &ctx, table::ObTableLoadResultInfo &result_info); int commit(sql::ObExecContext &ctx, table::ObTableLoadResultInfo &result_info);
int px_commit_data(sql::ObExecContext &ctx);
int px_commit_ddl();
int get_status(table::ObTableLoadStatusType &status, int &error_code); int get_status(table::ObTableLoadStatusType &status, int &error_code);
private: private:
int pre_begin_peers(); int pre_begin_peers();

View File

@ -176,13 +176,15 @@ int ObTableLoadInstance::check_merged()
return ret; return ret;
} }
// commit() = px_commit_data() + px_commit_ddl()
// used in non px_mode
int ObTableLoadInstance::commit(ObTableLoadResultInfo &result_info) int ObTableLoadInstance::commit(ObTableLoadResultInfo &result_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadInstance not init", KR(ret), KP(this)); LOG_WARN("ObTableLoadInstance not init", KR(ret), KP(this));
} else if (!px_mode_) { } else {
// finish trans // finish trans
if (OB_FAIL(coordinator_->finish_trans(trans_id_))) { if (OB_FAIL(coordinator_->finish_trans(trans_id_))) {
LOG_WARN("fail to finish trans", KR(ret)); LOG_WARN("fail to finish trans", KR(ret));
@ -191,10 +193,8 @@ int ObTableLoadInstance::commit(ObTableLoadResultInfo &result_info)
else if (OB_FAIL(check_trans_committed())) { else if (OB_FAIL(check_trans_committed())) {
LOG_WARN("fail to check trans committed", KR(ret)); LOG_WARN("fail to check trans committed", KR(ret));
} }
}
if (OB_SUCC(ret)) {
// finish // finish
if (OB_FAIL(coordinator_->finish())) { else if (OB_FAIL(coordinator_->finish())) {
LOG_WARN("fail to finish", KR(ret)); LOG_WARN("fail to finish", KR(ret));
} }
// wait merge // wait merge
@ -205,7 +205,6 @@ int ObTableLoadInstance::commit(ObTableLoadResultInfo &result_info)
else if (OB_FAIL(coordinator_->commit(*execute_ctx_->exec_ctx_, result_info))) { else if (OB_FAIL(coordinator_->commit(*execute_ctx_->exec_ctx_, result_info))) {
LOG_WARN("fail to commit", KR(ret)); LOG_WARN("fail to commit", KR(ret));
} }
// sql statistics
else { else {
// Setting coordinator_ to NULL to mark a normal termination // Setting coordinator_ to NULL to mark a normal termination
coordinator_->~ObTableLoadCoordinator(); coordinator_->~ObTableLoadCoordinator();
@ -216,6 +215,49 @@ int ObTableLoadInstance::commit(ObTableLoadResultInfo &result_info)
return ret; return ret;
} }
// used in insert /*+ append */ into select clause
int ObTableLoadInstance::px_commit_data()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadInstance not init", KR(ret), KP(this));
} else {
// finish
if (OB_FAIL(coordinator_->finish())) {
LOG_WARN("fail to finish", KR(ret));
}
// wait merge
else if (OB_FAIL(check_merged())) {
LOG_WARN("fail to check merged", KR(ret));
}
// commit
else if (OB_FAIL(coordinator_->px_commit_data(*execute_ctx_->exec_ctx_))) {
LOG_WARN("fail to do px_commit_data", KR(ret));
}
}
return ret;
}
int ObTableLoadInstance::px_commit_ddl()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadInstance not init", KR(ret), KP(this));
} else {
if (OB_FAIL(coordinator_->px_commit_ddl())) {
LOG_WARN("fail to do px_commit_ddl", KR(ret));
} else {
// Setting coordinator_ to NULL to mark a normal termination
coordinator_->~ObTableLoadCoordinator();
allocator_->free(coordinator_);
coordinator_ = nullptr;
}
}
return ret;
}
int ObTableLoadInstance::write(int32_t session_id, int ObTableLoadInstance::write(int32_t session_id,
const table::ObTableLoadObjRowArray &obj_rows) const table::ObTableLoadObjRowArray &obj_rows)
{ {

View File

@ -28,6 +28,8 @@ public:
observer::ObTableLoadExecCtx *execute_ctx); observer::ObTableLoadExecCtx *execute_ctx);
void destroy(); void destroy();
int commit(table::ObTableLoadResultInfo &result_info); int commit(table::ObTableLoadResultInfo &result_info);
int px_commit_data();
int px_commit_ddl();
int write(int32_t session_id, const table::ObTableLoadObjRowArray &obj_rows); int write(int32_t session_id, const table::ObTableLoadObjRowArray &obj_rows);
sql::ObLoadDataStat *get_job_stat() const { return job_stat_; } sql::ObLoadDataStat *get_job_stat() const { return job_stat_; }
void update_job_stat_parsed_rows(int64_t parsed_rows) void update_job_stat_parsed_rows(int64_t parsed_rows)

View File

@ -889,7 +889,6 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
} else { } else {
ObTableLoadStoreTrans *trans = nullptr; ObTableLoadStoreTrans *trans = nullptr;
ObTableLoadTransStoreWriter *store_writer = nullptr; ObTableLoadTransStoreWriter *store_writer = nullptr;
ObTableLoadMutexGuard guard;
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) { if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
LOG_WARN("fail to get trans", KR(ret)); LOG_WARN("fail to get trans", KR(ret));
} else if (OB_FAIL(trans->get_store_writer_for_write(store_writer))) { } else if (OB_FAIL(trans->get_store_writer_for_write(store_writer))) {
@ -916,6 +915,38 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
return ret; return ret;
} }
int ObTableLoadStore::px_clean_up(const ObTableLoadTransId &trans_id)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
} else {
ObTableLoadStoreTrans *trans = nullptr;
ObTableLoadTransStoreWriter *store_writer = nullptr;
if (OB_FAIL(store_ctx_->get_trans(trans_id, trans))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
LOG_WARN("fail to get trans", KR(ret));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(trans->get_store_writer_for_clean_up(store_writer))) {
LOG_WARN("fail to get store writer for clean up", KR(ret));
} else if (OB_FAIL(store_writer->clean_up(PX_DEFAULT_SESSION_ID))) {
LOG_WARN("fail to clean up store writer", KR(ret));
}
if (OB_NOT_NULL(trans)) {
if (OB_NOT_NULL(store_writer)) {
trans->put_store_writer(store_writer);
store_writer = nullptr;
}
store_ctx_->put_trans(trans);
trans = nullptr;
}
}
return ret;
}
int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans) int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -75,10 +75,10 @@ public:
static const int32_t PX_DEFAULT_SESSION_ID = 1; static const int32_t PX_DEFAULT_SESSION_ID = 1;
int px_start_trans(const table::ObTableLoadTransId &trans_id); int px_start_trans(const table::ObTableLoadTransId &trans_id);
int px_finish_trans(const table::ObTableLoadTransId &trans_id); int px_finish_trans(const table::ObTableLoadTransId &trans_id);
int px_abandon_trans(const table::ObTableLoadTransId &trans_id);
int px_write(const table::ObTableLoadTransId &trans_id, int px_write(const table::ObTableLoadTransId &trans_id,
const ObTabletID &tablet_id, const ObTabletID &tablet_id,
const common::ObIArray<common::ObNewRow> &row_array); const common::ObIArray<common::ObNewRow> &row_array);
int px_clean_up(const table::ObTableLoadTransId &trans_id);
private: private:
int px_flush(ObTableLoadStoreTrans *trans); int px_flush(ObTableLoadStoreTrans *trans);

View File

@ -208,7 +208,7 @@ ob_set_subtarget(ob_sql engine_cmd
engine/cmd/ob_xa_executor.cpp engine/cmd/ob_xa_executor.cpp
engine/cmd/ob_context_executor.cpp engine/cmd/ob_context_executor.cpp
engine/cmd/ob_table_direct_insert_ctx.cpp engine/cmd/ob_table_direct_insert_ctx.cpp
engine/cmd/ob_table_direct_insert_trans.cpp engine/cmd/ob_table_direct_insert_service.cpp
) )
ob_set_subtarget(ob_sql engine_dml ob_set_subtarget(ob_sql engine_dml

View File

@ -81,18 +81,31 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
return ret; return ret;
} }
int ObTableDirectInsertCtx::finish() // commit() should be called before finish()
int ObTableDirectInsertCtx::commit()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
table::ObTableLoadResultInfo result_info;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("ObTableDirectInsertCtx is not init", KR(ret)); LOG_WARN("ObTableDirectInsertCtx is not init", KR(ret));
} else if (OB_FAIL(table_load_instance_->commit(result_info))) { } else if (OB_FAIL(table_load_instance_->px_commit_data())) {
LOG_WARN("failed to commit direct loader", KR(ret)); LOG_WARN("failed to do px_commit_data", KR(ret));
}
return ret;
}
// finish() should be called after commit()
int ObTableDirectInsertCtx::finish()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableDirectInsertCtx is not init", KR(ret));
} else if (OB_FAIL(table_load_instance_->px_commit_ddl())) {
LOG_WARN("failed to do px_commit_ddl", KR(ret));
} else { } else {
table_load_instance_->destroy(); table_load_instance_->destroy();
LOG_DEBUG("succeeded to commit direct loader", K(result_info)); LOG_DEBUG("succeeded to finish direct loader");
} }
return ret; return ret;
} }

View File

@ -29,6 +29,7 @@ public:
TO_STRING_KV(K_(is_inited)); TO_STRING_KV(K_(is_inited));
public: public:
int init(sql::ObExecContext *exec_ctx, const uint64_t table_id, const int64_t parallel); int init(sql::ObExecContext *exec_ctx, const uint64_t table_id, const int64_t parallel);
int commit();
int finish(); int finish();
void destroy(); void destroy();
private: private:

View File

@ -0,0 +1,136 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_table_direct_insert_service.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_store.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/engine/ob_physical_plan.h"
namespace oceanbase
{
using namespace observer;
namespace sql
{
bool ObTableDirectInsertService::is_direct_insert(const ObPhysicalPlan &phy_plan)
{
return (phy_plan.get_enable_append() && (0 != phy_plan.get_append_table_id()));
}
int ObTableDirectInsertService::start_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
if (!GCONF._ob_enable_direct_load) { // recheck
phy_plan.set_enable_append(false);
phy_plan.set_append_table_id(0);
} else {
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
CK (OB_NOT_NULL(session));
bool auto_commit = false;
if (OB_FAIL(session->get_autocommit(auto_commit))) {
LOG_WARN("failed to get auto commit", KR(ret));
} else if (!auto_commit || session->is_in_transaction()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("using direct-insert within a transaction is not supported",
KR(ret), K(auto_commit), K(session->is_in_transaction()));
} else {
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
uint64_t table_id = phy_plan.get_append_table_id();
int64_t parallel = phy_plan.get_px_dop();
if (OB_FAIL(table_direct_insert_ctx.init(&ctx, table_id, parallel))) {
LOG_WARN("failed to init table direct insert ctx", KR(ret), K(table_id), K(parallel));
}
}
}
return ret;
}
int ObTableDirectInsertService::commit_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
if (OB_FAIL(table_direct_insert_ctx.commit())) {
LOG_WARN("failed to commit table direct insert ctx", KR(ret));
}
return ret;
}
int ObTableDirectInsertService::finish_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
if (OB_FAIL(table_direct_insert_ctx.finish())) {
LOG_WARN("failed to finish table direct insert ctx", KR(ret));
}
return ret;
}
int ObTableDirectInsertService::open_task(const uint64_t table_id, const int64_t task_id)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadKey key(MTL_ID(), table_id);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id));
} else {
table::ObTableLoadTransId trans_id;
trans_id.segment_id_ = task_id;
trans_id.trans_gid_ = 1;
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.px_start_trans(trans_id))) {
LOG_WARN("fail to start direct load trans", KR(ret), K(trans_id));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
}
int ObTableDirectInsertService::close_task(const uint64_t table_id,
const int64_t task_id,
const int error_code)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadKey key(MTL_ID(), table_id);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id));
} else {
table::ObTableLoadTransId trans_id;
trans_id.segment_id_ = task_id;
trans_id.trans_gid_ = 1;
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else {
if (OB_SUCC(error_code)) {
if (OB_FAIL(store.px_finish_trans(trans_id))) {
LOG_WARN("fail to finish direct load trans", KR(ret), K(trans_id));
}
} else {
if (OB_FAIL(store.px_clean_up(trans_id))) {
LOG_WARN("failed to do px clean up", KR(ret));
}
}
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -0,0 +1,31 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#pragma once
#include "lib/ob_define.h"
namespace oceanbase
{
namespace sql
{
class ObExecContext;
class ObPhysicalPlan;
class ObTableDirectInsertService
{
public:
static bool is_direct_insert(const ObPhysicalPlan &phy_plan);
// all insert-tasks within an insert into select clause are wrapped by a single direct insert instance
static int start_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
static int commit_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
static int finish_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
// each insert-task is processed in a single thread and is wrapped by a table load trans
static int open_task(const uint64_t table_id, const int64_t task_id);
static int close_task(const uint64_t table_id,
const int64_t task_id,
const int error_code = OB_SUCCESS);
};
} // namespace sql
} // namespace oceanbase

View File

@ -1,116 +0,0 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_table_direct_insert_trans.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_store.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/engine/ob_physical_plan.h"
namespace oceanbase
{
using namespace observer;
namespace sql
{
int ObTableDirectInsertTrans::try_start_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
if (phy_plan.get_enable_append()
&& (0 != phy_plan.get_append_table_id())) {
if (!GCONF._ob_enable_direct_load) { // recheck
phy_plan.set_enable_append(false);
phy_plan.set_append_table_id(0);
} else {
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
CK (OB_NOT_NULL(session));
bool auto_commit = false;
if (OB_FAIL(session->get_autocommit(auto_commit))) {
LOG_WARN("failed to get auto commit", KR(ret));
} else if (!auto_commit || session->is_in_transaction()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("using direct-insert within a transaction is not supported",
KR(ret), K(auto_commit), K(session->is_in_transaction()));
} else {
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
uint64_t table_id = phy_plan.get_append_table_id();
int64_t parallel = phy_plan.get_px_dop();
if (OB_FAIL(table_direct_insert_ctx.init(&ctx, table_id, parallel))) {
LOG_WARN("failed to init table direct insert ctx", KR(ret), K(table_id), K(parallel));
}
}
}
}
return ret;
}
int ObTableDirectInsertTrans::try_finish_direct_insert(ObExecContext &ctx,
ObPhysicalPlan &phy_plan)
{
int ret = OB_SUCCESS;
if (phy_plan.get_enable_append()
&& (0 != phy_plan.get_append_table_id())) {
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
if (OB_FAIL(table_direct_insert_ctx.finish())) {
LOG_WARN("failed to finish table direct insert ctx", KR(ret));
}
}
return ret;
}
int ObTableDirectInsertTrans::start_trans(const uint64_t table_id, const int64_t task_id)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadKey key(MTL_ID(), table_id);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id));
} else {
table::ObTableLoadTransId trans_id;
trans_id.segment_id_ = task_id;
trans_id.trans_gid_ = 1;
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.px_start_trans(trans_id))) {
LOG_WARN("fail to start direct load trans", KR(ret), K(trans_id));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
}
int ObTableDirectInsertTrans::finish_trans(const uint64_t table_id, const int64_t task_id)
{
int ret = OB_SUCCESS;
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadKey key(MTL_ID(), table_id);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id));
} else {
table::ObTableLoadTransId trans_id;
trans_id.segment_id_ = task_id;
trans_id.trans_gid_ = 1;
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.px_finish_trans(trans_id))) {
LOG_WARN("fail to finish direct load trans", KR(ret), K(trans_id));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -1,27 +0,0 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <yuya.yu@oceanbase.com>
#pragma once
#include "lib/ob_define.h"
namespace oceanbase
{
namespace sql
{
class ObExecContext;
class ObPhysicalPlan;
class ObTableDirectInsertTrans
{
public:
// all insert-tasks within an insert into select clause are wrapped by a single direct insert instance
static int try_start_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
static int try_finish_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan);
// each insert-task is processed in a single thread and is wrapped by a table load trans
static int start_trans(const uint64_t table_id, const int64_t task_id);
static int finish_trans(const uint64_t table_id, const int64_t task_id);
};
} // namespace sql
} // namespace oceanbase

View File

@ -26,7 +26,7 @@
#include "lib/profile/ob_perf_event.h" #include "lib/profile/ob_perf_event.h"
#include "share/schema/ob_table_dml_param.h" #include "share/schema/ob_table_dml_param.h"
#include "share/ob_tablet_autoincrement_service.h" #include "share/ob_tablet_autoincrement_service.h"
#include "sql/engine/cmd/ob_table_direct_insert_trans.h" #include "sql/engine/cmd/ob_table_direct_insert_service.h"
namespace oceanbase namespace oceanbase
@ -397,10 +397,10 @@ int ObTableInsertOp::inner_open()
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan();
if (plan->get_enable_append() && (0 != plan->get_append_table_id())) { if (ObTableDirectInsertService::is_direct_insert(*plan)) {
int64_t task_id = 1; int64_t task_id = 1;
if (OB_FAIL(ObTableDirectInsertTrans::start_trans(plan->get_append_table_id(), task_id))) { if (OB_FAIL(ObTableDirectInsertService::open_task(plan->get_append_table_id(), task_id))) {
LOG_WARN("failed to start table direct insert trans", KR(ret), LOG_WARN("failed to open table direct insert task", KR(ret),
K(plan->get_append_table_id()), K(task_id)); K(plan->get_append_table_id()), K(task_id));
} }
} }
@ -428,10 +428,10 @@ int ObTableInsertOp::inner_close()
NG_TRACE(insert_close); NG_TRACE(insert_close);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan();
if (plan->get_enable_append() && (0 != plan->get_append_table_id())) { if (ObTableDirectInsertService::is_direct_insert(*plan)) {
int64_t task_id = 1; int64_t task_id = 1;
if (OB_FAIL(ObTableDirectInsertTrans::finish_trans(plan->get_append_table_id(), task_id))) { if (OB_FAIL(ObTableDirectInsertService::close_task(plan->get_append_table_id(), task_id))) {
LOG_WARN("failed to finish table direct insert trans", KR(ret), LOG_WARN("failed to close table direct insert task", KR(ret),
K(plan->get_append_table_id()), K(task_id)); K(plan->get_append_table_id()), K(task_id));
} }
} }

View File

@ -15,7 +15,7 @@
#include "storage/access/ob_dml_param.h" #include "storage/access/ob_dml_param.h"
#include "storage/tx_storage/ob_access_service.h" #include "storage/tx_storage/ob_access_service.h"
#include "sql/engine/dml/ob_dml_service.h" #include "sql/engine/dml/ob_dml_service.h"
#include "sql/engine/cmd/ob_table_direct_insert_trans.h" #include "sql/engine/cmd/ob_table_direct_insert_service.h"
using namespace oceanbase::common; using namespace oceanbase::common;
using namespace oceanbase::sql; using namespace oceanbase::sql;
@ -50,10 +50,10 @@ int ObPxMultiPartInsertOp::inner_open()
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan();
if (plan->get_enable_append() && (0 != plan->get_append_table_id())) { if (ObTableDirectInsertService::is_direct_insert(*plan)) {
int64_t task_id = ctx_.get_px_task_id() + 1; int64_t task_id = ctx_.get_px_task_id() + 1;
if (OB_FAIL(ObTableDirectInsertTrans::start_trans(plan->get_append_table_id(), task_id))) { if (OB_FAIL(ObTableDirectInsertService::open_task(plan->get_append_table_id(), task_id))) {
LOG_WARN("failed to start table direct insert trans", KR(ret), LOG_WARN("failed to open table direct insert task", KR(ret),
K(plan->get_append_table_id()), K(task_id)); K(plan->get_append_table_id()), K(task_id));
} else { } else {
ins_rtdef_.das_rtdef_.direct_insert_task_id_ = task_id; ins_rtdef_.das_rtdef_.direct_insert_task_id_ = task_id;
@ -104,11 +104,14 @@ int ObPxMultiPartInsertOp::inner_close()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan();
if (plan->get_enable_append() && (0 != plan->get_append_table_id())) { if (ObTableDirectInsertService::is_direct_insert(*plan)) {
int64_t task_id = ctx_.get_px_task_id() + 1; int64_t task_id = ctx_.get_px_task_id() + 1;
if (OB_FAIL(ObTableDirectInsertTrans::finish_trans(plan->get_append_table_id(), task_id))) { int error_code = (static_cast<const ObPxMultiPartInsertOpInput *>(input_))->get_error_code();
LOG_WARN("failed to finish table direct insert trans", KR(ret), if (OB_FAIL(ObTableDirectInsertService::close_task(plan->get_append_table_id(),
K(plan->get_append_table_id()), K(task_id)); task_id,
error_code))) {
LOG_WARN("failed to close table direct insert task", KR(ret),
K(plan->get_append_table_id()), K(task_id), K(error_code));
} }
} }
if (OB_FAIL(ObTableModifyOp::inner_close())) { if (OB_FAIL(ObTableModifyOp::inner_close())) {

View File

@ -26,13 +26,23 @@ class ObPxMultiPartInsertOpInput : public ObPxMultiPartModifyOpInput
OB_UNIS_VERSION_V(1); OB_UNIS_VERSION_V(1);
public: public:
ObPxMultiPartInsertOpInput(ObExecContext &ctx, const ObOpSpec &spec) ObPxMultiPartInsertOpInput(ObExecContext &ctx, const ObOpSpec &spec)
: ObPxMultiPartModifyOpInput(ctx, spec) : ObPxMultiPartModifyOpInput(ctx, spec),
error_code_(OB_SUCCESS)
{} {}
int init(ObTaskInfo &task_info) override int init(ObTaskInfo &task_info) override
{ {
return ObPxMultiPartModifyOpInput::init(task_info); return ObPxMultiPartModifyOpInput::init(task_info);
} }
void reset() override
{
ObPxMultiPartModifyOpInput::reset();
error_code_ = OB_SUCCESS;
}
inline void set_error_code(const int error_code) { error_code_ = error_code; }
inline int get_error_code() const { return error_code_; }
TO_STRING_KV(K_(error_code));
private: private:
int error_code_;
DISALLOW_COPY_AND_ASSIGN(ObPxMultiPartInsertOpInput); DISALLOW_COPY_AND_ASSIGN(ObPxMultiPartInsertOpInput);
}; };

View File

@ -32,7 +32,7 @@
#include "sql/engine/join/ob_hash_join_op.h" #include "sql/engine/join/ob_hash_join_op.h"
#include "sql/engine/window_function/ob_window_function_op.h" #include "sql/engine/window_function/ob_window_function_op.h"
#include "sql/engine/px/ob_px_basic_info.h" #include "sql/engine/px/ob_px_basic_info.h"
#include "sql/engine/pdml/static/ob_px_multi_part_modify_op.h" #include "sql/engine/pdml/static/ob_px_multi_part_insert_op.h"
#include "sql/engine/join/ob_join_filter_op.h" #include "sql/engine/join/ob_join_filter_op.h"
#include "sql/engine/px/ob_granule_pump.h" #include "sql/engine/px/ob_granule_pump.h"
#include "observer/mysql/obmp_base.h" #include "observer/mysql/obmp_base.h"
@ -706,6 +706,22 @@ int ObPxTaskProcess::OpPostparation::apply(ObExecContext &ctx, ObOpSpec &op)
LOG_TRACE("debug post apply info", K(ret_)); LOG_TRACE("debug post apply info", K(ret_));
} }
} }
} else if (PHY_PX_MULTI_PART_INSERT == op.get_type()) {
if (OB_ISNULL(kit->input_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator is NULL", K(ret), KP(kit));
} else {
ObPxMultiPartInsertOpInput *input = static_cast<ObPxMultiPartInsertOpInput *>(kit->input_);
if (OB_ISNULL(input)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("input not found for op", "op_id", op.id_, K(ret));
} else if (OB_SUCCESS != ret_) {
input->set_error_code(ret_);
LOG_TRACE("debug post apply info", K(ret_));
} else {
LOG_TRACE("debug post apply info", K(ret_));
}
}
} }
return ret; return ret;
} }

View File

@ -25,7 +25,7 @@
#include "sql/session/ob_sql_session_info.h" #include "sql/session/ob_sql_session_info.h"
#include "sql/resolver/ob_cmd.h" #include "sql/resolver/ob_cmd.h"
#include "sql/engine/px/ob_px_admission.h" #include "sql/engine/px/ob_px_admission.h"
#include "sql/engine/cmd/ob_table_direct_insert_trans.h" #include "sql/engine/cmd/ob_table_direct_insert_service.h"
#include "sql/executor/ob_executor.h" #include "sql/executor/ob_executor.h"
#include "sql/executor/ob_cmd_executor.h" #include "sql/executor/ob_cmd_executor.h"
#include "sql/resolver/dml/ob_select_stmt.h" #include "sql/resolver/dml/ob_select_stmt.h"
@ -209,6 +209,12 @@ int ObResultSet::open_result()
} }
} else if (OB_FAIL(drive_dml_query())) { } else if (OB_FAIL(drive_dml_query())) {
LOG_WARN("fail to drive dml query", K(ret)); LOG_WARN("fail to drive dml query", K(ret));
} else if ((stmt::T_INSERT == get_stmt_type())
&& (ObTableDirectInsertService::is_direct_insert(*physical_plan_))) {
// for insert /*+ append */ into select clause
if (OB_FAIL(ObTableDirectInsertService::commit_direct_insert(get_exec_context(), *physical_plan_))) {
LOG_WARN("fail to commit direct insert", KR(ret));
}
} }
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
@ -475,8 +481,10 @@ OB_INLINE int ObResultSet::do_open_plan(ObExecContext &ctx)
} }
// for insert /*+ append */ into select clause // for insert /*+ append */ into select clause
if (OB_SUCC(ret) && (stmt::T_INSERT == get_stmt_type())) { if (OB_SUCC(ret)
if (OB_FAIL(ObTableDirectInsertTrans::try_start_direct_insert(ctx, *physical_plan_))) { && (stmt::T_INSERT == get_stmt_type())
&& (ObTableDirectInsertService::is_direct_insert(*physical_plan_))) {
if (OB_FAIL(ObTableDirectInsertService::start_direct_insert(ctx, *physical_plan_))) {
LOG_WARN("fail to start direct insert", KR(ret)); LOG_WARN("fail to start direct insert", KR(ret));
} }
} }
@ -713,10 +721,11 @@ OB_INLINE int ObResultSet::do_close_plan(int errcode, ObExecContext &ctx)
// Finishing direct-insert must be executed after ObPxTargetMgr::release_target() // Finishing direct-insert must be executed after ObPxTargetMgr::release_target()
if ((OB_SUCCESS == close_ret) if ((OB_SUCCESS == close_ret)
&& (OB_SUCCESS == errcode || OB_ITER_END == errcode) && (OB_SUCCESS == errcode || OB_ITER_END == errcode)
&& (stmt::T_INSERT == get_stmt_type())) { && (stmt::T_INSERT == get_stmt_type())
&& (ObTableDirectInsertService::is_direct_insert(*physical_plan_))) {
// for insert /*+ append */ into select clause // for insert /*+ append */ into select clause
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ObTableDirectInsertTrans::try_finish_direct_insert(ctx, *physical_plan_))) { if (OB_TMP_FAIL(ObTableDirectInsertService::finish_direct_insert(ctx, *physical_plan_))) {
errcode_ = tmp_ret; // record error code errcode_ = tmp_ret; // record error code
errcode = tmp_ret; errcode = tmp_ret;
LOG_WARN("fail to finish direct insert", KR(tmp_ret)); LOG_WARN("fail to finish direct insert", KR(tmp_ret));