22x Fix remote update pg backup task timeout bug
This commit is contained in:

committed by
LINxiansheng

parent
f700a428f4
commit
ae446e60cf
@ -47,9 +47,11 @@ int ObPGBackupTaskUpdater::update_pg_backup_task_status(
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret) && report_idx < pkeys.count()) {
|
while (OB_SUCC(ret) && report_idx < pkeys.count()) {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
|
ObTimeoutCtx timeout_ctx;
|
||||||
const int64_t remain_cnt = pkeys.count() - report_idx;
|
const int64_t remain_cnt = pkeys.count() - report_idx;
|
||||||
int64_t cur_batch_cnt = remain_cnt < MAX_BATCH_COUNT ? remain_cnt : MAX_BATCH_COUNT;
|
int64_t cur_batch_cnt = remain_cnt < MAX_BATCH_COUNT ? remain_cnt : MAX_BATCH_COUNT;
|
||||||
if (OB_FAIL(trans.start(sql_proxy_))) {
|
|
||||||
|
if (OB_FAIL(start_trans_(timeout_ctx, trans))) {
|
||||||
LOG_WARN("failed to start trans", K(ret));
|
LOG_WARN("failed to start trans", K(ret));
|
||||||
} else {
|
} else {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < cur_batch_cnt; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < cur_batch_cnt; ++i) {
|
||||||
@ -93,9 +95,11 @@ int ObPGBackupTaskUpdater::update_pg_task_info(const common::ObIArray<common::Ob
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret) && report_idx < pkeys.count()) {
|
while (OB_SUCC(ret) && report_idx < pkeys.count()) {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
|
ObTimeoutCtx timeout_ctx;
|
||||||
const int64_t remain_cnt = pkeys.count() - report_idx;
|
const int64_t remain_cnt = pkeys.count() - report_idx;
|
||||||
int64_t cur_batch_cnt = remain_cnt < MAX_BATCH_COUNT ? remain_cnt : MAX_BATCH_COUNT;
|
int64_t cur_batch_cnt = remain_cnt < MAX_BATCH_COUNT ? remain_cnt : MAX_BATCH_COUNT;
|
||||||
if (OB_FAIL(trans.start(sql_proxy_))) {
|
|
||||||
|
if (OB_FAIL(start_trans_(timeout_ctx, trans))) {
|
||||||
LOG_WARN("failed to start trans", K(ret));
|
LOG_WARN("failed to start trans", K(ret));
|
||||||
} else {
|
} else {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < cur_batch_cnt; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < cur_batch_cnt; ++i) {
|
||||||
@ -397,9 +401,11 @@ int ObPGBackupTaskUpdater::update_status_and_result(const common::ObIArray<commo
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret) && report_idx < pkeys.count()) {
|
while (OB_SUCC(ret) && report_idx < pkeys.count()) {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
|
ObTimeoutCtx timeout_ctx;
|
||||||
const int64_t remain_cnt = pkeys.count() - report_idx;
|
const int64_t remain_cnt = pkeys.count() - report_idx;
|
||||||
int64_t cur_batch_cnt = remain_cnt < MAX_BATCH_COUNT ? remain_cnt : MAX_BATCH_COUNT;
|
int64_t cur_batch_cnt = remain_cnt < MAX_BATCH_COUNT ? remain_cnt : MAX_BATCH_COUNT;
|
||||||
if (OB_FAIL(trans.start(sql_proxy_))) {
|
|
||||||
|
if (OB_FAIL(start_trans_(timeout_ctx, trans))) {
|
||||||
LOG_WARN("failed to start trans", K(ret));
|
LOG_WARN("failed to start trans", K(ret));
|
||||||
} else {
|
} else {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < cur_batch_cnt; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < cur_batch_cnt; ++i) {
|
||||||
@ -446,3 +452,21 @@ int ObPGBackupTaskUpdater::cancel_pending_tasks(
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObPGBackupTaskUpdater::start_trans_(ObTimeoutCtx &timeout_ctx, ObMySQLTransaction &trans)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t MAX_EXECUTE_TIMEOUT_US = 30L * 1000 * 1000; // 30s
|
||||||
|
int64_t stmt_timeout = MAX_EXECUTE_TIMEOUT_US;
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("pg backup task updater do not init", K(ret));
|
||||||
|
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(stmt_timeout))) {
|
||||||
|
LOG_WARN("fail to set trx timeout", K(ret), K(stmt_timeout));
|
||||||
|
} else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) {
|
||||||
|
LOG_WARN("set timeout context failed", K(ret));
|
||||||
|
} else if (OB_FAIL(trans.start(sql_proxy_))) {
|
||||||
|
LOG_WARN("failed to start trans", K(ret));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
@ -58,7 +58,10 @@ public:
|
|||||||
int cancel_pending_tasks(const uint64_t tenant_id, const int64_t incarnation, const int64_t backup_set_id);
|
int cancel_pending_tasks(const uint64_t tenant_id, const int64_t incarnation, const int64_t backup_set_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t MAX_BATCH_COUNT = 1024;
|
int start_trans_(ObTimeoutCtx &timeout_ctx, ObMySQLTransaction &trans);
|
||||||
|
|
||||||
|
private:
|
||||||
|
static const int64_t MAX_BATCH_COUNT = 128;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
common::ObISQLClient* sql_proxy_;
|
common::ObISQLClient* sql_proxy_;
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObPGBackupTaskUpdater);
|
DISALLOW_COPY_AND_ASSIGN(ObPGBackupTaskUpdater);
|
||||||
|
Reference in New Issue
Block a user