fix direct load memory leak when timeout
This commit is contained in:
@ -871,6 +871,7 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id)
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||||
} else {
|
} else {
|
||||||
|
LOG_INFO("store px finish trans", K(trans_id));
|
||||||
ObTableLoadStoreTrans *trans = nullptr;
|
ObTableLoadStoreTrans *trans = nullptr;
|
||||||
if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) {
|
if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) {
|
||||||
LOG_WARN("fail to get segment trans", KR(ret));
|
LOG_WARN("fail to get segment trans", KR(ret));
|
||||||
@ -892,39 +893,6 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableLoadStore::px_abandon_trans(const ObTableLoadTransId &trans_id)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (IS_NOT_INIT) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
|
||||||
} else {
|
|
||||||
LOG_INFO("store px abandon trans", K(trans_id));
|
|
||||||
ObTableLoadStoreTrans *trans = nullptr;
|
|
||||||
if (OB_FAIL(store_ctx_->get_segment_trans(trans_id.segment_id_, trans))) {
|
|
||||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
|
||||||
LOG_WARN("fail to get segment trans", KR(ret));
|
|
||||||
} else {
|
|
||||||
ret = OB_SUCCESS;
|
|
||||||
}
|
|
||||||
} else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans));
|
|
||||||
} else if (OB_FAIL(trans->set_trans_status_abort())) {
|
|
||||||
LOG_WARN("fail to set trans status abort", KR(ret));
|
|
||||||
} else if (OB_FAIL(store_ctx_->abort_trans(trans))) {
|
|
||||||
LOG_WARN("fail to abort trans", KR(ret));
|
|
||||||
} else if (OB_FAIL(px_clean_up_trans(trans))) {
|
|
||||||
LOG_WARN("fail to clean up trans", KR(ret));
|
|
||||||
}
|
|
||||||
if (OB_NOT_NULL(trans)) {
|
|
||||||
store_ctx_->put_trans(trans);
|
|
||||||
trans = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
|
int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
|
||||||
const ObTabletID &tablet_id, const ObIArray<ObNewRow> &row_array)
|
const ObTabletID &tablet_id, const ObIArray<ObNewRow> &row_array)
|
||||||
{
|
{
|
||||||
@ -964,27 +932,6 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableLoadStore::px_clean_up_trans(ObTableLoadStoreTrans *trans)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (IS_NOT_INIT) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
|
||||||
} else {
|
|
||||||
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
|
||||||
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
|
||||||
LOG_WARN("fail to get store writer", KR(ret));
|
|
||||||
} else if (OB_FAIL(store_writer->clean_up(trans->get_trans_id().segment_id_.id_))) {
|
|
||||||
LOG_WARN("fail to clean up store writer", KR(ret));
|
|
||||||
}
|
|
||||||
if (OB_NOT_NULL(store_writer)) {
|
|
||||||
trans->put_store_writer(store_writer);
|
|
||||||
store_writer = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans)
|
int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -1012,5 +959,62 @@ int ObTableLoadStore::px_flush(ObTableLoadStoreTrans *trans)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTableLoadStore::px_abandon_trans(ObTableLoadTableCtx *ctx, const ObTableLoadTransId &trans_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_UNLIKELY(!ctx->is_valid())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid args", KR(ret), KPC(ctx));
|
||||||
|
} else if (OB_UNLIKELY(nullptr == ctx->store_ctx_ || !ctx->store_ctx_->is_valid())) {
|
||||||
|
// store ctx not init, do nothing
|
||||||
|
} else {
|
||||||
|
LOG_INFO("store px abandon trans", K(trans_id));
|
||||||
|
ObTableLoadStoreCtx *store_ctx = ctx->store_ctx_;
|
||||||
|
ObTableLoadStoreTrans *trans = nullptr;
|
||||||
|
if (OB_FAIL(store_ctx->get_segment_trans(trans_id.segment_id_, trans))) {
|
||||||
|
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||||
|
LOG_WARN("fail to get segment trans", KR(ret));
|
||||||
|
} else {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
}
|
||||||
|
} else if (OB_UNLIKELY(trans_id != trans->get_trans_id())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans));
|
||||||
|
} else if (OB_FAIL(trans->set_trans_status_abort())) {
|
||||||
|
LOG_WARN("fail to set trans status abort", KR(ret));
|
||||||
|
} else if (OB_FAIL(store_ctx->abort_trans(trans))) {
|
||||||
|
LOG_WARN("fail to abort trans", KR(ret));
|
||||||
|
} else if (OB_FAIL(px_clean_up_trans(trans))) {
|
||||||
|
LOG_WARN("fail to clean up trans", KR(ret));
|
||||||
|
}
|
||||||
|
if (OB_NOT_NULL(trans)) {
|
||||||
|
store_ctx->put_trans(trans);
|
||||||
|
trans = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTableLoadStore::px_clean_up_trans(ObTableLoadStoreTrans *trans)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_ISNULL(trans)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid args", KR(ret), KP(trans));
|
||||||
|
} else {
|
||||||
|
ObTableLoadTransStoreWriter *store_writer = nullptr;
|
||||||
|
if (OB_FAIL(trans->get_store_writer(store_writer))) {
|
||||||
|
LOG_WARN("fail to get store writer", KR(ret));
|
||||||
|
} else if (OB_FAIL(store_writer->clean_up(trans->get_trans_id().segment_id_.id_))) {
|
||||||
|
LOG_WARN("fail to clean up store writer", KR(ret));
|
||||||
|
}
|
||||||
|
if (OB_NOT_NULL(store_writer)) {
|
||||||
|
trans->put_store_writer(store_writer);
|
||||||
|
store_writer = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace observer
|
} // namespace observer
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|||||||
@ -73,13 +73,13 @@ private:
|
|||||||
public:
|
public:
|
||||||
int px_start_trans(const table::ObTableLoadTransId &trans_id);
|
int px_start_trans(const table::ObTableLoadTransId &trans_id);
|
||||||
int px_finish_trans(const table::ObTableLoadTransId &trans_id);
|
int px_finish_trans(const table::ObTableLoadTransId &trans_id);
|
||||||
int px_abandon_trans(const table::ObTableLoadTransId &trans_id);
|
|
||||||
int px_write(const table::ObTableLoadTransId &trans_id,
|
int px_write(const table::ObTableLoadTransId &trans_id,
|
||||||
const ObTabletID &tablet_id,
|
const ObTabletID &tablet_id,
|
||||||
const common::ObIArray<common::ObNewRow> &row_array);
|
const common::ObIArray<common::ObNewRow> &row_array);
|
||||||
|
static int px_abandon_trans(ObTableLoadTableCtx *ctx, const table::ObTableLoadTransId &trans_id);
|
||||||
private:
|
private:
|
||||||
int px_clean_up_trans(ObTableLoadStoreTrans *trans);
|
|
||||||
int px_flush(ObTableLoadStoreTrans *trans);
|
int px_flush(ObTableLoadStoreTrans *trans);
|
||||||
|
static int px_clean_up_trans(ObTableLoadStoreTrans *trans);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ObTableLoadTableCtx * const ctx_;
|
ObTableLoadTableCtx * const ctx_;
|
||||||
|
|||||||
@ -112,6 +112,7 @@ int ObTableDirectInsertService::close_task(const uint64_t table_id,
|
|||||||
const int error_code)
|
const int error_code)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_SUCC(error_code)) {
|
||||||
ObTableLoadTableCtx *table_ctx = nullptr;
|
ObTableLoadTableCtx *table_ctx = nullptr;
|
||||||
ObTableLoadKey key(MTL_ID(), table_id);
|
ObTableLoadKey key(MTL_ID(), table_id);
|
||||||
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
|
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
|
||||||
@ -123,16 +124,13 @@ int ObTableDirectInsertService::close_task(const uint64_t table_id,
|
|||||||
ObTableLoadStore store(table_ctx);
|
ObTableLoadStore store(table_ctx);
|
||||||
if (OB_FAIL(store.init())) {
|
if (OB_FAIL(store.init())) {
|
||||||
LOG_WARN("fail to init store", KR(ret));
|
LOG_WARN("fail to init store", KR(ret));
|
||||||
} else {
|
} else if (OB_FAIL(store.px_finish_trans(trans_id))) {
|
||||||
if (OB_SUCC(error_code)) {
|
|
||||||
if (OB_FAIL(store.px_finish_trans(trans_id))) {
|
|
||||||
LOG_WARN("fail to finish direct load trans", KR(ret), K(trans_id));
|
LOG_WARN("fail to finish direct load trans", KR(ret), K(trans_id));
|
||||||
}
|
}
|
||||||
}
|
if (OB_FAIL(ret)) {
|
||||||
if (OB_FAIL(ret)){
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(store.px_abandon_trans(trans_id))) {
|
if (OB_TMP_FAIL(ObTableLoadStore::px_abandon_trans(table_ctx, trans_id))) {
|
||||||
LOG_WARN("fail to abandon direct load trans", KR(ret));
|
LOG_WARN("fail to abandon direct load trans", KR(tmp_ret), K(trans_id));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -140,6 +138,7 @@ int ObTableDirectInsertService::close_task(const uint64_t table_id,
|
|||||||
ObTableLoadService::put_ctx(table_ctx);
|
ObTableLoadService::put_ctx(table_ctx);
|
||||||
table_ctx = nullptr;
|
table_ctx = nullptr;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
} // namespace sql
|
} // namespace sql
|
||||||
|
|||||||
Reference in New Issue
Block a user