refactor DAS task retry

This commit is contained in:
leslieyuchen
2023-06-16 08:48:27 +00:00
committed by ob-robot
parent f0e14d228f
commit 9df89f3baf
11 changed files with 319 additions and 147 deletions

View File

@ -32,7 +32,7 @@ using namespace oceanbase::transaction;
namespace observer
{
common::hash::ObHashMap<int, std::pair<ObQueryRetryCtrl::retry_func, ObQueryRetryCtrl::retry_func>, common::hash::NoPthreadDefendMode> ObQueryRetryCtrl::map_;
common::hash::ObHashMap<int, ObQueryRetryCtrl::RetryFuncs, common::hash::NoPthreadDefendMode> ObQueryRetryCtrl::map_;
void ObRetryPolicy::try_packet_retry(ObRetryParam &v) const
{
@ -896,10 +896,11 @@ int ObQueryRetryCtrl::init()
// r: error code
// func: processor for obmp* query
// inner_func: processor for inner connection query
// das_func: processor for DAS task retry
#ifndef ERR_RETRY_FUNC
#define ERR_RETRY_FUNC(tag, r, func, inner_func) \
#define ERR_RETRY_FUNC(tag, r, func, inner_func, das_func) \
if (OB_SUCC(ret)) { \
if (OB_SUCCESS != (ret = map_.set_refactored(r, std::make_pair(func, inner_func)))) { \
if (OB_SUCCESS != (ret = map_.set_refactored(r, RetryFuncs(func, inner_func, das_func)))) { \
LOG_ERROR("Duplicated error code registered", "code", #r, KR(ret)); \
} \
}
@ -907,92 +908,92 @@ int ObQueryRetryCtrl::init()
// register your error code retry handler here, no order required
/* schema */
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_ERROR, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_TENANT_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_TENANT_NOT_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_DATABASE, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_DATABASE_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_NOT_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_TABLE_NOT_EXIST, schema_error_proc, inner_common_schema_error_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_TABLE_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_FIELD_ERROR, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_COLUMN_DUPLICATE, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_NOT_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_PRIVILEGE, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_DB_PRIVILEGE, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_TABLE_PRIVILEGE, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH, schema_error_proc, inner_schema_error_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_REMOTE_SCHEMA_NOT_FULL, schema_error_proc, inner_schema_error_proc);
ERR_RETRY_FUNC("SCHEMA", OB_PARTITION_IS_BLOCKED, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_ALREADY_EXISTS, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_DOES_NOT_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_NOT_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_EXIST, schema_error_proc, empty_proc);
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_EAGAIN, schema_error_proc, inner_schema_error_proc);
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_NOT_UPTODATE, schema_error_proc, inner_schema_error_proc);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_PARALLEL_DDL_CONFLICT, schema_error_proc, inner_schema_error_proc);
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_ERROR, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_TENANT_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_TENANT_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_DATABASE, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_DATABASE_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_TABLE_NOT_EXIST, schema_error_proc, inner_common_schema_error_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_TABLE_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_FIELD_ERROR, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_COLUMN_DUPLICATE, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_PRIVILEGE, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_DB_PRIVILEGE, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_TABLE_PRIVILEGE, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH, schema_error_proc, inner_schema_error_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_REMOTE_SCHEMA_NOT_FULL, schema_error_proc, inner_schema_error_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_ALREADY_EXISTS, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_DOES_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_NOT_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_EXIST, schema_error_proc, empty_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_EAGAIN, schema_error_proc, inner_schema_error_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_NOT_UPTODATE, schema_error_proc, inner_schema_error_proc, nullptr);
ERR_RETRY_FUNC("SCHEMA", OB_ERR_PARALLEL_DDL_CONFLICT, schema_error_proc, inner_schema_error_proc, nullptr);
/* location */
ERR_RETRY_FUNC("LOCATION", OB_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc);
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc);
ERR_RETRY_FUNC("LOCATION", OB_NO_READABLE_REPLICA, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc);
ERR_RETRY_FUNC("LOCATION", OB_NOT_MASTER, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_RS_NOT_MASTER, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_RS_SHUTDOWN, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_NOT_EXIST, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_STOPPED, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_INIT, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_STOPPING, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_TENANT_NOT_IN_SERVER, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_TRANS_RPC_TIMEOUT, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_USE_DUP_FOLLOW_AFTER_DML, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_TRANS_STMT_NEED_RETRY, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_LS_NOT_EXIST, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_NO_READABLE_REPLICA, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_NOT_MASTER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_RS_NOT_MASTER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_RS_SHUTDOWN, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_STOPPED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_INIT, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_STOPPING, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_TENANT_NOT_IN_SERVER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_TRANS_RPC_TIMEOUT, location_error_proc, inner_location_error_proc, nullptr);
ERR_RETRY_FUNC("LOCATION", OB_USE_DUP_FOLLOW_AFTER_DML, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_TRANS_STMT_NEED_RETRY, location_error_proc, inner_location_error_proc, nullptr);
ERR_RETRY_FUNC("LOCATION", OB_LS_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
// OB_TABLET_NOT_EXIST may be caused by old version schema or incorrect location.
// Just use location_error_proc to retry sql and a new schema guard will be obtained during the retry process.
ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_nothing_readable_proc);
ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc);
ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
/* network */
ERR_RETRY_FUNC("NETWORK", OB_RPC_CONNECT_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc);
ERR_RETRY_FUNC("NETWORK", OB_RPC_SEND_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc);
ERR_RETRY_FUNC("NETWORK", OB_RPC_POST_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc);
ERR_RETRY_FUNC("NETWORK", OB_RPC_CONNECT_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc);
ERR_RETRY_FUNC("NETWORK", OB_RPC_SEND_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc);
ERR_RETRY_FUNC("NETWORK", OB_RPC_POST_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc);
/* storage */
ERR_RETRY_FUNC("STORAGE", OB_SNAPSHOT_DISCARDED, snapshot_discard_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("STORAGE", OB_DATA_NOT_UPTODATE, long_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("STORAGE", OB_REPLICA_NOT_READABLE, long_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("STORAGE", OB_PARTITION_IS_SPLITTING, short_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("STORAGE", OB_DISK_HUNG, nonblock_location_error_proc, empty_proc);
ERR_RETRY_FUNC("STORAGE", OB_SNAPSHOT_DISCARDED, snapshot_discard_proc, short_wait_retry_proc, nullptr);
ERR_RETRY_FUNC("STORAGE", OB_DATA_NOT_UPTODATE, long_wait_retry_proc, short_wait_retry_proc, nullptr);
ERR_RETRY_FUNC("STORAGE", OB_REPLICA_NOT_READABLE, long_wait_retry_proc, short_wait_retry_proc, ObDASRetryCtrl::tablet_nothing_readable_proc);
ERR_RETRY_FUNC("STORAGE", OB_PARTITION_IS_SPLITTING, short_wait_retry_proc, short_wait_retry_proc, nullptr);
ERR_RETRY_FUNC("STORAGE", OB_DISK_HUNG, nonblock_location_error_proc, empty_proc, nullptr);
/* trx */
ERR_RETRY_FUNC("TRX", OB_TRY_LOCK_ROW_CONFLICT, try_lock_row_conflict_proc, inner_try_lock_row_conflict_proc);
ERR_RETRY_FUNC("TRX", OB_TRANSACTION_SET_VIOLATION, trx_set_violation_proc, trx_set_violation_proc);
ERR_RETRY_FUNC("TRX", OB_TRANS_CANNOT_SERIALIZE, trx_can_not_serialize_proc, trx_can_not_serialize_proc);
ERR_RETRY_FUNC("TRX", OB_GTS_NOT_READY, short_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("TRX", OB_GTI_NOT_READY, short_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("TRX", OB_TRANS_WEAK_READ_VERSION_NOT_READY, short_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("TRX", OB_TRY_LOCK_ROW_CONFLICT, try_lock_row_conflict_proc, inner_try_lock_row_conflict_proc, nullptr);
ERR_RETRY_FUNC("TRX", OB_TRANSACTION_SET_VIOLATION, trx_set_violation_proc, trx_set_violation_proc, nullptr);
ERR_RETRY_FUNC("TRX", OB_TRANS_CANNOT_SERIALIZE, trx_can_not_serialize_proc, trx_can_not_serialize_proc, nullptr);
ERR_RETRY_FUNC("TRX", OB_GTS_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr);
ERR_RETRY_FUNC("TRX", OB_GTI_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr);
ERR_RETRY_FUNC("TRX", OB_TRANS_WEAK_READ_VERSION_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr);
/* sql */
ERR_RETRY_FUNC("SQL", OB_ERR_INSUFFICIENT_PX_WORKER, px_thread_not_enough_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("SQL", OB_ERR_INSUFFICIENT_PX_WORKER, px_thread_not_enough_proc, short_wait_retry_proc, nullptr);
// create a new interval part when inserting a row which has no matched part,
// wait and retry, will see new part
ERR_RETRY_FUNC("SQL", OB_NO_PARTITION_FOR_INTERVAL_PART, short_wait_retry_proc, short_wait_retry_proc);
ERR_RETRY_FUNC("SQL", OB_SQL_RETRY_SPM, force_local_retry_proc, force_local_retry_proc);
ERR_RETRY_FUNC("SQL", OB_NEED_SWITCH_CONSUMER_GROUP, switch_consumer_group_retry_proc, empty_proc);
ERR_RETRY_FUNC("SQL", OB_NO_PARTITION_FOR_INTERVAL_PART, short_wait_retry_proc, short_wait_retry_proc, nullptr);
ERR_RETRY_FUNC("SQL", OB_SQL_RETRY_SPM, force_local_retry_proc, force_local_retry_proc, nullptr);
ERR_RETRY_FUNC("SQL", OB_NEED_SWITCH_CONSUMER_GROUP, switch_consumer_group_retry_proc, empty_proc, nullptr);
/* timeout */
ERR_RETRY_FUNC("SQL", OB_TIMEOUT, timeout_proc, timeout_proc);
ERR_RETRY_FUNC("SQL", OB_TRANS_TIMEOUT, timeout_proc, timeout_proc);
ERR_RETRY_FUNC("SQL", OB_TRANS_STMT_TIMEOUT, timeout_proc, timeout_proc);
ERR_RETRY_FUNC("SQL", OB_TIMEOUT, timeout_proc, timeout_proc, nullptr);
ERR_RETRY_FUNC("SQL", OB_TRANS_TIMEOUT, timeout_proc, timeout_proc, nullptr);
ERR_RETRY_FUNC("SQL", OB_TRANS_STMT_TIMEOUT, timeout_proc, timeout_proc, nullptr);
/* ddl */
@ -1023,17 +1024,32 @@ ObQueryRetryCtrl::~ObQueryRetryCtrl()
{
}
int ObQueryRetryCtrl::get_das_retry_func(int err, ObDASRetryCtrl::retry_func &retry_func)
{
int ret = OB_SUCCESS;
retry_func = nullptr;
RetryFuncs funcs;
if (OB_FAIL(map_.get_refactored(err, funcs))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
}
} else {
retry_func = funcs.element<2>();
}
return ret;
}
int ObQueryRetryCtrl::get_func(int err, bool is_inner_sql, retry_func &func)
{
int ret = OB_SUCCESS;
std::pair<retry_func, retry_func> funcs;
RetryFuncs funcs;
if (OB_FAIL(map_.get_refactored(err, funcs))) {
if (OB_HASH_NOT_EXIST == ret) {
func = empty_proc;
ret = OB_SUCCESS;
}
} else {
func = is_inner_sql ? funcs.second : funcs.first;
func = is_inner_sql ? funcs.element<1>() : funcs.element<0>();
}
return ret;
}

View File

@ -17,6 +17,8 @@
#include "lib/time/ob_time_utility.h"
#include "sql/ob_sql_context.h"
#include "sql/session/ob_basic_session_info.h"
#include "lib/container/ob_tuple.h"
#include "sql/das/ob_das_retry_ctrl.h"
namespace oceanbase
{
namespace sql
@ -259,6 +261,7 @@ public:
return (isolation == transaction::ObTxIsolationLevel::RR
|| isolation == transaction::ObTxIsolationLevel::SERIAL);
}
static int get_das_retry_func(int err, sql::ObDASRetryCtrl::retry_func &retry_func);
public:
// schema类型的错误最多在本线程重试5次。
// 5是拍脑袋决定的,之后还要看统计数据的反馈再修改。TODO qianfu.zpf
@ -314,7 +317,8 @@ private:
/* variables */
// map_ is used to fast lookup the error code retry processor
static common::hash::ObHashMap<int, std::pair<retry_func, retry_func>, common::hash::NoPthreadDefendMode> map_;
typedef common::ObTuple<retry_func, retry_func, sql::ObDASRetryCtrl::retry_func> RetryFuncs;
static common::hash::ObHashMap<int, RetryFuncs, common::hash::NoPthreadDefendMode> map_;
int64_t curr_query_tenant_local_schema_version_; // Query开始、Loc刷新前的普通租户shm ver
int64_t curr_query_tenant_global_schema_version_; // Query开始时的普通租户schema version
int64_t curr_query_sys_local_schema_version_; // Query开始、Loc刷新前的系统租户shm ver

View File

@ -1219,6 +1219,28 @@ int ObSimpleTableSchemaV2::get_tablet_id_by_object_id(
return ret;
}
int ObSimpleTableSchemaV2::check_if_tablet_exists(const ObTabletID &tablet_id, bool &exists) const
{
int ret = OB_SUCCESS;
const ObCheckPartitionMode mode = CHECK_PARTITION_MODE_NORMAL;
ObPartitionSchemaIter iter(*this, mode);
ObPartitionSchemaIter::Info info;
exists = false;
while (OB_SUCC(ret) && !exists) {
if (OB_FAIL(iter.next_partition_info(info))) {
if (OB_ITER_END != ret) {
LOG_WARN("iter partition failed", KR(ret));
}
} else if (info.tablet_id_ == tablet_id) {
exists = true;
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
return ret;
}
ObTableSchema::ObTableSchema()
: ObSimpleTableSchemaV2()
{

View File

@ -663,6 +663,7 @@ public:
int check_is_all_server_readonly_replica(
share::schema::ObSchemaGetterGuard &guard,
bool &is) const;
int check_if_tablet_exists(const common::ObTabletID &tablet_id, bool &exists) const;
int add_simple_foreign_key_info(const uint64_t tenant_id,
const uint64_t database_id,

View File

@ -63,6 +63,7 @@ ob_set_subtarget(ob_sql das
das/ob_das_id_cache.cpp
das/ob_das_task_result.cpp
das/ob_das_spatial_index_lookup_op.cpp
das/ob_das_retry_ctrl.cpp
)
ob_set_subtarget(ob_sql dtl

View File

@ -808,6 +808,8 @@ int ObDASLocationRouter::nonblock_get_readable_replica(const uint64_t tenant_id,
} else if (OB_FAIL(remote_replicas.push_back(&tmp_replica_loc))) {
LOG_WARN("store tmp replica failed", K(ret));
}
} else {
LOG_INFO("this replica is in the blacklist, thus filtered it", K(bl_key));
}
}
if (OB_SUCC(ret)) {

View File

@ -0,0 +1,107 @@
/**
* Copyright (c) 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_DAS
#include "sql/das/ob_das_retry_ctrl.h"
#include "sql/das/ob_das_task.h"
#include "sql/das/ob_das_ref.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase {
using namespace common;
using namespace share;
using namespace share::schema;
namespace sql {
void ObDASRetryCtrl::tablet_location_retry_proc(ObDASRef &das_ref,
ObIDASTaskOp &task_op,
bool &need_retry)
{
need_retry = false;
int ret = OB_SUCCESS;
ObTableID ref_table_id = task_op.get_ref_table_id();
ObDASLocationRouter &loc_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router();
const ObDASTabletLoc *tablet_loc = task_op.get_tablet_loc();
if (is_virtual_table(ref_table_id)) {
//the location of the virtual table can't be refreshed,
//so when a location exception occurs, virtual table is not retryable
need_retry = false;
} else if (OB_ISNULL(tablet_loc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet loc is nullptr", K(ret));
} else {
loc_router.refresh_location_cache(tablet_loc->tablet_id_, true, task_op.get_errcode());
need_retry = true;
const ObDASTableLocMeta *loc_meta = tablet_loc->loc_meta_;
LOG_INFO("refresh tablet location cache and retry DAS task",
"errcode", task_op.get_errcode(), KPC(loc_meta), KPC(tablet_loc));
}
}
void ObDASRetryCtrl::tablet_nothing_readable_proc(ObDASRef &, ObIDASTaskOp &task_op, bool &need_retry)
{
if (is_virtual_table(task_op.get_ref_table_id())) {
need_retry = false;
} else {
need_retry = true;
}
}
void ObDASRetryCtrl::task_network_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &need_retry)
{
need_retry = true;
}
/**
* The storage throws 4725 to the DAS in two cases:
* 1. When a table or partition is dropped, the tablet is recycled, which is caused by DDL and cannot be retried.
* 2. When a partition is transfered, but the tablet location cache is not updated,
* the TSC operation is sent to the old server, and the storage reports 4725, this case needs to be retried.
* The DAS cannot unconditionally retry 4725,
* and needs to determine whether the real cause of the 4725 error is a drop table or a transfer.
**/
void ObDASRetryCtrl::tablet_not_exist_retry_proc(ObDASRef &das_ref,
ObIDASTaskOp &task_op,
bool &need_retry)
{
int ret = OB_SUCCESS;
need_retry = false;
ObTableID ref_table_id = task_op.get_ref_table_id();
bool tablet_exist = false;
schema::ObSchemaGetterGuard schema_guard;
const schema::ObTableSchema *table_schema = nullptr;
const ObDASTabletLoc *tablet_loc = task_op.get_tablet_loc();
if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(tablet_loc)) {
LOG_WARN("invalid schema service", KR(ret), K(GCTX.schema_service_), K(tablet_loc));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) {
// tenant could be deleted
task_op.set_errcode(ret);
LOG_WARN("get tenant schema guard fail", KR(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), ref_table_id, table_schema))) {
task_op.set_errcode(ret);
LOG_WARN("failed to get table schema", KR(ret));
} else if (OB_ISNULL(table_schema)) {
//table could be dropped
task_op.set_errcode(OB_TABLE_NOT_EXIST);
LOG_WARN("table not exist, fast fail das task", K(ref_table_id));
} else if (table_schema->is_vir_table()) {
need_retry = false;
} else if (OB_FAIL(table_schema->check_if_tablet_exists(tablet_loc->tablet_id_, tablet_exist))) {
LOG_WARN("check if tablet exists failed", K(ret), K(tablet_loc), K(ref_table_id));
} else if (!tablet_exist) {
task_op.set_errcode(OB_PARTITION_NOT_EXIST);
LOG_WARN("partition not exist, maybe dropped by DDL", K(ret), K(tablet_loc), K(ref_table_id));
} else {
tablet_location_retry_proc(das_ref, task_op, need_retry);
}
}
} // namespace sql
} // namespace oceanbase

View File

@ -0,0 +1,42 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef DEV_SRC_SQL_DAS_OB_DAS_RETRY_CTRL_H_
#define DEV_SRC_SQL_DAS_OB_DAS_RETRY_CTRL_H_
namespace oceanbase {
namespace sql {
class ObIDASTaskOp;
class ObDASRef;
class ObDASRetryCtrl
{
public:
/**
* retry_func: is to determine whether tablet-level retry is necessary based on the status of the task
* and maybe change some status of the das task in this function,
* such as refresh_partition_location_cache,
* DAS retry only can be attempted within the current thread
* [param in]: ObDASRef &: the das context reference
* [param in/out] ObIDASTaskOp &: which das task need to retry
* [param out] bool &: whether need DAS retry
* */
typedef void (*retry_func)(ObDASRef &, ObIDASTaskOp &, bool &);
static void tablet_location_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &);
static void tablet_nothing_readable_proc(ObDASRef &, ObIDASTaskOp &, bool &);
static void task_network_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &);
static void tablet_not_exist_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &);
};
} // namespace sql
} // namespace oceanbase
#endif /* DEV_SRC_SQL_DAS_OB_DAS_RETRY_CTRL_H_ */

View File

@ -148,9 +148,10 @@ public:
DASRtDefFixedArray &get_related_rtdefs() { return related_rtdefs_; }
ObTabletIDFixedArray &get_related_tablet_ids() { return related_tablet_ids_; }
virtual int dump_data() const { return common::OB_SUCCESS; }
const DasTaskNode &get_node() const { return das_task_node_; };
DasTaskNode &get_node() { return das_task_node_; };
int get_errcode() const { return errcode_; };
const DasTaskNode &get_node() const { return das_task_node_; }
DasTaskNode &get_node() { return das_task_node_; }
int get_errcode() const { return errcode_; }
void set_errcode(int errcode) { errcode_ = errcode; }
VIRTUAL_TO_STRING_KV(K_(tenant_id),
K_(task_id),
K_(op_type),

View File

@ -12,6 +12,7 @@
#define USING_LOG_PREFIX SQL_DAS
#include "observer/ob_srv_network_frame.h"
#include "observer/mysql/ob_query_retry_ctrl.h"
#include "sql/das/ob_data_access_service.h"
#include "sql/das/ob_das_define.h"
#include "sql/das/ob_das_extra_data.h"
@ -20,6 +21,7 @@
#include "sql/das/ob_das_utils.h"
#include "sql/ob_phy_table_location.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/das/ob_das_retry_ctrl.h"
#include "storage/tx/ob_trans_service.h"
namespace oceanbase
{
@ -27,6 +29,7 @@ using namespace share;
using namespace storage;
using namespace common;
using namespace transaction;
using namespace observer;
namespace sql
{
@ -210,17 +213,12 @@ int ObDataAccessService::clear_task_exec_env(ObDASRef &das_ref, ObIDASTaskOp &ta
return ret;
}
int ObDataAccessService::refresh_partition_location(ObDASRef &das_ref,
ObIDASTaskOp &task_op,
int err_no)
int ObDataAccessService::refresh_task_location_info(ObDASRef &das_ref, ObIDASTaskOp &task_op)
{
int ret = OB_SUCCESS;
ObExecContext &exec_ctx = das_ref.get_exec_ctx();
ObDASBaseRtDef *das_rtdef = task_op.get_rtdef();
ObDASTableLoc *table_loc = das_rtdef->table_loc_;
ObDASTabletLoc *tablet_loc = const_cast<ObDASTabletLoc*>(task_op.get_tablet_loc());
int64_t retry_cnt = DAS_CTX(exec_ctx).get_location_router().get_retry_cnt();
DAS_CTX(exec_ctx).get_location_router().refresh_location_cache(tablet_loc->tablet_id_, true, err_no);
if (OB_FAIL(ObDASUtils::wait_das_retry(retry_cnt))) {
LOG_WARN("wait das retry failed", K(ret));
} else if (OB_FAIL(DAS_CTX(exec_ctx).get_location_router().get_tablet_loc(*tablet_loc->loc_meta_,
@ -230,7 +228,6 @@ int ObDataAccessService::refresh_partition_location(ObDASRef &das_ref,
} else {
task_op.set_ls_id(tablet_loc->ls_id_);
}
LOG_INFO("LOCATION: refresh tablet cache", K(ret), KPC(table_loc), KPC(tablet_loc));
return ret;
}
@ -239,36 +236,50 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
int ret = task_op.errcode_;
ObArenaAllocator tmp_alloc;
ObDasAggregatedTasks das_task_wrapper(tmp_alloc);
while ((is_master_changed_error(ret) ||
is_partition_change_error(ret) ||
OB_REPLICA_NOT_READABLE == ret)
&& !is_virtual_table(task_op.get_ref_table_id())) {
int tmp_ret = ret;
if (!can_fast_fail(task_op)) {
task_op.in_part_retry_ = true;
ObDASLocationRouter &location_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router();
location_router.set_last_errno(ret);
location_router.inc_retry_cnt();
if (OB_FAIL(clear_task_exec_env(das_ref, task_op))) {
LOG_WARN("clear task execution environment", K(ret));
} else if (OB_FAIL(das_ref.get_exec_ctx().check_status())) {
LOG_WARN("query is timeout, terminate retry", K(ret));
} else if (OB_FAIL(refresh_partition_location(das_ref, task_op, task_op.errcode_))) {
LOG_WARN("refresh partition location failed", K(ret), "ori_err_code", tmp_ret, K(lbt()));
} else if (FALSE_IT(das_task_wrapper.reuse())) {
} else if (FALSE_IT(task_op.set_task_status(ObDasTaskStatus::UNSTART))) {
} else if (OB_FAIL(das_task_wrapper.push_back_task(&task_op))) {
LOG_WARN("failed to push back task", K(ret));
} else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) {
bool retry_continue = false;
ObDASLocationRouter &location_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router();
do {
ObDASRetryCtrl::retry_func retry_func = nullptr;
retry_continue = false;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ObQueryRetryCtrl::get_das_retry_func(task_op.errcode_, retry_func))) {
LOG_WARN("get das retry func failed", K(tmp_ret), K(task_op.errcode_));
} else if (retry_func != nullptr) {
bool need_retry = false;
retry_func(das_ref, task_op, need_retry);
LOG_INFO("[DAS RETRY] check if need tablet level retry",
KR(task_op.errcode_), K(need_retry),
"retry_cnt", location_router.get_retry_cnt());
if (need_retry) {
task_op.in_part_retry_ = true;
location_router.set_last_errno(task_op.get_errcode());
location_router.inc_retry_cnt();
if (OB_TMP_FAIL(clear_task_exec_env(das_ref, task_op))) {
LOG_WARN("clear task execution environment failed", K(tmp_ret));
}
if (OB_FAIL(das_ref.get_exec_ctx().check_status())) {
LOG_WARN("query is timeout, terminate retry", K(ret));
} else if (OB_FAIL(refresh_task_location_info(das_ref, task_op))) {
LOG_WARN("refresh task location failed", K(ret));
} else {
LOG_INFO("start to retry DAS task now", KPC(task_op.get_tablet_loc()));
das_task_wrapper.reuse();
task_op.set_task_status(ObDasTaskStatus::UNSTART);
if (OB_FAIL(das_task_wrapper.push_back_task(&task_op))) {
LOG_WARN("failed to push back task", K(ret));
} else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) {
LOG_WARN("execute dist DAS task failed", K(ret));
}
}
task_op.errcode_ = ret;
LOG_WARN("execute dist das task failed", K(ret));
retry_continue = (OB_SUCCESS != ret);
} else {
LOG_DEBUG("retry das task success!", K(task_op));
ret = task_op.errcode_;
}
} else {
break;
}
}
} while (retry_continue);
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_op.state_advance())) {
@ -280,37 +291,6 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
return ret;
}
bool ObDataAccessService::can_fast_fail(const ObIDASTaskOp &task_op) const
{
bool bret = false;
int ret = OB_SUCCESS; // no need to pass ret outside.
const common::ObTableID &table_id = IS_DAS_DML_OP(task_op)
? static_cast<const ObDASDMLBaseCtDef *>(task_op.get_ctdef())->table_id_
: static_cast<const ObDASScanCtDef *>(task_op.get_ctdef())->ref_table_id_;
int64_t schema_version = IS_DAS_DML_OP(task_op)
? static_cast<const ObDASDMLBaseCtDef *>(task_op.get_ctdef())->schema_version_
: static_cast<const ObDASScanCtDef *>(task_op.get_ctdef())->schema_version_;
schema::ObSchemaGetterGuard schema_guard;
const schema::ObTableSchema *table_schema = nullptr;
if (OB_ISNULL(GCTX.schema_service_)) {
LOG_ERROR("invalid schema service", KR(ret));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) {
// tenant could be deleted
bret = true;
LOG_WARN("get tenant schema guard fail", KR(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
LOG_WARN("failed to get table schema", KR(ret));
} else if (OB_ISNULL(table_schema)) {
bret = true;
LOG_WARN("table not exist, fast fail das task");
} else if (table_schema->get_schema_version() != schema_version) {
bret = true;
LOG_WARN("schema version changed, fast fail das task", "current schema version",
table_schema->get_schema_version(), "query schema version", schema_version);
}
return bret;
}
int ObDataAccessService::end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op)
{
int ret = OB_SUCCESS;
@ -340,11 +320,8 @@ int ObDataAccessService::rescan_das_task(ObDASRef &das_ref, ObDASScanOp &scan_op
OB_ASSERT(scan_op.errcode_ == ret);
if (OB_FAIL(ret) && GCONF._enable_partition_level_retry && scan_op.can_part_retry()) {
//only fast select can be retry with partition level
int tmp_ret = retry_das_task(das_ref, scan_op);
if (OB_SUCCESS == tmp_ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to retry das task", K(tmp_ret));
if (OB_FAIL(retry_das_task(das_ref, scan_op))) {
LOG_WARN("failed to retry das task", K(ret));
}
}
return ret;

View File

@ -58,12 +58,11 @@ private:
int execute_dist_das_task(ObDASRef &das_ref,
ObDasAggregatedTasks &task_ops, bool async = true);
int clear_task_exec_env(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int refresh_partition_location(ObDASRef &das_ref, ObIDASTaskOp &task_op, int err_no);
int refresh_task_location_info(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int do_local_das_task(ObDASRef &das_ref, ObDASTaskArg &task_arg);
int do_async_remote_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks, ObDASTaskArg &task_arg);
int do_sync_remote_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks, ObDASTaskArg &task_arg);
int collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info);
bool can_fast_fail(const ObIDASTaskOp &task_op) const;
void calc_das_task_parallelism(const ObDASRef &das_ref, const ObDasAggregatedTasks &task_ops, int &target_parallelism);
private:
obrpc::ObDASRpcProxy das_rpc_proxy_;