[CP] fix direct load still write after task exit

This commit is contained in:
suz-yang 2023-10-17 09:39:50 +00:00 committed by ob-robot
parent c5be3d3110
commit 79dc2ef91c
21 changed files with 446 additions and 58 deletions

View File

@ -296,14 +296,16 @@ int ObDirectLoadControlAbortExecutor::process()
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST == ret)) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
LOG_WARN("fail to get table ctx", KR(ret), K(key));
} else {
ret = OB_SUCCESS;
res_.is_stopped_ = true;
}
} else {
if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
ObTableLoadStore::abort_ctx(table_ctx, res_.is_stopped_);
if (res_.is_stopped_ && OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(key));
} else {
ObTableLoadStore::abort_ctx(table_ctx);
}
}
if (OB_NOT_NULL(table_ctx)) {
@ -351,6 +353,44 @@ int ObDirectLoadControlGetStatusExecutor::process()
return ret;
}
// heart_beath
int ObDirectLoadControlHeartBeatExecutor::check_args()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == arg_.table_id_ || 0 == arg_.task_id_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(arg_));
}
return ret;
}
int ObDirectLoadControlHeartBeatExecutor::process()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
}
if (OB_SUCC(ret)) {
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key));
} else {
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.heart_beat())) {
LOG_WARN("fail to heart beat store", KR(ret));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
}
return ret;
}
/// trans
// pre_start_trans
int ObDirectLoadControlPreStartTransExecutor::check_args()

View File

@ -197,6 +197,26 @@ protected:
int process() override;
};
// heart_beat
class ObDirectLoadControlHeartBeatExecutor
: public ObTableLoadControlRpcExecutor<ObDirectLoadControlCommandType::HEART_BEAT>
{
typedef ObTableLoadControlRpcExecutor<ObDirectLoadControlCommandType::HEART_BEAT> ParentType;
public:
ObDirectLoadControlHeartBeatExecutor(common::ObIAllocator &allocator,
const ObDirectLoadControlRequest &request,
ObDirectLoadControlResult &result)
: ParentType(allocator, request, result)
{
}
virtual ~ObDirectLoadControlHeartBeatExecutor() = default;
protected:
int check_args() override;
int process() override;
};
/// trans
// pre_start_trans
class ObDirectLoadControlPreStartTransExecutor

View File

@ -38,6 +38,7 @@ int ObTableLoadControlRpcProxy::dispatch(const ObDirectLoadControlRequest &reque
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::COMMIT);
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::ABORT);
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::GET_STATUS);
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::HEART_BEAT);
/// trans
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::PRE_START_TRANS);
OB_TABLE_LOAD_CONTROL_RPC_DISPATCH(ObDirectLoadControlCommandType::CONFIRM_START_TRANS);

View File

@ -27,6 +27,7 @@ class ObDirectLoadControlStartMergeExecutor;
class ObDirectLoadControlCommitExecutor;
class ObDirectLoadControlAbortExecutor;
class ObDirectLoadControlGetStatusExecutor;
class ObDirectLoadControlHeartBeatExecutor;
/// trans
class ObDirectLoadControlPreStartTransExecutor;
class ObDirectLoadControlConfirmStartTransExecutor;
@ -153,12 +154,17 @@ public:
ObDirectLoadControlCommitRes);
// abort
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abort, ObDirectLoadControlCommandType::ABORT,
ObDirectLoadControlAbortExecutor, ObDirectLoadControlAbortArg);
ObDirectLoadControlAbortExecutor, ObDirectLoadControlAbortArg,
ObDirectLoadControlAbortRes);
// get_status
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_status, ObDirectLoadControlCommandType::GET_STATUS,
ObDirectLoadControlGetStatusExecutor,
ObDirectLoadControlGetStatusArg,
ObDirectLoadControlGetStatusRes);
// heart_beat
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(heart_beat, ObDirectLoadControlCommandType::HEART_BEAT,
ObDirectLoadControlHeartBeatExecutor,
ObDirectLoadControlHeartBeatArg);
/// trans
// pre_start_trans

View File

@ -194,6 +194,9 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObDirectLoadControlAbortArg,
table_id_,
task_id_);
OB_SERIALIZE_MEMBER_SIMPLE(ObDirectLoadControlAbortRes,
is_stopped_);
// get_status
OB_SERIALIZE_MEMBER_SIMPLE(ObDirectLoadControlGetStatusArg,
table_id_,
@ -203,6 +206,11 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObDirectLoadControlGetStatusRes,
status_,
error_code_);
// heartbeat
OB_SERIALIZE_MEMBER_SIMPLE(ObDirectLoadControlHeartBeatArg,
table_id_,
task_id_);
// pre_start_trans
OB_SERIALIZE_MEMBER_SIMPLE(ObDirectLoadControlPreStartTransArg,
table_id_,

View File

@ -43,6 +43,8 @@ enum class ObDirectLoadControlCommandType
GET_TRANS_STATUS = 12,
INSERT_TRANS = 13,
HEART_BEAT = 14,
MAX_TYPE
};
@ -252,6 +254,18 @@ public:
int64_t task_id_;
};
class ObDirectLoadControlAbortRes final
{
OB_UNIS_VERSION(1);
public:
ObDirectLoadControlAbortRes() : is_stopped_(false) {}
TO_STRING_KV(K_(is_stopped));
public:
bool is_stopped_;
};
class ObDirectLoadControlGetStatusArg final
{
OB_UNIS_VERSION(1);
@ -280,6 +294,19 @@ public:
int32_t error_code_;
};
class ObDirectLoadControlHeartBeatArg final
{
OB_UNIS_VERSION(1);
public:
ObDirectLoadControlHeartBeatArg() : table_id_(common::OB_INVALID_ID), task_id_(0) {}
TO_STRING_KV(K_(table_id), K_(task_id));
public:
uint64_t table_id_;
int64_t task_id_;
};
class ObDirectLoadControlPreStartTransArg final
{
OB_UNIS_VERSION(1);

View File

@ -98,15 +98,17 @@ void ObTableLoadCoordinator::abort_ctx(ObTableLoadTableCtx *ctx)
if (OB_FAIL(ctx->coordinator_ctx_->set_status_abort())) {
LOG_WARN("fail to set coordinator status abort", KR(ret));
}
// 2. mark all active trans abort
// 2. disable heart beat
ctx->coordinator_ctx_->set_enable_heart_beat(false);
// 3. mark all active trans abort
if (OB_FAIL(abort_active_trans(ctx))) {
LOG_WARN("fail to abort active trans", KR(ret));
}
// 3. abort peers ctx
// 4. abort peers ctx
if (OB_FAIL(abort_peers_ctx(ctx))) {
LOG_WARN("fail to abort peers ctx", KR(ret));
}
// 4. abort redef table, release table lock
// 5. abort redef table, release table lock
if (OB_FAIL(abort_redef_table(ctx))) {
LOG_WARN("fail to abort redef table", KR(ret));
}
@ -148,20 +150,56 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
LOG_WARN("fail to get all addr", KR(ret));
} else {
LOG_INFO("route_abort_peer_request begin", K(all_addr_array.count()));
static const int64_t max_retry_times = 100; // ensure store ctx detect heart beat timeout and abort
ObArray<ObAddr> addr_array1, addr_array2;
ObIArray<ObAddr> *curr_round = &addr_array1, *next_round = &addr_array2;
int64_t fail_cnt = 0;
int64_t tries = 0;
ObDirectLoadControlAbortArg arg;
ObDirectLoadControlAbortRes res;
arg.table_id_ = ctx->param_.table_id_;
arg.task_id_ = ctx->ddl_param_.task_id_;
for (int64_t i = 0; i < all_addr_array.count(); ++i) {
const ObAddr &addr = all_addr_array.at(i);
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
ObTableLoadStore::abort_ctx(ctx);
} else { // 远端, 发送rpc
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(INT64_MAX); // use default timeout value, avoid timeout now
TABLE_LOAD_CONTROL_RPC_CALL(abort, addr, arg);
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
if (OB_FAIL(curr_round->push_back(addr))) {
LOG_WARN("fail to push back", KR(ret), K(addr));
}
}
while (!curr_round->empty() && tries < max_retry_times) {
ret = OB_SUCCESS;
fail_cnt = 0;
for (int64_t i = 0; i < curr_round->count(); ++i) {
const ObAddr &addr = curr_round->at(i);
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
ObTableLoadStore::abort_ctx(ctx, res.is_stopped_);
ret = OB_SUCCESS;
} else { // 远端, 发送rpc
// use default timeout value, avoid timeout immediately
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + DEFAULT_TIMEOUT_US);
TABLE_LOAD_CONTROL_RPC_CALL(abort, addr, arg, res);
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
}
if (OB_SUCC(ret) && res.is_stopped_) {
// peer is stopped
} else {
if (OB_FAIL(ret)) {
++fail_cnt;
ret = OB_SUCCESS;
}
if (OB_FAIL(next_round->push_back(addr))) {
LOG_WARN("fail to push back", KR(ret));
}
}
}
++tries;
if (tries % 10 == 0) {
LOG_WARN("retry too many times", K(tries), K(fail_cnt), KPC(next_round));
}
std::swap(curr_round, next_round);
next_round->reuse();
ob_usleep(WAIT_INTERVAL_US);
}
}
return ret;
}
@ -311,6 +349,8 @@ int ObTableLoadCoordinator::begin()
LOG_WARN("fail to confirm begin peers", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->set_status_loading())) {
LOG_WARN("fail to set coordinator status loading", KR(ret));
} else {
coordinator_ctx_->set_enable_heart_beat(true);
}
}
return ret;
@ -655,6 +695,7 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info)
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_peers())) {
LOG_WARN("fail to commit peers", KR(ret));
} else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) {
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(
drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) {
@ -686,6 +727,7 @@ int ObTableLoadCoordinator::px_commit_data()
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_peers())) {
LOG_WARN("fail to commit peers", KR(ret));
} else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) {
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(
drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) {
@ -763,6 +805,55 @@ int ObTableLoadCoordinator::get_status(ObTableLoadStatusType &status, int &error
return ret;
}
/**
* heart_beat
*/
int ObTableLoadCoordinator::heart_beat_peer()
{
int ret = OB_SUCCESS;
ObTableLoadArray<ObAddr> all_addr_array;
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
LOG_WARN("fail to get all addr", KR(ret));
} else {
LOG_DEBUG("route_heart_beat_peer_request begin", K(all_addr_array.count()));
ObDirectLoadControlHeartBeatArg arg;
arg.table_id_ = param_.table_id_;
arg.task_id_ = ctx_->ddl_param_.task_id_;
for (int64_t i = 0; i < all_addr_array.count(); ++i) {
const ObAddr &addr = all_addr_array.at(i);
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
ObTableLoadStore store(ctx_);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.heart_beat())) {
LOG_WARN("fail to heart beat store", KR(ret));
}
} else { // 远端, 发送rpc
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + HEART_BEAT_RPC_TIMEOUT_US);
TABLE_LOAD_CONTROL_RPC_CALL(heart_beat, addr, arg);
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
}
}
}
return ret;
}
int ObTableLoadCoordinator::heart_beat()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this));
} else {
LOG_DEBUG("coordinator heart beat");
// 心跳是为了让数据节点感知控制节点存活, 控制节点不依赖心跳感知数据节点状态, 忽略失败
heart_beat_peer();
}
return ret;
}
/**
* start trans
*/

View File

@ -32,6 +32,7 @@ class ObTableLoadCoordinator
{
static const int64_t WAIT_INTERVAL_US = 1LL * 1000 * 1000; // 1s
static const int64_t DEFAULT_TIMEOUT_US = 10LL * 1000 * 1000; // 10s
static const int64_t HEART_BEAT_RPC_TIMEOUT_US = 1LL * 1000 * 1000; // 1s
public:
ObTableLoadCoordinator(ObTableLoadTableCtx *ctx);
static bool is_ctx_inited(ObTableLoadTableCtx *ctx);
@ -53,6 +54,7 @@ public:
int px_commit_data();
int px_commit_ddl();
int get_status(table::ObTableLoadStatusType &status, int &error_code);
int heart_beat();
private:
int pre_begin_peers();
int confirm_begin_peers();
@ -61,6 +63,7 @@ private:
int commit_peers();
int commit_redef_table();
int drive_sql_stat(sql::ObExecContext *ctx);
int heart_beat_peer();
private:
int add_check_merge_result_task();
int check_peers_merge_result(bool &is_finish);

View File

@ -43,6 +43,7 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx)
next_session_id_(0),
status_(ObTableLoadStatusType::NONE),
error_code_(OB_SUCCESS),
enable_heart_beat_(false),
is_inited_(false)
{
}

View File

@ -90,6 +90,11 @@ public:
int set_status_error(int error_code);
int set_status_abort();
int check_status(table::ObTableLoadStatusType status) const;
OB_INLINE bool enable_heart_beat() const { return enable_heart_beat_; }
OB_INLINE void set_enable_heart_beat(bool enable_heart_beat)
{
enable_heart_beat_ = enable_heart_beat;
}
private:
int advance_status(table::ObTableLoadStatusType status);
public:
@ -169,6 +174,7 @@ private:
TransCtxMap trans_ctx_map_;
SegmentCtxMap segment_ctx_map_;
common::ObSEArray<ObTableLoadTransCtx *, 64> commited_trans_ctx_array_;
bool enable_heart_beat_;
bool is_inited_;
};

View File

@ -15,8 +15,10 @@
#include "observer/table_load/ob_table_load_service.h"
#include "observer/omt/ob_tenant.h"
#include "observer/table_load/ob_table_load_client_task.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_store.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_utils.h"
@ -72,6 +74,51 @@ void ObTableLoadService::ObCheckTenantTask::runTimerTask()
}
}
/**
* ObHeartBeatTask
*/
int ObTableLoadService::ObHeartBeatTask::init(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadService::ObHeartBeatTask init twice", KR(ret), KP(this));
} else {
tenant_id_ = tenant_id;
is_inited_ = true;
}
return ret;
}
void ObTableLoadService::ObHeartBeatTask::runTimerTask()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadService::ObHeartBeatTask not init", KR(ret), KP(this));
} else {
LOG_DEBUG("table load heart beat", K(tenant_id_));
ObTableLoadManager &manager = service_.get_manager();
ObArray<ObTableLoadTableCtx *> table_ctx_array;
if (OB_FAIL(manager.get_all_table_ctx(table_ctx_array))) {
LOG_WARN("fail to get all table ctx", KR(ret), K(tenant_id_));
}
for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
if (nullptr != table_ctx->coordinator_ctx_ && table_ctx->coordinator_ctx_->enable_heart_beat()) {
ObTableLoadCoordinator coordinator(table_ctx);
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.heart_beat())) {
LOG_WARN("fail to coordinator heart beat", KR(ret));
}
}
manager.put_table_ctx(table_ctx);
}
}
}
/**
* ObGCTask
*/
@ -98,50 +145,102 @@ void ObTableLoadService::ObGCTask::runTimerTask()
} else {
LOG_DEBUG("table load start gc", K(tenant_id_));
ObTableLoadManager &manager = service_.get_manager();
ObArray<ObTableLoadTableCtx *> inactive_table_ctx_array;
if (OB_FAIL(manager.get_inactive_table_ctx_list(inactive_table_ctx_array))) {
LOG_WARN("fail to get inactive table ctx list", KR(ret), K(tenant_id_));
ObArray<ObTableLoadTableCtx *> table_ctx_array;
if (OB_FAIL(manager.get_all_table_ctx(table_ctx_array))) {
LOG_WARN("fail to get all table ctx", KR(ret), K(tenant_id_));
}
for (int64_t i = 0; i < inactive_table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = inactive_table_ctx_array.at(i);
const uint64_t table_id = table_ctx->param_.table_id_;
const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
// check if table ctx is removed
if (table_ctx->is_dirty()) {
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count",
table_ctx->get_ref_count());
}
// check if table ctx is activated
else if (table_ctx->get_ref_count() > 1) {
LOG_DEBUG("table load ctx is active", K(tenant_id_), K(table_id), "ref_count",
table_ctx->get_ref_count());
}
// check if table ctx can be recycled
else {
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id_, hidden_table_id, schema_guard,
table_schema))) {
if (OB_UNLIKELY(OB_TABLE_NOT_EXIST != ret)) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(hidden_table_id));
} else {
LOG_INFO("hidden table not exist, gc table load ctx", K(tenant_id_), K(table_id),
K(hidden_table_id));
ObTableLoadService::remove_ctx(table_ctx);
}
} else if (table_schema->is_in_recyclebin()) {
LOG_INFO("hidden table is in recyclebin, gc table load ctx", K(tenant_id_), K(table_id),
K(hidden_table_id));
ObTableLoadService::remove_ctx(table_ctx);
} else {
LOG_DEBUG("table load ctx is running", K(tenant_id_), K(table_id), K(hidden_table_id));
}
for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
if (gc_heart_beat_expired_ctx(table_ctx)) {
} else if (gc_table_not_exist_ctx(table_ctx)) {
}
manager.put_table_ctx(table_ctx);
}
}
}
bool ObTableLoadService::ObGCTask::gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
bool is_removed = false;
if (OB_ISNULL(table_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected table ctx is null", KR(ret));
is_removed = true;
} else {
const uint64_t table_id = table_ctx->param_.table_id_;
const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
// check if table ctx is removed
if (table_ctx->is_dirty()) {
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count",
table_ctx->get_ref_count());
is_removed = true;
}
// check if heart beat expired
else if (nullptr != table_ctx->store_ctx_ && table_ctx->store_ctx_->enable_heart_beat_check()) {
if (OB_UNLIKELY(
table_ctx->store_ctx_->check_heart_beat_expired(HEART_BEEAT_EXPIRED_TIME_US))) {
LOG_INFO("store heart beat expired, abort", K(tenant_id_), K(table_id), K(hidden_table_id));
bool is_stopped = false;
ObTableLoadStore::abort_ctx(table_ctx, is_stopped);
// 先不移除, 防止心跳超时后, 网络恢复, 控制节点查不到table_ctx, 直接认为已经停止
// 如果网络一直不恢复, 也可以通过table不存在来gc此table_ctx
is_removed = true; // skip other gc
}
}
}
return is_removed;
}
bool ObTableLoadService::ObGCTask::gc_table_not_exist_ctx(ObTableLoadTableCtx *table_ctx)
{
int ret = OB_SUCCESS;
bool is_removed = false;
if (OB_ISNULL(table_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected table ctx is null", KR(ret));
is_removed = true;
} else {
const uint64_t table_id = table_ctx->param_.table_id_;
const uint64_t hidden_table_id = table_ctx->ddl_param_.dest_table_id_;
// check if table ctx is removed
if (table_ctx->is_dirty()) {
LOG_DEBUG("table load ctx is dirty", K(tenant_id_), K(table_id), "ref_count",
table_ctx->get_ref_count());
is_removed = true;
}
// check if table ctx is activated
else if (table_ctx->get_ref_count() > 1) {
LOG_DEBUG("table load ctx is active", K(tenant_id_), K(table_id), "ref_count",
table_ctx->get_ref_count());
}
// check if table ctx can be recycled
else {
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id_, hidden_table_id, schema_guard,
table_schema))) {
if (OB_UNLIKELY(OB_TABLE_NOT_EXIST != ret)) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(hidden_table_id));
} else {
LOG_INFO("hidden table not exist, gc table load ctx", K(tenant_id_), K(table_id),
K(hidden_table_id));
ObTableLoadService::remove_ctx(table_ctx);
is_removed = true;
}
} else if (table_schema->is_in_recyclebin()) {
LOG_INFO("hidden table is in recyclebin, gc table load ctx", K(tenant_id_), K(table_id),
K(hidden_table_id));
ObTableLoadService::remove_ctx(table_ctx);
is_removed = true;
} else {
LOG_DEBUG("table load ctx is running", K(tenant_id_), K(table_id), K(hidden_table_id));
}
}
}
return is_removed;
}
/**
* ObReleaseTask
*/
@ -426,6 +525,7 @@ void ObTableLoadService::put_ctx(ObTableLoadTableCtx *table_ctx)
ObTableLoadService::ObTableLoadService()
: check_tenant_task_(*this),
heart_beat_task_(*this),
gc_task_(*this),
release_task_(*this),
client_task_auto_abort_task_(*this),
@ -447,6 +547,8 @@ int ObTableLoadService::init(uint64_t tenant_id)
LOG_WARN("fail to init client service", KR(ret));
} else if (OB_FAIL(check_tenant_task_.init(tenant_id))) {
LOG_WARN("fail to init check tenant task", KR(ret));
} else if (OB_FAIL(heart_beat_task_.init(tenant_id))) {
LOG_WARN("fail to init heart beat task", KR(ret));
} else if (OB_FAIL(gc_task_.init(tenant_id))) {
LOG_WARN("fail to init gc task", KR(ret));
} else if (OB_FAIL(release_task_.init(tenant_id))) {
@ -473,6 +575,8 @@ int ObTableLoadService::start()
LOG_WARN("fail to init gc timer", KR(ret));
} else if (OB_FAIL(timer_.schedule(check_tenant_task_, CHECK_TENANT_INTERVAL, true))) {
LOG_WARN("fail to schedule check tenant task", KR(ret));
} else if (OB_FAIL(timer_.schedule(heart_beat_task_, HEART_BEEAT_INTERVAL, true))) {
LOG_WARN("fail to schedule heart beat task", KR(ret));
} else if (OB_FAIL(timer_.schedule(gc_task_, GC_INTERVAL, true))) {
LOG_WARN("fail to schedule gc task", KR(ret));
} else if (OB_FAIL(timer_.schedule(release_task_, RELEASE_INTERVAL, true))) {

View File

@ -61,6 +61,8 @@ private:
void release_all_ctx();
private:
static const int64_t CHECK_TENANT_INTERVAL = 1LL * 1000 * 1000; // 1s
static const int64_t HEART_BEEAT_INTERVAL = 10LL * 1000 * 1000; // 10s
static const int64_t HEART_BEEAT_EXPIRED_TIME_US = 30LL * 1000 * 1000; // 30s
static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s
static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s
static const int64_t CLIENT_TASK_AUTO_ABORT_INTERVAL = 1LL * 1000 * 1000; // 1s
@ -78,6 +80,19 @@ private:
uint64_t tenant_id_;
bool is_inited_;
};
class ObHeartBeatTask : public common::ObTimerTask
{
public:
ObHeartBeatTask(ObTableLoadService &service)
: service_(service), tenant_id_(common::OB_INVALID_ID), is_inited_(false) {}
virtual ~ObHeartBeatTask() = default;
int init(uint64_t tenant_id);
void runTimerTask() override;
private:
ObTableLoadService &service_;
uint64_t tenant_id_;
bool is_inited_;
};
class ObGCTask : public common::ObTimerTask
{
public:
@ -86,6 +101,9 @@ private:
virtual ~ObGCTask() = default;
int init(uint64_t tenant_id);
void runTimerTask() override;
private:
bool gc_heart_beat_expired_ctx(ObTableLoadTableCtx *table_ctx);
bool gc_table_not_exist_ctx(ObTableLoadTableCtx *table_ctx);
private:
ObTableLoadService &service_;
uint64_t tenant_id_;
@ -139,6 +157,7 @@ private:
ObTableLoadClientService client_service_;
common::ObTimer timer_;
ObCheckTenantTask check_tenant_task_;
ObHeartBeatTask heart_beat_task_;
ObGCTask gc_task_;
ObReleaseTask release_task_;
ObClientTaskAutoAbortTask client_task_auto_abort_task_;

View File

@ -55,24 +55,33 @@ int ObTableLoadStore::init_ctx(
return ret;
}
void ObTableLoadStore::abort_ctx(ObTableLoadTableCtx *ctx)
void ObTableLoadStore::abort_ctx(ObTableLoadTableCtx *ctx, bool &is_stopped)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ctx->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KPC(ctx));
is_stopped = true;
} else if (OB_UNLIKELY(nullptr == ctx->store_ctx_ || !ctx->store_ctx_->is_valid())) {
// store ctx not init, do nothing
is_stopped = true;
} else {
LOG_INFO("store abort");
// 1. mark status abort, speed up background task exit
if (OB_FAIL(ctx->store_ctx_->set_status_abort())) {
LOG_WARN("fail to set store status abort", KR(ret));
}
// 2. mark all active trans abort
// 2. disable heart beat check
ctx->store_ctx_->set_enable_heart_beat_check(false);
// 3. mark all active trans abort
if (OB_FAIL(abort_active_trans(ctx))) {
LOG_WARN("fail to abort active trans", KR(ret));
}
// 4. stop merger
ctx->store_ctx_->merger_->stop();
// 5. stop task_scheduler
ctx->store_ctx_->task_scheduler_->stop();
is_stopped = ctx->store_ctx_->task_scheduler_->is_stopped();
}
}
@ -145,6 +154,9 @@ int ObTableLoadStore::confirm_begin()
LOG_INFO("store confirm begin");
if (OB_FAIL(store_ctx_->set_status_loading())) {
LOG_WARN("fail to set store status loading", KR(ret));
} else {
store_ctx_->heart_beat(); // init heart beat
store_ctx_->set_enable_heart_beat_check(true);
}
}
return ret;
@ -316,6 +328,7 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info)
} else if (OB_FAIL(store_ctx_->set_status_commit())) {
LOG_WARN("fail to set store status commit", KR(ret));
} else {
store_ctx_->set_enable_heart_beat_check(false);
result_info = store_ctx_->result_info_;
}
}
@ -363,6 +376,19 @@ int ObTableLoadStore::get_status(ObTableLoadStatusType &status, int &error_code)
return ret;
}
int ObTableLoadStore::heart_beat()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
} else {
LOG_DEBUG("store heart beat");
store_ctx_->heart_beat();
}
return ret;
}
int ObTableLoadStore::pre_start_trans(const ObTableLoadTransId &trans_id)
{
int ret = OB_SUCCESS;

View File

@ -35,7 +35,7 @@ public:
ObTableLoadTableCtx *ctx,
const table::ObTableLoadArray<table::ObTableLoadLSIdAndPartitionId> &partition_id_array,
const table::ObTableLoadArray<table::ObTableLoadLSIdAndPartitionId> &target_partition_id_array);
static void abort_ctx(ObTableLoadTableCtx *ctx);
static void abort_ctx(ObTableLoadTableCtx *ctx, bool &is_stopped);
int init();
private:
static int abort_active_trans(ObTableLoadTableCtx *ctx);
@ -48,6 +48,7 @@ public:
int start_merge();
int commit(table::ObTableLoadResultInfo &result_info);
int get_status(table::ObTableLoadStatusType &status, int &error_code);
int heart_beat();
private:
int commit_sql_statistics(const table::ObTableLoadSqlStatistics &sql_statistics);
private:

View File

@ -58,6 +58,8 @@ ObTableLoadStoreCtx::ObTableLoadStoreCtx(ObTableLoadTableCtx *ctx)
next_session_id_(0),
status_(ObTableLoadStatusType::NONE),
error_code_(OB_SUCCESS),
last_heart_beat_ts_(0),
enable_heart_beat_check_(false),
is_inited_(false)
{
}
@ -389,6 +391,16 @@ int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const
return ret;
}
void ObTableLoadStoreCtx::heart_beat()
{
last_heart_beat_ts_ = ObTimeUtil::current_time();
}
bool ObTableLoadStoreCtx::check_heart_beat_expired(const uint64_t expired_time_us)
{
return ObTimeUtil::current_time() > (last_heart_beat_ts_ + expired_time_us);
}
int ObTableLoadStoreCtx::get_wa_memory_limit(int64_t &wa_mem_limit)
{
int ret = OB_SUCCESS;

View File

@ -98,6 +98,13 @@ public:
int set_status_error(int error_code);
int set_status_abort();
int check_status(table::ObTableLoadStatusType status) const;
OB_INLINE bool enable_heart_beat_check() const { return enable_heart_beat_check_; }
OB_INLINE void set_enable_heart_beat_check(bool enable_heart_beat_check)
{
enable_heart_beat_check_ = enable_heart_beat_check;
}
void heart_beat();
bool check_heart_beat_expired(const uint64_t expired_time_us);
private:
int advance_status(table::ObTableLoadStatusType status);
public:
@ -183,6 +190,8 @@ private:
TransCtxMap trans_ctx_map_;
SegmentCtxMap segment_ctx_map_;
common::ObSEArray<ObTableLoadTransStore *, 64> committed_trans_store_array_;
uint64_t last_heart_beat_ts_;
bool enable_heart_beat_check_;
bool is_inited_;
};

View File

@ -37,6 +37,7 @@ public:
virtual void wait() = 0;
virtual int add_task(int64_t thread_idx, ObTableLoadTask *task) = 0;
virtual int64_t get_thread_count() const = 0;
virtual bool is_stopped() const = 0;
private:
DISALLOW_COPY_AND_ASSIGN(ObITableLoadTaskScheduler);
};
@ -61,6 +62,10 @@ public:
void wait() override;
int add_task(int64_t thread_idx, ObTableLoadTask *task) override;
int64_t get_thread_count() const override { return thread_count_; }
bool is_stopped() const override
{
return state_ == STATE_STOPPED || state_ == STATE_STOPPED_NO_WAIT;
}
private:
void run(uint64_t thread_idx);
int init_worker_ctx_array();

View File

@ -61,6 +61,7 @@ ObDirectLoadMultipleHeapTableCompactor::ObDirectLoadMultipleHeapTableCompactor()
index_entry_count_(0),
row_count_(0),
max_data_block_size_(0),
is_stop_(false),
is_inited_(false)
{
}
@ -242,7 +243,10 @@ int ObDirectLoadMultipleHeapTableCompactor::compact()
LOG_WARN("fail to init scan merge", KR(ret));
}
while (OB_SUCC(ret)) {
if (OB_FAIL(scan_merge.get_next_index(idx, tablet_index))) {
if (OB_UNLIKELY(is_stop_)) {
ret = OB_CANCELED;
LOG_WARN("compact canceled", KR(ret));
} else if (OB_FAIL(scan_merge.get_next_index(idx, tablet_index))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next index", KR(ret));
} else {
@ -338,6 +342,7 @@ int ObDirectLoadMultipleHeapTableCompactor::get_table(ObIDirectLoadPartitionTabl
void ObDirectLoadMultipleHeapTableCompactor::stop()
{
is_stop_ = true;
}
} // namespace storage

View File

@ -64,6 +64,7 @@ private:
common::ObArray<int64_t> base_data_fragment_idxs_;
common::ObArray<ObDirectLoadMultipleHeapTableDataFragment> data_fragments_;
ObDirectLoadTmpFileHandle compacted_index_file_handle_;
bool is_stop_;
bool is_inited_;
};

View File

@ -82,7 +82,10 @@ int ObDirectLoadPartitionMergeTask::process()
LOG_INFO("add sstable slice begin", K(target_tablet_id), K(parallel_idx_));
const ObDatumRow *datum_row = nullptr;
while (OB_SUCC(ret)) {
if (OB_FAIL(row_iter->get_next_row(datum_row))) {
if (OB_UNLIKELY(is_stop_)) {
ret = OB_CANCELED;
LOG_WARN("merge task canceled", KR(ret));
} else if (OB_FAIL(row_iter->get_next_row(datum_row))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next row", KR(ret));
} else {

View File

@ -62,7 +62,7 @@ protected:
int64_t affected_rows_;
common::ObArray<ObOptOSGColumnStat*> column_stat_array_;
common::ObArenaAllocator allocator_;
volatile bool is_stop_;
bool is_stop_;
bool is_inited_;
};