diff --git a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp index 0b942ae0a..aa0f01910 100644 --- a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp @@ -407,6 +407,7 @@ inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& conte // If a request is divided into multiple MySQL packages, each MySQL package will also set the re-routing flag ObMySQLRawPacket *input_packet = reinterpret_cast(ipacket); input_packet->set_can_reroute_pkt(pkt20->get_flags().is_proxy_reroute()); + input_packet->set_is_weak_read(pkt20->get_flags().is_weak_read()); input_packet->set_extra_info(pkt20->get_extra_info()); context.reset(); // set again for sending response diff --git a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_struct.h b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_struct.h index 3eff9ade1..538473c8f 100644 --- a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_struct.h +++ b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_struct.h @@ -30,6 +30,7 @@ union Ob20ProtocolFlags bool is_last_packet() const { return 1 == st_flags_.OB_IS_LAST_PACKET; } bool is_proxy_reroute() const { return 1 == st_flags_.OB_IS_PROXY_REROUTE; } bool is_new_extra_info() const { return 1 == st_flags_.OB_IS_NEW_EXTRA_INFO; } + bool is_weak_read() const { return 1 == st_flags_.OB_IS_WEAK_READ; } uint32_t flags_; struct Protocol20Flags @@ -38,7 +39,8 @@ union Ob20ProtocolFlags uint32_t OB_IS_LAST_PACKET: 1; uint32_t OB_IS_PROXY_REROUTE: 1; uint32_t OB_IS_NEW_EXTRA_INFO: 1; - uint32_t OB_FLAG_RESERVED_NOT_USE: 28; + uint32_t OB_IS_WEAK_READ: 1; + uint32_t OB_FLAG_RESERVED_NOT_USE: 27; } st_flags_; }; diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h index 986ef8fd1..bf0706630 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h @@ -459,12 +459,14 @@ class ObMySQLRawPacket public: ObMySQLRawPacket() : ObMySQLPacket(), cmd_(COM_MAX_NUM), can_reroute_pkt_(false), + is_weak_read_(false), extra_info_() {} explicit ObMySQLRawPacket(obmysql::ObMySQLCmd cmd) : ObMySQLPacket(), cmd_(cmd), can_reroute_pkt_(false), + is_weak_read_(false), extra_info_() {} @@ -479,6 +481,9 @@ public: inline void set_can_reroute_pkt(const bool can_rerute); inline bool can_reroute_pkt() const; + inline void set_is_weak_read(const bool v) { is_weak_read_ = v; } + inline bool is_weak_read() const { return is_weak_read_; } + inline void set_extra_info(const Ob20ExtraInfo &extra_info) { extra_info_ = extra_info; } inline const Ob20ExtraInfo &get_extra_info() const { return extra_info_; } bool exist_trace_info() const { return extra_info_.exist_trace_info_; } @@ -489,6 +494,7 @@ public: ObMySQLPacket::reset(); cmd_ = COM_MAX_NUM; can_reroute_pkt_ = false; + is_weak_read_ = false; extra_info_.reset(); } @@ -497,10 +503,11 @@ public: ObMySQLPacket::assign(other); cmd_ = other.cmd_; can_reroute_pkt_ = other.can_reroute_pkt_; + is_weak_read_ = other.is_weak_read_; extra_info_ = other.extra_info_; } - TO_STRING_KV("header", hdr_, "can_reroute", can_reroute_pkt_); + TO_STRING_KV("header", hdr_, "can_reroute", can_reroute_pkt_, "weak_read", is_weak_read_); protected: virtual int serialize(char*, const int64_t, int64_t&) const; @@ -509,6 +516,7 @@ private: private: ObMySQLCmd cmd_; bool can_reroute_pkt_; + bool is_weak_read_; Ob20ExtraInfo extra_info_; }; diff --git a/src/observer/mysql/obmp_base.cpp b/src/observer/mysql/obmp_base.cpp index d879f7a23..0778fc0f4 100644 --- a/src/observer/mysql/obmp_base.cpp +++ b/src/observer/mysql/obmp_base.cpp @@ -349,7 +349,8 @@ int ObMPBase::init_process_var(sql::ObSqlCtx &ctx, // 并且还没开启事务时,这条sql才能二次路由 ctx.can_reroute_sql_ = (pkt.can_reroute_pkt() && get_conn()->is_support_proxy_reroute()); } - LOG_TRACE("recorded sql reroute flag", K(ctx.can_reroute_sql_)); + ctx.is_protocol_weak_read_ = pkt.is_weak_read(); + LOG_TRACE("protocol flag info", K(ctx.can_reroute_sql_), K(ctx.is_protocol_weak_read_)); } return ret; } diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 4f52ee817..f8e92fec0 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -140,6 +140,7 @@ ObServer::ObServer() ob_service_(gctx_), multi_tenant_(), vt_data_service_(root_service_, self_addr_, &config_), weak_read_service_(), + bl_service_(ObBLService::get_instance()), table_service_(), cgroup_ctrl_(), start_time_(ObTimeUtility::current_time()), @@ -288,6 +289,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg) LOG_ERROR("init ts mgr failed", KR(ret)); } else if (OB_FAIL(weak_read_service_.init(net_frame_.get_req_transport()))) { LOG_ERROR("init weak_read_service failed", KR(ret)); + } else if (OB_FAIL(bl_service_.init())) { + LOG_ERROR("init bl_service_ failed", KR(ret)); } else if (OB_FAIL(ObDeviceManager::get_instance().init_devices_env())) { LOG_ERROR("init device manager failed", KR(ret)); } else if (OB_FAIL(ObTenantMutilAllocatorMgr::get_instance().init())) { @@ -488,6 +491,10 @@ void ObServer::destroy() weak_read_service_.destroy(); FLOG_INFO("weak read service destroyed"); + FLOG_INFO("begin to destroy blacklist service"); + bl_service_.destroy(); + FLOG_INFO("blacklist service destroyed"); + FLOG_INFO("begin to destroy net frame"); net_frame_.destroy(); FLOG_INFO("net frame destroyed"); @@ -637,6 +644,12 @@ int ObServer::start() FLOG_INFO("success to start weak read service"); } + if (FAILEDx(bl_service_.start())) { + LOG_ERROR("fail to start blacklist service", KR(ret)); + } else { + FLOG_INFO("success to start blacklist service"); + } + // do not wait clog replay over, avoid blocking other module if (FAILEDx(root_service_monitor_.start())) { LOG_ERROR("fail to start root service monitor", KR(ret)); @@ -971,6 +984,10 @@ int ObServer::stop() weak_read_service_.stop(); FLOG_INFO("weak read service stopped"); + FLOG_INFO("begin to stop blacklist service"); + bl_service_.stop(); + FLOG_INFO("blacklist service stopped"); + FLOG_INFO("begin to stop ts mgr"); OB_TS_MGR.stop(); FLOG_INFO("ts mgr stopped"); @@ -1209,6 +1226,10 @@ int ObServer::wait() weak_read_service_.wait(); FLOG_INFO("wait weak read service success"); + FLOG_INFO("begin to wait blacklist service"); + bl_service_.wait(); + FLOG_INFO("wait blacklist service success"); + FLOG_INFO("begin to wait server checkpoint slog handler"); ObServerCheckpointSlogHandler::get_instance().wait(); FLOG_INFO("wait server checkpoint slog handler success"); diff --git a/src/observer/ob_server.h b/src/observer/ob_server.h index 7123a0db9..c29f4ee15 100644 --- a/src/observer/ob_server.h +++ b/src/observer/ob_server.h @@ -35,6 +35,7 @@ #include "pl/ob_pl.h" #include "storage/tx/wrs/ob_weak_read_service.h" // ObWeakReadService +#include "storage/tx/wrs/ob_black_list.h" // ObBLService #include "storage/ob_long_ops_monitor.h" #include "storage/ob_partition_component_factory.h" @@ -370,6 +371,8 @@ private: // Weakly Consistent Read Service transaction::ObWeakReadService weak_read_service_; + // blacklist service + transaction::ObBLService &bl_service_; // table service ObTableService table_service_; diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index 831bd844f..da7e66c1c 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -27,6 +27,7 @@ #include "observer/mysql/ob_mysql_request_manager.h" #include "share/ob_define.h" #include "storage/tx/ob_trans_service.h" +#include "storage/tx/wrs/ob_weak_read_util.h" using namespace oceanbase::observer; using namespace oceanbase::common; @@ -412,7 +413,9 @@ int ObTableApiProcessorBase::setup_tx_snapshot_(transaction::ObTxDesc &trans_des } } else { SCN weak_read_snapshot; - if (OB_FAIL(txs->get_weak_read_snapshot_version(weak_read_snapshot))) { + if (OB_FAIL(txs->get_weak_read_snapshot_version( + transaction::ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID()), + weak_read_snapshot))) { LOG_WARN("fail to get weak read snapshot", K(ret)); } else { tx_snapshot_.init_weak_read(weak_read_snapshot); diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index a431b5118..a44cb0ef1 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -653,7 +653,7 @@ DEF_STR_WITH_CHECKER(_rpc_checksum, OB_CLUSTER_PARAMETER, "Force", DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]", "transaction rpc timeout(s). Range: [0s, 3600s]", ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "False", +DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "True", "enable early lock release", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); @@ -1317,4 +1317,4 @@ DEF_BOOL(enable_cgroup, OB_CLUSTER_PARAMETER, "True", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE)); DEF_TIME(_ob_plan_cache_auto_flush_interval, OB_CLUSTER_PARAMETER, "0s", "[0s,)", "time interval for auto periodic flush plan cache. Range: [0s, +∞)", - ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); \ No newline at end of file + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/share/schema/ob_schema_service_sql_impl.cpp b/src/share/schema/ob_schema_service_sql_impl.cpp index c3007f966..8b595bd87 100644 --- a/src/share/schema/ob_schema_service_sql_impl.cpp +++ b/src/share/schema/ob_schema_service_sql_impl.cpp @@ -5153,6 +5153,8 @@ int ObSchemaServiceSQLImpl::sort_table_partition_info( LOG_WARN("fail to mock partition array", KR(ret), K(table_schema)); } else if (OB_FAIL(sort_subpartition_array(table_schema))) { LOG_WARN("failed to sort subpartition array", KR(ret), K(table_schema)); + } else if (OB_FAIL(table_schema.build_list_idx_hash_array())) { + LOG_WARN("fail to build list idx hash array", KR(ret)); } } LOG_TRACE("fetch partition info", KR(ret), K(table_schema)); diff --git a/src/share/schema/ob_schema_struct.cpp b/src/share/schema/ob_schema_struct.cpp index 73d774614..4d6253f82 100644 --- a/src/share/schema/ob_schema_struct.cpp +++ b/src/share/schema/ob_schema_struct.cpp @@ -2920,6 +2920,35 @@ OB_DEF_SERIALIZE_SIZE(ObPrimaryZone) return len; } +ObNewRowKey::ObNewRowKey(const common::ObNewRow &row) +{ + row_ = &row; + hash_val_ = 0; + for (int64_t i = 0; i < row.get_count(); i++) { + const ObObj &obj = row.get_cell(i); + uint64_t tmp_hash_val = obj.hash(); + hash_val_ = common::murmurhash(&tmp_hash_val, sizeof(uint64_t), hash_val_); + } +} + +uint64_t ObNewRowKey::hash() const +{ + return hash_val_; +} + +bool ObNewRowKey::operator==(const ObNewRowKey &other) const +{ + bool bret = false; + if (OB_ISNULL(row_) && OB_ISNULL(other.row_)) { + bret = true; + } else if (OB_NOT_NULL(row_) && OB_NOT_NULL(other.row_)) { + bret = (*(row_) == *(other.row_)); + } else { + bret = false; + } + return bret; +} + /*------------------------------------------------------------------------------------------------- * ------------------------------ObPartitionSchema------------------------------------------- ----------------------------------------------------------------------------------------------------*/ @@ -2977,6 +3006,7 @@ void ObPartitionSchema::reuse_partition_schema() partition_array_ = NULL; partition_array_capacity_ = 0; partition_num_ = 0; + list_idx_hash_array_ = NULL; def_subpartition_array_ = NULL; def_subpartition_num_ = 0; def_subpartition_array_capacity_ = 0; @@ -3079,12 +3109,114 @@ int ObPartitionSchema::assign_partition_schema(const ObPartitionSchema &src_sche LOG_WARN("fail to set transition point", K(ret)); } else if (OB_FAIL(set_interval_range(src_schema.get_interval_range()))) { LOG_WARN("fail to set interval range", K(ret)); + } else if (OB_NOT_NULL(src_schema.list_idx_hash_array_) + && OB_FAIL(build_list_idx_hash_array())) { + LOG_WARN("fail to build list idx hash_array", KR(ret)); } } return ret; } +int ObPartitionSchema::build_list_idx_hash_array() +{ + int ret = OB_SUCCESS; + ObPartitionLevel part_level = get_part_level(); + if ((PARTITION_LEVEL_ONE == part_level + || PARTITION_LEVEL_TWO == part_level) + && is_list_part()) { + + ObPartition **part_array = get_part_array(); + const int64_t partition_num = get_partition_num(); + int64_t list_value_cnt = 0; + if (OB_ISNULL(part_array) + || partition_num <= 0 + || OB_NOT_NULL(list_idx_hash_array_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid arg", KR(ret), KP(part_array), K(partition_num), KP(list_idx_hash_array_)); + } else if (OB_FAIL(get_list_part_value_cnt_(list_value_cnt))) { + LOG_WARN("fail to get list value cnt", KR(ret), KPC(this)); + } else if (list_value_cnt <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid list_value_cnt", KR(ret), K(list_value_cnt)); + } else { + int64_t mem_size = get_list_idx_hash_array_mem_size_(list_value_cnt); + char *buf = NULL; + if (OB_ISNULL(buf = static_cast(alloc(mem_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc mem failed", KR(ret), K(mem_size), K(list_value_cnt)); + } else if (OB_ISNULL(list_idx_hash_array_ = new (buf) ListIdxHashArray(mem_size))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to new ListIdxHashArray", KR(ret), K(mem_size), K(list_value_cnt)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < partition_num; i++) { + ObPartition *part = part_array[i]; + if (OB_ISNULL(part)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("part is null", KR(ret), K(i)); + } else if (part->get_list_row_values().count() <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("list value is empty", KR(ret), KPC(part)); + } + for (int64_t j = 0; OB_SUCC(ret) && j < part->get_list_row_values().count(); j++) { + const ObNewRow &row = part->get_list_row_values().at(j); + void *tmp_buf = NULL; + ObNewRowValue *value = NULL; + if (OB_ISNULL(tmp_buf = alloc(sizeof(ObNewRowValue)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc mem failed", KR(ret)); + } else if (OB_ISNULL(value = new (tmp_buf) ObNewRowValue(row, i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to new ObNewRowValue", KR(ret)); + } else if (OB_FAIL(list_idx_hash_array_->set_refactored(value->get_key(), value))) { + LOG_WARN("fail to set refactored", KR(ret), K(row)); + } + } // end for + } // end for + } + } + } + return ret; +} + +int ObPartitionSchema::get_list_idx_hash_array_mem_size_(const int64_t cnt) const +{ + return common::max(ListIdxHashArray::MIN_HASH_ARRAY_ITEM_COUNT, cnt * 2) * sizeof(void*) + + sizeof(ListIdxHashArray); +} + +int ObPartitionSchema::get_list_part_value_cnt_(int64_t &count) +{ + int ret = OB_SUCCESS; + ObPartitionLevel part_level = get_part_level(); + count = 0; + if ((PARTITION_LEVEL_ONE == part_level + || PARTITION_LEVEL_TWO == part_level) + && is_list_part()) { + ObPartition **part_array = get_part_array(); + const int64_t partition_num = get_partition_num(); + if (OB_ISNULL(part_array) + || partition_num <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid arg", KR(ret), KP(part_array), K(partition_num)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < partition_num; i++) { + ObPartition *part = part_array[i]; + if (OB_ISNULL(part)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("part is null", KR(ret), K(i)); + } else if (part->get_list_row_values().count() <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("list value is empty", KR(ret), KPC(part)); + } else { + count += part->get_list_row_values().count(); + } + } // end for + } + } + return ret; +} + int ObPartitionSchema::try_assign_part_array( const share::schema::ObPartitionSchema &that) { @@ -3102,6 +3234,10 @@ int ObPartitionSchema::try_assign_part_array( partition_array_ = nullptr; partition_num_ = 0; } + if (NULL != list_idx_hash_array_) { + free(list_idx_hash_array_); + list_idx_hash_array_ = NULL; + } part_level_ = that.get_part_level(); part_option_ = that.get_part_option(); if (OB_FAIL(part_option_.get_err_ret())) { @@ -4528,6 +4664,11 @@ int64_t ObTablegroupSchema::get_convert_size() const hidden_partition_array_, hidden_partition_num_); convert_size += transition_point_.get_deep_copy_size(); convert_size += interval_range_.get_deep_copy_size(); + if (OB_NOT_NULL(list_idx_hash_array_)) { + int64_t cnt = list_idx_hash_array_->item_count(); + convert_size += get_list_idx_hash_array_mem_size_(cnt); + convert_size += cnt * sizeof(ObNewRowValue); + } return convert_size; } @@ -6316,8 +6457,10 @@ int ObPartitionUtils::get_tablet_and_part_id( KR(ret), K(range), K(table_id)); } } else if (table_schema.is_list_part()) { + const ListIdxHashArray *list_idx_hash_array = table_schema.get_list_idx_hash_array(); if (OB_FAIL(ObPartitionUtils::get_list_tablet_and_part_id_( - range, part_array, part_num, partition_indexes))) { + range, part_array, part_num, list_idx_hash_array, + partition_indexes))) { LOG_WARN("fail to fill list tablet_id and part_id", KR(ret), K(range), K(table_id)); } @@ -6376,8 +6519,9 @@ int ObPartitionUtils::get_tablet_and_part_id( KR(ret), K(row), K(table_id)); } } else if (table_schema.is_list_part()) { + const ListIdxHashArray *list_idx_hash_array = table_schema.get_list_idx_hash_array(); if (OB_FAIL(ObPartitionUtils::get_list_tablet_and_part_id_( - row, part_array, part_num, partition_indexes))) { + row, part_array, part_num, list_idx_hash_array, partition_indexes))) { LOG_WARN("fail to fill list tablet_id and part_id", KR(ret), K(row), K(table_id)); } @@ -6797,6 +6941,7 @@ int ObPartitionUtils::get_list_tablet_and_part_id_( const common::ObNewRange &range, ObPartition * const* partition_array, const int64_t partition_num, + const ListIdxHashArray *list_idx_hash_array, common::ObIArray &indexes) { int ret = OB_SUCCESS; @@ -6817,7 +6962,7 @@ int ObPartitionUtils::get_list_tablet_and_part_id_( row.cells_ = const_cast(range.start_key_.get_obj_ptr()); row.count_ = range.start_key_.get_obj_cnt(); if (OB_FAIL(get_list_tablet_and_part_id_( - row, partition_array, partition_num, indexes))) { + row, partition_array, partition_num, list_idx_hash_array, indexes))) { LOG_WARN("fail to get list tablet and part_id", KR(ret), K(row)); } } @@ -6904,6 +7049,7 @@ int ObPartitionUtils::get_list_tablet_and_part_id_( const common::ObNewRow &row, ObPartition * const* partition_array, const int64_t partition_num, + const ListIdxHashArray *list_idx_hash_array, common::ObIArray &indexes) { int ret = OB_SUCCESS; @@ -6916,21 +7062,65 @@ int ObPartitionUtils::get_list_tablet_and_part_id_( } else { int64_t part_idx = OB_INVALID_INDEX; int64_t default_value_idx = OB_INVALID_INDEX; - for (int64_t i = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && i < partition_num; i++) { - const ObIArray &list_row_values = partition_array[i]->get_list_row_values(); - for (int64_t j = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && j < list_row_values.count(); j++) { - const ObNewRow &list_row = list_row_values.at(j); - if (row == list_row) { - part_idx = i; + if (OB_NOT_NULL(list_idx_hash_array)) { + // new logic, use hash array + ObNewRowKey key(row); + const ObNewRowValue *value = NULL; + int hash_ret = list_idx_hash_array->get_refactored(key, value); + if (OB_SUCCESS == hash_ret) { + if (OB_ISNULL(value)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("value should not be null", KR(ret), K(row)); + } else { + part_idx = value->get_part_idx(); + LOG_TRACE("hit list idx hash array", K(row), K(part_idx)); } - } // end for - if (list_row_values.count() == 1 - && list_row_values.at(0).get_count() >= 1 - && list_row_values.at(0).get_cell(0).is_max_value()) { - // calc default value position - default_value_idx = i; + } else if (OB_HASH_NOT_EXIST == hash_ret) { + // try use default value + static int64_t ROW_KEY_CNT = 1; + ObObj obj_array[ROW_KEY_CNT]; + obj_array[0].set_max_value(); + ObNewRow default_row(obj_array, 1); + ObNewRowKey default_key(default_row); + hash_ret = list_idx_hash_array->get_refactored(default_key, value); + if (OB_SUCCESS == hash_ret) { + if (OB_ISNULL(value)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("value should not be null", KR(ret), K(default_row)); + } else { + default_value_idx = value->get_part_idx(); + LOG_TRACE("not hit, but hit default value in list idx hash array", K(row), K(default_value_idx)); + } + } else if (OB_HASH_NOT_EXIST == hash_ret) { + // do nothing + LOG_TRACE("not hit any values in list idx hash array", K(row)); + } else { + ret = hash_ret; + LOG_WARN("fail to get value from hash array", KR(ret), K(default_row)); + } + } else { + ret = hash_ret; + LOG_WARN("fail to get value from hash array", KR(ret), K(row)); } - } // end dor + + } else { + // old logic, iterate all partition + for (int64_t i = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && i < partition_num; i++) { + const ObIArray &list_row_values = partition_array[i]->get_list_row_values(); + for (int64_t j = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && j < list_row_values.count(); j++) { + const ObNewRow &list_row = list_row_values.at(j); + if (row == list_row) { + part_idx = i; + } + } // end for + if (list_row_values.count() == 1 + && list_row_values.at(0).get_count() >= 1 + && list_row_values.at(0).get_cell(0).is_max_value()) { + // calc default value position + default_value_idx = i; + } + } // end dor + } if (OB_SUCC(ret)) { const ObPartition *partition = NULL; diff --git a/src/share/schema/ob_schema_struct.h b/src/share/schema/ob_schema_struct.h index dfb1d9318..f606f1479 100644 --- a/src/share/schema/ob_schema_struct.h +++ b/src/share/schema/ob_schema_struct.h @@ -28,6 +28,7 @@ #include "share/ob_priv_common.h" #include "lib/worker.h" #include "objit/common/ob_item_type.h" +#include "lib/hash/ob_pointer_hashmap.h" #ifdef __cplusplus extern "C" { @@ -2093,6 +2094,54 @@ private: int64_t subpart_idx_; }; +class ObNewRowKey +{ +public: + ObNewRowKey() : row_(NULL), hash_val_(0) {} + ObNewRowKey(const common::ObNewRow &row); + ~ObNewRowKey() {}; + uint64_t hash() const; + bool operator==(const ObNewRowKey &other) const; +private: + const common::ObNewRow *row_; + uint64_t hash_val_; +}; + +class ObNewRowValue +{ +public: + ObNewRowValue() : key_(), part_idx_(common::OB_INVALID_INDEX) {} + ObNewRowValue(const common::ObNewRow &row, const int64_t part_idx) + : key_(row), part_idx_(part_idx) {} + ~ObNewRowValue() {}; + ObNewRowKey get_key() const { return key_; } + int64_t get_part_idx() const { return part_idx_; } +private: + ObNewRowKey key_; + int64_t part_idx_; +}; + +template +struct ObGetNewRowKey +{ + void operator()(const K &k, const V &v) const + { + UNUSED(k); + UNUSED(v); + } +}; + +template<> +struct ObGetNewRowKey +{ + ObNewRowKey operator()(const ObNewRowValue *value) const + { + return value->get_key(); + } +}; + +typedef common::hash::ObPointerHashArray ListIdxHashArray; + class ObPartitionSchema : public ObSchema { OB_UNIS_VERSION(1); @@ -2304,6 +2353,7 @@ public: partition_array_capacity_ = 0; partition_num_ = 0; partition_array_ = NULL; + list_idx_hash_array_ = NULL; } int64_t get_hidden_partition_num() const { return hidden_partition_num_; } @@ -2356,6 +2406,9 @@ public: // only used for generate part_name int get_max_part_id(int64_t &part_id) const; int get_max_part_idx(int64_t &part_idx) const; + + const ListIdxHashArray *get_list_idx_hash_array() const { return list_idx_hash_array_; } + int build_list_idx_hash_array(); protected: int inner_add_partition(const ObPartition &part); template @@ -2385,6 +2438,8 @@ protected: { return (sub_part_template_flags_ & flag) > 0; } + int get_list_idx_hash_array_mem_size_(const int64_t cnt) const; + int get_list_part_value_cnt_(int64_t &count); protected: static const int64_t DEFAULT_ARRAY_CAPACITY = 128; protected: @@ -2424,6 +2479,10 @@ protected: int64_t hidden_partition_num_; common::ObRowkey transition_point_; common::ObRowkey interval_range_; + + // Record ObNewRow -> part_idx when table's firt part is list like + // won't serialize/deserialize + ListIdxHashArray *list_idx_hash_array_; }; /*TODO: Delete the following interfaces in ObTablegroupSchema and ObDatabaseSchema int ObTablegroupSchema::get_first_primary_zone_inherit() @@ -2794,6 +2853,7 @@ private: const common::ObNewRange &range, ObPartition * const* partition_array, const int64_t partition_num, + const ListIdxHashArray *list_idx_hash_array, common::ObIArray &indexes); // param[@in]: @@ -2827,6 +2887,7 @@ private: const common::ObNewRow &row, ObPartition * const* partition_array, const int64_t partition_num, + const ListIdxHashArray *list_idx_hash_array, common::ObIArray &indexes); // param[@in]: diff --git a/src/share/schema/ob_table_schema.cpp b/src/share/schema/ob_table_schema.cpp index 71249f74a..f46b93f57 100644 --- a/src/share/schema/ob_table_schema.cpp +++ b/src/share/schema/ob_table_schema.cpp @@ -590,6 +590,11 @@ int64_t ObSimpleTableSchemaV2::get_convert_size() const convert_size += link_database_name_.length() + 1; convert_size += transition_point_.get_deep_copy_size(); convert_size += interval_range_.get_deep_copy_size(); + if (OB_NOT_NULL(list_idx_hash_array_)) { + int64_t cnt = list_idx_hash_array_->item_count(); + convert_size += get_list_idx_hash_array_mem_size_(cnt); + convert_size += cnt * sizeof(ObNewRowValue); + } return convert_size; } diff --git a/src/share/system_variable/ob_sys_var_class_type.h b/src/share/system_variable/ob_sys_var_class_type.h index ae986c81b..7a532394b 100644 --- a/src/share/system_variable/ob_sys_var_class_type.h +++ b/src/share/system_variable/ob_sys_var_class_type.h @@ -241,6 +241,7 @@ enum ObSysVarClassType SYS_VAR_OB_ENABLE_RICH_ERROR_MSG = 10134, SYS_VAR_OB_SQL_PLAN_MEMORY_PERCENTAGE = 10135, SYS_VAR_LOG_ROW_VALUE_OPTIONS = 10136, + SYS_VAR_OB_MAX_READ_STALE_TIME = 10137, }; } diff --git a/src/share/system_variable/ob_system_variable_alias.h b/src/share/system_variable/ob_system_variable_alias.h index 9a5c0cfc9..0efbedbc8 100644 --- a/src/share/system_variable/ob_system_variable_alias.h +++ b/src/share/system_variable/ob_system_variable_alias.h @@ -236,6 +236,7 @@ namespace share static const char* const OB_SV_ENABLE_RICH_ERROR_MSG = "ob_enable_rich_error_msg"; static const char* const OB_SV_SQL_PLAN_MEMORY_PERCENTAGE = "ob_sql_plan_memory_percentage"; static const char* const OB_SV_LOG_ROW_VALUE_OPTIONS = "log_row_value_options"; + static const char* const OB_SV_MAX_READ_STALE_TIME = "ob_max_read_stale_time"; } } diff --git a/src/share/system_variable/ob_system_variable_factory.cpp b/src/share/system_variable/ob_system_variable_factory.cpp index 7b1417029..a43395e8d 100644 --- a/src/share/system_variable/ob_system_variable_factory.cpp +++ b/src/share/system_variable/ob_system_variable_factory.cpp @@ -244,6 +244,7 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_NAME[] = { "ob_interm_result_mem_limit", "ob_last_schema_version", "ob_log_level", + "ob_max_read_stale_time", "ob_org_cluster_id", "ob_pl_block_timeout", "ob_plan_cache_evict_high_percentage", @@ -467,6 +468,7 @@ const ObSysVarClassType ObSysVarFactory::SYS_VAR_IDS_SORTED_BY_NAME[] = { SYS_VAR_OB_INTERM_RESULT_MEM_LIMIT, SYS_VAR_OB_LAST_SCHEMA_VERSION, SYS_VAR_OB_LOG_LEVEL, + SYS_VAR_OB_MAX_READ_STALE_TIME, SYS_VAR_OB_ORG_CLUSTER_ID, SYS_VAR_OB_PL_BLOCK_TIMEOUT, SYS_VAR_OB_PLAN_CACHE_EVICT_HIGH_PERCENTAGE, @@ -783,7 +785,8 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_ID[] = { "_windowfunc_optimization_settings", "ob_enable_rich_error_msg", "ob_sql_plan_memory_percentage", - "log_row_value_options" + "log_row_value_options", + "ob_max_read_stale_time" }; bool ObSysVarFactory::sys_var_name_case_cmp(const char *name1, const ObString &name2) @@ -1171,6 +1174,7 @@ int ObSysVarFactory::create_all_sys_vars() + sizeof(ObSysVarObEnableRichErrorMsg) + sizeof(ObSysVarObSqlPlanMemoryPercentage) + sizeof(ObSysVarLogRowValueOptions) + + sizeof(ObSysVarObMaxReadStaleTime) ; void *ptr = NULL; if (OB_ISNULL(ptr = allocator_.alloc(total_mem_size))) { @@ -3159,6 +3163,15 @@ int ObSysVarFactory::create_all_sys_vars() ptr = (void *)((char *)ptr + sizeof(ObSysVarLogRowValueOptions)); } } + if (OB_SUCC(ret)) { + if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObMaxReadStaleTime())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to new ObSysVarObMaxReadStaleTime", K(ret)); + } else { + store_buf_[ObSysVarsToIdxMap::get_store_idx(static_cast(SYS_VAR_OB_MAX_READ_STALE_TIME))] = sys_var_ptr; + ptr = (void *)((char *)ptr + sizeof(ObSysVarObMaxReadStaleTime)); + } + } } return ret; @@ -5605,6 +5618,17 @@ int ObSysVarFactory::create_sys_var(ObSysVarClassType sys_var_id, ObBasicSysVar } break; } + case SYS_VAR_OB_MAX_READ_STALE_TIME: { + void *ptr = NULL; + if (OB_ISNULL(ptr = allocator_.alloc(sizeof(ObSysVarObMaxReadStaleTime)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to alloc memory", K(ret), K(sizeof(ObSysVarObMaxReadStaleTime))); + } else if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObMaxReadStaleTime())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to new ObSysVarObMaxReadStaleTime", K(ret)); + } + break; + } default: { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/system_variable/ob_system_variable_factory.h b/src/share/system_variable/ob_system_variable_factory.h index 9abf4320d..1e2b824ea 100644 --- a/src/share/system_variable/ob_system_variable_factory.h +++ b/src/share/system_variable/ob_system_variable_factory.h @@ -1591,6 +1591,13 @@ public: inline virtual ObSysVarClassType get_type() const { return SYS_VAR_LOG_ROW_VALUE_OPTIONS; } inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(219); } }; +class ObSysVarObMaxReadStaleTime : public ObIntSysVar +{ +public: + ObSysVarObMaxReadStaleTime() : ObIntSysVar(ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large, NULL, NULL, NULL, NULL) {} + inline virtual ObSysVarClassType get_type() const { return SYS_VAR_OB_MAX_READ_STALE_TIME; } + inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(220); } +}; class ObSysVarFactory @@ -1610,7 +1617,7 @@ public: static const common::ObString get_sys_var_name_by_id(ObSysVarClassType sys_var_id); const static int64_t MYSQL_SYS_VARS_COUNT = 97; - const static int64_t OB_SYS_VARS_COUNT = 123; + const static int64_t OB_SYS_VARS_COUNT = 124; const static int64_t ALL_SYS_VARS_COUNT = MYSQL_SYS_VARS_COUNT + OB_SYS_VARS_COUNT; const static int16_t OB_SPECIFIC_SYS_VAR_ID_OFFSET = 10000; diff --git a/src/share/system_variable/ob_system_variable_init.cpp b/src/share/system_variable/ob_system_variable_init.cpp index 7a8ca6809..45a12a384 100644 --- a/src/share/system_variable/ob_system_variable_init.cpp +++ b/src/share/system_variable/ob_system_variable_init.cpp @@ -2892,13 +2892,26 @@ static struct VarsInit{ ObSysVars[219].alias_ = "OB_SV_LOG_ROW_VALUE_OPTIONS" ; }(); + [&] (){ + ObSysVars[220].info_ = "max stale time(us) for weak read query " ; + ObSysVars[220].name_ = "ob_max_read_stale_time" ; + ObSysVars[220].data_type_ = ObIntType ; + ObSysVars[220].value_ = "5000000" ; + ObSysVars[220].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ; + ObSysVars[220].on_check_and_convert_func_ = "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large" ; + ObSysVars[220].id_ = SYS_VAR_OB_MAX_READ_STALE_TIME ; + cur_max_var_id = MAX(cur_max_var_id, static_cast(SYS_VAR_OB_MAX_READ_STALE_TIME)) ; + ObSysVarsIdToArrayIdx[SYS_VAR_OB_MAX_READ_STALE_TIME] = 220 ; + ObSysVars[220].alias_ = "OB_SV_MAX_READ_STALE_TIME" ; + }(); + if (cur_max_var_id >= ObSysVarFactory::OB_MAX_SYS_VAR_ID) { HasInvalidSysVar = true; } } }vars_init; -static int64_t var_amount = 220; +static int64_t var_amount = 221; int64_t ObSysVariables::get_all_sys_var_count(){ return ObSysVarFactory::ALL_SYS_VARS_COUNT;} ObSysVarClassType ObSysVariables::get_sys_var_id(int64_t i){ return ObSysVars[i].id_;} diff --git a/src/share/system_variable/ob_system_variable_init.json b/src/share/system_variable/ob_system_variable_init.json index ec55550f5..46e0deb4a 100644 --- a/src/share/system_variable/ob_system_variable_init.json +++ b/src/share/system_variable/ob_system_variable_init.json @@ -2921,5 +2921,18 @@ "info_cn": "", "background_cn": "", "ref_url": "https://yuque.antfin-inc.com/ob/product_functionality_review/wan7iw4mox8vgkfm" + }, + "ob_max_read_stale_time": { + "id": 10137, + "name": "ob_max_read_stale_time", + "value": "5000000", + "data_type": "int", + "info": "max stale time(us) for weak read query ", + "flags": "GLOBAL | SESSION | NEED_SERIALIZE", + "on_check_and_convert_func": "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large", + "publish_version": "410", + "info_cn": "", + "background_cn": "", + "ref_url": "https://yuque.antfin.com/ob/product_functionality_review/ns7okery0azyvlnp" } } diff --git a/src/sql/code_generator/ob_tsc_cg_service.cpp b/src/sql/code_generator/ob_tsc_cg_service.cpp index 171e3770f..9481b13c0 100644 --- a/src/sql/code_generator/ob_tsc_cg_service.cpp +++ b/src/sql/code_generator/ob_tsc_cg_service.cpp @@ -949,10 +949,15 @@ int ObTscCgService::generate_table_loc_meta(uint64_t table_loc_id, loc_meta.ref_table_id_ = table_schema.get_table_id(); loc_meta.is_dup_table_ = table_schema.is_duplicate_table(); bool is_weak_read = false; - if (stmt.get_query_ctx()->has_dml_write_stmt_) { + if (OB_ISNULL(cg_.opt_ctx_) || OB_ISNULL(cg_.opt_ctx_->get_exec_ctx())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(cg_.opt_ctx_), K(ret)); + } else if (stmt.get_query_ctx()->has_dml_write_stmt_) { loc_meta.select_leader_ = 1; loc_meta.is_weak_read_ = 0; - } else if (OB_FAIL(ObTableLocation::get_is_weak_read(stmt, &session, is_weak_read))) { + } else if (OB_FAIL(ObTableLocation::get_is_weak_read(stmt, &session, + cg_.opt_ctx_->get_exec_ctx()->get_sql_ctx(), + is_weak_read))) { LOG_WARN("get is weak read failed", K(ret)); } else if (is_weak_read) { loc_meta.is_weak_read_ = 1; diff --git a/src/sql/das/ob_das_location_router.cpp b/src/sql/das/ob_das_location_router.cpp index 414a730c3..526b30d54 100644 --- a/src/sql/das/ob_das_location_router.cpp +++ b/src/sql/das/ob_das_location_router.cpp @@ -20,12 +20,14 @@ #include "share/schema/ob_multi_version_schema_service.h" #include "share/schema/ob_schema_utils.h" #include "sql/das/ob_das_utils.h" +#include "storage/tx/wrs/ob_black_list.h" namespace oceanbase { using namespace common; using namespace share; using namespace share::schema; +using namespace transaction; namespace sql { OB_SERIALIZE_MEMBER(DASRelatedTabletMap::MapEntry, @@ -630,14 +632,24 @@ int ObDASLocationRouter::nonblock_get_readable_replica(const uint64_t tenant_id, } if (OB_UNLIKELY(tablet_loc.need_refresh_)){ + ObAddr strong_leader; + ObBLKey bl_key; + bool in_black_list = true; for (int64_t i = 0; OB_SUCC(ret) && !is_found && i < ls_loc.get_replica_locations().count(); ++i) { const ObLSReplicaLocation &tmp_replica_loc = ls_loc.get_replica_locations().at(i); if (tmp_replica_loc.is_strong_leader()) { - //in version 4.0, if das task in retry, we force to choose the leader replica + strong_leader = tmp_replica_loc.get_server(); + } else if (OB_SUCC(bl_key.init(tmp_replica_loc.get_server(), tenant_id, tablet_loc.ls_id_)) + && OB_SUCC(ObBLService::get_instance().check_in_black_list(bl_key, in_black_list)) + && !in_black_list) { tablet_loc.server_ = tmp_replica_loc.get_server(); is_found = true; } } + if (!is_found && strong_leader.is_valid()) { + tablet_loc.server_ = strong_leader; + is_found = true; + } } for (int64_t i = 0; OB_SUCC(ret) && !is_found && i < ls_loc.get_replica_locations().count(); ++i) { diff --git a/src/sql/engine/ob_exec_context.cpp b/src/sql/engine/ob_exec_context.cpp index 2ca9f851a..e1dfae206 100644 --- a/src/sql/engine/ob_exec_context.cpp +++ b/src/sql/engine/ob_exec_context.cpp @@ -719,10 +719,9 @@ int ObExecContext::init_physical_plan_ctx(const ObPhysicalPlan &plan) { int ret = OB_SUCCESS; int64_t foreign_key_checks = 0; - if (OB_ISNULL(phy_plan_ctx_) || OB_ISNULL(my_session_)) { - ret = OB_NOT_INIT; - LOG_WARN("physical_plan or ctx is NULL", K_(phy_plan_ctx), K_(my_session), K(ret)); - ret = OB_ERR_UNEXPECTED; + if (OB_ISNULL(phy_plan_ctx_) || OB_ISNULL(my_session_) || OB_ISNULL(sql_ctx_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K_(phy_plan_ctx), K_(my_session), K(ret)); } else if (OB_FAIL(my_session_->get_foreign_key_checks(foreign_key_checks))) { LOG_WARN("failed to get foreign_key_checks", K(ret)); } else { @@ -747,7 +746,9 @@ int ObExecContext::init_physical_plan_ctx(const ObPhysicalPlan &plan) } if (OB_SUCC(ret)) { if (stmt::T_SELECT == plan.get_stmt_type()) { // select才有weak - if (OB_UNLIKELY(phy_plan_hint.read_consistency_ != INVALID_CONSISTENCY)) { + if (sql_ctx_->is_protocol_weak_read_) { + consistency = WEAK; + } else if (OB_UNLIKELY(phy_plan_hint.read_consistency_ != INVALID_CONSISTENCY)) { consistency = phy_plan_hint.read_consistency_; } else { consistency = my_session_->get_consistency_level(); diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 77468eace..57ae4d15d 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -938,14 +938,17 @@ int ObResultSet::get_read_consistency(ObConsistencyLevel &consistency) consistency = INVALID_CONSISTENCY; int ret = OB_SUCCESS; ObPhysicalPlan* physical_plan_ = static_cast(cache_obj_guard_.get_cache_obj()); - if (OB_ISNULL(physical_plan_)) { - ret = OB_NOT_INIT; - LOG_WARN("physical_plan", K_(physical_plan), K(ret)); - ret = OB_ERR_UNEXPECTED; + if (OB_ISNULL(physical_plan_) + || OB_ISNULL(exec_ctx_) + || OB_ISNULL(exec_ctx_->get_sql_ctx())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("physical_plan", K_(physical_plan), K(exec_ctx_->get_sql_ctx()), K(ret)); } else { const ObPhyPlanHint &phy_hint = physical_plan_->get_phy_plan_hint(); if (stmt::T_SELECT == stmt_type_) { // select才有weak - if (OB_UNLIKELY(phy_hint.read_consistency_ != INVALID_CONSISTENCY)) { + if (exec_ctx_->get_sql_ctx()->is_protocol_weak_read_) { + consistency = WEAK; + } else if (OB_UNLIKELY(phy_hint.read_consistency_ != INVALID_CONSISTENCY)) { consistency = phy_hint.read_consistency_; } else { consistency = my_session_.get_consistency_level(); diff --git a/src/sql/ob_sql_context.cpp b/src/sql/ob_sql_context.cpp index 581e3cfe4..fb7014ede 100644 --- a/src/sql/ob_sql_context.cpp +++ b/src/sql/ob_sql_context.cpp @@ -230,6 +230,7 @@ ObSqlCtx::ObSqlCtx() cur_plan_(nullptr), can_reroute_sql_(false), is_sensitive_(false), + is_protocol_weak_read_(false), flashback_query_expr_(nullptr), is_execute_call_stmt_(false), reroute_info_(nullptr) @@ -267,6 +268,7 @@ void ObSqlCtx::reset() is_ddl_from_primary_ = false; can_reroute_sql_ = false; is_sensitive_ = false; + is_protocol_weak_read_ = false; if (nullptr != reroute_info_) { reroute_info_->reset(); op_reclaim_free(reroute_info_); diff --git a/src/sql/ob_sql_context.h b/src/sql/ob_sql_context.h index 89ac557e6..0933de3e1 100644 --- a/src/sql/ob_sql_context.h +++ b/src/sql/ob_sql_context.h @@ -495,6 +495,7 @@ public: bool can_reroute_sql_; // 是否可以重新路由 bool is_sensitive_; // 是否含有敏感信息,若有则不记入 sql_audit + bool is_protocol_weak_read_; // record whether proxy set weak read for this request in protocol flag common::ObFixedArray multi_stmt_rowkey_pos_; ObRawExpr *flashback_query_expr_; ObSpmCacheCtx spm_ctx_; diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 2b9d84ab7..70cf3582a 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -35,6 +35,7 @@ #include "observer/ob_server_struct.h" #include "observer/ob_server.h" #include "storage/tx/wrs/ob_weak_read_util.h" //ObWeakReadUtil +#include "storage/tx_storage/ob_ls_service.h" #include "sql/das/ob_das_dml_ctx_define.h" #include "share/deadlock/ob_deadlock_detector_mgr.h" @@ -463,9 +464,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx) OZ (stmt_sanity_check_(session, plan, plan_ctx)); OZ (txs->sql_stmt_start_hook(session->get_xid(), *session->get_tx_desc(), session->get_sessid())); if (OB_SUCC(ret) - && txs->get_tx_elr_util().check_and_update_tx_elr_info( - *session->get_tx_desc(), - session->get_early_lock_release())) { + && txs->get_tx_elr_util().check_and_update_tx_elr_info(*session->get_tx_desc())) { LOG_WARN("check and update tx elr info", K(ret), KPC(session->get_tx_desc())); } uint32_t session_id = 0; @@ -576,7 +575,8 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session, auto &snapshot = das_ctx.get_snapshot(); if (cl == ObConsistencyLevel::WEAK || cl == ObConsistencyLevel::FROZEN) { SCN snapshot_version = SCN::min_scn(); - if (OB_FAIL(txs->get_weak_read_snapshot_version(snapshot_version))) { + if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(), + snapshot_version))) { TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs)); } else { snapshot.init_weak_read(snapshot_version); @@ -989,5 +989,53 @@ void ObSqlTransControl::clear_xa_branch(const ObXATransID &xid, ObTxDesc *&tx_de MTL(transaction::ObXAService *)->clear_xa_branch(xid, tx_desc); } + +int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id, + const share::ObLSID &ls_id, + const common::ObAddr &addr, + const int64_t max_stale_time_ns, + bool &can_read) +{ + int ret = OB_SUCCESS; + can_read = false; + + if (!ls_id.is_valid() + || !addr.is_valid() + || max_stale_time_ns <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_ns)); + } else if (observer::ObServer::get_instance().get_self() == addr) { + // distribute plan and check black list + ObBLKey blk; + bool in_black_list = false; + if (OB_FAIL(blk.init(addr, tenant_id, ls_id))) { + LOG_WARN("ObBLKey init error", K(ret), K(addr), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(ObBLService::get_instance().check_in_black_list(blk, in_black_list))) { + LOG_WARN("check in black list error", K(ret), K(blk)); + } else { + can_read = (in_black_list ? false : true); + } + } else { + storage::ObLSService *ls_svr = MTL(storage::ObLSService *); + storage::ObLSHandle handle; + ObLS *ls = nullptr; + + if (OB_ISNULL(ls_svr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("log stream service is NULL", K(ret)); + } else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::TRANS_MOD))) { + LOG_WARN("get id service log stream failed"); + } else if (OB_ISNULL(ls = handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("id service log stream not exist"); + } else if (ObTimeUtility::current_time() - max_stale_time_ns / 1000 + < ls->get_ls_wrs_handler()->get_ls_weak_read_ts().convert_to_ts()) { + can_read = true; + } + } + + return ret; +} + }/* ns sql*/ }/* ns oceanbase */ diff --git a/src/sql/ob_sql_trans_control.h b/src/sql/ob_sql_trans_control.h index 3a231b8f3..695885256 100644 --- a/src/sql/ob_sql_trans_control.h +++ b/src/sql/ob_sql_trans_control.h @@ -238,6 +238,11 @@ public: const uint64_t table_id, const transaction::tablelock::ObTableLockMode lock_mode); static void clear_xa_branch(const transaction::ObXATransID &xid, transaction::ObTxDesc *&tx_desc); + static int check_ls_readable(const uint64_t tenant_id, + const share::ObLSID &ls_id, + const common::ObAddr &addr, + const int64_t max_stale_time_ns, + bool &can_read); private: DISALLOW_COPY_AND_ASSIGN(ObSqlTransControl); static int get_trans_expire_ts(const ObSQLSessionInfo &session, diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 506d95f29..86481a6c6 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -2330,9 +2330,13 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx, if (OB_FAIL(ret)) { } else if (is_weak && !exec_ctx.get_my_session()->get_is_in_retry()) { + int64_t max_read_stale_time = exec_ctx.get_my_session()->get_ob_max_read_stale_time(); + uint64_t tenant_id = exec_ctx.get_my_session()->get_effective_tenant_id(); if (OB_FAIL(ObLogPlan::weak_select_replicas(local_server, static_cast(route_policy_type), proxy_priority_hit_support, + tenant_id, + max_read_stale_time, phy_tbl_loc_info_list, is_hit_partition, follower_first_feedback))) { LOG_WARN("fail to weak select intersect replicas", K(ret), K(local_server), K(phy_tbl_loc_info_list.count())); @@ -2443,6 +2447,8 @@ int ObLogPlan::strong_select_replicas(const ObAddr &local_server, int ObLogPlan::weak_select_replicas(const ObAddr &local_server, ObRoutePolicyType route_type, bool proxy_priority_hit_support, + uint64_t tenant_id, + int64_t max_read_stale_time, ObIArray &phy_tbl_loc_info_list, bool &is_hit_partition, ObFollowerFirstFeedbackType &follower_first_feedback) @@ -2457,6 +2463,8 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server, route_policy_ctx.policy_type_ = route_type; route_policy_ctx.consistency_level_ = WEAK; route_policy_ctx.is_proxy_priority_hit_support_ = proxy_priority_hit_support; + route_policy_ctx.max_read_stale_time_ = max_read_stale_time; + route_policy_ctx.tenant_id_ = tenant_id; if (OB_FAIL(route_policy.init())) { LOG_WARN("fail to init route policy", K(ret)); @@ -2474,7 +2482,9 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server, ObIArray &replica_array = phy_part_loc_info.get_partition_location().get_replica_locations(); if (OB_FAIL(route_policy.init_candidate_replicas(replica_array))) { LOG_WARN("fail to init candidate replicas", K(replica_array), K(ret)); - } else if (OB_FAIL(route_policy.calculate_replica_priority(replica_array, route_policy_ctx))) { + } else if (OB_FAIL(route_policy.calculate_replica_priority(phy_part_loc_info.get_ls_id(), + replica_array, + route_policy_ctx))) { LOG_WARN("fail to calculate replica priority", K(replica_array), K(route_policy_ctx), K(ret)); } else if (OB_FAIL(route_policy.select_replica_with_priority(route_policy_ctx, replica_array, phy_part_loc_info))) { LOG_WARN("fail to select replica", K(replica_array), K(ret)); @@ -12519,4 +12529,4 @@ int ObLogPlan::allocate_material_for_recursive_cte_plan(ObIArray &phy_tbl_loc_info_list, bool &is_hit_partition, share::ObFollowerFirstFeedbackType &follower_first_feedback); diff --git a/src/sql/optimizer/ob_phy_table_location_info.h b/src/sql/optimizer/ob_phy_table_location_info.h index d67a83e34..d57ed8c95 100644 --- a/src/sql/optimizer/ob_phy_table_location_info.h +++ b/src/sql/optimizer/ob_phy_table_location_info.h @@ -58,7 +58,7 @@ public: inline common::ObTabletID get_tablet_id() const { return tablet_id_; } - inline share::ObLSID get_ls_id() const { return ls_id_; } + inline const share::ObLSID &get_ls_id() const { return ls_id_; } inline int64_t get_renew_time() const { return renew_time_; } @@ -90,7 +90,7 @@ public: int add_priority_replica_idx(int64_t priority_replica_idx); int64_t get_selected_replica_idx() const { return selected_replica_idx_; } bool has_selected_replica() const { return common::OB_INVALID_INDEX != selected_replica_idx_; } - + const share::ObLSID &get_ls_id() const { return opt_tablet_loc_.get_ls_id(); } int get_selected_replica(share::ObLSReplicaLocation &replica_loc) const; int get_selected_replica(ObRoutePolicy::CandidateReplica &replica_loc) const; int get_priority_replica(int64_t idx, share::ObLSReplicaLocation &replica_loc) const; diff --git a/src/sql/optimizer/ob_route_policy.cpp b/src/sql/optimizer/ob_route_policy.cpp index eecdec451..618edec36 100644 --- a/src/sql/optimizer/ob_route_policy.cpp +++ b/src/sql/optimizer/ob_route_policy.cpp @@ -60,26 +60,40 @@ int ObRoutePolicy::strong_sort_replicas(ObIArray& candi_replic return ret; } -int ObRoutePolicy::filter_replica(ObIArray& candi_replicas, ObRoutePolicyCtx &ctx) +int ObRoutePolicy::filter_replica(const ObLSID &ls_id, + ObIArray& candi_replicas, + ObRoutePolicyCtx &ctx) { int ret = OB_SUCCESS; ObRoutePolicyType policy_type = get_calc_route_policy_type(ctx); for (int64_t i = 0; OB_SUCC(ret) && i < candi_replicas.count(); ++i) { CandidateReplica &cur_replica = candi_replicas.at(i); - if ((policy_type == ONLY_READONLY_ZONE && cur_replica.attr_.zone_type_ == ZONE_TYPE_READWRITE) - || cur_replica.attr_.zone_status_ == ObZoneStatus::INACTIVE - || cur_replica.attr_.server_status_ != ObServerStatus::OB_SERVER_ACTIVE - || cur_replica.attr_.start_service_time_ == 0 - || cur_replica.attr_.server_stop_time_ != 0 - || (0 == cur_replica.get_property().get_memstore_percent() - && is_follower(cur_replica.get_role()))) { // 作为Follower的D副不能选择 - cur_replica.is_filter_ = true; + bool can_read = true; + if (OB_FAIL(ObSqlTransControl::check_ls_readable(ctx.tenant_id_, + ls_id, + cur_replica.get_server(), + ctx.max_read_stale_time_, + can_read))) { + LOG_WARN("fail to check ls readable", K(ctx), K(cur_replica), K(ret)); + } else { + LOG_TRACE("check ls readable", K(ctx), K(ls_id), K(cur_replica.get_server()), K(can_read)); + if ((policy_type == ONLY_READONLY_ZONE && cur_replica.attr_.zone_type_ == ZONE_TYPE_READWRITE) + || cur_replica.attr_.zone_status_ == ObZoneStatus::INACTIVE + || cur_replica.attr_.server_status_ != ObServerStatus::OB_SERVER_ACTIVE + || cur_replica.attr_.start_service_time_ == 0 + || cur_replica.attr_.server_stop_time_ != 0 + || (0 == cur_replica.get_property().get_memstore_percent() + && is_follower(cur_replica.get_role()))// 作为Follower的D副不能选择 + || !can_read) { + cur_replica.is_filter_ = true; + } } } return ret; } -int ObRoutePolicy::calculate_replica_priority(ObIArray& candi_replicas, +int ObRoutePolicy::calculate_replica_priority(const ObLSID &ls_id, + ObIArray& candi_replicas, ObRoutePolicyCtx &ctx) { int ret = OB_SUCCESS; @@ -88,7 +102,7 @@ int ObRoutePolicy::calculate_replica_priority(ObIArray& candi_ LOG_WARN("not init", K(ret)); } else if (candi_replicas.count() <= 1) {//do nothing } else if (WEAK == ctx.consistency_level_) { - if (OB_FAIL(filter_replica(candi_replicas, ctx))) { + if (OB_FAIL(filter_replica(ls_id, candi_replicas, ctx))) { LOG_WARN("fail to filter replicas", K(candi_replicas), K(ctx), K(ret)); } else if (OB_FAIL(weak_sort_replicas(candi_replicas, ctx))) { LOG_WARN("fail to sort replicas", K(candi_replicas), K(ctx), K(ret)); @@ -219,7 +233,7 @@ int ObRoutePolicy::select_replica_with_priority(const ObRoutePolicyCtx &route_po bool same_priority = true; ReplicaAttribute priority_attr; for (int64_t i = 0; OB_SUCC(ret) && same_priority && i < replica_array.count(); ++i) { - if (replica_array.at(i).is_usable()) { + if (replica_array.at(i).is_usable()/*+满足max_read_stale_time事务延迟*/) { if (has_found) { if (priority_attr == replica_array.at(i).attr_) { if (OB_FAIL(phy_part_loc_info.add_priority_replica_idx(i))) { diff --git a/src/sql/optimizer/ob_route_policy.h b/src/sql/optimizer/ob_route_policy.h index 102ba5bfa..39ddbff2c 100644 --- a/src/sql/optimizer/ob_route_policy.h +++ b/src/sql/optimizer/ob_route_policy.h @@ -46,16 +46,22 @@ struct ObRoutePolicyCtx ObRoutePolicyCtx() :policy_type_(POLICY_TYPE_MAX), consistency_level_(common::INVALID_CONSISTENCY), - is_proxy_priority_hit_support_(false) + is_proxy_priority_hit_support_(false), + tenant_id_(OB_INVALID_TENANT_ID), + max_read_stale_time_(0) {} TO_STRING_KV(K(policy_type_), K(consistency_level_), - K(is_proxy_priority_hit_support_)); + K(is_proxy_priority_hit_support_), + K(tenant_id_), + K(max_read_stale_time_)); ObRoutePolicyType policy_type_; common::ObConsistencyLevel consistency_level_; bool is_proxy_priority_hit_support_; + uint64_t tenant_id_; + int64_t max_read_stale_time_; }; class ObRoutePolicy @@ -164,7 +170,9 @@ public: {} ~ObRoutePolicy() {} int init(); - int calculate_replica_priority(common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx); + int calculate_replica_priority(const share::ObLSID &ls_id, + common::ObIArray& candi_replicas, + ObRoutePolicyCtx &ctx); int init_candidate_replicas(common::ObIArray &candi_replicas); int select_replica_with_priority(const ObRoutePolicyCtx &route_policy_ctx, const common::ObIArray &replica_array, @@ -204,7 +212,9 @@ protected: int get_merge_status(const share::ObServerLocality &candi_locality, CandidateReplica &candi_replica); int get_zone_status(const share::ObServerLocality &candi_locality, CandidateReplica &candi_replica); - int filter_replica(common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx); + int filter_replica(const share::ObLSID &ls_id, + common::ObIArray& candi_replicas, + ObRoutePolicyCtx &ctx); int weak_sort_replicas(common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx); int strong_sort_replicas(common::ObIArray& candi_replicas, ObRoutePolicyCtx &ctx); inline ObRoutePolicyType get_calc_route_policy_type(const ObRoutePolicyCtx &ctx) const diff --git a/src/sql/optimizer/ob_table_location.cpp b/src/sql/optimizer/ob_table_location.cpp index cd5ea9f74..adc2c3e60 100644 --- a/src/sql/optimizer/ob_table_location.cpp +++ b/src/sql/optimizer/ob_table_location.cpp @@ -998,7 +998,10 @@ int ObTableLocation::init_table_location(ObExecContext &exec_ctx, } if (OB_SUCC(ret)) { bool is_weak_read = false; - if (OB_FAIL(get_is_weak_read(stmt, exec_ctx.get_my_session(), is_weak_read))) { + if (OB_FAIL(get_is_weak_read(stmt, + exec_ctx.get_my_session(), + exec_ctx.get_sql_ctx(), + is_weak_read))) { LOG_WARN("get is weak read failed", K(ret)); } else if (ObDuplicateScope::DUPLICATE_SCOPE_NONE != table_schema->get_duplicate_scope()) { loc_meta_.is_dup_table_ = 1; @@ -1272,7 +1275,7 @@ int ObTableLocation::init( } if (OB_SUCC(ret)) { bool is_weak_read = false; - if (OB_FAIL(get_is_weak_read(stmt, session_info, is_weak_read))) { + if (OB_FAIL(get_is_weak_read(stmt, session_info, exec_ctx->get_sql_ctx(), is_weak_read))) { LOG_WARN("get is weak read failed", K(ret)); } else if (ObDuplicateScope::DUPLICATE_SCOPE_NONE != table_schema->get_duplicate_scope()) { loc_meta_.is_dup_table_ = 1; @@ -1291,20 +1294,24 @@ int ObTableLocation::init( int ObTableLocation::get_is_weak_read(const ObDMLStmt &dml_stmt, const ObSQLSessionInfo *session, + const ObSqlCtx *sql_ctx, bool &is_weak_read) { int ret = OB_SUCCESS; is_weak_read = false; - if (OB_ISNULL(session)) { + if (OB_ISNULL(session) || OB_ISNULL(sql_ctx)) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("unexpected null", K(ret), K(session)); + LOG_ERROR("unexpected null", K(ret), K(session), K(sql_ctx)); } else if (dml_stmt.get_query_ctx()->has_dml_write_stmt_) { is_weak_read = false; } else { ObConsistencyLevel consistency_level = INVALID_CONSISTENCY; ObTxConsistencyType trans_consistency_type = ObTxConsistencyType::INVALID; if (stmt::T_SELECT == dml_stmt.get_stmt_type()) { - if (OB_UNLIKELY(INVALID_CONSISTENCY != dml_stmt.get_query_ctx()->get_global_hint().read_consistency_)) { + if (sql_ctx->is_protocol_weak_read_) { + consistency_level = WEAK; + } else if (OB_UNLIKELY(INVALID_CONSISTENCY + != dml_stmt.get_query_ctx()->get_global_hint().read_consistency_)) { consistency_level = dml_stmt.get_query_ctx()->get_global_hint().read_consistency_; } else { consistency_level = session->get_consistency_level(); @@ -1374,8 +1381,10 @@ int ObTableLocation::calculate_partition_ids_by_rowkey(ObSQLSessionInfo &session ObArenaAllocator allocator(ObModIds::OB_SQL_TABLE_LOCATION); SMART_VAR(ObExecContext, exec_ctx, allocator) { ObSqlSchemaGuard sql_schema_guard; + ObSqlCtx sql_ctx; sql_schema_guard.set_schema_guard(&schema_guard); exec_ctx.set_my_session(&session_info); + exec_ctx.set_sql_ctx(&sql_ctx); ObDASTabletMapper tablet_mapper; if (is_non_partition_optimized_ && table_id == loc_meta_.ref_table_id_) { tablet_mapper.set_non_partitioned_table_ids(tablet_id_, object_id_, &related_list_); diff --git a/src/sql/optimizer/ob_table_location.h b/src/sql/optimizer/ob_table_location.h index 8432afe63..f4c568063 100644 --- a/src/sql/optimizer/ob_table_location.h +++ b/src/sql/optimizer/ob_table_location.h @@ -603,6 +603,7 @@ public: static int get_is_weak_read(const ObDMLStmt &dml_stmt, const ObSQLSessionInfo *session_info, + const ObSqlCtx *sql_ctx, bool &is_weak_read); int send_add_interval_partition_rpc(ObExecContext &exec_ctx, diff --git a/src/sql/session/ob_basic_session_info.cpp b/src/sql/session/ob_basic_session_info.cpp index 0a2871d38..baed4d58f 100644 --- a/src/sql/session/ob_basic_session_info.cpp +++ b/src/sql/session/ob_basic_session_info.cpp @@ -2490,6 +2490,12 @@ OB_INLINE int ObBasicSessionInfo::process_session_variable(ObSysVarClassType var } break; } + case SYS_VAR_OB_MAX_READ_STALE_TIME: { + int64_t int_val = 0; + OZ (val.get_int(int_val), val); + OX (sys_vars_cache_.set_ob_max_read_stale_time(int_val)); + break; + } default: { //do nothing } @@ -2871,6 +2877,12 @@ int ObBasicSessionInfo::fill_sys_vars_cache_base_value( } break; } + case SYS_VAR_OB_MAX_READ_STALE_TIME: { + int64_t int_val = 0; + OZ (val.get_int(int_val), val); + OX (sys_vars_cache.set_base_ob_max_read_stale_time(int_val)); + break; + } default: { //do nothing } @@ -3579,7 +3591,8 @@ OB_DEF_SERIALIZE(ObBasicSessionInfo::SysVarsCacheData) nls_formats_[NLS_TIMESTAMP_TZ], ob_trx_lock_timeout_, ob_trace_info_, - ob_plsql_ccflags_); + ob_plsql_ccflags_, + ob_max_read_stale_time_); return ret; } @@ -3604,7 +3617,8 @@ OB_DEF_DESERIALIZE(ObBasicSessionInfo::SysVarsCacheData) nls_formats_[NLS_TIMESTAMP_TZ], ob_trx_lock_timeout_, ob_trace_info_, - ob_plsql_ccflags_); + ob_plsql_ccflags_, + ob_max_read_stale_time_); set_nls_date_format(nls_formats_[NLS_DATE]); set_nls_timestamp_format(nls_formats_[NLS_TIMESTAMP]); set_nls_timestamp_tz_format(nls_formats_[NLS_TIMESTAMP_TZ]); @@ -3634,7 +3648,8 @@ OB_DEF_SERIALIZE_SIZE(ObBasicSessionInfo::SysVarsCacheData) nls_formats_[NLS_TIMESTAMP_TZ], ob_trx_lock_timeout_, ob_trace_info_, - ob_plsql_ccflags_); + ob_plsql_ccflags_, + ob_max_read_stale_time_); return len; } diff --git a/src/sql/session/ob_basic_session_info.h b/src/sql/session/ob_basic_session_info.h index b11f16a6d..4ee3ac13b 100644 --- a/src/sql/session/ob_basic_session_info.h +++ b/src/sql/session/ob_basic_session_info.h @@ -559,6 +559,9 @@ public: { return sys_vars_cache_.get_ob_trx_lock_timeout(); } + int64_t get_ob_max_read_stale_time() { + return sys_vars_cache_.get_ob_max_read_stale_time(); + } int get_sql_throttle_current_priority(int64_t &sql_throttle_current_priority) { sql_throttle_current_priority = sys_vars_cache_.get_sql_throttle_current_priority(); @@ -1473,7 +1476,8 @@ private: nls_collation_(CS_TYPE_INVALID), nls_nation_collation_(CS_TYPE_INVALID), ob_trace_info_(), - ob_plsql_ccflags_() + ob_plsql_ccflags_(), + ob_max_read_stale_time_(0) { for (int64_t i = 0; i < ObNLSFormatEnum::NLS_MAX; ++i) { MEMSET(nls_formats_buf_[i], 0, MAX_NLS_FORMAT_STR_LEN); @@ -1526,6 +1530,7 @@ private: ob_trace_info_.reset(); iso_nls_currency_.reset(); ob_plsql_ccflags_.reset(); + ob_max_read_stale_time_ = 0; } void set_nls_date_format(const common::ObString &format) { @@ -1614,7 +1619,7 @@ private: K_(optimizer_use_sql_plan_baselines), K_(optimizer_capture_sql_plan_baselines), K_(is_result_accurate), K_(character_set_results), K_(character_set_connection), K_(ob_pl_block_timeout), K_(ob_plsql_ccflags), - K_(iso_nls_currency)); + K_(iso_nls_currency), K_(ob_max_read_stale_time)); public: static const int64_t MAX_NLS_FORMAT_STR_LEN = 256; @@ -1665,6 +1670,7 @@ private: char trace_info_buf_[OB_TRACE_BUFFER_SIZE]; ObString ob_plsql_ccflags_; char plsql_ccflags_[OB_TMP_BUF_SIZE_256]; + int64_t ob_max_read_stale_time_; private: char nls_formats_buf_[ObNLSFormatEnum::NLS_MAX][MAX_NLS_FORMAT_STR_LEN]; }; @@ -1770,6 +1776,7 @@ private: DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_pl_block_timeout); DEF_SYS_VAR_CACHE_FUNCS_STR(plsql_ccflags); DEF_SYS_VAR_CACHE_FUNCS_STR(iso_nls_currency); + DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_max_read_stale_time); void set_autocommit_info(bool inc_value) { inc_data_.autocommit_ = inc_value; @@ -1831,6 +1838,7 @@ private: bool inc_ob_pl_block_timeout_:1; bool inc_plsql_ccflags_:1; bool inc_iso_nls_currency_:1; + bool inc_ob_max_read_stale_time_:1; }; }; }; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 3bdd2a4a0..225999f7f 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -189,6 +189,7 @@ ob_set_subtarget(ob_storage tablet ) ob_set_subtarget(ob_storage tx_wrs + tx/wrs/ob_black_list.cpp tx/wrs/ob_i_weak_read_service.cpp tx/wrs/ob_tenant_weak_read_cluster_service.cpp tx/wrs/ob_tenant_weak_read_cluster_version_mgr.cpp diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_ctx.h index dbcfeacba..eaacf3069 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.h @@ -96,7 +96,6 @@ public: // for mvcc engine invoke const share::SCN max_trans_version, const transaction::ObTransID &conflict_tx_id) = 0; virtual void on_wlock_retry(const ObMemtableKey& key, const transaction::ObTransID &conflict_tx_id) = 0; - virtual bool is_can_elr() const = 0; virtual void inc_truncate_cnt() = 0; virtual void add_trans_mem_total_size(const int64_t size) = 0; virtual void update_max_submitted_seq_no(const int64_t seq_no) = 0; diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.cpp b/src/storage/memtable/mvcc/ob_mvcc_row.cpp index 4299acddf..b1636e7ea 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_row.cpp @@ -748,7 +748,7 @@ void ObMvccRow::lock_begin(ObIMemtableCtx &ctx) const void ObMvccRow::mvcc_write_end(ObIMemtableCtx &ctx, int64_t ret) const { - if (!ctx.is_can_elr() && GCONF.enable_sql_audit) { + if (GCONF.enable_sql_audit) { const int64_t lock_use_time = OB_TSC_TIMESTAMP.current_time() - ctx.get_lock_start_time(); EVENT_ADD(MEMSTORE_WAIT_WRITE_LOCK_TIME, lock_use_time); if (OB_FAIL(ret)) { @@ -995,8 +995,7 @@ int ObMvccRow::mvcc_write_(ObIMemtableCtx &ctx, res.tx_node_ = &writer_node; total_trans_node_cnt_++; } - if (ctx.is_can_elr() - && NULL != writer_node.prev_ + if (NULL != writer_node.prev_ && writer_node.prev_->is_elr()) { ObMemtableCtx &mt_ctx = static_cast(ctx); if (NULL != mt_ctx.get_trans_ctx()) { diff --git a/src/storage/memtable/ob_lock_wait_mgr.cpp b/src/storage/memtable/ob_lock_wait_mgr.cpp index 07bca0fa4..521b36fa1 100644 --- a/src/storage/memtable/ob_lock_wait_mgr.cpp +++ b/src/storage/memtable/ob_lock_wait_mgr.cpp @@ -553,7 +553,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret, const ObStoreRowkey& row_key, const int64_t timeout, const bool is_remote_sql, - const bool can_elr, const int64_t last_compact_cnt, const int64_t total_trans_node_cnt, const ObTransID &tx_id, @@ -578,7 +577,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret, TRANS_LOG(WARN, "recheck lock fail", K(key), K(holder_tx_id)); } else if (locked) { auto hash = wait_on_row ? row_hash : tx_hash; - if (is_remote_sql && can_elr) { + if (is_remote_sql) { delay_header_node_run_ts(hash); } node->set((void*)node, @@ -605,7 +604,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret, const ObLockID &lock_id, const int64_t timeout, const bool is_remote_sql, - const bool can_elr, const int64_t last_compact_cnt, const int64_t total_trans_node_cnt, const transaction::ObTransID &tx_id, @@ -621,7 +619,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret, } else if (NULL == (node = get_thread_node())) { } else if (OB_TRY_LOCK_ROW_CONFLICT == tmp_ret) { auto hash = hash_lock_id(lock_id); - const bool need_delay = is_remote_sql && can_elr; + const bool need_delay = is_remote_sql; char lock_id_buf[common::MAX_LOCK_ID_BUF_LENGTH]; lock_id.to_string(lock_id_buf, sizeof(lock_id_buf)); lock_id_buf[common::MAX_LOCK_ID_BUF_LENGTH - 1] = '\0'; diff --git a/src/storage/memtable/ob_lock_wait_mgr.h b/src/storage/memtable/ob_lock_wait_mgr.h index a252c20f7..be9842ec5 100644 --- a/src/storage/memtable/ob_lock_wait_mgr.h +++ b/src/storage/memtable/ob_lock_wait_mgr.h @@ -232,7 +232,6 @@ public: const ObStoreRowkey &key, const int64_t timeout, const bool is_remote_sql, - const bool can_elr, const int64_t last_compact_cnt, const int64_t total_trans_node_cnt, const transaction::ObTransID &tx_id, @@ -243,7 +242,6 @@ public: const transaction::tablelock::ObLockID &lock_id, const int64_t timeout, const bool is_remote_sql, - const bool can_elr, const int64_t last_compact_cnt, const int64_t total_trans_node_cnt, const transaction::ObTransID &tx_id, diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index fc70961d2..e8408a292 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -2574,7 +2574,6 @@ int ObMemtable::post_row_write_conflict_(ObMvccAccessCtx &acc_ctx, *row_key.get_rowkey(), lock_wait_expire_ts, remote_tx, - mem_ctx->is_can_elr(), last_compact_cnt, total_trans_node_cnt, tx_id, diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 20d12467a..59117a02a 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -873,15 +873,6 @@ uint64_t ObMemtableCtx::get_tenant_id() const return tenant_id; } -bool ObMemtableCtx::is_can_elr() const -{ - bool bret = false; - if (OB_NOT_NULL(ATOMIC_LOAD(&ctx_))) { - bret = ctx_->is_can_elr(); - } - return bret; -} - void ObMemtableCtx::update_max_submitted_seq_no(const int64_t seq_no) { if (NULL != ATOMIC_LOAD(&ctx_)) { diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index e253bed45..2324d94c5 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -397,7 +397,6 @@ public: virtual void add_trans_mem_total_size(const int64_t size); int64_t get_ref() const { return ATOMIC_LOAD(&ref_); } uint64_t get_tenant_id() const; - bool is_can_elr() const; inline bool has_read_elr_data() const { return read_elr_data_; } int remove_callbacks_for_fast_commit(); int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt); diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index 46bba9731..6379a8396 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -335,7 +335,6 @@ int ObLockMemtable::post_obj_lock_conflict_(ObMvccAccessCtx &acc_ctx, lock_id, lock_wait_expire_ts, remote_tx, - mem_ctx->is_can_elr(), -1, -1, // total_trans_node_cnt tx_id, diff --git a/src/storage/tx/ob_trans_ctx.h b/src/storage/tx/ob_trans_ctx.h index 1eb5714cc..8d861e926 100644 --- a/src/storage/tx/ob_trans_ctx.h +++ b/src/storage/tx/ob_trans_ctx.h @@ -148,7 +148,6 @@ public: uint32_t get_session_id() const { return session_id_; } void before_unlock(CtxLockArg &arg); void after_unlock(CtxLockArg &arg); - bool is_can_elr() const { return can_elr_; } public: void set_exiting() { is_exiting_ = true; } bool is_exiting() const { return is_exiting_; } diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index 4ee73042c..c5e0ae6bd 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -405,7 +405,6 @@ int ObLSTxCtxMgr::create_tx_ctx_(const ObTxCreateArg &arg, arg.trans_service_, arg.cluster_id_, epoch, - arg.can_elr_, this, arg.for_replay_))) { } else if (FALSE_IT(inc_total_tx_ctx_count())) { diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 8764fe109..9455e045b 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -89,8 +89,7 @@ typedef common::LinkHashValue ObLSTxCtxMgrHashValue; struct ObTxCreateArg { - ObTxCreateArg(const bool can_elr, - const bool for_replay, + ObTxCreateArg(const bool for_replay, const uint64_t tenant_id, const ObTransID &trans_id, const share::ObLSID &ls_id, @@ -100,8 +99,7 @@ struct ObTxCreateArg const common::ObAddr &scheduler, const int64_t trans_expired_time, ObTransService *trans_service) - : can_elr_(can_elr), - for_replay_(for_replay), + : for_replay_(for_replay), tenant_id_(tenant_id), tx_id_(trans_id), ls_id_(ls_id), @@ -118,11 +116,10 @@ struct ObTxCreateArg && trans_expired_time_ > 0 && NULL != trans_service_; } - TO_STRING_KV(K_(can_elr), K_(for_replay), + TO_STRING_KV(K_(for_replay), K_(tenant_id), K_(tx_id), K_(ls_id), K_(cluster_id), K_(cluster_version), K_(session_id), K_(scheduler), K_(trans_expired_time), KP_(trans_service)); - bool can_elr_; bool for_replay_; uint64_t tenant_id_; ObTransID tx_id_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index ed6005dd8..3e98c5777 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -63,7 +63,6 @@ int ObPartTransCtx::init(const uint64_t tenant_id, ObTransService *trans_service, const uint64_t cluster_id, const int64_t epoch, - const bool can_elr, ObLSTxCtxMgr *ls_ctx_mgr, const bool for_replay) { @@ -123,7 +122,6 @@ int ObPartTransCtx::init(const uint64_t tenant_id, last_ask_scheduler_status_ts_ = 0; cluster_id_ = cluster_id; epoch_ = epoch; - can_elr_ = can_elr; pending_write_ = 0; set_role_state(for_replay); @@ -154,7 +152,7 @@ int ObPartTransCtx::init(const uint64_t tenant_id, OB_ID(trans_id), trans_id, OB_ID(ctx_ref), get_ref()); TRANS_LOG(TRACE, "part trans ctx init", K(ret), K(tenant_id), K(trans_id), K(trans_expired_time), - K(ls_id), K(cluster_version), KP(trans_service), K(cluster_id), K(epoch), K(can_elr)); + K(ls_id), K(cluster_version), KP(trans_service), K(cluster_id), K(epoch)); return ret; } @@ -254,6 +252,7 @@ void ObPartTransCtx::default_init_() session_id_ = 0; timeout_task_.reset(); trace_info_.reset(); + can_elr_ = false; // TODO ObPartTransCtx clog_encrypt_info_.reset(); @@ -694,6 +693,7 @@ int ObPartTransCtx::commit(const ObLSArray &parts, TRANS_LOG(ERROR, "the size of participant is 0 when commit", KPC(this)); } else if (parts.count() == 1 && parts[0] == ls_id_) { exec_info_.trans_type_ = TransType::SP_TRANS; + can_elr_ = (trans_service_->get_tx_elr_util().is_can_tenant_elr() ? true : false); if (OB_FAIL(one_phase_commit_())) { TRANS_LOG(WARN, "start sp coimit fail", K(ret), KPC(this)); } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 2c8320173..c92dc11b8 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -116,7 +116,6 @@ public: ObTransService *trans_service, const uint64_t cluster_id, const int64_t epoch, - const bool can_elr, ObLSTxCtxMgr *ls_ctx_mgr, const bool for_replay); void reset() { } diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 3d75fde42..f4306e53a 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1132,8 +1132,7 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id, int ret = OB_SUCCESS; bool existed = false; int64_t epoch = 0; - ObTxCreateArg arg(tx.can_elr_, /* can_elr */ - false, /* for_replay */ + ObTxCreateArg arg(false, /* for_replay */ tx.tenant_id_, tx.tx_id_, ls_id, diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 5d2861c42..4549557e1 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -671,7 +671,8 @@ int ObTransService::get_ls_read_snapshot_version(const share::ObLSID &local_ls_i return ret; } -int ObTransService::get_weak_read_snapshot_version(SCN &snapshot) +int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_time, + SCN &snapshot) { int ret = OB_SUCCESS; bool monotinic_read = true;; @@ -685,8 +686,10 @@ int ObTransService::get_weak_read_snapshot_version(SCN &snapshot) } else if (OB_FAIL(GCTX.weak_read_service_->get_cluster_version(tenant_id_, snapshot))) { TRANS_LOG(WARN, "get weak read snapshot fail", K(ret), KPC(this)); } else { - const int64_t snapshot_barrier = ObTimeUtility::current_time() - - ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id_); + // do nothing + } + if (OB_SUCC(ret)) { + const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_read_stale_time; if (snapshot.convert_to_ts() < snapshot_barrier) { TRANS_LOG(WARN, "weak read snapshot too stale", K(snapshot), K(snapshot_barrier), "delta", (snapshot_barrier - snapshot.convert_to_ts())); diff --git a/src/storage/tx/ob_tx_api.h b/src/storage/tx/ob_tx_api.h index 4154f8efa..b037f7024 100644 --- a/src/storage/tx/ob_tx_api.h +++ b/src/storage/tx/ob_tx_api.h @@ -226,13 +226,15 @@ int get_ls_read_snapshot_version(const share::ObLSID &local_ls_id, /** * get_weak_read_snapshot_version - get snapshot version for weak read * + * max_read_stale_time the minimal threshold of stale snapshot * @snapshot_version: the snapshot acquired * * Return: * OB_SUCCESS - OK * OB_REPLICA_NOT_READABLE - snapshot is too stale */ -int get_weak_read_snapshot_version(share::SCN &snapshot_version); +int get_weak_read_snapshot_version(const int64_t max_read_stale_time, + share::SCN &snapshot_version); /* * release_snapshot - release snapshot * diff --git a/src/storage/tx/ob_tx_data_functor.cpp b/src/storage/tx/ob_tx_data_functor.cpp index a1f2af02c..c0d3843f1 100644 --- a/src/storage/tx/ob_tx_data_functor.cpp +++ b/src/storage/tx/ob_tx_data_functor.cpp @@ -148,7 +148,7 @@ int GetTxStateWithSCNFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_ // return the transaction state_ according to the merge log ts. // the detailed document is available as follows. // https://yuque.antfin-inc.com/docs/share/a3160d5e-6e1a-4980-a12e-4af653c6cf57?# - if (ObTxData::RUNNING == state) { + if (ObTxData::RUNNING == state || ObTxData::ELR_COMMIT == state) { // Case 1: data is during execution, so we return the running state with // INT64_MAX as version state_ = ObTxData::RUNNING; @@ -168,11 +168,6 @@ int GetTxStateWithSCNFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_ // ts, so we return the abort state with 0 as txn version state_ = ObTxData::ABORT; trans_version_ = SCN::min_scn(); - } else if (ObTxData::ELR_COMMIT == state) { - // Case 5: data is elr committed and the required state is after the merge log - // ts, it means tx's state is completely decided so it must not be elr commit - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, "unexpected state", K(ret), K(tx_data), KPC(tx_cc_ctx)); } else { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "unexpected transaction state_", K(ret), K(tx_data)); @@ -218,13 +213,8 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * is_determined_state_ = true; break; } + case ObTxData::RUNNING: case ObTxData::ELR_COMMIT: { - can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); - trans_version_ = commit_version; - is_determined_state_ = false; - break; - } - case ObTxData::RUNNING: { // Case 2: data is during execution, so the state is not decided. if (read_latest && reader_tx_id == data_tx_id) { // Case 2.0: read the latest written of current txn @@ -272,15 +262,22 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * can_read_ = false; trans_version_ = SCN::min_scn(); } else { - // Case 2.2.3: data is in prepare state and the prepare version is - // smaller than the read txn's snapshot version, then the data's - // commit version may or may not be bigger than the read txn's - // snapshot version, so we are unsure whether we can read it and we - // need wait for the commit version of the data - ret = OB_ERR_SHARED_LOCK_CONFLICT; - if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { - TRANS_LOG(WARN, "lock_for_read need retry", K(ret), - K(tx_data), K(lock_for_read_arg_), K(tx_cc_ctx)); + // Only dml statement can read elr data + if (ObTxData::ELR_COMMIT == state + && lock_for_read_arg_.mvcc_acc_ctx_.get_tx_id().is_valid()) { + can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); + trans_version_ = commit_version; + } else { + // Case 2.2.3: data is in prepare state and the prepare version is + // smaller than the read txn's snapshot version, then the data's + // commit version may or may not be bigger than the read txn's + // snapshot version, so we are unsure whether we can read it and we + // need wait for the commit version of the data + ret = OB_ERR_SHARED_LOCK_CONFLICT; + if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { + TRANS_LOG(WARN, "lock_for_read need retry", K(ret), + K(tx_data), K(lock_for_read_arg_), K(tx_cc_ctx)); + } } } } diff --git a/src/storage/tx/ob_tx_elr_util.cpp b/src/storage/tx/ob_tx_elr_util.cpp index e96f630cf..7ba9c4b95 100644 --- a/src/storage/tx/ob_tx_elr_util.cpp +++ b/src/storage/tx/ob_tx_elr_util.cpp @@ -20,15 +20,14 @@ namespace oceanbase namespace transaction { -int ObTxELRUtil::check_and_update_tx_elr_info(ObTxDesc &tx, const bool can_elr) +int ObTxELRUtil::check_and_update_tx_elr_info(ObTxDesc &tx) { int ret = OB_SUCCESS; - if (can_elr && can_tenant_elr_) { // tenant config enable elr + if (can_tenant_elr_ && OB_SYS_TENANT_ID != MTL_ID()) { // tenant config enable elr tx.set_can_elr(true); TX_STAT_ELR_ENABLE_TRANS_INC(MTL_ID()); - } else { - refresh_elr_tenant_config_(); } + refresh_elr_tenant_config_(); return ret; } diff --git a/src/storage/tx/ob_tx_elr_util.h b/src/storage/tx/ob_tx_elr_util.h index 76d193dcb..74a4fc5ed 100644 --- a/src/storage/tx/ob_tx_elr_util.h +++ b/src/storage/tx/ob_tx_elr_util.h @@ -28,7 +28,8 @@ class ObTxELRUtil public: ObTxELRUtil() : last_refresh_ts_(0), can_tenant_elr_(false) {} - int check_and_update_tx_elr_info(ObTxDesc &tx, const bool can_elr); + int check_and_update_tx_elr_info(ObTxDesc &tx); + bool is_can_tenant_elr() const { return can_tenant_elr_; } void reset() { last_refresh_ts_ = 0; diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 53bda2ec8..faa063b64 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -207,8 +207,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_(ObTxLogType type, ret = OB_SUCCESS; bool tx_ctx_existed = false; common::ObAddr scheduler; - ObTxCreateArg arg(false, /* can_elr */ - true, /* for_replay */ + ObTxCreateArg arg(true, /* for_replay */ tenant_id, tx_id, ls_id, diff --git a/src/storage/tx/wrs/ob_black_list.cpp b/src/storage/tx/wrs/ob_black_list.cpp new file mode 100644 index 000000000..dc274a1a5 --- /dev/null +++ b/src/storage/tx/wrs/ob_black_list.cpp @@ -0,0 +1,344 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_black_list.h" +#include "share/ob_thread_mgr.h" // set_thread_name +#include "observer/ob_server_struct.h" // for GCTX +#include "deps/oblib/src/common/ob_role.h" // role +#include "src/storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil + +namespace oceanbase +{ +using namespace common; +using namespace storage; + +namespace transaction +{ +int ObBLService::init() +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + TRANS_LOG(ERROR, "BLService init twice", K(*this)); + } else if (OB_FAIL(ls_bl_mgr_.init())) { + TRANS_LOG(ERROR, "ls_bl_mgr_ init fail", KR(ret)); + } else if (OB_FAIL(ObThreadPool::init())) { + TRANS_LOG(ERROR, "ThreadPool init fail", KR(ret)); + } else { + is_inited_ = true; + TRANS_LOG(INFO, "BLService init success", K(*this)); + } + + return ret; +} + +void ObBLService::reset() +{ + is_running_ = false; + is_inited_ = false; + ObThreadPool::stop(); + ObThreadPool::wait(); + ls_bl_mgr_.reset(); +} + +void ObBLService::destroy() +{ + is_running_ = false; + is_inited_ = false; + ObThreadPool::stop(); + ObThreadPool::wait(); + ls_bl_mgr_.destroy(); +} +int ObBLService::start() +{ + int ret = OB_SUCCESS; + + if (!is_inited_) { + ret = OB_NOT_INIT; + TRANS_LOG(ERROR, "BLService not inited", KR(ret)); + } else if (is_running_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "BLService is already running", KR(ret)); + } else if (OB_FAIL(ObThreadPool::start())) { + TRANS_LOG(ERROR, "ThreadPool start fail", KR(ret)); + } else { + is_running_ = true; + TRANS_LOG(INFO, "BLService start success"); + } + + return ret; +} + +void ObBLService::stop() +{ + ObThreadPool::stop(); + TRANS_LOG(INFO, "BLService stop"); +} + +void ObBLService::wait() +{ + TRANS_LOG(INFO, "BLService wait begin"); + ObThreadPool::wait(); + TRANS_LOG(INFO, "BLService wait end"); +} + +int ObBLService::add(const ObBLKey &bl_key) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(ERROR, "BLService not inited", KR(ret)); + } else { + switch(bl_key.get_type()) { + case BLTYPE_LS: { + if (OB_FAIL(ls_bl_mgr_.add(bl_key))) { + TRANS_LOG(WARN, "ls_bl_mgr_ add failed", KR(ret), K(bl_key)); + } + break; + } + default: { + ret = OB_UNKNOWN_OBJ; + TRANS_LOG(ERROR, "unknown key type", K(bl_key)); + } + } + } + return ret; +} + +void ObBLService::remove(const ObBLKey &bl_key) +{ + if (OB_UNLIKELY(!is_inited_)) { + TRANS_LOG(ERROR, "BLService not inited"); + } else { + switch(bl_key.get_type()) { + case BLTYPE_LS: { + ls_bl_mgr_.remove(bl_key); + break; + } + default: { + TRANS_LOG(ERROR, "unknown key type", K(bl_key)); + } + } + } +} + +int ObBLService::check_in_black_list(const ObBLKey &bl_key, bool &in_black_list) const +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(ERROR, "BLService not inited", KR(ret)); + } else { + switch(bl_key.get_type()) { + case BLTYPE_LS: { + if (OB_FAIL(ls_bl_mgr_.check_in_black_list(bl_key, in_black_list))) { + TRANS_LOG(WARN, "ls_bl_mgr_ check failed", KR(ret), K(bl_key)); + } + break; + } + default: { + ret = OB_UNKNOWN_OBJ; + TRANS_LOG(ERROR, "unknown blacklist key type", K(bl_key)); + } + } + } + return ret; +} + +void ObBLService::run1() +{ + const int64_t thread_index = get_thread_idx(); + int64_t last_print_stat_ts = 0; + int64_t last_clean_up_ts = 0; + lib::set_thread_name("BlackListService"); + TRANS_LOG(INFO, "blacklist refresh thread start", K(thread_index)); + + if (OB_UNLIKELY(!is_inited_)) { + TRANS_LOG(ERROR, "BLService not inited"); + } else { + while (!has_set_stop()) { + int64_t begin_tstamp = ObTimeUtility::current_time(); + do_thread_task_(begin_tstamp, last_print_stat_ts, last_clean_up_ts); + int64_t end_tstamp = ObTimeUtility::current_time(); + int64_t wait_interval = BLACK_LIST_REFRESH_INTERVAL - (end_tstamp - begin_tstamp); + if (wait_interval > 0) { + thread_cond_.timedwait(wait_interval); + } + } + } + TRANS_LOG(INFO, "blacklist refresh thread end"); +} + +void ObBLService::do_thread_task_(const int64_t begin_tstamp, + int64_t &last_print_stat_ts, + int64_t &last_clean_up_ts) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + ObMySQLProxy *sql_proxy = NULL; + sqlclient::ObMySQLResult *result = NULL; + + // 查询ls内部表,根据时间戳信息决定是否将ls其加入黑名单 + SMART_VAR(ObISQLClient::ReadResult, res) { + if (OB_ISNULL((sql_proxy = GCTX.sql_proxy_))) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "sql_proxy is null", KR(ret)); + } else if (OB_FAIL(sql.assign_fmt(BLACK_LIST_SELECT_LS_INFO_STMT))) { + TRANS_LOG(WARN, "fail to append sql", KR(ret)); + } else if (OB_FAIL(sql_proxy->read(res, OB_SYS_TENANT_ID, sql.ptr()))) { + TRANS_LOG(WARN, "fail to execute sql", KR(ret), K(sql)); + } else if (NULL == (result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "fail to get sql result", KR(ret), K(sql)); + } else if (OB_FAIL(do_black_list_check_(result))) { + TRANS_LOG(WARN, "fail to do black list check", KR(ret), K(sql)); + } else { + // do nothing + } + } + // 如果连续失败多次,黑名单失去了时效性,需要清空黑名单 + static int fail_cnt = 0; + if (OB_SUCCESS != ret) { + fail_cnt++; + if (fail_cnt == BLACK_LIST_MAX_FAIL_COUNT) { + ls_bl_mgr_.reset(); + TRANS_LOG(WARN, "failed too much times, reset blacklist "); + } + } else { + fail_cnt = 0; + } + // 定期清理长久未更新的对象,它们实际上可能已经不存在了 + if (begin_tstamp > last_clean_up_ts + BLACK_LIST_CLEAN_UP_INTERVAL) { + do_clean_up_(); + last_clean_up_ts = begin_tstamp; + } + // 定期打印黑名单内容 + if (ObTimeUtility::current_time() - last_print_stat_ts > BLACK_LIST_PRINT_INTERVAL) { + print_stat_(); + last_print_stat_ts = begin_tstamp; + } +} + +int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result) +{ + int ret = OB_SUCCESS; + uint64_t max_stale_time = 0; + uint64_t curr_time_ns = static_cast(ObTimeUtility::current_time_ns()); + + while (OB_SUCC(result->next())) { + ObBLKey bl_key; + ObLsInfo ls_info; + if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) { + TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result)); + } else if (LEADER == ls_info.ls_state_) { + // 该日志流是leader,不能加入黑名单 + } else if (OB_FAIL(get_tenant_max_stale_time_(bl_key.get_tenant_id(), max_stale_time))) { + TRANS_LOG(WARN, "get_tenant_max_stale_time_ fail ", KR(ret), K(bl_key)); + } else { + uint64_t max_stale_time_ns = max_stale_time * 1000; + if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) { + // 时间戳落后,将对应日志流加入黑名单 + if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) { + TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info)); + } + } else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) { + // 时间戳赶上,将对应日志流从黑名单中移除 + ls_bl_mgr_.remove(bl_key); + } else { + // do nothing + } + } + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } else { + TRANS_LOG(WARN, "get next result fail ", KR(ret)); + } + + return ret; +} + +int ObBLService::do_clean_up_() +{ + int ret = OB_SUCCESS; + ObBLCleanUpFunctor clean_up_functor(ls_bl_mgr_); + if (OB_FAIL(ls_bl_mgr_.for_each(clean_up_functor))) { + TRANS_LOG(WARN, "clean up blacklist fail ", KR(ret)); + } + return ret; +} + +int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey &bl_key, ObLsInfo &ls_info) +{ + int ret = OB_SUCCESS; + + ObString ip; + int64_t port = 0; + int64_t tenant_id = 0; + int64_t id = ObLSID::INVALID_LS_ID; + ObString ls_state_str; + uint64_t weak_read_scn = 0; + int64_t migrate_status_int = -1; + + (void)GET_COL_IGNORE_NULL(result.get_varchar, "svr_ip", ip); + (void)GET_COL_IGNORE_NULL(result.get_int, "svr_port", port); + (void)GET_COL_IGNORE_NULL(result.get_int, "tenant_id", tenant_id); + (void)GET_COL_IGNORE_NULL(result.get_int, "ls_id", id); + (void)GET_COL_IGNORE_NULL(result.get_varchar, "ls_state", ls_state_str); + (void)GET_COL_IGNORE_NULL(result.get_uint, "weak_read_scn", weak_read_scn); + (void)GET_COL_IGNORE_NULL(result.get_int, "migrate_status", migrate_status_int); + + ObLSID ls_id(id); + common::ObAddr server; + ObRole ls_state = INVALID_ROLE; + ObMigrateStatus migrate_status = ObMigrateStatus(migrate_status_int); + + if (false == server.set_ip_addr(ip, static_cast(port))) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "invalid server address", K(ip), K(port)); + } else if (OB_FAIL(string_to_role(ls_state_str, ls_state))) { + TRANS_LOG(WARN, "string_to_role fail", K(ls_state_str)); + } else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) { + TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(ls_info.init(ls_state, weak_read_scn, migrate_status))) { + TRANS_LOG(WARN, "ls_info init fail", K(ls_state), K(weak_read_scn), K(migrate_status)); + } + + return ret; +} + +int ObBLService::get_tenant_max_stale_time_(uint64_t tenant_id, uint64_t &max_stale_time) +{ + int ret = OB_SUCCESS; + int64_t max_stale_time_int = ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id, + ObWeakReadUtil::IGNORE_TENANT_EXIST_WARN); + if (max_stale_time_int <= 0) { + ret = OB_ERR_UNEXPECTED; + max_stale_time = 0; + } else { + max_stale_time = static_cast(max_stale_time_int); + } + return ret; +} + +void ObBLService::print_stat_() +{ + TRANS_LOG(INFO, "start to print blacklist info"); + ObBLPrintFunctor print_fn; + ls_bl_mgr_.for_each(print_fn); +} + +} // transaction +} // oceanbase diff --git a/src/storage/tx/wrs/ob_black_list.h b/src/storage/tx/wrs/ob_black_list.h new file mode 100644 index 000000000..56bcb3721 --- /dev/null +++ b/src/storage/tx/wrs/ob_black_list.h @@ -0,0 +1,410 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_TRANSACTION_OB_BLACK_LIST_ +#define OCEANBASE_TRANSACTION_OB_BLACK_LIST_ + +#include "common/ob_queue_thread.h" // ObCond +#include "common/ob_role.h" // ObRole +#include "storage/tx/ob_trans_define.h" // ObLSID, LinkHashNode +#include "storage/ob_storage_struct.h" // ObMigrateStatus + +// 定期更新黑名单的时间间隔(us) +#define BLACK_LIST_REFRESH_INTERVAL 1000000 // 1s +// 判断时间戳是否赶上/落后的缓冲时间(ns),避免阈值附近的日志流反复加入/移出黑名单 +#define BLACK_LIST_WHITEWASH_INTERVAL_NS 1000000000 // 1s +// 黑名单信息打印时间间隔(us) +#define BLACK_LIST_PRINT_INTERVAL 10000000 // 10s +// 清理超时对象的时间间隔(us),这些对象不会出现在 SQLResult 中,比如切换server之后旧server上的日志流 +#define BLACK_LIST_CLEAN_UP_INTERVAL 5000000 // 5s +// 最大连续失败次数,连续刷新黑名单失败 达到 该次数则清空黑名单 +#define BLACK_LIST_MAX_FAIL_COUNT 3 + +// 查询 __all_virtual_ls_info 的语句,设置了2s超时时间 +#define BLACK_LIST_SELECT_LS_INFO_STMT \ + "select /*+query_timeout(2000000)*/ svr_ip, svr_port, tenant_id, ls_id, ls_state, \ + weak_read_scn, migrate_status from oceanbase.__all_virtual_ls_info;" + +namespace oceanbase +{ +using namespace common; +using namespace share; +using namespace storage; + +namespace transaction +{ +// blacklist type +enum BLType +{ + BLTYPE_UNKNOWN=0, + BLTYPE_SERVER, + BLTYPE_LS, + BLTYPE_MAX +}; + +// blacklist key +class ObBLKey +{ +public: + ObBLKey() : server_(), tenant_id_(OB_INVALID_TENANT_ID), ls_id_(ObLSID::INVALID_LS_ID) {} + ~ObBLKey() { reset(); } + int init(const ObAddr &server, const uint64_t tenant_id, const ObLSID &ls_id) + { + int ret = OB_SUCCESS; + if (!server.is_valid() || OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else { + server_ = server; + tenant_id_ = tenant_id; + ls_id_ = ls_id; + } + return ret; + } + void reset() + { + server_.reset(); + tenant_id_ = OB_INVALID_TENANT_ID; + ls_id_ = ObLSID::INVALID_LS_ID; + } + uint64_t hash() const { + uint64_t hash_val = 0; + int64_t server_hash = server_.hash(); + uint64_t ls_hash = ls_id_.hash(); + hash_val = murmurhash(&server_hash, sizeof(int64_t), 0); + hash_val = murmurhash(&tenant_id_, sizeof(uint64_t), hash_val); + hash_val = murmurhash(&ls_hash, sizeof(uint64_t), hash_val); + return hash_val; + } + int compare(const ObBLKey &other) const + { + int ret = 0; + uint64_t hash_1 = hash(); + uint64_t hash_2 = other.hash(); + if (hash_1 > hash_2) { + ret = 1; + } else if (hash_1 < hash_2) { + ret = -1; + } + return ret; + } + uint64_t get_tenant_id() const { return tenant_id_; } + const ObLSID &get_ls_id() const { return ls_id_; } + // TODO: different keys return different types, default return BLTYPE_UNKNOWN + BLType get_type() const { return BLType::BLTYPE_LS; } + bool is_valid() const { + return server_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid(); + } + + TO_STRING_KV(K_(server), K_(tenant_id), K_(ls_id)); + +private: + ObAddr server_; + uint64_t tenant_id_; + ObLSID ls_id_; +}; + +typedef LinkHashNode ObBlackListHashNode; +typedef LinkHashValue ObBlackListHashValue; + +struct ObLsInfo +{ +public: + ObLsInfo() + : ls_state_(INVALID_ROLE), + weak_read_scn_(0), + migrate_status_(OB_MIGRATE_STATUS_MAX) + {} + int init(ObRole ls_state, uint64_t weak_read_scn, ObMigrateStatus migrate_status) + { + int ret = OB_SUCCESS; + if (OB_MIGRATE_STATUS_MAX == migrate_status) { + ret = OB_INVALID_ARGUMENT; + } else { + ls_state_ = ls_state; + weak_read_scn_ = weak_read_scn; + migrate_status_ = migrate_status; + } + return ret; + } + bool is_valid() const + { + return OB_MIGRATE_STATUS_MAX != migrate_status_; + } + TO_STRING_KV(K_(ls_state), K_(weak_read_scn), K_(migrate_status)); + + // 日志流状态(角色):LEADER、FOLLOWER,其他角色对于日志流是没有意义的 + ObRole ls_state_; + // 弱读时间戳,如果落后超过一定时间就要加入黑名单,单位ns + uint64_t weak_read_scn_; + // 迁移状态,正在迁移的日志流一定不可读 + ObMigrateStatus migrate_status_; +}; + +// blacklist value +class ObBLValue : public ObBlackListHashValue +{ +public: + ObBLValue() {} + int init(const ObBLKey &key) + { + int ret = OB_SUCCESS; + if (!key.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else { + key_ = key; + update_ts_ = ObTimeUtility::current_time(); + } + return ret; + } + int init(const ObBLKey &key, const ObLsInfo ls_info) + { + int ret = OB_SUCCESS; + if (!key.is_valid() || !ls_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else { + key_ = key; + ls_info_ = ls_info; + update_ts_ = ObTimeUtility::current_time(); + } + return ret; + } + int update(const ObLsInfo &ls_info) + { + int ret = OB_SUCCESS; + if (!ls_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else { + ls_info_ = ls_info; + update_ts_ = ObTimeUtility::current_time(); + } + return ret; + } + int64_t get_update_ts() + { + return update_ts_; + } + TO_STRING_KV(K_(key), K_(ls_info)); + +private: + ObBLKey key_; + // ls相关信息 + ObLsInfo ls_info_; + // 当前对象最后一次init/update的时间(us) + int64_t update_ts_; +}; + +class ObBlackListAlloc +{ +public: + ObBLValue *alloc_value() + { + return op_alloc(ObBLValue); + } + void free_value(ObBLValue *val) + { + if (NULL != val) { + op_free(val); + } + } + ObBlackListHashNode* alloc_node(ObBLValue *val) + { + UNUSED(val); + return op_alloc(ObBlackListHashNode); + } + void free_node(ObBlackListHashNode *node) + { + if (NULL != node) { + op_free(node); + } + } +}; + +// blacklist manager +template +class ObBLMgr +{ +public: + ObBLMgr() {} + ~ObBLMgr() { destroy(); } + int init() + { + int ret = OB_SUCCESS; + + if (OB_FAIL(map_.init())) { + TRANS_LOG(ERROR, "BLMgr init failed", KR(ret)); + } + return ret; + } + void reset() + { + map_.reset(); + } + void destroy() + { + map_.destroy(); + } + // create and insert + int add(const BLKey &bl_key) + { + int ret = OB_SUCCESS; + BLValue *value = NULL; + + if (!bl_key.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "bl_key is invalid", KR(ret), K(bl_key)); + } else if (OB_FAIL(map_.create(bl_key, value))) { + TRANS_LOG(WARN, "map create error", KR(ret), K(bl_key)); + } else { + if (OB_FAIL(value->init(bl_key))) { + TRANS_LOG(WARN, "value init error", KR(ret), K(bl_key), KPC(value)); + map_.del(bl_key); + } + map_.revert(value); + } + return ret; + } + // update or create + int update(const BLKey &bl_key, const ObLsInfo &ls_info) + { + int ret = OB_SUCCESS; + BLValue *value = NULL; + + if (!bl_key.is_valid() || !ls_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "bl_key is invalid", KR(ret), K(bl_key), K(ls_info)); + } else if (OB_FAIL(map_.get(bl_key, value)) && OB_ENTRY_NOT_EXIST != ret) { + TRANS_LOG(WARN, "map get error", KR(ret), K(bl_key), K(ls_info)); + } else if (OB_ENTRY_NOT_EXIST == ret) { + // key不存在,创建value并插入map + if (OB_FAIL(map_.create(bl_key, value))) { + // 可能前面get时还没有这个key,但是在create之前别的线程把这个key插入map了 + TRANS_LOG(WARN, "map create error", KR(ret), K(bl_key), K(ls_info)); + } else { + if (OB_FAIL(value->init(bl_key, ls_info))) { + TRANS_LOG(WARN, "value init error", KR(ret), KPC(value), K(bl_key), K(ls_info)); + map_.del(bl_key); + } + map_.revert(value); + } + // key已存在,直接更新value + } else if (OB_FAIL(value->update(ls_info))) { + // 只要get成功,就要减去value的引用计数 + map_.revert(value); + TRANS_LOG(WARN, "value update error", KR(ret), KPC(value), K(bl_key), K(ls_info)); + } else { + map_.revert(value); + } + return ret; + } + void remove(const BLKey &bl_key) + { + if (bl_key.is_valid()) { + map_.del(bl_key); + } + } + int check_in_black_list(const BLKey &bl_key, bool &in_black_list) const + { + int ret = OB_SUCCESS; + + if (!bl_key.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_ENTRY_EXIST == map_.contains_key(bl_key)) { + in_black_list = true; + } else { + in_black_list = false; + } + return ret; + } + template int for_each(Function &fn) + { + return map_.for_each(fn); + } + +private: + typedef ObLinkHashMap BLHashMap; + BLHashMap map_; +}; + +typedef ObBLMgr ObLsBLMgr; + +class ObBLService : public ObThreadPool +{ +public: + ObBLService() : is_inited_(false), is_running_(false), thread_cond_() {} + ~ObBLService() { destroy(); } + int init(); + void reset(); + void destroy(); + int start(); + void stop(); + void wait(); + + int add(const ObBLKey &bl_key); + void remove(const ObBLKey &bl_key); + int check_in_black_list(const ObBLKey &bl_key, bool &in_black_list) const; + + void run1(); + TO_STRING_KV(K_(is_inited), K_(is_running)); + +public: + static ObBLService &get_instance() + { + static ObBLService instance_; + return instance_; + } + +private: + void do_thread_task_(const int64_t begin_tstamp, int64_t &last_print_stat_ts, int64_t &last_clean_up_ts); + int do_black_list_check_(sqlclient::ObMySQLResult *result); + int do_clean_up_(); + int get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey &bl_key, ObLsInfo &ls_info); + int get_tenant_max_stale_time_(uint64_t tenant_id, uint64_t &max_stale_time); + void print_stat_(); + +private: + bool is_inited_; + bool is_running_; + common::ObCond thread_cond_; + ObLsBLMgr ls_bl_mgr_; +}; + +class ObBLPrintFunctor +{ +public: + explicit ObBLPrintFunctor() {} + bool operator()(const ObBLKey &key, ObBLValue *value) + { + TRANS_LOG(INFO, "blacklist info print ", KPC(value)); + return true; + } +}; + +template +class ObBLCleanUpFunctor +{ +public: + explicit ObBLCleanUpFunctor(BLMgr &bl_mgr) : bl_mgr_(bl_mgr) {} + bool operator()(const ObBLKey &key, ObBLValue *value) + { + if (ObTimeUtility::current_time() > value->get_update_ts() + BLACK_LIST_CLEAN_UP_INTERVAL) { + bl_mgr_.remove(key); + } + return true; + } +private: + BLMgr &bl_mgr_; +}; + +} // transaction +} // oceanbase + +#endif // end of OCEANBASE_TRANSACTION_OB_BLACK_LIST_ diff --git a/src/storage/tx_table/ob_tx_ctx_table.cpp b/src/storage/tx_table/ob_tx_ctx_table.cpp index 403dd65f3..03af7832a 100644 --- a/src/storage/tx_table/ob_tx_ctx_table.cpp +++ b/src/storage/tx_table/ob_tx_ctx_table.cpp @@ -97,8 +97,7 @@ int ObTxCtxTableRecoverHelper::recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls transaction::ObPartTransCtx *tx_ctx = NULL; bool tx_ctx_existed = true; common::ObAddr scheduler; - transaction::ObTxCreateArg arg(false, /* can_elr */ - true, /* for_replay */ + transaction::ObTxCreateArg arg(true, /* for_replay */ MTL_ID(), ctx_info.tx_id_, ctx_info.ls_id_, diff --git a/unittest/storage/tx/CMakeLists.txt b/unittest/storage/tx/CMakeLists.txt index cd061dd53..84c92297f 100644 --- a/unittest/storage/tx/CMakeLists.txt +++ b/unittest/storage/tx/CMakeLists.txt @@ -34,6 +34,7 @@ tx_unittest(test_simple_tx_ctx) tx_unittest(test_ls_log_writer) tx_unittest(test_ob_trans_hashmap) +storage_unittest(test_ob_black_list) storage_unittest(test_ob_tx_log) storage_unittest(test_ob_timestamp_service) storage_unittest(test_ob_trans_rpc) diff --git a/unittest/storage/tx/test_ob_black_list.cpp b/unittest/storage/tx/test_ob_black_list.cpp new file mode 100644 index 000000000..05461606f --- /dev/null +++ b/unittest/storage/tx/test_ob_black_list.cpp @@ -0,0 +1,289 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include + +#define protected public +#define private public +#include "storage/tx/wrs/ob_black_list.h" + +namespace oceanbase +{ +using namespace common; +using namespace storage; +using namespace transaction; + +namespace unittest +{ +class TestObBlackList : public ::testing::Test +{ +public : + virtual void SetUp() {} + virtual void TearDown() {} +}; + +TEST_F(TestObBlackList, black_list_init_invalid) +{ + TRANS_LOG(INFO, "called", "func", test_info_->name()); + + // init + ObBLService &bl_service = ObBLService::get_instance(); + EXPECT_EQ(OB_SUCCESS, bl_service.init()); + + ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077); + ObBLKey key; + ObLSID ls_id(2); + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id)); + + // add + EXPECT_EQ(OB_SUCCESS, bl_service.add(key)); + + // check + bool check; + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(true, check); + + // update + ObLsInfo ls_info; + uint64_t curr_time = static_cast(ObTimeUtility::current_time()); + EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, curr_time, OB_MIGRATE_STATUS_NONE)); + EXPECT_EQ(OB_SUCCESS, bl_service.ls_bl_mgr_.update(key, ls_info)); + + // check + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(true, check); + + // remove + bl_service.remove(key); + + // check + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(false, check); + + // reset & init + bl_service.reset(); + EXPECT_EQ(OB_SUCCESS, bl_service.init()); + + // add 100 keys & check + for (int i=1; i<=100; i++) { + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ObLSID(i))); + EXPECT_EQ(OB_SUCCESS, bl_service.add(key)); + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(true, check); + } + + // remove 100 keys & check + for (int i=1; i<=100; i++) { + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ObLSID(i))); + bl_service.remove(key); + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(false, check); + } + + // destroy + bl_service.destroy(); +} + +TEST_F(TestObBlackList, black_list_error) +{ + TRANS_LOG(INFO, "called", "func", test_info_->name()); + + // init key + ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077); + ObBLKey key; + ObLSID ls_id(2); + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id)); + + // error: insert before init + ObBLService &bl_service = ObBLService::get_instance(); + EXPECT_EQ(OB_NOT_INIT, bl_service.add(key)); + + // error: init twice + EXPECT_EQ(OB_SUCCESS, bl_service.init()); + EXPECT_EQ(OB_INIT_TWICE, bl_service.init()); + + // erroe: add twice + EXPECT_EQ(OB_SUCCESS, bl_service.add(key)); + EXPECT_EQ(OB_ENTRY_EXIST, bl_service.add(key)); + + // check + bool check; + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(true, check); + + // remove twice + bl_service.remove(key); + bl_service.remove(key); + + // check + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(false, check); + + // destroy twice + bl_service.destroy(); + bl_service.destroy(); + + // error: access after destroy + EXPECT_EQ(OB_NOT_INIT, bl_service.add(key)); + bl_service.remove(key); + EXPECT_EQ(OB_NOT_INIT, bl_service.check_in_black_list(key, check)); +} + +TEST_F(TestObBlackList, black_list_inc_func) +{ + TRANS_LOG(INFO, "called", "func", test_info_->name()); + + // init + ObBLService &bl_service = ObBLService::get_instance(); + EXPECT_EQ(OB_SUCCESS, bl_service.init()); + + ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077); + ObBLKey key; + bool check = false; + + // add 100 keys & check + for (int i=1; i<=100; i++) { + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ObLSID(i))); + EXPECT_EQ(OB_SUCCESS, bl_service.add(key)); + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + EXPECT_EQ(true, check); + } + + // print + bl_service.print_stat_(); + EXPECT_EQ(100, bl_service.ls_bl_mgr_.map_.size()); + + // clean up + bl_service.do_clean_up_(); + EXPECT_EQ(100, bl_service.ls_bl_mgr_.map_.size()); + + // sleep 5s + int64_t clean_ts = ObTimeUtility::current_time(); + usleep(BLACK_LIST_CLEAN_UP_INTERVAL); + + // do_thread_task_ + int64_t curr_ts = ObTimeUtility::current_time(); + int64_t print_ts = curr_ts - 1; + bl_service.do_thread_task_(curr_ts, print_ts, clean_ts); + EXPECT_EQ(curr_ts, clean_ts); + EXPECT_EQ(0, bl_service.ls_bl_mgr_.map_.size()); + + // destroy + bl_service.destroy(); +} + +TEST_F(TestObBlackList, black_list_parallel_1) +{ + TRANS_LOG(INFO, "called", "func", test_info_->name()); + + // init + ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077); + ObBLService &bl_service = ObBLService::get_instance(); + EXPECT_EQ(OB_SUCCESS, bl_service.init()); + + // parallel test + std::vector ths; + uint64_t loop_cnt = 10000; + uint64_t worker_cnt = 10; + + auto do_worker = [&] (uint64_t id) { + ObBLKey key; + uint64_t base_id = id * loop_cnt; + // pre worker id, the first worker's pre worker is the last worker + uint64_t pre_id = ((id + worker_cnt - 1) % worker_cnt) * loop_cnt; + for (int i = 1; i <= loop_cnt; i++) { + ObLSID ls_id(base_id + i); + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id)); + EXPECT_EQ(OB_SUCCESS, bl_service.add(key)); + ObLSID ls_id_2(pre_id + i); + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id_2)); + bl_service.remove(key); + } + }; + for (int i = 0; i < worker_cnt; i++) { + std::thread th(do_worker, i); + ths.push_back(std::move(th)); + } + for (auto &th : ths) { + th.join(); + } + + // destroy + bl_service.destroy(); +} + +TEST_F(TestObBlackList, black_list_parallel_2) +{ + TRANS_LOG(INFO, "called", "func", test_info_->name()); + + // init + ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077); + ObBLService &bl_service = ObBLService::get_instance(); + EXPECT_EQ(OB_SUCCESS, bl_service.init()); + + // parallel test + std::vector ths; + uint64_t loop_cnt = 10000; + uint64_t worker_cnt = 10; + bool check = false; + + auto do_worker = [&] (uint64_t id) { + int rand = 0; + ObBLKey key; + ObLsInfo ls_info; + EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, 1, OB_MIGRATE_STATUS_NONE)); + std::srand((unsigned)std::time(NULL)); + for (int i = 1; i <= loop_cnt; i++) { + rand = std::rand() % 1000 + 1; + ObLSID ls_id(std::rand() % loop_cnt + 1); + EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id)); + + if (rand <= 300) { + bl_service.add(key); + } else if (rand <= 600) { + bl_service.ls_bl_mgr_.update(key, ls_info); + } else if (rand <= 800) { + bl_service.remove(key); + } else { + EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check)); + } + } + }; + for (int i = 0; i < worker_cnt; i++) { + std::thread th(do_worker, i); + ths.push_back(std::move(th)); + } + for (auto &th : ths) { + th.join(); + } + + // destroy + bl_service.destroy(); +} + +} // unittest +} // oceanbase + +using namespace oceanbase; +using namespace oceanbase::common; + +int main(int argc, char **argv) +{ + int ret = 1; + ObLogger &logger = ObLogger::get_logger(); + logger.set_file_name("test_ob_black_list.log", true); + logger.set_log_level(OB_LOG_LEVEL_INFO); + testing::InitGoogleTest(&argc, argv); + ret = RUN_ALL_TESTS(); + return ret; +}