Add transfer event to __all_server_event_history, record transfer task info.
This commit is contained in:
@ -539,6 +539,7 @@ class ObString;
|
||||
ACT(BEFORE_CHECK_SHRINK_RESOURCE_POOL,)\
|
||||
ACT(STOP_RECOVERY_LS_THREAD0,)\
|
||||
ACT(STOP_RECOVERY_LS_THREAD1,)\
|
||||
ACT(STOP_TRANSFER_LS_LOGICAL_TABLE_REPLACED,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "storage/high_availability/ob_transfer_service.h"
|
||||
#include "storage/high_availability/ob_transfer_lock_utils.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "observer/ob_server_event_history_table_operator.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase::share;
|
||||
@ -78,14 +79,14 @@ int ObTxFinishTransfer::init(const ObTransferTaskID &task_id, const uint64_t ten
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxFinishTransfer::process()
|
||||
int ObTxFinishTransfer::process(int64_t &round)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("tx finish transfer do not init", K(ret));
|
||||
} else if (OB_FAIL(do_tx_transfer_doing_(task_id_, tenant_id_, src_ls_id_, dest_ls_id_))) {
|
||||
LOG_WARN("failed to do tx transfer doing", K(ret), K_(task_id), K_(tenant_id), K_(src_ls_id), K_(dest_ls_id));
|
||||
} else if (OB_FAIL(do_tx_transfer_doing_(task_id_, tenant_id_, src_ls_id_, dest_ls_id_, round))) {
|
||||
LOG_WARN("failed to do tx transfer doing", K(ret), K_(task_id), K_(tenant_id), K_(src_ls_id), K_(dest_ls_id), K(round));
|
||||
} else {
|
||||
LOG_INFO("process tx finish transfer", K_(task_id), K_(tenant_id), K_(src_ls_id), K_(dest_ls_id));
|
||||
}
|
||||
@ -93,7 +94,7 @@ int ObTxFinishTransfer::process()
|
||||
}
|
||||
|
||||
int ObTxFinishTransfer::do_tx_transfer_doing_(const ObTransferTaskID &task_id, const uint64_t tenant_id,
|
||||
const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id)
|
||||
const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, int64_t &round)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -119,6 +120,7 @@ int ObTxFinishTransfer::do_tx_transfer_doing_(const ObTransferTaskID &task_id, c
|
||||
ObDisplayTabletList table_lock_tablet_list;
|
||||
transaction::tablelock::ObTableLockOwnerID lock_owner_id;
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
const int64_t tmp_round = round;
|
||||
if (!task_id.is_valid() || OB_INVALID_ID == tenant_id || !src_ls_id.is_valid() || !dest_ls_id.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", K(ret), K(task_id), K(tenant_id), K(src_ls_id), K(dest_ls_id));
|
||||
@ -302,6 +304,8 @@ int ObTxFinishTransfer::do_tx_transfer_doing_(const ObTransferTaskID &task_id, c
|
||||
if (OB_SUCCESS == ret) {
|
||||
ret = tmp_ret;
|
||||
}
|
||||
} else if (is_commit) {
|
||||
round = 0;
|
||||
}
|
||||
// 11. After the dest_ls leader succeeds,
|
||||
// it will report the corresponding results to RS.
|
||||
@ -312,6 +316,9 @@ int ObTxFinishTransfer::do_tx_transfer_doing_(const ObTransferTaskID &task_id, c
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_TMP_FAIL(record_server_event_(ret, is_ready, tmp_round))) {
|
||||
LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(is_ready));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1092,5 +1099,58 @@ int ObTxFinishTransfer::select_transfer_task_for_update_(const ObTransferTaskID
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxFinishTransfer::record_server_event_(
|
||||
const int32_t result,
|
||||
const bool is_ready,
|
||||
const int64_t round) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString extra_info_str;
|
||||
const share::ObTransferStatus doing_status(ObTransferStatus::DOING);
|
||||
const share::ObTransferStatus finish_status(ObTransferStatus::COMPLETED);
|
||||
if (OB_SUCCESS == result) {
|
||||
if (is_ready) {
|
||||
if (OB_FAIL(extra_info_str.append_fmt("msg:\"transfer doing success\";"))) {
|
||||
LOG_WARN("fail to printf wait info", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(extra_info_str.append_fmt("msg:\"wait for ls logical table replaced\";"))) {
|
||||
LOG_WARN("fail to printf wait info", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(extra_info_str.append_fmt("round:%ld;", round))) {
|
||||
LOG_WARN("fail to printf retry time", K(ret));
|
||||
} else {
|
||||
if (OB_SUCCESS == result && is_ready) {
|
||||
if (OB_FAIL(write_server_event_(result, extra_info_str, finish_status))) {
|
||||
LOG_WARN("fail to write server event", K(ret), K(result), K(extra_info_str));
|
||||
}
|
||||
} else {
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
if (OB_FAIL(write_server_event_(result, extra_info_str, doing_status))) {
|
||||
LOG_WARN("fail to write server event", K(ret), K(result), K(extra_info_str));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxFinishTransfer::write_server_event_(const int32_t result, const ObSqlString &extra_info, const share::ObTransferStatus &status) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SERVER_EVENT_ADD("storage_ha", "transfer",
|
||||
"tenant_id", tenant_id_,
|
||||
"trace_id", *ObCurTraceId::get_trace_id(),
|
||||
"src_ls", src_ls_id_.id(),
|
||||
"dest_ls", dest_ls_id_.id(),
|
||||
"status", status.str(),
|
||||
"result", result,
|
||||
extra_info.ptr());
|
||||
return ret;
|
||||
}
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
@ -35,11 +35,11 @@ public:
|
||||
virtual ~ObTxFinishTransfer();
|
||||
int init(const share::ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &src_ls_id,
|
||||
const share::ObLSID &dest_ls_id, common::ObMySQLProxy &sql_proxy);
|
||||
int process();
|
||||
int process(int64_t &round);
|
||||
|
||||
private:
|
||||
int do_tx_transfer_doing_(const share::ObTransferTaskID &task_id, const uint64_t tenant_id,
|
||||
const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id);
|
||||
const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, int64_t &round);
|
||||
|
||||
// unlock both src and dest ls member list
|
||||
// @param[in]: tenant_id
|
||||
@ -277,6 +277,15 @@ private:
|
||||
// @param[in]: trans
|
||||
int select_transfer_task_for_update_(const share::ObTransferTaskID &task_id, ObMySQLTransaction &trans);
|
||||
|
||||
int record_server_event_(
|
||||
const int32_t result,
|
||||
const bool is_ready,
|
||||
const int64_t round) const;
|
||||
int write_server_event_(
|
||||
const int32_t result,
|
||||
const ObSqlString &extra_info,
|
||||
const share::ObTransferStatus &status) const;
|
||||
|
||||
private:
|
||||
static const int64_t DEFAULT_WAIT_INTERVAL_US = 10 * 1000; // 10ms
|
||||
|
||||
|
@ -638,6 +638,7 @@ int ObTabletBackfillTXTask::get_backfill_tx_minor_sstables_(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableStoreIterator minor_table_iter;
|
||||
DEBUG_SYNC(STOP_TRANSFER_LS_LOGICAL_TABLE_REPLACED);
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("tablet backfill tx task do not init", K(ret));
|
||||
|
@ -57,7 +57,8 @@ ObTransferHandler::ObTransferHandler()
|
||||
storage_rpc_(nullptr),
|
||||
sql_proxy_(nullptr),
|
||||
retry_count_(0),
|
||||
transfer_worker_mgr_()
|
||||
transfer_worker_mgr_(),
|
||||
round_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -262,6 +263,7 @@ int ObTransferHandler::do_leader_transfer_()
|
||||
LOG_WARN("failed to get transfer task", K(ret), KPC(ls_));
|
||||
}
|
||||
} else {
|
||||
round_++;
|
||||
ObCurTraceId::set(task_info.trace_id_);
|
||||
|
||||
switch (task_info.status_) {
|
||||
@ -405,6 +407,9 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
|
||||
LOG_WARN("failed to report to meta table", K(ret), K(task_info));
|
||||
}
|
||||
}
|
||||
if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, round_, task_info))) {
|
||||
LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info));
|
||||
}
|
||||
wakeup_();
|
||||
LOG_INFO("[TRANSFER] finish do with start status", K(ret), K(task_info), "cost_ts", ObTimeUtil::current_time() - start_ts);
|
||||
return ret;
|
||||
@ -1501,6 +1506,7 @@ int ObTransferHandler::report_to_meta_table_(
|
||||
int ObTransferHandler::do_with_doing_status_(const share::ObTransferTaskInfo &task_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTxFinishTransfer finish_transfer;
|
||||
const ObTransferTaskID task_id = task_info.task_id_;
|
||||
const uint64_t tenant_id = task_info.tenant_id_;
|
||||
@ -1513,7 +1519,7 @@ int ObTransferHandler::do_with_doing_status_(const share::ObTransferTaskInfo &ta
|
||||
} else if (OB_FAIL(finish_transfer.init(
|
||||
task_id, tenant_id, src_ls_id, dest_ls_id, *sql_proxy_))) {
|
||||
LOG_INFO("[TRANSFER] do with doing status", K(task_info));
|
||||
} else if (OB_FAIL(finish_transfer.process())) {
|
||||
} else if (OB_FAIL(finish_transfer.process(round_))) {
|
||||
LOG_WARN("failed to process", K(ret));
|
||||
}
|
||||
|
||||
@ -1535,7 +1541,7 @@ int ObTransferHandler::do_with_aborted_status_(
|
||||
const share::ObTransferStatus next_status(ObTransferStatus::FAILED);
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
ObMySQLTransaction trans;
|
||||
|
||||
const int64_t tmp_round = round_;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("transfer handler do not init", K(ret));
|
||||
@ -1563,6 +1569,8 @@ int ObTransferHandler::do_with_aborted_status_(
|
||||
if (OB_SUCCESS == ret) {
|
||||
ret = tmp_ret;
|
||||
}
|
||||
} else if (OB_SUCCESS == ret) {
|
||||
round_ = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1570,8 +1578,15 @@ int ObTransferHandler::do_with_aborted_status_(
|
||||
if (OB_FAIL(ret)) {
|
||||
if (can_retry_(task_info, ret)) {
|
||||
LOG_INFO("transfer task can retry", K(ret), K(task_info));
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, tmp_round, task_info))) {
|
||||
LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info));
|
||||
}
|
||||
}
|
||||
ob_usleep(INTERVAL_US);
|
||||
}
|
||||
} else if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, tmp_round, task_info))) {
|
||||
LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1695,6 +1710,28 @@ int ObTransferHandler::unblock_tx_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferHandler::record_server_event_(const int32_t result, const int64_t round, const share::ObTransferTaskInfo &task_info) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString extra_info_str;
|
||||
if (!task_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("task info invalid", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(extra_info_str.append_fmt("round:%ld;", round))) {
|
||||
LOG_WARN("fail to printf round", K(ret), K(round));
|
||||
} else {
|
||||
SERVER_EVENT_ADD("storage_ha", "transfer",
|
||||
"tenant_id", task_info.tenant_id_,
|
||||
"trace_id", task_info.trace_id_,
|
||||
"src_ls", task_info.src_ls_id_.id(),
|
||||
"dest_ls", task_info.dest_ls_id_.id(),
|
||||
"status", task_info.status_.str(),
|
||||
"result", result,
|
||||
extra_info_str.ptr());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,6 +201,7 @@ private:
|
||||
int get_gts_(
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id);
|
||||
int record_server_event_(const int32_t ret, const int64_t round, const share::ObTransferTaskInfo &task_info) const;
|
||||
private:
|
||||
static const int64_t INTERVAL_US = 1 * 1000 * 1000; //1s
|
||||
static const int64_t KILL_TX_MAX_RETRY_TIMES = 3;
|
||||
@ -214,6 +215,7 @@ private:
|
||||
|
||||
int64_t retry_count_;
|
||||
ObTransferWorkerMgr transfer_worker_mgr_;
|
||||
int64_t round_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTransferHandler);
|
||||
};
|
||||
}
|
||||
|
Reference in New Issue
Block a user