diff --git a/deps/oblib/src/lib/oblog/ob_warning_buffer.h b/deps/oblib/src/lib/oblog/ob_warning_buffer.h index 685ea6c29a..eb6bfd29a3 100644 --- a/deps/oblib/src/lib/oblog/ob_warning_buffer.h +++ b/deps/oblib/src/lib/oblog/ob_warning_buffer.h @@ -240,7 +240,6 @@ inline ObWarningBuffer *&ob_get_tsi_warning_buffer() { return g_warning_buffer; } - inline const ObString ob_get_tsi_err_msg(int code) { ObString ret; diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h index b854f03a95..5a3930fdf0 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h @@ -158,6 +158,7 @@ union ObProxyCapabilityFlags && is_ob_protocol_v2_support(); } bool is_session_var_sync_support() const { return 1 == cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC && is_ob_protocol_v2_support(); } + bool is_weak_stale_feedback() const { return 1 == cap_flags_.OB_CAP_PROXY_WEAK_STALE_FEEDBACK; } uint64_t capability_; struct CapabilityFlags @@ -180,14 +181,15 @@ union ObProxyCapabilityFlags uint64_t OB_CAP_PL_ROUTE: 1; uint64_t OB_CAP_PROXY_REROUTE: 1; - + // for session_info sync uint64_t OB_CAP_PROXY_SESSIOIN_SYNC: 1; // for full trace_route uint64_t OB_CAP_PROXY_FULL_LINK_TRACING: 1; uint64_t OB_CAP_PROXY_NEW_EXTRA_INFO: 1; uint64_t OB_CAP_PROXY_SESSION_VAR_SYNC: 1; - uint64_t OB_CAP_RESERVED_NOT_USE: 47; + uint64_t OB_CAP_PROXY_WEAK_STALE_FEEDBACK: 1; + uint64_t OB_CAP_RESERVED_NOT_USE: 46; } cap_flags_; }; diff --git a/src/observer/mysql/obmp_packet_sender.cpp b/src/observer/mysql/obmp_packet_sender.cpp index 04fd5e8b92..5d4f2ef5c0 100644 --- a/src/observer/mysql/obmp_packet_sender.cpp +++ b/src/observer/mysql/obmp_packet_sender.cpp @@ -459,7 +459,8 @@ int ObMPPacketSender::send_error_packet(int err, ok_param.take_trace_id_to_client_ = true; } if (OB_ERR_PROXY_REROUTE == err) { - ok_param.reroute_info_ = static_cast(extra_err_info); + ObFeedbackRerouteInfo *rt_info = static_cast(extra_err_info); + ok_param.reroute_info_ = rt_info; } if (OB_FAIL(send_ok_packet(*session, ok_param, &epacket))) { LOG_WARN("failed to send ok packet", K(ok_param), K(ret)); diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index 26c30c78a2..bbb65a87d1 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -4944,6 +4944,15 @@ int ObSql::check_need_reroute(ObPlanCacheCtx &pc_ctx, ObSQLSessionInfo &session, if (OB_ISNULL(pc_ctx.sql_ctx_.schema_guard_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid null schema guard", K(ret)); + } else if (OB_ISNULL(pc_ctx.sql_ctx_.cur_stmt_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid null stmt", K(ret)); + } else if (OB_ISNULL(pc_ctx.sql_ctx_.cur_stmt_->get_query_ctx())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid null query context", K(ret)); + } else if (OB_ISNULL(pc_ctx.sql_ctx_.session_info_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid null session", K(ret)); } else if (should_reroute) { if (DAS_CTX(pc_ctx.exec_ctx_).get_table_loc_list().empty()) { ret = OB_ERR_UNEXPECTED; @@ -4961,7 +4970,7 @@ int ObSql::check_need_reroute(ObPlanCacheCtx &pc_ctx, ObSQLSessionInfo &session, } else if (OB_ISNULL(table_schema)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid null table schema", K(ret)); - } else if (OB_ISNULL(pc_ctx.sql_ctx_.get_reroute_info())) { + } else if (OB_ISNULL(pc_ctx.sql_ctx_.get_or_create_reroute_info())) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("get reroute info failed", K(ret)); } else if (OB_FAIL(loc_router.get_full_ls_replica_loc(table_schema->get_tenant_id(), @@ -4969,12 +4978,31 @@ int ObSql::check_need_reroute(ObPlanCacheCtx &pc_ctx, ObSQLSessionInfo &session, ls_replica_loc))) { LOG_WARN("get full ls replica location failed", K(ret), KPC(first_tablet_loc)); } else { - pc_ctx.sql_ctx_.get_reroute_info()->server_ = ls_replica_loc.get_server(); - pc_ctx.sql_ctx_.get_reroute_info()->server_.set_port(static_cast(ls_replica_loc.get_sql_port())); - pc_ctx.sql_ctx_.get_reroute_info()->role_ = ls_replica_loc.get_role(); - pc_ctx.sql_ctx_.get_reroute_info()->replica_type_ = ls_replica_loc.get_replica_type(); - pc_ctx.sql_ctx_.get_reroute_info()->set_tbl_name(table_schema->get_table_name()); - pc_ctx.sql_ctx_.get_reroute_info()->tbl_schema_version_ = table_schema->get_schema_version(); + bool is_weak = false; + if (stmt::T_SELECT == pc_ctx.sql_ctx_.cur_stmt_->get_stmt_type()) { + if (pc_ctx.sql_ctx_.is_protocol_weak_read_) { + is_weak = true; + } else if (OB_UNLIKELY(INVALID_CONSISTENCY + != pc_ctx.sql_ctx_.cur_stmt_->get_query_ctx()-> + get_global_hint().read_consistency_)) { + is_weak = (WEAK == pc_ctx.sql_ctx_.cur_stmt_-> + get_query_ctx()->get_global_hint().read_consistency_); + } else { + is_weak = (WEAK == pc_ctx.sql_ctx_.session_info_->get_consistency_level()); + } + } + // if weak delay read, no need to return reroute_info. + if (pc_ctx.sql_ctx_.session_info_-> + get_proxy_cap_flags().is_weak_stale_feedback() && is_weak) { + pc_ctx.sql_ctx_.reset_reroute_info(); + } else { + pc_ctx.sql_ctx_.get_reroute_info()->server_ = ls_replica_loc.get_server(); + pc_ctx.sql_ctx_.get_reroute_info()->server_.set_port(static_cast(ls_replica_loc.get_sql_port())); + pc_ctx.sql_ctx_.get_reroute_info()->role_ = ls_replica_loc.get_role(); + pc_ctx.sql_ctx_.get_reroute_info()->replica_type_ = ls_replica_loc.get_replica_type(); + pc_ctx.sql_ctx_.get_reroute_info()->set_tbl_name(table_schema->get_table_name()); + pc_ctx.sql_ctx_.get_reroute_info()->tbl_schema_version_ = table_schema->get_schema_version(); + } LOG_DEBUG("reroute sql", KPC(pc_ctx.sql_ctx_.get_reroute_info())); need_reroute = true; } diff --git a/src/sql/ob_sql_context.h b/src/sql/ob_sql_context.h index 5722201c47..7ba84054c8 100644 --- a/src/sql/ob_sql_context.h +++ b/src/sql/ob_sql_context.h @@ -435,13 +435,22 @@ public: void reset(); bool handle_batched_multi_stmt() const { return multi_stmt_item_.is_batched_multi_stmt(); } - share::ObFeedbackRerouteInfo *get_reroute_info() + void reset_reroute_info() { + if (nullptr != reroute_info_) { + op_reclaim_free(reroute_info_); + } + reroute_info_ = NULL; + } + share::ObFeedbackRerouteInfo *get_or_create_reroute_info() { if (nullptr == reroute_info_) { reroute_info_ = op_reclaim_alloc(share::ObFeedbackRerouteInfo); } return reroute_info_; } + share::ObFeedbackRerouteInfo *get_reroute_info() { + return reroute_info_; + } // release dynamic allocated memory // https://aone.alibaba-inc.com/issue/19749534 void clear(); diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index c021424a4e..9183378cc0 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -2218,7 +2218,8 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server, ObIArray &replica_array = phy_part_loc_info.get_partition_location().get_replica_locations(); if (OB_FAIL(route_policy.init_candidate_replicas(replica_array))) { LOG_WARN("fail to init candidate replicas", K(replica_array), K(ret)); - } else if (OB_FAIL(route_policy.calculate_replica_priority(phy_part_loc_info.get_ls_id(), + } else if (OB_FAIL(route_policy.calculate_replica_priority(local_server, + phy_part_loc_info.get_ls_id(), replica_array, route_policy_ctx))) { LOG_WARN("fail to calculate replica priority", K(replica_array), K(route_policy_ctx), K(ret)); diff --git a/src/sql/optimizer/ob_route_policy.cpp b/src/sql/optimizer/ob_route_policy.cpp index a3f1a33c4b..c41eb5a7db 100644 --- a/src/sql/optimizer/ob_route_policy.cpp +++ b/src/sql/optimizer/ob_route_policy.cpp @@ -60,16 +60,20 @@ int ObRoutePolicy::strong_sort_replicas(ObIArray& candi_replic return ret; } -int ObRoutePolicy::filter_replica(const ObLSID &ls_id, +int ObRoutePolicy::filter_replica(const ObAddr &local_server, + const ObLSID &ls_id, ObIArray& candi_replicas, ObRoutePolicyCtx &ctx) { int ret = OB_SUCCESS; ObRoutePolicyType policy_type = get_calc_route_policy_type(ctx); - for (int64_t i = 0; OB_SUCC(ret) && i < candi_replicas.count(); ++i) { + bool need_break = false; + for (int64_t i = 0; !need_break && OB_SUCC(ret) && i < candi_replicas.count(); ++i) { CandidateReplica &cur_replica = candi_replicas.at(i); bool can_read = true; - if (OB_FAIL(ObSqlTransControl::check_ls_readable(ctx.tenant_id_, + bool is_local = cur_replica.get_server() == local_server; + + if (is_local && OB_FAIL(ObSqlTransControl::check_ls_readable(ctx.tenant_id_, ls_id, cur_replica.get_server(), ctx.max_read_stale_time_, @@ -87,12 +91,22 @@ int ObRoutePolicy::filter_replica(const ObLSID &ls_id, || !can_read) { cur_replica.is_filter_ = true; } + + // if is local replica and can read, filter all replicas and only select this replica. + if (is_local && !cur_replica.is_filter_) { + for (int64_t j = 0; j < candi_replicas.count(); ++j) { + candi_replicas.at(i).is_filter_ = true; + } + cur_replica.is_filter_ = false; + need_break = true; + } } } return ret; } -int ObRoutePolicy::calculate_replica_priority(const ObLSID &ls_id, +int ObRoutePolicy::calculate_replica_priority(const ObAddr &local_server, + const ObLSID &ls_id, ObIArray& candi_replicas, ObRoutePolicyCtx &ctx) { @@ -102,7 +116,7 @@ int ObRoutePolicy::calculate_replica_priority(const ObLSID &ls_id, LOG_WARN("not init", K(ret)); } else if (candi_replicas.count() <= 1) {//do nothing } else if (WEAK == ctx.consistency_level_) { - if (OB_FAIL(filter_replica(ls_id, candi_replicas, ctx))) { + if (OB_FAIL(filter_replica(local_server, ls_id, candi_replicas, ctx))) { LOG_WARN("fail to filter replicas", K(candi_replicas), K(ctx), K(ret)); } else if (OB_FAIL(weak_sort_replicas(candi_replicas, ctx))) { LOG_WARN("fail to sort replicas", K(candi_replicas), K(ctx), K(ret)); diff --git a/src/sql/optimizer/ob_route_policy.h b/src/sql/optimizer/ob_route_policy.h index 39ddbff2c8..f4ced6ed2a 100644 --- a/src/sql/optimizer/ob_route_policy.h +++ b/src/sql/optimizer/ob_route_policy.h @@ -170,7 +170,8 @@ public: {} ~ObRoutePolicy() {} int init(); - int calculate_replica_priority(const share::ObLSID &ls_id, + int calculate_replica_priority(const ObAddr &local_server, + const share::ObLSID &ls_id, common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx); int init_candidate_replicas(common::ObIArray &candi_replicas); @@ -212,7 +213,8 @@ protected: int get_merge_status(const share::ObServerLocality &candi_locality, CandidateReplica &candi_replica); int get_zone_status(const share::ObServerLocality &candi_locality, CandidateReplica &candi_replica); - int filter_replica(const share::ObLSID &ls_id, + int filter_replica(const ObAddr &local_server, + const share::ObLSID &ls_id, common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx); int weak_sort_replicas(common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx);