modify direct load heart beat start logic
This commit is contained in:
parent
145e8598e6
commit
68bfc3cfa2
@ -112,13 +112,11 @@ void ObTableLoadCoordinator::abort_ctx(ObTableLoadTableCtx *ctx)
|
||||
if (OB_SUCCESS != (tmp_ret = ctx->coordinator_ctx_->set_status_abort())) {
|
||||
LOG_WARN("fail to set coordinator status abort", KR(tmp_ret));
|
||||
}
|
||||
// 2. disable heart beat
|
||||
ctx->coordinator_ctx_->set_enable_heart_beat(false);
|
||||
// 3. mark all active trans abort
|
||||
// 2. mark all active trans abort
|
||||
if (OB_SUCCESS != (tmp_ret = abort_active_trans(ctx))) {
|
||||
LOG_WARN("fail to abort active trans", KR(tmp_ret));
|
||||
}
|
||||
// 4. abort peers ctx
|
||||
// 3. abort peers ctx
|
||||
if (OB_SUCCESS != (tmp_ret = abort_peers_ctx(ctx))) {
|
||||
LOG_WARN("fail to abort peers ctx", KR(tmp_ret));
|
||||
}
|
||||
@ -156,14 +154,12 @@ int ObTableLoadCoordinator::abort_active_trans(ObTableLoadTableCtx *ctx)
|
||||
int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(ctx->coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_abort_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
using StoreInfo = ObTableLoadCoordinatorCtx::StoreInfo;
|
||||
LOG_INFO("route_abort_peer_request begin", K(ctx->coordinator_ctx_->store_infos_));
|
||||
static const int64_t max_retry_times = 100; // ensure store ctx detect heart beat timeout and abort
|
||||
ObArray<ObAddr> addr_array1, addr_array2;
|
||||
ObIArray<ObAddr> *curr_round = &addr_array1, *next_round = &addr_array2;
|
||||
ObArray<StoreInfo *> addr_array1, addr_array2;
|
||||
ObIArray<StoreInfo *> *curr_round = &addr_array1, *next_round = &addr_array2;
|
||||
int64_t running_cnt = 0;
|
||||
int64_t fail_cnt = 0;
|
||||
int64_t round = 0;
|
||||
@ -174,10 +170,10 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
||||
addr_array2.set_tenant_id(MTL_ID());
|
||||
arg.table_id_ = ctx->param_.table_id_;
|
||||
arg.task_id_ = ctx->ddl_param_.task_id_;
|
||||
for (int64_t i = 0; i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
if (OB_FAIL(curr_round->push_back(addr))) {
|
||||
LOG_WARN("fail to push back", KR(ret), K(addr));
|
||||
for (int64_t i = 0; i < ctx->coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
StoreInfo *store_info = &(ctx->coordinator_ctx_->store_infos_.at(i));
|
||||
if (OB_FAIL(curr_round->push_back(store_info))) {
|
||||
LOG_WARN("fail to push back", KR(ret), K(*store_info));
|
||||
}
|
||||
}
|
||||
ObTableLoadIndexLongWait wait_obj(10 * 1000, WAIT_INTERVAL_US);
|
||||
@ -187,7 +183,8 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
||||
running_cnt = 0;
|
||||
fail_cnt = 0;
|
||||
for (int64_t i = 0; i < curr_round->count(); ++i) {
|
||||
const ObAddr &addr = curr_round->at(i);
|
||||
StoreInfo *store_info = curr_round->at(i);
|
||||
const ObAddr &addr = store_info->addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore::abort_ctx(ctx, res.is_stopped_);
|
||||
ret = OB_SUCCESS;
|
||||
@ -200,6 +197,7 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
||||
}
|
||||
if (OB_SUCC(ret) && res.is_stopped_) {
|
||||
// peer is stopped
|
||||
store_info->enable_heart_beat_ = false;
|
||||
} else {
|
||||
if (OB_FAIL(ret)) {
|
||||
++fail_cnt;
|
||||
@ -207,8 +205,8 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx)
|
||||
} else {
|
||||
++running_cnt;
|
||||
}
|
||||
if (OB_FAIL(next_round->push_back(addr))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
if (OB_FAIL(next_round->push_back(store_info))) {
|
||||
LOG_WARN("fail to push back", KR(ret), K(*store_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -290,11 +288,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
} else if (cluster_version < CLUSTER_VERSION_4_2_2_0 ||
|
||||
(cluster_version >= CLUSTER_VERSION_4_3_0_0 && cluster_version < CLUSTER_VERSION_4_3_1_0)) {
|
||||
// not support resource manage
|
||||
if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(coordinator_ctx_->partition_ids_,
|
||||
coordinator_ctx_->target_partition_ids_,
|
||||
coordinator_ctx_->partition_location_,
|
||||
coordinator_ctx_->target_partition_location_))) {
|
||||
LOG_WARN("fail to inner init partition location", KR(ret));
|
||||
if (OB_FAIL(coordinator_ctx_->init_partition_location_and_store_infos())) {
|
||||
LOG_WARN("fail to init partition location and store infos", KR(ret));
|
||||
} else {
|
||||
ctx_->param_.session_count_ = MAX(MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2), MIN_THREAD_COUNT);
|
||||
ctx_->param_.write_session_count_ = ctx_->param_.session_count_;
|
||||
@ -315,11 +310,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(coordinator_ctx_->partition_ids_,
|
||||
coordinator_ctx_->target_partition_ids_,
|
||||
coordinator_ctx_->partition_location_,
|
||||
coordinator_ctx_->target_partition_location_))) {
|
||||
LOG_WARN("fail to inner init partition location", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->init_partition_location_and_store_infos())) {
|
||||
LOG_WARN("fail to init partition location and store infos", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) {
|
||||
LOG_WARN("fail to get all leader info", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) {
|
||||
@ -705,16 +697,14 @@ private:
|
||||
int ObTableLoadCoordinator::confirm_begin_peers()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_confirm_begin_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_confirm_begin_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlConfirmBeginArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
ObTableLoadCoordinatorCtx::StoreInfo &store_info = coordinator_ctx_->store_infos_.at(i);
|
||||
const ObAddr &addr = store_info.addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -725,6 +715,9 @@ int ObTableLoadCoordinator::confirm_begin_peers()
|
||||
} else { // 对端, 发送rpc
|
||||
TABLE_LOAD_CONTROL_RPC_CALL(confirm_begin, addr, arg);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
store_info.enable_heart_beat_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -751,7 +744,6 @@ int ObTableLoadCoordinator::begin()
|
||||
} else if (OB_FAIL(confirm_begin_peers())) {
|
||||
LOG_WARN("fail to confirm begin peers", KR(ret));
|
||||
} else {
|
||||
coordinator_ctx_->set_enable_heart_beat(true);
|
||||
if (OB_NOT_NULL(coordinator_ctx_->empty_insert_tablet_ctx_manager_)) {
|
||||
if (OB_FAIL(init_empty_tablets())) {
|
||||
LOG_WARN("fail to init empty partition", KR(ret));
|
||||
@ -773,18 +765,15 @@ int ObTableLoadCoordinator::begin()
|
||||
int ObTableLoadCoordinator::check_peers_begin_result(bool &is_finish)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("check_peers_begin_result begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("check_peers_begin_result begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlGetStatusArg arg;
|
||||
ObDirectLoadControlGetStatusRes res;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
is_finish = true;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -925,11 +914,8 @@ int ObTableLoadCoordinator::add_check_begin_result_task()
|
||||
int ObTableLoadCoordinator::pre_merge_peers()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_pre_merge_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_pre_merge_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
ObArenaAllocator allocator("TLD_Coord");
|
||||
ObDirectLoadControlPreMergeArg arg;
|
||||
allocator.set_tenant_id(MTL_ID());
|
||||
@ -943,8 +929,8 @@ int ObTableLoadCoordinator::pre_merge_peers()
|
||||
lib::ob_sort(arg.committed_trans_id_array_.begin(), arg.committed_trans_id_array_.end());
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -963,16 +949,13 @@ int ObTableLoadCoordinator::pre_merge_peers()
|
||||
int ObTableLoadCoordinator::start_merge_peers()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_start_merge_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_start_merge_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlStartMergeArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1045,18 +1028,15 @@ int ObTableLoadCoordinator::finish()
|
||||
int ObTableLoadCoordinator::check_peers_merge_result(bool &is_finish)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_get_status_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_get_status_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlGetStatusArg arg;
|
||||
ObDirectLoadControlGetStatusRes res;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
is_finish = true;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1199,20 +1179,18 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTransService *txs = nullptr;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_ISNULL(MTL(ObTransService *))) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("trans service is null", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_commit_peer_request begin", K(all_addr_array.count()));
|
||||
LOG_INFO("route_commit_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlCommitArg arg;
|
||||
ObDirectLoadControlCommitRes res;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
ObTableLoadCoordinatorCtx::StoreInfo &store_info = coordinator_ctx_->store_infos_.at(i);
|
||||
const ObAddr &addr = store_info.addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1236,8 +1214,10 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
|
||||
} else if (OB_FAIL(dml_stats.merge(res.dml_stats_))) {
|
||||
LOG_WARN("fail to add result dml stats", KR(ret), K(addr), K(res));
|
||||
} else if (ObDirectLoadMethod::is_incremental(param_.method_) &&
|
||||
txs->add_tx_exec_result(*ctx_->session_info_->get_tx_desc(), res.trans_result_)) {
|
||||
OB_FAIL(txs->add_tx_exec_result(*ctx_->session_info_->get_tx_desc(), res.trans_result_))) {
|
||||
LOG_WARN("fail to add tx exec result", KR(ret));
|
||||
} else {
|
||||
store_info.enable_heart_beat_ = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1379,7 +1359,6 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info)
|
||||
LOG_WARN("fail to check coordinator status", KR(ret));
|
||||
} else if (OB_FAIL(commit_peers(sql_statistics, dml_stats))) {
|
||||
LOG_WARN("fail to commit peers", KR(ret));
|
||||
} else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) {
|
||||
} else if (param_.online_opt_stat_gather_ &&
|
||||
OB_FAIL(write_sql_stat(sql_statistics, dml_stats))) {
|
||||
LOG_WARN("fail to write sql stat", KR(ret));
|
||||
@ -1416,16 +1395,13 @@ int ObTableLoadCoordinator::get_status(ObTableLoadStatusType &status, int &error
|
||||
int ObTableLoadCoordinator::heart_beat_peer()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_DEBUG("route_heart_beat_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_DEBUG("route_heart_beat_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlHeartBeatArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
for (int64_t i = 0; i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1464,18 +1440,15 @@ int ObTableLoadCoordinator::heart_beat()
|
||||
int ObTableLoadCoordinator::pre_start_trans_peers(ObTableLoadCoordinatorTrans *trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_pre_start_trans_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_pre_start_trans_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
const ObTableLoadTransId &trans_id = trans->get_trans_id();
|
||||
ObDirectLoadControlPreStartTransArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
arg.trans_id_ = trans_id;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1494,18 +1467,15 @@ int ObTableLoadCoordinator::pre_start_trans_peers(ObTableLoadCoordinatorTrans *t
|
||||
int ObTableLoadCoordinator::confirm_start_trans_peers(ObTableLoadCoordinatorTrans *trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_confirm_start_trans_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_confirm_start_trans_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
const ObTableLoadTransId &trans_id = trans->get_trans_id();
|
||||
ObDirectLoadControlConfirmStartTransArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
arg.trans_id_ = trans_id;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1577,18 +1547,15 @@ int ObTableLoadCoordinator::start_trans(const ObTableLoadSegmentID &segment_id,
|
||||
int ObTableLoadCoordinator::pre_finish_trans_peers(ObTableLoadCoordinatorTrans *trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_pre_finish_trans_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_pre_finish_trans_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
const ObTableLoadTransId &trans_id = trans->get_trans_id();
|
||||
ObDirectLoadControlPreFinishTransArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
arg.trans_id_ = trans_id;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1607,18 +1574,15 @@ int ObTableLoadCoordinator::pre_finish_trans_peers(ObTableLoadCoordinatorTrans *
|
||||
int ObTableLoadCoordinator::confirm_finish_trans_peers(ObTableLoadCoordinatorTrans *trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_pre_finish_trans_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_pre_finish_trans_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
const ObTableLoadTransId &trans_id = trans->get_trans_id();
|
||||
ObDirectLoadControlConfirmFinishTransArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
arg.trans_id_ = trans_id;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1663,19 +1627,16 @@ int ObTableLoadCoordinator::finish_trans(const ObTableLoadTransId &trans_id)
|
||||
int ObTableLoadCoordinator::check_peers_trans_commit(ObTableLoadCoordinatorTrans *trans, bool &is_commit)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_check_peers_trans_commit begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_check_peers_trans_commit begin", K(coordinator_ctx_->store_infos_));
|
||||
ObDirectLoadControlGetTransStatusArg arg;
|
||||
ObDirectLoadControlGetTransStatusRes res;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
arg.trans_id_ = trans->get_trans_id();
|
||||
is_commit = true;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
@ -1774,18 +1735,15 @@ int ObTableLoadCoordinator::commit_trans(ObTableLoadCoordinatorTrans *trans)
|
||||
int ObTableLoadCoordinator::abandon_trans_peers(ObTableLoadCoordinatorTrans *trans)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all addr", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("route_abandon_trans_peer_request begin", K(all_addr_array.count()));
|
||||
{
|
||||
LOG_INFO("route_abandon_trans_peer_request begin", K(coordinator_ctx_->store_infos_));
|
||||
const ObTableLoadTransId &trans_id = trans->get_trans_id();
|
||||
ObDirectLoadControlAbandonTransArg arg;
|
||||
arg.table_id_ = param_.table_id_;
|
||||
arg.task_id_ = ctx_->ddl_param_.task_id_;
|
||||
arg.trans_id_ = trans_id;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
const ObAddr &addr = all_addr_array.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < coordinator_ctx_->store_infos_.count(); ++i) {
|
||||
const ObAddr &addr = coordinator_ctx_->store_infos_.at(i).addr_;
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
|
@ -46,7 +46,6 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx)
|
||||
next_session_id_(0),
|
||||
status_(ObTableLoadStatusType::NONE),
|
||||
error_code_(OB_SUCCESS),
|
||||
enable_heart_beat_(false),
|
||||
is_inited_(false)
|
||||
{
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
@ -594,6 +593,31 @@ int ObTableLoadCoordinatorCtx::init_partition_ids(const ObIArray<ObTabletID> &ta
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinatorCtx::init_partition_location_and_store_infos()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
store_infos_.reset();
|
||||
ObTableLoadArray<ObAddr> all_addr_array;
|
||||
if (OB_FAIL(ObTableLoadPartitionLocation::init_partition_location(partition_ids_,
|
||||
target_partition_ids_,
|
||||
partition_location_,
|
||||
target_partition_location_))) {
|
||||
LOG_WARN("fail to init partition location", KR(ret));
|
||||
} else if (OB_FAIL(partition_location_.get_all_leader(all_addr_array))) {
|
||||
LOG_WARN("fail to get all leader", KR(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_addr_array.count(); ++i) {
|
||||
StoreInfo store_info;
|
||||
store_info.addr_ = all_addr_array.at(i);
|
||||
store_info.enable_heart_beat_ = false;
|
||||
if (OB_FAIL(store_infos_.push_back(store_info))) {
|
||||
LOG_WARN("fail to push back store info", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinatorCtx::init_empty_insert_tablet_ctx_manager()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -93,11 +93,6 @@ public:
|
||||
int set_status_error(int error_code);
|
||||
int set_status_abort();
|
||||
int check_status(table::ObTableLoadStatusType status) const;
|
||||
OB_INLINE bool enable_heart_beat() const { return enable_heart_beat_; }
|
||||
OB_INLINE void set_enable_heart_beat(bool enable_heart_beat)
|
||||
{
|
||||
enable_heart_beat_ = enable_heart_beat;
|
||||
}
|
||||
private:
|
||||
int advance_status(table::ObTableLoadStatusType status);
|
||||
public:
|
||||
@ -118,6 +113,7 @@ public:
|
||||
int check_exist_trans(bool &is_exist) const;
|
||||
int check_exist_committed_trans(bool &is_exist) const;
|
||||
int init_complete();
|
||||
int init_partition_location_and_store_infos();
|
||||
private:
|
||||
int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx);
|
||||
int alloc_trans(const table::ObTableLoadSegmentID &segment_id,
|
||||
@ -154,6 +150,15 @@ public:
|
||||
share::AutoincParam autoinc_param_;
|
||||
};
|
||||
SessionContext *session_ctx_array_;
|
||||
struct StoreInfo
|
||||
{
|
||||
StoreInfo() : enable_heart_beat_(false) {}
|
||||
~StoreInfo() {}
|
||||
ObAddr addr_;
|
||||
bool enable_heart_beat_;
|
||||
TO_STRING_KV(K_(addr), K_(enable_heart_beat));
|
||||
};
|
||||
common::ObArray<StoreInfo> store_infos_;
|
||||
private:
|
||||
struct SegmentCtx : public common::LinkHashValue<table::ObTableLoadSegmentID>
|
||||
{
|
||||
@ -186,7 +191,6 @@ private:
|
||||
TransCtxMap trans_ctx_map_;
|
||||
SegmentCtxMap segment_ctx_map_;
|
||||
common::ObArray<ObTableLoadTransCtx *> commited_trans_ctx_array_;
|
||||
bool enable_heart_beat_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
|
@ -69,8 +69,7 @@ void ObTableLoadService::ObHeartBeatTask::runTimerTask()
|
||||
}
|
||||
for (int64_t i = 0; i < table_ctx_array.count(); ++i) {
|
||||
ObTableLoadTableCtx *table_ctx = table_ctx_array.at(i);
|
||||
if (nullptr != table_ctx->coordinator_ctx_ &&
|
||||
table_ctx->coordinator_ctx_->enable_heart_beat()) {
|
||||
if (nullptr != table_ctx->coordinator_ctx_) {
|
||||
ObTableLoadCoordinator coordinator(table_ctx);
|
||||
if (OB_FAIL(coordinator.init())) {
|
||||
LOG_WARN("fail to init coordinator", KR(ret));
|
||||
|
Loading…
x
Reference in New Issue
Block a user