Fix direct load abort ctx in background core

This commit is contained in:
suz-yang
2023-06-09 06:48:15 +00:00
committed by ob-robot
parent ba47b23683
commit a11f594a34
8 changed files with 101 additions and 27 deletions

View File

@ -40,7 +40,9 @@ void ObTableLoadInstance::destroy()
if (nullptr != table_ctx_) {
if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx_))) {
LOG_WARN("table ctx may remove by service", KR(ret), KP(table_ctx_));
} else if (!is_committed_) {
}
if (!is_committed_) {
// must abort here, abort redef table need exec_ctx session_info
ObTableLoadCoordinator::abort_ctx(table_ctx_);
}
ObTableLoadService::put_ctx(table_ctx_);

View File

@ -130,7 +130,7 @@ int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key)
return ret;
}
int ObTableLoadManager::remove_all_table_ctx(ObIArray<ObTableLoadTableCtx *> &table_ctx_array)
int ObTableLoadManager::get_all_table_ctx(ObIArray<ObTableLoadTableCtx *> &table_ctx_array)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -151,10 +151,6 @@ int ObTableLoadManager::remove_all_table_ctx(ObIArray<ObTableLoadTableCtx *> &ta
table_ctx->inc_ref_count();
}
}
if (OB_SUCC(ret)) {
table_ctx_map_.clear();
table_handle_map_.clear();
}
if (OB_FAIL(ret)) {
for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
@ -264,6 +260,24 @@ void ObTableLoadManager::put_table_ctx(ObTableLoadTableCtx *table_ctx)
}
}
int64_t ObTableLoadManager::get_table_ctx_count() const
{
obsys::ObRLockGuard guard(rwlock_);
return table_ctx_map_.size();
}
int64_t ObTableLoadManager::get_dirty_list_count() const
{
ObMutexGuard guard(mutex_);
return dirty_list_.get_size();
}
bool ObTableLoadManager::is_table_ctx_empty() const
{
obsys::ObRLockGuard guard(rwlock_);
return table_ctx_map_.empty();
}
bool ObTableLoadManager::is_dirty_list_empty() const
{
ObMutexGuard guard(mutex_);

View File

@ -25,7 +25,7 @@ public:
int add_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *table_ctx);
int remove_table_ctx(const ObTableLoadUniqueKey &key);
// table ctx holds a reference count
int remove_all_table_ctx(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
int get_all_table_ctx(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
// table ctx holds a reference count
int get_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *&table_ctx);
// table ctx holds a reference count
@ -33,6 +33,9 @@ public:
// all table ctx hold a reference count
int get_inactive_table_ctx_list(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
void put_table_ctx(ObTableLoadTableCtx *table_ctx);
int64_t get_table_ctx_count() const;
int64_t get_dirty_list_count() const;
bool is_table_ctx_empty() const;
bool is_dirty_list_empty() const;
// table ctx no reference counting
int get_releasable_table_ctx_list(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);

View File

@ -6,9 +6,9 @@
#include "observer/table_load/ob_table_load_service.h"
#include "observer/omt/ob_tenant.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_store.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_utils.h"
#include "share/rc/ob_tenant_base.h"
@ -55,10 +55,8 @@ void ObTableLoadService::ObCheckTenantTask::runTimerTask()
} else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL !=
tenant->get_unit_status())) {
LOG_DEBUG("tenant unit status not normal, clear", K(tenant_id_), KPC(tenant));
// stop all current tasks, release session
service_.abort_all_ctx();
// clear all current tasks, release handle
service_.release_all_ctx();
// fail all current tasks
service_.fail_all_ctx(OB_ERR_UNEXPECTED_UNIT_STATUS);
}
}
}
@ -198,7 +196,7 @@ int ObTableLoadService::check_tenant()
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id));
} else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL !=
tenant->get_unit_status())) {
ret = OB_UNIT_IS_MIGRATING;
ret = OB_ERR_UNEXPECTED_UNIT_STATUS;
LOG_WARN("unit status not normal", KR(ret), K(tenant->get_unit_status()));
}
return ret;
@ -388,7 +386,6 @@ int ObTableLoadService::stop()
void ObTableLoadService::wait()
{
gc_timer_.wait();
abort_all_ctx();
release_all_ctx();
}
@ -398,22 +395,22 @@ void ObTableLoadService::destroy()
gc_timer_.destroy();
}
void ObTableLoadService::abort_all_ctx()
void ObTableLoadService::fail_all_ctx(int error_code)
{
int ret = OB_SUCCESS;
ObArray<ObTableLoadTableCtx *> table_ctx_array;
if (OB_FAIL(manager_.remove_all_table_ctx(table_ctx_array))) {
LOG_WARN("fail to remove all table ctx list", KR(ret));
if (OB_FAIL(manager_.get_all_table_ctx(table_ctx_array))) {
LOG_WARN("fail to get all table ctx list", KR(ret));
} else {
for (int i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
// abort coordinator
// fail coordinator
if (nullptr != table_ctx->coordinator_ctx_) {
ObTableLoadCoordinator::abort_ctx(table_ctx);
table_ctx->coordinator_ctx_->set_status_error(error_code);
}
// abort store
else if (nullptr != table_ctx->store_ctx_) {
ObTableLoadStore::abort_ctx(table_ctx);
// fail store
if (nullptr != table_ctx->store_ctx_) {
table_ctx->store_ctx_->set_status_error(error_code);
}
manager_.put_table_ctx(table_ctx);
}
@ -424,7 +421,48 @@ void ObTableLoadService::release_all_ctx()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
// 1. check all ctx are removed
while (OB_SUCC(ret)) {
if (REACH_TIME_INTERVAL(30 * 1000 * 1000)) {
LOG_INFO("[DIRECT LOAD TABLE CTX]", "count", manager_.get_table_ctx_count());
}
fail_all_ctx(OB_ERR_UNEXPECTED_UNIT_STATUS);
ObArray<ObTableLoadTableCtx *> table_ctx_array;
if (OB_FAIL(manager_.get_inactive_table_ctx_list(table_ctx_array))) {
LOG_WARN("fail to get inactive table ctx list", KR(ret), K(tenant_id));
} else {
for (int i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
const uint64_t table_id = table_ctx->param_.table_id_;
const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
// check if table ctx is removed
if (table_ctx->is_dirty()) {
LOG_DEBUG("table load ctx is dirty", K(tenant_id), K(table_id), "ref_count",
table_ctx->get_ref_count());
}
// check if table ctx is activated
else if (table_ctx->get_ref_count() > 1) {
LOG_DEBUG("table load ctx is active", K(tenant_id), K(table_id), "ref_count",
table_ctx->get_ref_count());
} else {
LOG_INFO("tenant exit, remove table load ctx", K(tenant_id), K(table_id),
K(hidden_table_id));
remove_ctx(table_ctx);
}
manager_.put_table_ctx(table_ctx);
}
}
if (manager_.is_table_ctx_empty()) {
break;
} else {
ob_usleep(1 * 1000 * 1000);
}
}
// 2. release all ctx
while (OB_SUCC(ret)) {
if (REACH_TIME_INTERVAL(30 * 1000 * 1000)) {
LOG_INFO("[DIRECT LOAD DIRTY LIST]", "count", manager_.get_dirty_list_count());
}
ObArray<ObTableLoadTableCtx *> table_ctx_array;
if (OB_FAIL(manager_.get_releasable_table_ctx_list(table_ctx_array))) {
LOG_WARN("fail to get releasable table ctx list", KR(ret));
@ -439,7 +477,7 @@ void ObTableLoadService::release_all_ctx()
if (manager_.is_dirty_list_empty()) {
break;
} else {
ob_usleep(10 * 1000 * 1000);
ob_usleep(1 * 1000 * 1000);
}
}
}

View File

@ -38,7 +38,7 @@ public:
void destroy();
ObTableLoadManager &get_manager() { return manager_; }
private:
void abort_all_ctx();
void fail_all_ctx(int error_code);
void release_all_ctx();
private:
static const int64_t CHECK_TENANT_INTERVAL = 1LL * 1000 * 1000; // 1s

File diff suppressed because one or more lines are too long

View File

@ -450,6 +450,7 @@ DEFINE_ERROR(OB_ERR_OBSERVICE_START, -4395, -1, "HY000", "observice start proces
DEFINE_ERROR_DEP(OB_ERR_THREAD_PANIC, -4396, -1, "HY000", "Worker thread pannic, thread may be terminated or hung");
DEFINE_ERROR(OB_ENCODING_EST_SIZE_OVERFLOW, -4397, -1, "HY000", "Encoding estimated size overflow");
DEFINE_ORACLE_ERROR(OB_INVALID_SUB_PARTITION_TYPE, -4398, 1500, "HY000", "It is only possible to mix RANGE/LIST partitioning with HASH/KEY partitioning for subpartitioning", 14020, "this physical attribute may not be specified for a table partition");
DEFINE_ERROR(OB_ERR_UNEXPECTED_UNIT_STATUS, -4399, -1, "HY000", "Unit status is not expected");
////////////////////////////////////////////////////////////////
//error code for root server & server management -4500 ---- -5000

View File

@ -264,6 +264,7 @@ constexpr int OB_ERR_OBSERVER_STOP = -4394;
constexpr int OB_ERR_OBSERVICE_START = -4395;
constexpr int OB_ENCODING_EST_SIZE_OVERFLOW = -4397;
constexpr int OB_INVALID_SUB_PARTITION_TYPE = -4398;
constexpr int OB_ERR_UNEXPECTED_UNIT_STATUS = -4399;
constexpr int OB_IMPORT_NOT_IN_SERVER = -4505;
constexpr int OB_CONVERT_ERROR = -4507;
constexpr int OB_BYPASS_TIMEOUT = -4510;
@ -2109,6 +2110,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_THREAD_PANIC__USER_ERROR_MSG "Worker thread pannic, thread may be terminated or hung"
#define OB_ENCODING_EST_SIZE_OVERFLOW__USER_ERROR_MSG "Encoding estimated size overflow"
#define OB_INVALID_SUB_PARTITION_TYPE__USER_ERROR_MSG "It is only possible to mix RANGE/LIST partitioning with HASH/KEY partitioning for subpartitioning"
#define OB_ERR_UNEXPECTED_UNIT_STATUS__USER_ERROR_MSG "Unit status is not expected"
#define OB_IMPORT_NOT_IN_SERVER__USER_ERROR_MSG "Import not in service"
#define OB_CONVERT_ERROR__USER_ERROR_MSG "Convert error"
#define OB_BYPASS_TIMEOUT__USER_ERROR_MSG "Bypass timeout"
@ -4193,6 +4195,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_THREAD_PANIC__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4396, Worker thread pannic, thread may be terminated or hung"
#define OB_ENCODING_EST_SIZE_OVERFLOW__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4397, Encoding estimated size overflow"
#define OB_INVALID_SUB_PARTITION_TYPE__ORA_USER_ERROR_MSG "ORA-14020: this physical attribute may not be specified for a table partition"
#define OB_ERR_UNEXPECTED_UNIT_STATUS__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4399, Unit status is not expected"
#define OB_IMPORT_NOT_IN_SERVER__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4505, Import not in service"
#define OB_CONVERT_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4507, Convert error"
#define OB_BYPASS_TIMEOUT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4510, Bypass timeout"
@ -5917,7 +5920,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
extern int g_all_ob_errnos[2080];
extern int g_all_ob_errnos[2081];
const char *ob_error_name(const int oberr);
const char* ob_error_cause(const int oberr);