Fix direct load hang unit delete
This commit is contained in:
@ -5,6 +5,7 @@
|
|||||||
#define USING_LOG_PREFIX SERVER
|
#define USING_LOG_PREFIX SERVER
|
||||||
|
|
||||||
#include "observer/table_load/ob_table_load_service.h"
|
#include "observer/table_load/ob_table_load_service.h"
|
||||||
|
#include "observer/omt/ob_tenant.h"
|
||||||
#include "observer/table_load/ob_table_load_coordinator.h"
|
#include "observer/table_load/ob_table_load_coordinator.h"
|
||||||
#include "observer/table_load/ob_table_load_schema.h"
|
#include "observer/table_load/ob_table_load_schema.h"
|
||||||
#include "observer/table_load/ob_table_load_store.h"
|
#include "observer/table_load/ob_table_load_store.h"
|
||||||
@ -21,6 +22,46 @@ using namespace common;
|
|||||||
using namespace lib;
|
using namespace lib;
|
||||||
using namespace share::schema;
|
using namespace share::schema;
|
||||||
using namespace table;
|
using namespace table;
|
||||||
|
using namespace omt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ObCheckTenantTask
|
||||||
|
*/
|
||||||
|
|
||||||
|
int ObTableLoadService::ObCheckTenantTask::init(uint64_t tenant_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (IS_INIT) {
|
||||||
|
ret = OB_INIT_TWICE;
|
||||||
|
LOG_WARN("ObTableLoadService::ObCheckTenantTask init twice", KR(ret), KP(this));
|
||||||
|
} else {
|
||||||
|
tenant_id_ = tenant_id;
|
||||||
|
is_inited_ = true;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObTableLoadService::ObCheckTenantTask::runTimerTask()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (IS_NOT_INIT) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("ObTableLoadService::ObCheckTenantTask not init", KR(ret), KP(this));
|
||||||
|
} else {
|
||||||
|
LOG_DEBUG("table load check tenant", K(tenant_id_));
|
||||||
|
ObTenant *tenant = nullptr;
|
||||||
|
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id_, tenant))) {
|
||||||
|
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id_));
|
||||||
|
} else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL !=
|
||||||
|
tenant->get_unit_status())) {
|
||||||
|
LOG_INFO("tenant unit status not normal, exit", K(tenant_id_), KPC(tenant));
|
||||||
|
// stop all current tasks, release session
|
||||||
|
service_.abort_all_ctx();
|
||||||
|
// clear all current tasks, release handle
|
||||||
|
service_.release_all_ctx();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ObGCTask
|
* ObGCTask
|
||||||
@ -58,24 +99,20 @@ void ObTableLoadService::ObGCTask::runTimerTask()
|
|||||||
const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
|
const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
|
||||||
// check if table ctx is removed
|
// check if table ctx is removed
|
||||||
if (table_ctx->is_dirty()) {
|
if (table_ctx->is_dirty()) {
|
||||||
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), "table_id", table_ctx->param_.table_id_,
|
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count",
|
||||||
"ref_count", table_ctx->get_ref_count());
|
table_ctx->get_ref_count());
|
||||||
}
|
}
|
||||||
// check if table ctx is activated
|
// check if table ctx is activated
|
||||||
else if (table_ctx->get_ref_count() > 1) {
|
else if (table_ctx->get_ref_count() > 1) {
|
||||||
LOG_DEBUG("table load ctx is active", K(tenant_id_), "table_id",
|
LOG_DEBUG("table load ctx is active", K(tenant_id_), K(table_id), "ref_count",
|
||||||
table_ctx->param_.table_id_, "ref_count", table_ctx->get_ref_count());
|
table_ctx->get_ref_count());
|
||||||
}
|
}
|
||||||
// check if table ctx can be recycled
|
// check if table ctx can be recycled
|
||||||
else {
|
else {
|
||||||
ObSchemaGetterGuard schema_guard;
|
ObSchemaGetterGuard schema_guard;
|
||||||
const ObTableSchema *table_schema = nullptr;
|
const ObTableSchema *table_schema = nullptr;
|
||||||
if (hidden_table_id == OB_INVALID_ID) {
|
if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id_, hidden_table_id, schema_guard,
|
||||||
LOG_INFO("hidden table has not been created, gc table load ctx", K(tenant_id_),
|
table_schema))) {
|
||||||
K(table_id), K(hidden_table_id));
|
|
||||||
ObTableLoadService::remove_ctx(table_ctx);
|
|
||||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id_, hidden_table_id,
|
|
||||||
schema_guard, table_schema))) {
|
|
||||||
if (OB_UNLIKELY(OB_TABLE_NOT_EXIST != ret)) {
|
if (OB_UNLIKELY(OB_TABLE_NOT_EXIST != ret)) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(hidden_table_id));
|
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(hidden_table_id));
|
||||||
} else {
|
} else {
|
||||||
@ -276,7 +313,11 @@ void ObTableLoadService::put_ctx(ObTableLoadTableCtx *table_ctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ObTableLoadService::ObTableLoadService()
|
ObTableLoadService::ObTableLoadService()
|
||||||
: gc_task_(*this), release_task_(*this), is_stop_(false), is_inited_(false)
|
: check_tenant_task_(*this),
|
||||||
|
gc_task_(*this),
|
||||||
|
release_task_(*this),
|
||||||
|
is_stop_(false),
|
||||||
|
is_inited_(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -288,6 +329,8 @@ int ObTableLoadService::init(uint64_t tenant_id)
|
|||||||
LOG_WARN("ObTableLoadService init twice", KR(ret), KP(this));
|
LOG_WARN("ObTableLoadService init twice", KR(ret), KP(this));
|
||||||
} else if (OB_FAIL(manager_.init())) {
|
} else if (OB_FAIL(manager_.init())) {
|
||||||
LOG_WARN("fail to init table ctx manager", KR(ret));
|
LOG_WARN("fail to init table ctx manager", KR(ret));
|
||||||
|
} else if (OB_FAIL(check_tenant_task_.init(tenant_id))) {
|
||||||
|
LOG_WARN("fail to init check tenant task", KR(ret));
|
||||||
} else if (OB_FAIL(gc_task_.init(tenant_id))) {
|
} else if (OB_FAIL(gc_task_.init(tenant_id))) {
|
||||||
LOG_WARN("fail to init gc task", KR(ret));
|
LOG_WARN("fail to init gc task", KR(ret));
|
||||||
} else if (OB_FAIL(release_task_.init(tenant_id))) {
|
} else if (OB_FAIL(release_task_.init(tenant_id))) {
|
||||||
@ -308,6 +351,8 @@ int ObTableLoadService::start()
|
|||||||
gc_timer_.set_run_wrapper(MTL_CTX());
|
gc_timer_.set_run_wrapper(MTL_CTX());
|
||||||
if (OB_FAIL(gc_timer_.init("TLD_GC", ObMemAttr(MTL_ID(), "GC_TIMER")))) {
|
if (OB_FAIL(gc_timer_.init("TLD_GC", ObMemAttr(MTL_ID(), "GC_TIMER")))) {
|
||||||
LOG_WARN("fail to init gc timer", KR(ret));
|
LOG_WARN("fail to init gc timer", KR(ret));
|
||||||
|
} else if (OB_FAIL(gc_timer_.schedule(check_tenant_task_, CHECK_TENANT_INTERVAL, true))) {
|
||||||
|
LOG_WARN("fail to schedule check tenant task", KR(ret));
|
||||||
} else if (OB_FAIL(gc_timer_.schedule(gc_task_, GC_INTERVAL, true))) {
|
} else if (OB_FAIL(gc_timer_.schedule(gc_task_, GC_INTERVAL, true))) {
|
||||||
LOG_WARN("fail to schedule gc task", KR(ret));
|
LOG_WARN("fail to schedule gc task", KR(ret));
|
||||||
} else if (OB_FAIL(gc_timer_.schedule(release_task_, RELEASE_INTERVAL, true))) {
|
} else if (OB_FAIL(gc_timer_.schedule(release_task_, RELEASE_INTERVAL, true))) {
|
||||||
|
|||||||
@ -40,8 +40,22 @@ private:
|
|||||||
void abort_all_ctx();
|
void abort_all_ctx();
|
||||||
void release_all_ctx();
|
void release_all_ctx();
|
||||||
private:
|
private:
|
||||||
|
static const int64_t CHECK_TENANT_INTERVAL = 1LL * 1000 * 1000; // 1s
|
||||||
static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s
|
static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s
|
||||||
static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s
|
static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s
|
||||||
|
class ObCheckTenantTask : public common::ObTimerTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ObCheckTenantTask(ObTableLoadService &service)
|
||||||
|
: service_(service), tenant_id_(common::OB_INVALID_ID), is_inited_(false) {}
|
||||||
|
virtual ~ObCheckTenantTask() = default;
|
||||||
|
int init(uint64_t tenant_id);
|
||||||
|
void runTimerTask() override;
|
||||||
|
private:
|
||||||
|
ObTableLoadService &service_;
|
||||||
|
uint64_t tenant_id_;
|
||||||
|
bool is_inited_;
|
||||||
|
};
|
||||||
class ObGCTask : public common::ObTimerTask
|
class ObGCTask : public common::ObTimerTask
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -71,6 +85,7 @@ private:
|
|||||||
private:
|
private:
|
||||||
ObTableLoadManager manager_;
|
ObTableLoadManager manager_;
|
||||||
common::ObTimer gc_timer_;
|
common::ObTimer gc_timer_;
|
||||||
|
ObCheckTenantTask check_tenant_task_;
|
||||||
ObGCTask gc_task_;
|
ObGCTask gc_task_;
|
||||||
ObReleaseTask release_task_;
|
ObReleaseTask release_task_;
|
||||||
volatile bool is_stop_;
|
volatile bool is_stop_;
|
||||||
|
|||||||
Reference in New Issue
Block a user