fix ddl rpc timeout bug
This commit is contained in:
@ -1100,11 +1100,19 @@ int ObDDLTask::batch_release_snapshot(
|
||||
ObMySQLTransaction trans;
|
||||
ObRootService *root_service = GCTX.root_service_;
|
||||
SCN snapshot_scn;
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
int64_t timeout = 0;
|
||||
if (OB_ISNULL(root_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_tx(snapshot_version))) {
|
||||
LOG_WARN("failed to convert scn", K(snapshot_scn), K(ret));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_ddl_tx_timeout(tablet_ids.count(), timeout))) {
|
||||
LOG_WARN("get ddl tx timeout failed", K(ret));
|
||||
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(timeout))) {
|
||||
LOG_WARN("set timeout ctx failed", K(ret));
|
||||
} else if (OB_FAIL(timeout_ctx.set_timeout(timeout))) {
|
||||
LOG_WARN("set timeout failed", K(ret));
|
||||
} else if (OB_FAIL(trans.start(&root_service->get_ddl_service().get_sql_proxy(), tenant_id_))) {
|
||||
LOG_WARN("fail to start trans", K(ret));
|
||||
} else if (OB_FAIL(root_service->get_ddl_service().get_snapshot_mgr().batch_release_snapshot_in_trans(
|
||||
@ -1195,9 +1203,11 @@ int ObDDLTask::push_execution_id(const uint64_t tenant_id, const int64_t task_id
|
||||
// The length of [min_dt, max_dt] controls the execution rate of ddl tasks.
|
||||
void ObDDLTask::calc_next_schedule_ts(const int ret_code, const int64_t total_task_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t ddl_rpc_timeout = ObDDLUtil::get_default_ddl_rpc_timeout();
|
||||
if (OB_TIMEOUT == ret_code) {
|
||||
const int64_t SEC = 1000000;
|
||||
const int64_t max_delay = total_task_cnt * ObDDLUtil::get_ddl_rpc_timeout() * 10;
|
||||
const int64_t max_delay = total_task_cnt * ddl_rpc_timeout * 10;
|
||||
delay_schedule_time_ = std::min(delay_schedule_time_ * 6/5 + SEC/10, max_delay);
|
||||
const int64_t max_dt = delay_schedule_time_;
|
||||
const int64_t min_dt = max_dt / 2;
|
||||
@ -1536,7 +1546,7 @@ int group_tablets_leader_addr(const uint64_t tenant_id, const ObIArray<ObTabletI
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(tablet_ids.count()));
|
||||
} else {
|
||||
const int64_t rpc_timeout = ObDDLUtil::get_ddl_rpc_timeout();
|
||||
int64_t rpc_timeout = ObDDLUtil::get_default_ddl_rpc_timeout();
|
||||
if (OB_FAIL(group_items.reserve(tablet_ids.count()))) {
|
||||
LOG_WARN("reserve send array failed", K(ret), K(tablet_ids.count()));
|
||||
}
|
||||
@ -1569,6 +1579,7 @@ int check_trans_end(const ObArray<SendItem> &send_array,
|
||||
transaction::ObTransID &pending_tx_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rpc_timeout = 0;
|
||||
ret_array.reuse();
|
||||
snapshot_array.reuse();
|
||||
hash::ObHashMap<obrpc::ObLSTabletPair, obrpc::ObCheckTransElapsedResult> result_map;
|
||||
@ -1581,10 +1592,11 @@ int check_trans_end(const ObArray<SendItem> &send_array,
|
||||
LOG_WARN("copy send array failed", K(ret), K(send_array.count()));
|
||||
} else if (OB_FAIL(result_map.create(send_array.count(), "check_trans_map"))) {
|
||||
LOG_WARN("create return code map failed", K(ret));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_ddl_rpc_timeout(arg.tablets_.count(), rpc_timeout))) {
|
||||
LOG_WARN("get_ddl_rpc_timeout_failed", K(ret));
|
||||
} else {
|
||||
// group by leader addr and send batch rpc
|
||||
std::sort(tmp_send_array.begin(), tmp_send_array.end());
|
||||
const int64_t rpc_timeout = ObDDLUtil::get_ddl_rpc_timeout();
|
||||
ObAddr last_addr;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_send_array.count(); ++i) {
|
||||
const SendItem &send_item = tmp_send_array.at(i);
|
||||
@ -1744,6 +1756,7 @@ int ObDDLWaitTransEndCtx::check_sstable_trans_end(
|
||||
int ObDDLWaitTransEndCtx::try_wait(bool &is_trans_end, int64_t &snapshot_version, const bool need_wait_trans_end)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t tablet_count = 0;
|
||||
is_trans_end = false;
|
||||
snapshot_version = 0;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -1792,6 +1805,7 @@ int ObDDLWaitTransEndCtx::try_wait(bool &is_trans_end, int64_t &snapshot_version
|
||||
K(check_count), K(ret_codes.count()), K(tmp_snapshots.count()));
|
||||
} else {
|
||||
int64_t succ_count = 0;
|
||||
tablet_count = check_count;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < check_count; ++i) {
|
||||
if (OB_SUCCESS == ret_codes.at(i) && tmp_snapshots.at(i) > 0) {
|
||||
snapshot_array_.at(tablet_pos_indexes.at(i)) = tmp_snapshots.at(i);
|
||||
@ -1832,7 +1846,7 @@ int ObDDLWaitTransEndCtx::get_snapshot(int64_t &snapshot_version)
|
||||
ObRootService *root_service = nullptr;
|
||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
||||
ObSimpleFrozenStatus frozen_status;
|
||||
const int64_t timeout = ObDDLUtil::get_ddl_rpc_timeout();
|
||||
const int64_t timeout_us = ObDDLUtil::get_default_ddl_rpc_timeout();
|
||||
SCN curr_ts;
|
||||
bool is_external_consistent = false;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -1851,10 +1865,10 @@ int ObDDLWaitTransEndCtx::get_snapshot(int64_t &snapshot_version)
|
||||
// for performance, everywhere calls get_ts_sync should ensure using correct tenant ctx
|
||||
tenant_guard.switch_to(tenant_id_);
|
||||
if (OB_FAIL(OB_TS_MGR.get_ts_sync(tenant_id_,
|
||||
timeout,
|
||||
timeout_us,
|
||||
curr_ts,
|
||||
is_external_consistent))) {
|
||||
LOG_WARN("fail to get gts sync", K(ret), K(tenant_id_), K(timeout), K(curr_ts), K(is_external_consistent));
|
||||
LOG_WARN("fail to get gts sync", K(ret), K(tenant_id_), K(timeout_us), K(curr_ts), K(is_external_consistent));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -2126,8 +2140,11 @@ int send_batch_calc_rpc(obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
int64_t &send_succ_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t rpc_timeout = ObDDLUtil::get_ddl_rpc_timeout();
|
||||
if (OB_FAIL(rpc_proxy.to(leader_addr)
|
||||
int64_t rpc_timeout = 0;
|
||||
const int64_t tablet_count = arg.calc_items_.count();
|
||||
if (OB_FAIL(ObDDLUtil::get_ddl_rpc_timeout(tablet_count, rpc_timeout))) {
|
||||
LOG_WARN("get ddl rpc timeout failed", K(ret));
|
||||
} else if (OB_FAIL(rpc_proxy.to(leader_addr)
|
||||
.by(arg.tenant_id_)
|
||||
.timeout(rpc_timeout)
|
||||
.calc_column_checksum_request(arg, res))) {
|
||||
@ -2174,7 +2191,7 @@ int ObDDLWaitColumnChecksumCtx::send_calc_rpc(int64_t &send_succ_count)
|
||||
LOG_WARN("root service or location_cache is null", K(ret), KP(root_service), KP(location_service));
|
||||
} else {
|
||||
ObLSID ls_id;
|
||||
const int64_t rpc_timeout = ObDDLUtil::get_ddl_rpc_timeout();
|
||||
int64_t rpc_timeout = ObDDLUtil::get_default_ddl_rpc_timeout();
|
||||
ObArray<SendItem> send_array;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < stat_array_.count(); ++i) {
|
||||
PartitionColChecksumStat &item = stat_array_.at(i);
|
||||
|
||||
Reference in New Issue
Block a user