diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index 80628383c9..e7f019d9d2 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -32,7 +32,7 @@ using namespace oceanbase::transaction; namespace observer { -common::hash::ObHashMap, common::hash::NoPthreadDefendMode> ObQueryRetryCtrl::map_; +common::hash::ObHashMap ObQueryRetryCtrl::map_; void ObRetryPolicy::try_packet_retry(ObRetryParam &v) const { @@ -896,10 +896,11 @@ int ObQueryRetryCtrl::init() // r: error code // func: processor for obmp* query // inner_func: processor for inner connection query + // das_func: processor for DAS task retry #ifndef ERR_RETRY_FUNC -#define ERR_RETRY_FUNC(tag, r, func, inner_func) \ +#define ERR_RETRY_FUNC(tag, r, func, inner_func, das_func) \ if (OB_SUCC(ret)) { \ - if (OB_SUCCESS != (ret = map_.set_refactored(r, std::make_pair(func, inner_func)))) { \ + if (OB_SUCCESS != (ret = map_.set_refactored(r, RetryFuncs(func, inner_func, das_func)))) { \ LOG_ERROR("Duplicated error code registered", "code", #r, KR(ret)); \ } \ } @@ -907,92 +908,92 @@ int ObQueryRetryCtrl::init() // register your error code retry handler here, no order required /* schema */ - ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_ERROR, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_TENANT_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_TENANT_NOT_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_DATABASE, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_DATABASE_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_NOT_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_TABLE_NOT_EXIST, schema_error_proc, inner_common_schema_error_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_TABLE_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_FIELD_ERROR, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_COLUMN_DUPLICATE, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_NOT_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_PRIVILEGE, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_DB_PRIVILEGE, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_TABLE_PRIVILEGE, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH, schema_error_proc, inner_schema_error_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_REMOTE_SCHEMA_NOT_FULL, schema_error_proc, inner_schema_error_proc); - ERR_RETRY_FUNC("SCHEMA", OB_PARTITION_IS_BLOCKED, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_ALREADY_EXISTS, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_DOES_NOT_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_NOT_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_EXIST, schema_error_proc, empty_proc); - ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_EAGAIN, schema_error_proc, inner_schema_error_proc); - ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_NOT_UPTODATE, schema_error_proc, inner_schema_error_proc); - ERR_RETRY_FUNC("SCHEMA", OB_ERR_PARALLEL_DDL_CONFLICT, schema_error_proc, inner_schema_error_proc); + ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_ERROR, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_TENANT_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_TENANT_NOT_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_DATABASE, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_DATABASE_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_NOT_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_TABLEGROUP_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_TABLE_NOT_EXIST, schema_error_proc, inner_common_schema_error_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_TABLE_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_BAD_FIELD_ERROR, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_COLUMN_DUPLICATE, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_USER_NOT_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_PRIVILEGE, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_DB_PRIVILEGE, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_NO_TABLE_PRIVILEGE, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH, schema_error_proc, inner_schema_error_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_REMOTE_SCHEMA_NOT_FULL, schema_error_proc, inner_schema_error_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_ALREADY_EXISTS, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_SP_DOES_NOT_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_NOT_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_OBJECT_NAME_EXIST, schema_error_proc, empty_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_EAGAIN, schema_error_proc, inner_schema_error_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_SCHEMA_NOT_UPTODATE, schema_error_proc, inner_schema_error_proc, nullptr); + ERR_RETRY_FUNC("SCHEMA", OB_ERR_PARALLEL_DDL_CONFLICT, schema_error_proc, inner_schema_error_proc, nullptr); /* location */ - ERR_RETRY_FUNC("LOCATION", OB_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc); - ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc); - ERR_RETRY_FUNC("LOCATION", OB_NO_READABLE_REPLICA, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc); - ERR_RETRY_FUNC("LOCATION", OB_NOT_MASTER, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_RS_NOT_MASTER, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_RS_SHUTDOWN, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_PARTITION_NOT_EXIST, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_STOPPED, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_INIT, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_STOPPING, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_TENANT_NOT_IN_SERVER, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_TRANS_RPC_TIMEOUT, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_USE_DUP_FOLLOW_AFTER_DML, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_TRANS_STMT_NEED_RETRY, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_LS_NOT_EXIST, location_error_proc, inner_location_error_proc); + ERR_RETRY_FUNC("LOCATION", OB_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_LEADER_NOT_EXIST, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_NO_READABLE_REPLICA, location_error_nothing_readable_proc, inner_location_error_nothing_readable_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_NOT_MASTER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_RS_NOT_MASTER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_RS_SHUTDOWN, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_PARTITION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_STOPPED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_INIT, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_SERVER_IS_STOPPING, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_TENANT_NOT_IN_SERVER, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_TRANS_RPC_TIMEOUT, location_error_proc, inner_location_error_proc, nullptr); + ERR_RETRY_FUNC("LOCATION", OB_USE_DUP_FOLLOW_AFTER_DML, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_TRANS_STMT_NEED_RETRY, location_error_proc, inner_location_error_proc, nullptr); + ERR_RETRY_FUNC("LOCATION", OB_LS_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); // OB_TABLET_NOT_EXIST may be caused by old version schema or incorrect location. // Just use location_error_proc to retry sql and a new schema guard will be obtained during the retry process. - ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc); - ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc); + ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); + ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_nothing_readable_proc); + ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); - ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc); + ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); /* network */ - ERR_RETRY_FUNC("NETWORK", OB_RPC_CONNECT_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc); - ERR_RETRY_FUNC("NETWORK", OB_RPC_SEND_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc); - ERR_RETRY_FUNC("NETWORK", OB_RPC_POST_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc); + ERR_RETRY_FUNC("NETWORK", OB_RPC_CONNECT_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc); + ERR_RETRY_FUNC("NETWORK", OB_RPC_SEND_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc); + ERR_RETRY_FUNC("NETWORK", OB_RPC_POST_ERROR, peer_server_status_uncertain_proc, inner_peer_server_status_uncertain_proc, ObDASRetryCtrl::task_network_retry_proc); /* storage */ - ERR_RETRY_FUNC("STORAGE", OB_SNAPSHOT_DISCARDED, snapshot_discard_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("STORAGE", OB_DATA_NOT_UPTODATE, long_wait_retry_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("STORAGE", OB_REPLICA_NOT_READABLE, long_wait_retry_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("STORAGE", OB_PARTITION_IS_SPLITTING, short_wait_retry_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("STORAGE", OB_DISK_HUNG, nonblock_location_error_proc, empty_proc); + ERR_RETRY_FUNC("STORAGE", OB_SNAPSHOT_DISCARDED, snapshot_discard_proc, short_wait_retry_proc, nullptr); + ERR_RETRY_FUNC("STORAGE", OB_DATA_NOT_UPTODATE, long_wait_retry_proc, short_wait_retry_proc, nullptr); + ERR_RETRY_FUNC("STORAGE", OB_REPLICA_NOT_READABLE, long_wait_retry_proc, short_wait_retry_proc, ObDASRetryCtrl::tablet_nothing_readable_proc); + ERR_RETRY_FUNC("STORAGE", OB_PARTITION_IS_SPLITTING, short_wait_retry_proc, short_wait_retry_proc, nullptr); + ERR_RETRY_FUNC("STORAGE", OB_DISK_HUNG, nonblock_location_error_proc, empty_proc, nullptr); /* trx */ - ERR_RETRY_FUNC("TRX", OB_TRY_LOCK_ROW_CONFLICT, try_lock_row_conflict_proc, inner_try_lock_row_conflict_proc); - ERR_RETRY_FUNC("TRX", OB_TRANSACTION_SET_VIOLATION, trx_set_violation_proc, trx_set_violation_proc); - ERR_RETRY_FUNC("TRX", OB_TRANS_CANNOT_SERIALIZE, trx_can_not_serialize_proc, trx_can_not_serialize_proc); - ERR_RETRY_FUNC("TRX", OB_GTS_NOT_READY, short_wait_retry_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("TRX", OB_GTI_NOT_READY, short_wait_retry_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("TRX", OB_TRANS_WEAK_READ_VERSION_NOT_READY, short_wait_retry_proc, short_wait_retry_proc); + ERR_RETRY_FUNC("TRX", OB_TRY_LOCK_ROW_CONFLICT, try_lock_row_conflict_proc, inner_try_lock_row_conflict_proc, nullptr); + ERR_RETRY_FUNC("TRX", OB_TRANSACTION_SET_VIOLATION, trx_set_violation_proc, trx_set_violation_proc, nullptr); + ERR_RETRY_FUNC("TRX", OB_TRANS_CANNOT_SERIALIZE, trx_can_not_serialize_proc, trx_can_not_serialize_proc, nullptr); + ERR_RETRY_FUNC("TRX", OB_GTS_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr); + ERR_RETRY_FUNC("TRX", OB_GTI_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr); + ERR_RETRY_FUNC("TRX", OB_TRANS_WEAK_READ_VERSION_NOT_READY, short_wait_retry_proc, short_wait_retry_proc, nullptr); /* sql */ - ERR_RETRY_FUNC("SQL", OB_ERR_INSUFFICIENT_PX_WORKER, px_thread_not_enough_proc, short_wait_retry_proc); + ERR_RETRY_FUNC("SQL", OB_ERR_INSUFFICIENT_PX_WORKER, px_thread_not_enough_proc, short_wait_retry_proc, nullptr); // create a new interval part when inserting a row which has no matched part, // wait and retry, will see new part - ERR_RETRY_FUNC("SQL", OB_NO_PARTITION_FOR_INTERVAL_PART, short_wait_retry_proc, short_wait_retry_proc); - ERR_RETRY_FUNC("SQL", OB_SQL_RETRY_SPM, force_local_retry_proc, force_local_retry_proc); - ERR_RETRY_FUNC("SQL", OB_NEED_SWITCH_CONSUMER_GROUP, switch_consumer_group_retry_proc, empty_proc); + ERR_RETRY_FUNC("SQL", OB_NO_PARTITION_FOR_INTERVAL_PART, short_wait_retry_proc, short_wait_retry_proc, nullptr); + ERR_RETRY_FUNC("SQL", OB_SQL_RETRY_SPM, force_local_retry_proc, force_local_retry_proc, nullptr); + ERR_RETRY_FUNC("SQL", OB_NEED_SWITCH_CONSUMER_GROUP, switch_consumer_group_retry_proc, empty_proc, nullptr); /* timeout */ - ERR_RETRY_FUNC("SQL", OB_TIMEOUT, timeout_proc, timeout_proc); - ERR_RETRY_FUNC("SQL", OB_TRANS_TIMEOUT, timeout_proc, timeout_proc); - ERR_RETRY_FUNC("SQL", OB_TRANS_STMT_TIMEOUT, timeout_proc, timeout_proc); + ERR_RETRY_FUNC("SQL", OB_TIMEOUT, timeout_proc, timeout_proc, nullptr); + ERR_RETRY_FUNC("SQL", OB_TRANS_TIMEOUT, timeout_proc, timeout_proc, nullptr); + ERR_RETRY_FUNC("SQL", OB_TRANS_STMT_TIMEOUT, timeout_proc, timeout_proc, nullptr); /* ddl */ @@ -1023,17 +1024,32 @@ ObQueryRetryCtrl::~ObQueryRetryCtrl() { } +int ObQueryRetryCtrl::get_das_retry_func(int err, ObDASRetryCtrl::retry_func &retry_func) +{ + int ret = OB_SUCCESS; + retry_func = nullptr; + RetryFuncs funcs; + if (OB_FAIL(map_.get_refactored(err, funcs))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } + } else { + retry_func = funcs.element<2>(); + } + return ret; +} + int ObQueryRetryCtrl::get_func(int err, bool is_inner_sql, retry_func &func) { int ret = OB_SUCCESS; - std::pair funcs; + RetryFuncs funcs; if (OB_FAIL(map_.get_refactored(err, funcs))) { if (OB_HASH_NOT_EXIST == ret) { func = empty_proc; ret = OB_SUCCESS; } } else { - func = is_inner_sql ? funcs.second : funcs.first; + func = is_inner_sql ? funcs.element<1>() : funcs.element<0>(); } return ret; } diff --git a/src/observer/mysql/ob_query_retry_ctrl.h b/src/observer/mysql/ob_query_retry_ctrl.h index ffd8fa347c..71157aef23 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.h +++ b/src/observer/mysql/ob_query_retry_ctrl.h @@ -17,6 +17,8 @@ #include "lib/time/ob_time_utility.h" #include "sql/ob_sql_context.h" #include "sql/session/ob_basic_session_info.h" +#include "lib/container/ob_tuple.h" +#include "sql/das/ob_das_retry_ctrl.h" namespace oceanbase { namespace sql @@ -259,6 +261,7 @@ public: return (isolation == transaction::ObTxIsolationLevel::RR || isolation == transaction::ObTxIsolationLevel::SERIAL); } + static int get_das_retry_func(int err, sql::ObDASRetryCtrl::retry_func &retry_func); public: // schema类型的错误最多在本线程重试5次。 // 5是拍脑袋决定的,之后还要看统计数据的反馈再修改。TODO qianfu.zpf @@ -314,7 +317,8 @@ private: /* variables */ // map_ is used to fast lookup the error code retry processor - static common::hash::ObHashMap, common::hash::NoPthreadDefendMode> map_; + typedef common::ObTuple RetryFuncs; + static common::hash::ObHashMap map_; int64_t curr_query_tenant_local_schema_version_; // Query开始、Loc刷新前的普通租户shm ver int64_t curr_query_tenant_global_schema_version_; // Query开始时的普通租户schema version int64_t curr_query_sys_local_schema_version_; // Query开始、Loc刷新前的系统租户shm ver diff --git a/src/share/schema/ob_table_schema.cpp b/src/share/schema/ob_table_schema.cpp index e91bc0bc91..9f934c3050 100644 --- a/src/share/schema/ob_table_schema.cpp +++ b/src/share/schema/ob_table_schema.cpp @@ -1219,6 +1219,28 @@ int ObSimpleTableSchemaV2::get_tablet_id_by_object_id( return ret; } +int ObSimpleTableSchemaV2::check_if_tablet_exists(const ObTabletID &tablet_id, bool &exists) const +{ + int ret = OB_SUCCESS; + const ObCheckPartitionMode mode = CHECK_PARTITION_MODE_NORMAL; + ObPartitionSchemaIter iter(*this, mode); + ObPartitionSchemaIter::Info info; + exists = false; + while (OB_SUCC(ret) && !exists) { + if (OB_FAIL(iter.next_partition_info(info))) { + if (OB_ITER_END != ret) { + LOG_WARN("iter partition failed", KR(ret)); + } + } else if (info.tablet_id_ == tablet_id) { + exists = true; + } + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + return ret; +} + ObTableSchema::ObTableSchema() : ObSimpleTableSchemaV2() { diff --git a/src/share/schema/ob_table_schema.h b/src/share/schema/ob_table_schema.h index 742ddc4dca..2ae0993da5 100644 --- a/src/share/schema/ob_table_schema.h +++ b/src/share/schema/ob_table_schema.h @@ -663,6 +663,7 @@ public: int check_is_all_server_readonly_replica( share::schema::ObSchemaGetterGuard &guard, bool &is) const; + int check_if_tablet_exists(const common::ObTabletID &tablet_id, bool &exists) const; int add_simple_foreign_key_info(const uint64_t tenant_id, const uint64_t database_id, diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 527e192b21..2fe76cad50 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -63,6 +63,7 @@ ob_set_subtarget(ob_sql das das/ob_das_id_cache.cpp das/ob_das_task_result.cpp das/ob_das_spatial_index_lookup_op.cpp + das/ob_das_retry_ctrl.cpp ) ob_set_subtarget(ob_sql dtl diff --git a/src/sql/das/ob_das_location_router.cpp b/src/sql/das/ob_das_location_router.cpp index 748bbff0bb..3bbc866547 100644 --- a/src/sql/das/ob_das_location_router.cpp +++ b/src/sql/das/ob_das_location_router.cpp @@ -808,6 +808,8 @@ int ObDASLocationRouter::nonblock_get_readable_replica(const uint64_t tenant_id, } else if (OB_FAIL(remote_replicas.push_back(&tmp_replica_loc))) { LOG_WARN("store tmp replica failed", K(ret)); } + } else { + LOG_INFO("this replica is in the blacklist, thus filtered it", K(bl_key)); } } if (OB_SUCC(ret)) { diff --git a/src/sql/das/ob_das_retry_ctrl.cpp b/src/sql/das/ob_das_retry_ctrl.cpp new file mode 100644 index 0000000000..68660555d7 --- /dev/null +++ b/src/sql/das/ob_das_retry_ctrl.cpp @@ -0,0 +1,107 @@ +/** + * Copyright (c) 2022 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX SQL_DAS +#include "sql/das/ob_das_retry_ctrl.h" +#include "sql/das/ob_das_task.h" +#include "sql/das/ob_das_ref.h" +#include "sql/engine/ob_exec_context.h" + +namespace oceanbase { +using namespace common; +using namespace share; +using namespace share::schema; +namespace sql { + +void ObDASRetryCtrl::tablet_location_retry_proc(ObDASRef &das_ref, + ObIDASTaskOp &task_op, + bool &need_retry) +{ + need_retry = false; + int ret = OB_SUCCESS; + ObTableID ref_table_id = task_op.get_ref_table_id(); + ObDASLocationRouter &loc_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router(); + const ObDASTabletLoc *tablet_loc = task_op.get_tablet_loc(); + if (is_virtual_table(ref_table_id)) { + //the location of the virtual table can't be refreshed, + //so when a location exception occurs, virtual table is not retryable + need_retry = false; + } else if (OB_ISNULL(tablet_loc)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet loc is nullptr", K(ret)); + } else { + loc_router.refresh_location_cache(tablet_loc->tablet_id_, true, task_op.get_errcode()); + need_retry = true; + const ObDASTableLocMeta *loc_meta = tablet_loc->loc_meta_; + LOG_INFO("refresh tablet location cache and retry DAS task", + "errcode", task_op.get_errcode(), KPC(loc_meta), KPC(tablet_loc)); + } +} + +void ObDASRetryCtrl::tablet_nothing_readable_proc(ObDASRef &, ObIDASTaskOp &task_op, bool &need_retry) +{ + if (is_virtual_table(task_op.get_ref_table_id())) { + need_retry = false; + } else { + need_retry = true; + } +} + +void ObDASRetryCtrl::task_network_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &need_retry) +{ + need_retry = true; +} + +/** + * The storage throws 4725 to the DAS in two cases: + * 1. When a table or partition is dropped, the tablet is recycled, which is caused by DDL and cannot be retried. + * 2. When a partition is transfered, but the tablet location cache is not updated, + * the TSC operation is sent to the old server, and the storage reports 4725, this case needs to be retried. + * The DAS cannot unconditionally retry 4725, + * and needs to determine whether the real cause of the 4725 error is a drop table or a transfer. + **/ +void ObDASRetryCtrl::tablet_not_exist_retry_proc(ObDASRef &das_ref, + ObIDASTaskOp &task_op, + bool &need_retry) +{ + int ret = OB_SUCCESS; + need_retry = false; + ObTableID ref_table_id = task_op.get_ref_table_id(); + bool tablet_exist = false; + schema::ObSchemaGetterGuard schema_guard; + const schema::ObTableSchema *table_schema = nullptr; + const ObDASTabletLoc *tablet_loc = task_op.get_tablet_loc(); + if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(tablet_loc)) { + LOG_WARN("invalid schema service", KR(ret), K(GCTX.schema_service_), K(tablet_loc)); + } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) { + // tenant could be deleted + task_op.set_errcode(ret); + LOG_WARN("get tenant schema guard fail", KR(ret), K(MTL_ID())); + } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), ref_table_id, table_schema))) { + task_op.set_errcode(ret); + LOG_WARN("failed to get table schema", KR(ret)); + } else if (OB_ISNULL(table_schema)) { + //table could be dropped + task_op.set_errcode(OB_TABLE_NOT_EXIST); + LOG_WARN("table not exist, fast fail das task", K(ref_table_id)); + } else if (table_schema->is_vir_table()) { + need_retry = false; + } else if (OB_FAIL(table_schema->check_if_tablet_exists(tablet_loc->tablet_id_, tablet_exist))) { + LOG_WARN("check if tablet exists failed", K(ret), K(tablet_loc), K(ref_table_id)); + } else if (!tablet_exist) { + task_op.set_errcode(OB_PARTITION_NOT_EXIST); + LOG_WARN("partition not exist, maybe dropped by DDL", K(ret), K(tablet_loc), K(ref_table_id)); + } else { + tablet_location_retry_proc(das_ref, task_op, need_retry); + } +} +} // namespace sql +} // namespace oceanbase diff --git a/src/sql/das/ob_das_retry_ctrl.h b/src/sql/das/ob_das_retry_ctrl.h new file mode 100644 index 0000000000..c02ee19e43 --- /dev/null +++ b/src/sql/das/ob_das_retry_ctrl.h @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#ifndef DEV_SRC_SQL_DAS_OB_DAS_RETRY_CTRL_H_ +#define DEV_SRC_SQL_DAS_OB_DAS_RETRY_CTRL_H_ + +namespace oceanbase { +namespace sql { +class ObIDASTaskOp; +class ObDASRef; +class ObDASRetryCtrl +{ +public: + /** + * retry_func: is to determine whether tablet-level retry is necessary based on the status of the task + * and maybe change some status of the das task in this function, + * such as refresh_partition_location_cache, + * DAS retry only can be attempted within the current thread + * [param in]: ObDASRef &: the das context reference + * [param in/out] ObIDASTaskOp &: which das task need to retry + * [param out] bool &: whether need DAS retry + * */ + typedef void (*retry_func)(ObDASRef &, ObIDASTaskOp &, bool &); + + static void tablet_location_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &); + static void tablet_nothing_readable_proc(ObDASRef &, ObIDASTaskOp &, bool &); + static void task_network_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &); + static void tablet_not_exist_retry_proc(ObDASRef &, ObIDASTaskOp &, bool &); +}; + +} // namespace sql +} // namespace oceanbase + +#endif /* DEV_SRC_SQL_DAS_OB_DAS_RETRY_CTRL_H_ */ diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index e2d99e4781..2feafd57fa 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -148,9 +148,10 @@ public: DASRtDefFixedArray &get_related_rtdefs() { return related_rtdefs_; } ObTabletIDFixedArray &get_related_tablet_ids() { return related_tablet_ids_; } virtual int dump_data() const { return common::OB_SUCCESS; } - const DasTaskNode &get_node() const { return das_task_node_; }; - DasTaskNode &get_node() { return das_task_node_; }; - int get_errcode() const { return errcode_; }; + const DasTaskNode &get_node() const { return das_task_node_; } + DasTaskNode &get_node() { return das_task_node_; } + int get_errcode() const { return errcode_; } + void set_errcode(int errcode) { errcode_ = errcode; } VIRTUAL_TO_STRING_KV(K_(tenant_id), K_(task_id), K_(op_type), diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index 3c5a2683d5..a60f0aaea4 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -12,6 +12,7 @@ #define USING_LOG_PREFIX SQL_DAS #include "observer/ob_srv_network_frame.h" +#include "observer/mysql/ob_query_retry_ctrl.h" #include "sql/das/ob_data_access_service.h" #include "sql/das/ob_das_define.h" #include "sql/das/ob_das_extra_data.h" @@ -20,6 +21,7 @@ #include "sql/das/ob_das_utils.h" #include "sql/ob_phy_table_location.h" #include "sql/engine/ob_exec_context.h" +#include "sql/das/ob_das_retry_ctrl.h" #include "storage/tx/ob_trans_service.h" namespace oceanbase { @@ -27,6 +29,7 @@ using namespace share; using namespace storage; using namespace common; using namespace transaction; +using namespace observer; namespace sql { @@ -210,17 +213,12 @@ int ObDataAccessService::clear_task_exec_env(ObDASRef &das_ref, ObIDASTaskOp &ta return ret; } -int ObDataAccessService::refresh_partition_location(ObDASRef &das_ref, - ObIDASTaskOp &task_op, - int err_no) +int ObDataAccessService::refresh_task_location_info(ObDASRef &das_ref, ObIDASTaskOp &task_op) { int ret = OB_SUCCESS; ObExecContext &exec_ctx = das_ref.get_exec_ctx(); - ObDASBaseRtDef *das_rtdef = task_op.get_rtdef(); - ObDASTableLoc *table_loc = das_rtdef->table_loc_; ObDASTabletLoc *tablet_loc = const_cast(task_op.get_tablet_loc()); int64_t retry_cnt = DAS_CTX(exec_ctx).get_location_router().get_retry_cnt(); - DAS_CTX(exec_ctx).get_location_router().refresh_location_cache(tablet_loc->tablet_id_, true, err_no); if (OB_FAIL(ObDASUtils::wait_das_retry(retry_cnt))) { LOG_WARN("wait das retry failed", K(ret)); } else if (OB_FAIL(DAS_CTX(exec_ctx).get_location_router().get_tablet_loc(*tablet_loc->loc_meta_, @@ -230,7 +228,6 @@ int ObDataAccessService::refresh_partition_location(ObDASRef &das_ref, } else { task_op.set_ls_id(tablet_loc->ls_id_); } - LOG_INFO("LOCATION: refresh tablet cache", K(ret), KPC(table_loc), KPC(tablet_loc)); return ret; } @@ -239,36 +236,50 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op int ret = task_op.errcode_; ObArenaAllocator tmp_alloc; ObDasAggregatedTasks das_task_wrapper(tmp_alloc); - while ((is_master_changed_error(ret) || - is_partition_change_error(ret) || - OB_REPLICA_NOT_READABLE == ret) - && !is_virtual_table(task_op.get_ref_table_id())) { - int tmp_ret = ret; - if (!can_fast_fail(task_op)) { - task_op.in_part_retry_ = true; - ObDASLocationRouter &location_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router(); - location_router.set_last_errno(ret); - location_router.inc_retry_cnt(); - if (OB_FAIL(clear_task_exec_env(das_ref, task_op))) { - LOG_WARN("clear task execution environment", K(ret)); - } else if (OB_FAIL(das_ref.get_exec_ctx().check_status())) { - LOG_WARN("query is timeout, terminate retry", K(ret)); - } else if (OB_FAIL(refresh_partition_location(das_ref, task_op, task_op.errcode_))) { - LOG_WARN("refresh partition location failed", K(ret), "ori_err_code", tmp_ret, K(lbt())); - } else if (FALSE_IT(das_task_wrapper.reuse())) { - } else if (FALSE_IT(task_op.set_task_status(ObDasTaskStatus::UNSTART))) { - } else if (OB_FAIL(das_task_wrapper.push_back_task(&task_op))) { - LOG_WARN("failed to push back task", K(ret)); - } else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) { + bool retry_continue = false; + ObDASLocationRouter &location_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router(); + do { + ObDASRetryCtrl::retry_func retry_func = nullptr; + + retry_continue = false; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(ObQueryRetryCtrl::get_das_retry_func(task_op.errcode_, retry_func))) { + LOG_WARN("get das retry func failed", K(tmp_ret), K(task_op.errcode_)); + } else if (retry_func != nullptr) { + bool need_retry = false; + retry_func(das_ref, task_op, need_retry); + LOG_INFO("[DAS RETRY] check if need tablet level retry", + KR(task_op.errcode_), K(need_retry), + "retry_cnt", location_router.get_retry_cnt()); + if (need_retry) { + task_op.in_part_retry_ = true; + location_router.set_last_errno(task_op.get_errcode()); + location_router.inc_retry_cnt(); + if (OB_TMP_FAIL(clear_task_exec_env(das_ref, task_op))) { + LOG_WARN("clear task execution environment failed", K(tmp_ret)); + } + if (OB_FAIL(das_ref.get_exec_ctx().check_status())) { + LOG_WARN("query is timeout, terminate retry", K(ret)); + } else if (OB_FAIL(refresh_task_location_info(das_ref, task_op))) { + LOG_WARN("refresh task location failed", K(ret)); + } else { + LOG_INFO("start to retry DAS task now", KPC(task_op.get_tablet_loc())); + das_task_wrapper.reuse(); + task_op.set_task_status(ObDasTaskStatus::UNSTART); + if (OB_FAIL(das_task_wrapper.push_back_task(&task_op))) { + LOG_WARN("failed to push back task", K(ret)); + } else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) { + LOG_WARN("execute dist DAS task failed", K(ret)); + } + } task_op.errcode_ = ret; - LOG_WARN("execute dist das task failed", K(ret)); + retry_continue = (OB_SUCCESS != ret); } else { - LOG_DEBUG("retry das task success!", K(task_op)); + ret = task_op.errcode_; } - } else { - break; } - } + } while (retry_continue); + if (OB_FAIL(ret)) { int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(task_op.state_advance())) { @@ -280,37 +291,6 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op return ret; } -bool ObDataAccessService::can_fast_fail(const ObIDASTaskOp &task_op) const -{ - bool bret = false; - int ret = OB_SUCCESS; // no need to pass ret outside. - const common::ObTableID &table_id = IS_DAS_DML_OP(task_op) - ? static_cast(task_op.get_ctdef())->table_id_ - : static_cast(task_op.get_ctdef())->ref_table_id_; - int64_t schema_version = IS_DAS_DML_OP(task_op) - ? static_cast(task_op.get_ctdef())->schema_version_ - : static_cast(task_op.get_ctdef())->schema_version_; - schema::ObSchemaGetterGuard schema_guard; - const schema::ObTableSchema *table_schema = nullptr; - if (OB_ISNULL(GCTX.schema_service_)) { - LOG_ERROR("invalid schema service", KR(ret)); - } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) { - // tenant could be deleted - bret = true; - LOG_WARN("get tenant schema guard fail", KR(ret), K(MTL_ID())); - } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { - LOG_WARN("failed to get table schema", KR(ret)); - } else if (OB_ISNULL(table_schema)) { - bret = true; - LOG_WARN("table not exist, fast fail das task"); - } else if (table_schema->get_schema_version() != schema_version) { - bret = true; - LOG_WARN("schema version changed, fast fail das task", "current schema version", - table_schema->get_schema_version(), "query schema version", schema_version); - } - return bret; -} - int ObDataAccessService::end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op) { int ret = OB_SUCCESS; @@ -340,11 +320,8 @@ int ObDataAccessService::rescan_das_task(ObDASRef &das_ref, ObDASScanOp &scan_op OB_ASSERT(scan_op.errcode_ == ret); if (OB_FAIL(ret) && GCONF._enable_partition_level_retry && scan_op.can_part_retry()) { //only fast select can be retry with partition level - int tmp_ret = retry_das_task(das_ref, scan_op); - if (OB_SUCCESS == tmp_ret) { - ret = OB_SUCCESS; - } else { - LOG_WARN("failed to retry das task", K(tmp_ret)); + if (OB_FAIL(retry_das_task(das_ref, scan_op))) { + LOG_WARN("failed to retry das task", K(ret)); } } return ret; diff --git a/src/sql/das/ob_data_access_service.h b/src/sql/das/ob_data_access_service.h index 99c5583e1b..5561ad1351 100644 --- a/src/sql/das/ob_data_access_service.h +++ b/src/sql/das/ob_data_access_service.h @@ -58,12 +58,11 @@ private: int execute_dist_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &task_ops, bool async = true); int clear_task_exec_env(ObDASRef &das_ref, ObIDASTaskOp &task_op); - int refresh_partition_location(ObDASRef &das_ref, ObIDASTaskOp &task_op, int err_no); + int refresh_task_location_info(ObDASRef &das_ref, ObIDASTaskOp &task_op); int do_local_das_task(ObDASRef &das_ref, ObDASTaskArg &task_arg); int do_async_remote_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks, ObDASTaskArg &task_arg); int do_sync_remote_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks, ObDASTaskArg &task_arg); int collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info); - bool can_fast_fail(const ObIDASTaskOp &task_op) const; void calc_das_task_parallelism(const ObDASRef &das_ref, const ObDasAggregatedTasks &task_ops, int &target_parallelism); private: obrpc::ObDASRpcProxy das_rpc_proxy_;