[FEAT MERGE] ELR & WeakRead feature enhancement

This commit is contained in:
obdev 2022-12-27 05:08:57 +00:00 committed by ob-robot
parent 9f3038df4f
commit 5aed6bd3d8
61 changed files with 1664 additions and 142 deletions

View File

@ -407,6 +407,7 @@ inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& conte
// If a request is divided into multiple MySQL packages, each MySQL package will also set the re-routing flag
ObMySQLRawPacket *input_packet = reinterpret_cast<ObMySQLRawPacket *>(ipacket);
input_packet->set_can_reroute_pkt(pkt20->get_flags().is_proxy_reroute());
input_packet->set_is_weak_read(pkt20->get_flags().is_weak_read());
input_packet->set_extra_info(pkt20->get_extra_info());
context.reset();
// set again for sending response

View File

@ -30,6 +30,7 @@ union Ob20ProtocolFlags
bool is_last_packet() const { return 1 == st_flags_.OB_IS_LAST_PACKET; }
bool is_proxy_reroute() const { return 1 == st_flags_.OB_IS_PROXY_REROUTE; }
bool is_new_extra_info() const { return 1 == st_flags_.OB_IS_NEW_EXTRA_INFO; }
bool is_weak_read() const { return 1 == st_flags_.OB_IS_WEAK_READ; }
uint32_t flags_;
struct Protocol20Flags
@ -38,7 +39,8 @@ union Ob20ProtocolFlags
uint32_t OB_IS_LAST_PACKET: 1;
uint32_t OB_IS_PROXY_REROUTE: 1;
uint32_t OB_IS_NEW_EXTRA_INFO: 1;
uint32_t OB_FLAG_RESERVED_NOT_USE: 28;
uint32_t OB_IS_WEAK_READ: 1;
uint32_t OB_FLAG_RESERVED_NOT_USE: 27;
} st_flags_;
};

View File

@ -459,12 +459,14 @@ class ObMySQLRawPacket
public:
ObMySQLRawPacket() : ObMySQLPacket(), cmd_(COM_MAX_NUM),
can_reroute_pkt_(false),
is_weak_read_(false),
extra_info_()
{}
explicit ObMySQLRawPacket(obmysql::ObMySQLCmd cmd)
: ObMySQLPacket(), cmd_(cmd),
can_reroute_pkt_(false),
is_weak_read_(false),
extra_info_()
{}
@ -479,6 +481,9 @@ public:
inline void set_can_reroute_pkt(const bool can_rerute);
inline bool can_reroute_pkt() const;
inline void set_is_weak_read(const bool v) { is_weak_read_ = v; }
inline bool is_weak_read() const { return is_weak_read_; }
inline void set_extra_info(const Ob20ExtraInfo &extra_info) { extra_info_ = extra_info; }
inline const Ob20ExtraInfo &get_extra_info() const { return extra_info_; }
bool exist_trace_info() const { return extra_info_.exist_trace_info_; }
@ -489,6 +494,7 @@ public:
ObMySQLPacket::reset();
cmd_ = COM_MAX_NUM;
can_reroute_pkt_ = false;
is_weak_read_ = false;
extra_info_.reset();
}
@ -497,10 +503,11 @@ public:
ObMySQLPacket::assign(other);
cmd_ = other.cmd_;
can_reroute_pkt_ = other.can_reroute_pkt_;
is_weak_read_ = other.is_weak_read_;
extra_info_ = other.extra_info_;
}
TO_STRING_KV("header", hdr_, "can_reroute", can_reroute_pkt_);
TO_STRING_KV("header", hdr_, "can_reroute", can_reroute_pkt_, "weak_read", is_weak_read_);
protected:
virtual int serialize(char*, const int64_t, int64_t&) const;
@ -509,6 +516,7 @@ private:
private:
ObMySQLCmd cmd_;
bool can_reroute_pkt_;
bool is_weak_read_;
Ob20ExtraInfo extra_info_;
};

View File

@ -349,7 +349,8 @@ int ObMPBase::init_process_var(sql::ObSqlCtx &ctx,
// 并且还没开启事务时,这条sql才能二次路由
ctx.can_reroute_sql_ = (pkt.can_reroute_pkt() && get_conn()->is_support_proxy_reroute());
}
LOG_TRACE("recorded sql reroute flag", K(ctx.can_reroute_sql_));
ctx.is_protocol_weak_read_ = pkt.is_weak_read();
LOG_TRACE("protocol flag info", K(ctx.can_reroute_sql_), K(ctx.is_protocol_weak_read_));
}
return ret;
}

View File

@ -140,6 +140,7 @@ ObServer::ObServer()
ob_service_(gctx_),
multi_tenant_(), vt_data_service_(root_service_, self_addr_, &config_),
weak_read_service_(),
bl_service_(ObBLService::get_instance()),
table_service_(),
cgroup_ctrl_(),
start_time_(ObTimeUtility::current_time()),
@ -288,6 +289,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
LOG_ERROR("init ts mgr failed", KR(ret));
} else if (OB_FAIL(weak_read_service_.init(net_frame_.get_req_transport()))) {
LOG_ERROR("init weak_read_service failed", KR(ret));
} else if (OB_FAIL(bl_service_.init())) {
LOG_ERROR("init bl_service_ failed", KR(ret));
} else if (OB_FAIL(ObDeviceManager::get_instance().init_devices_env())) {
LOG_ERROR("init device manager failed", KR(ret));
} else if (OB_FAIL(ObTenantMutilAllocatorMgr::get_instance().init())) {
@ -488,6 +491,10 @@ void ObServer::destroy()
weak_read_service_.destroy();
FLOG_INFO("weak read service destroyed");
FLOG_INFO("begin to destroy blacklist service");
bl_service_.destroy();
FLOG_INFO("blacklist service destroyed");
FLOG_INFO("begin to destroy net frame");
net_frame_.destroy();
FLOG_INFO("net frame destroyed");
@ -637,6 +644,12 @@ int ObServer::start()
FLOG_INFO("success to start weak read service");
}
if (FAILEDx(bl_service_.start())) {
LOG_ERROR("fail to start blacklist service", KR(ret));
} else {
FLOG_INFO("success to start blacklist service");
}
// do not wait clog replay over, avoid blocking other module
if (FAILEDx(root_service_monitor_.start())) {
LOG_ERROR("fail to start root service monitor", KR(ret));
@ -971,6 +984,10 @@ int ObServer::stop()
weak_read_service_.stop();
FLOG_INFO("weak read service stopped");
FLOG_INFO("begin to stop blacklist service");
bl_service_.stop();
FLOG_INFO("blacklist service stopped");
FLOG_INFO("begin to stop ts mgr");
OB_TS_MGR.stop();
FLOG_INFO("ts mgr stopped");
@ -1209,6 +1226,10 @@ int ObServer::wait()
weak_read_service_.wait();
FLOG_INFO("wait weak read service success");
FLOG_INFO("begin to wait blacklist service");
bl_service_.wait();
FLOG_INFO("wait blacklist service success");
FLOG_INFO("begin to wait server checkpoint slog handler");
ObServerCheckpointSlogHandler::get_instance().wait();
FLOG_INFO("wait server checkpoint slog handler success");

View File

@ -35,6 +35,7 @@
#include "pl/ob_pl.h"
#include "storage/tx/wrs/ob_weak_read_service.h" // ObWeakReadService
#include "storage/tx/wrs/ob_black_list.h" // ObBLService
#include "storage/ob_long_ops_monitor.h"
#include "storage/ob_partition_component_factory.h"
@ -370,6 +371,8 @@ private:
// Weakly Consistent Read Service
transaction::ObWeakReadService weak_read_service_;
// blacklist service
transaction::ObBLService &bl_service_;
// table service
ObTableService table_service_;

View File

@ -27,6 +27,7 @@
#include "observer/mysql/ob_mysql_request_manager.h"
#include "share/ob_define.h"
#include "storage/tx/ob_trans_service.h"
#include "storage/tx/wrs/ob_weak_read_util.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
@ -412,7 +413,9 @@ int ObTableApiProcessorBase::setup_tx_snapshot_(transaction::ObTxDesc &trans_des
}
} else {
SCN weak_read_snapshot;
if (OB_FAIL(txs->get_weak_read_snapshot_version(weak_read_snapshot))) {
if (OB_FAIL(txs->get_weak_read_snapshot_version(
transaction::ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID()),
weak_read_snapshot))) {
LOG_WARN("fail to get weak read snapshot", K(ret));
} else {
tx_snapshot_.init_weak_read(weak_read_snapshot);

View File

@ -653,7 +653,7 @@ DEF_STR_WITH_CHECKER(_rpc_checksum, OB_CLUSTER_PARAMETER, "Force",
DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]",
"transaction rpc timeout(s). Range: [0s, 3600s]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "False",
DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "True",
"enable early lock release",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
@ -1317,4 +1317,4 @@ DEF_BOOL(enable_cgroup, OB_CLUSTER_PARAMETER, "True",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
DEF_TIME(_ob_plan_cache_auto_flush_interval, OB_CLUSTER_PARAMETER, "0s", "[0s,)",
"time interval for auto periodic flush plan cache. Range: [0s, +∞)",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -5153,6 +5153,8 @@ int ObSchemaServiceSQLImpl::sort_table_partition_info(
LOG_WARN("fail to mock partition array", KR(ret), K(table_schema));
} else if (OB_FAIL(sort_subpartition_array(table_schema))) {
LOG_WARN("failed to sort subpartition array", KR(ret), K(table_schema));
} else if (OB_FAIL(table_schema.build_list_idx_hash_array())) {
LOG_WARN("fail to build list idx hash array", KR(ret));
}
}
LOG_TRACE("fetch partition info", KR(ret), K(table_schema));

View File

@ -2920,6 +2920,35 @@ OB_DEF_SERIALIZE_SIZE(ObPrimaryZone)
return len;
}
ObNewRowKey::ObNewRowKey(const common::ObNewRow &row)
{
row_ = &row;
hash_val_ = 0;
for (int64_t i = 0; i < row.get_count(); i++) {
const ObObj &obj = row.get_cell(i);
uint64_t tmp_hash_val = obj.hash();
hash_val_ = common::murmurhash(&tmp_hash_val, sizeof(uint64_t), hash_val_);
}
}
uint64_t ObNewRowKey::hash() const
{
return hash_val_;
}
bool ObNewRowKey::operator==(const ObNewRowKey &other) const
{
bool bret = false;
if (OB_ISNULL(row_) && OB_ISNULL(other.row_)) {
bret = true;
} else if (OB_NOT_NULL(row_) && OB_NOT_NULL(other.row_)) {
bret = (*(row_) == *(other.row_));
} else {
bret = false;
}
return bret;
}
/*-------------------------------------------------------------------------------------------------
* ------------------------------ObPartitionSchema-------------------------------------------
----------------------------------------------------------------------------------------------------*/
@ -2977,6 +3006,7 @@ void ObPartitionSchema::reuse_partition_schema()
partition_array_ = NULL;
partition_array_capacity_ = 0;
partition_num_ = 0;
list_idx_hash_array_ = NULL;
def_subpartition_array_ = NULL;
def_subpartition_num_ = 0;
def_subpartition_array_capacity_ = 0;
@ -3079,12 +3109,114 @@ int ObPartitionSchema::assign_partition_schema(const ObPartitionSchema &src_sche
LOG_WARN("fail to set transition point", K(ret));
} else if (OB_FAIL(set_interval_range(src_schema.get_interval_range()))) {
LOG_WARN("fail to set interval range", K(ret));
} else if (OB_NOT_NULL(src_schema.list_idx_hash_array_)
&& OB_FAIL(build_list_idx_hash_array())) {
LOG_WARN("fail to build list idx hash_array", KR(ret));
}
}
return ret;
}
int ObPartitionSchema::build_list_idx_hash_array()
{
int ret = OB_SUCCESS;
ObPartitionLevel part_level = get_part_level();
if ((PARTITION_LEVEL_ONE == part_level
|| PARTITION_LEVEL_TWO == part_level)
&& is_list_part()) {
ObPartition **part_array = get_part_array();
const int64_t partition_num = get_partition_num();
int64_t list_value_cnt = 0;
if (OB_ISNULL(part_array)
|| partition_num <= 0
|| OB_NOT_NULL(list_idx_hash_array_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid arg", KR(ret), KP(part_array), K(partition_num), KP(list_idx_hash_array_));
} else if (OB_FAIL(get_list_part_value_cnt_(list_value_cnt))) {
LOG_WARN("fail to get list value cnt", KR(ret), KPC(this));
} else if (list_value_cnt <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid list_value_cnt", KR(ret), K(list_value_cnt));
} else {
int64_t mem_size = get_list_idx_hash_array_mem_size_(list_value_cnt);
char *buf = NULL;
if (OB_ISNULL(buf = static_cast<char*>(alloc(mem_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc mem failed", KR(ret), K(mem_size), K(list_value_cnt));
} else if (OB_ISNULL(list_idx_hash_array_ = new (buf) ListIdxHashArray(mem_size))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to new ListIdxHashArray", KR(ret), K(mem_size), K(list_value_cnt));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < partition_num; i++) {
ObPartition *part = part_array[i];
if (OB_ISNULL(part)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("part is null", KR(ret), K(i));
} else if (part->get_list_row_values().count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("list value is empty", KR(ret), KPC(part));
}
for (int64_t j = 0; OB_SUCC(ret) && j < part->get_list_row_values().count(); j++) {
const ObNewRow &row = part->get_list_row_values().at(j);
void *tmp_buf = NULL;
ObNewRowValue *value = NULL;
if (OB_ISNULL(tmp_buf = alloc(sizeof(ObNewRowValue)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc mem failed", KR(ret));
} else if (OB_ISNULL(value = new (tmp_buf) ObNewRowValue(row, i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to new ObNewRowValue", KR(ret));
} else if (OB_FAIL(list_idx_hash_array_->set_refactored(value->get_key(), value))) {
LOG_WARN("fail to set refactored", KR(ret), K(row));
}
} // end for
} // end for
}
}
}
return ret;
}
int ObPartitionSchema::get_list_idx_hash_array_mem_size_(const int64_t cnt) const
{
return common::max(ListIdxHashArray::MIN_HASH_ARRAY_ITEM_COUNT, cnt * 2) * sizeof(void*)
+ sizeof(ListIdxHashArray);
}
int ObPartitionSchema::get_list_part_value_cnt_(int64_t &count)
{
int ret = OB_SUCCESS;
ObPartitionLevel part_level = get_part_level();
count = 0;
if ((PARTITION_LEVEL_ONE == part_level
|| PARTITION_LEVEL_TWO == part_level)
&& is_list_part()) {
ObPartition **part_array = get_part_array();
const int64_t partition_num = get_partition_num();
if (OB_ISNULL(part_array)
|| partition_num <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid arg", KR(ret), KP(part_array), K(partition_num));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < partition_num; i++) {
ObPartition *part = part_array[i];
if (OB_ISNULL(part)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("part is null", KR(ret), K(i));
} else if (part->get_list_row_values().count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("list value is empty", KR(ret), KPC(part));
} else {
count += part->get_list_row_values().count();
}
} // end for
}
}
return ret;
}
int ObPartitionSchema::try_assign_part_array(
const share::schema::ObPartitionSchema &that)
{
@ -3102,6 +3234,10 @@ int ObPartitionSchema::try_assign_part_array(
partition_array_ = nullptr;
partition_num_ = 0;
}
if (NULL != list_idx_hash_array_) {
free(list_idx_hash_array_);
list_idx_hash_array_ = NULL;
}
part_level_ = that.get_part_level();
part_option_ = that.get_part_option();
if (OB_FAIL(part_option_.get_err_ret())) {
@ -4528,6 +4664,11 @@ int64_t ObTablegroupSchema::get_convert_size() const
hidden_partition_array_, hidden_partition_num_);
convert_size += transition_point_.get_deep_copy_size();
convert_size += interval_range_.get_deep_copy_size();
if (OB_NOT_NULL(list_idx_hash_array_)) {
int64_t cnt = list_idx_hash_array_->item_count();
convert_size += get_list_idx_hash_array_mem_size_(cnt);
convert_size += cnt * sizeof(ObNewRowValue);
}
return convert_size;
}
@ -6316,8 +6457,10 @@ int ObPartitionUtils::get_tablet_and_part_id(
KR(ret), K(range), K(table_id));
}
} else if (table_schema.is_list_part()) {
const ListIdxHashArray *list_idx_hash_array = table_schema.get_list_idx_hash_array();
if (OB_FAIL(ObPartitionUtils::get_list_tablet_and_part_id_(
range, part_array, part_num, partition_indexes))) {
range, part_array, part_num, list_idx_hash_array,
partition_indexes))) {
LOG_WARN("fail to fill list tablet_id and part_id",
KR(ret), K(range), K(table_id));
}
@ -6376,8 +6519,9 @@ int ObPartitionUtils::get_tablet_and_part_id(
KR(ret), K(row), K(table_id));
}
} else if (table_schema.is_list_part()) {
const ListIdxHashArray *list_idx_hash_array = table_schema.get_list_idx_hash_array();
if (OB_FAIL(ObPartitionUtils::get_list_tablet_and_part_id_(
row, part_array, part_num, partition_indexes))) {
row, part_array, part_num, list_idx_hash_array, partition_indexes))) {
LOG_WARN("fail to fill list tablet_id and part_id",
KR(ret), K(row), K(table_id));
}
@ -6797,6 +6941,7 @@ int ObPartitionUtils::get_list_tablet_and_part_id_(
const common::ObNewRange &range,
ObPartition * const* partition_array,
const int64_t partition_num,
const ListIdxHashArray *list_idx_hash_array,
common::ObIArray<PartitionIndex> &indexes)
{
int ret = OB_SUCCESS;
@ -6817,7 +6962,7 @@ int ObPartitionUtils::get_list_tablet_and_part_id_(
row.cells_ = const_cast<ObObj*>(range.start_key_.get_obj_ptr());
row.count_ = range.start_key_.get_obj_cnt();
if (OB_FAIL(get_list_tablet_and_part_id_(
row, partition_array, partition_num, indexes))) {
row, partition_array, partition_num, list_idx_hash_array, indexes))) {
LOG_WARN("fail to get list tablet and part_id", KR(ret), K(row));
}
}
@ -6904,6 +7049,7 @@ int ObPartitionUtils::get_list_tablet_and_part_id_(
const common::ObNewRow &row,
ObPartition * const* partition_array,
const int64_t partition_num,
const ListIdxHashArray *list_idx_hash_array,
common::ObIArray<PartitionIndex> &indexes)
{
int ret = OB_SUCCESS;
@ -6916,21 +7062,65 @@ int ObPartitionUtils::get_list_tablet_and_part_id_(
} else {
int64_t part_idx = OB_INVALID_INDEX;
int64_t default_value_idx = OB_INVALID_INDEX;
for (int64_t i = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && i < partition_num; i++) {
const ObIArray<common::ObNewRow> &list_row_values = partition_array[i]->get_list_row_values();
for (int64_t j = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && j < list_row_values.count(); j++) {
const ObNewRow &list_row = list_row_values.at(j);
if (row == list_row) {
part_idx = i;
if (OB_NOT_NULL(list_idx_hash_array)) {
// new logic, use hash array
ObNewRowKey key(row);
const ObNewRowValue *value = NULL;
int hash_ret = list_idx_hash_array->get_refactored(key, value);
if (OB_SUCCESS == hash_ret) {
if (OB_ISNULL(value)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("value should not be null", KR(ret), K(row));
} else {
part_idx = value->get_part_idx();
LOG_TRACE("hit list idx hash array", K(row), K(part_idx));
}
} // end for
if (list_row_values.count() == 1
&& list_row_values.at(0).get_count() >= 1
&& list_row_values.at(0).get_cell(0).is_max_value()) {
// calc default value position
default_value_idx = i;
} else if (OB_HASH_NOT_EXIST == hash_ret) {
// try use default value
static int64_t ROW_KEY_CNT = 1;
ObObj obj_array[ROW_KEY_CNT];
obj_array[0].set_max_value();
ObNewRow default_row(obj_array, 1);
ObNewRowKey default_key(default_row);
hash_ret = list_idx_hash_array->get_refactored(default_key, value);
if (OB_SUCCESS == hash_ret) {
if (OB_ISNULL(value)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("value should not be null", KR(ret), K(default_row));
} else {
default_value_idx = value->get_part_idx();
LOG_TRACE("not hit, but hit default value in list idx hash array", K(row), K(default_value_idx));
}
} else if (OB_HASH_NOT_EXIST == hash_ret) {
// do nothing
LOG_TRACE("not hit any values in list idx hash array", K(row));
} else {
ret = hash_ret;
LOG_WARN("fail to get value from hash array", KR(ret), K(default_row));
}
} else {
ret = hash_ret;
LOG_WARN("fail to get value from hash array", KR(ret), K(row));
}
} // end dor
} else {
// old logic, iterate all partition
for (int64_t i = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && i < partition_num; i++) {
const ObIArray<common::ObNewRow> &list_row_values = partition_array[i]->get_list_row_values();
for (int64_t j = 0; OB_SUCC(ret) && OB_INVALID_INDEX == part_idx && j < list_row_values.count(); j++) {
const ObNewRow &list_row = list_row_values.at(j);
if (row == list_row) {
part_idx = i;
}
} // end for
if (list_row_values.count() == 1
&& list_row_values.at(0).get_count() >= 1
&& list_row_values.at(0).get_cell(0).is_max_value()) {
// calc default value position
default_value_idx = i;
}
} // end dor
}
if (OB_SUCC(ret)) {
const ObPartition *partition = NULL;

View File

@ -28,6 +28,7 @@
#include "share/ob_priv_common.h"
#include "lib/worker.h"
#include "objit/common/ob_item_type.h"
#include "lib/hash/ob_pointer_hashmap.h"
#ifdef __cplusplus
extern "C" {
@ -2093,6 +2094,54 @@ private:
int64_t subpart_idx_;
};
class ObNewRowKey
{
public:
ObNewRowKey() : row_(NULL), hash_val_(0) {}
ObNewRowKey(const common::ObNewRow &row);
~ObNewRowKey() {};
uint64_t hash() const;
bool operator==(const ObNewRowKey &other) const;
private:
const common::ObNewRow *row_;
uint64_t hash_val_;
};
class ObNewRowValue
{
public:
ObNewRowValue() : key_(), part_idx_(common::OB_INVALID_INDEX) {}
ObNewRowValue(const common::ObNewRow &row, const int64_t part_idx)
: key_(row), part_idx_(part_idx) {}
~ObNewRowValue() {};
ObNewRowKey get_key() const { return key_; }
int64_t get_part_idx() const { return part_idx_; }
private:
ObNewRowKey key_;
int64_t part_idx_;
};
template<class K, class V>
struct ObGetNewRowKey
{
void operator()(const K &k, const V &v) const
{
UNUSED(k);
UNUSED(v);
}
};
template<>
struct ObGetNewRowKey<ObNewRowKey, const ObNewRowValue *>
{
ObNewRowKey operator()(const ObNewRowValue *value) const
{
return value->get_key();
}
};
typedef common::hash::ObPointerHashArray<ObNewRowKey, const ObNewRowValue *, ObGetNewRowKey> ListIdxHashArray;
class ObPartitionSchema : public ObSchema
{
OB_UNIS_VERSION(1);
@ -2304,6 +2353,7 @@ public:
partition_array_capacity_ = 0;
partition_num_ = 0;
partition_array_ = NULL;
list_idx_hash_array_ = NULL;
}
int64_t get_hidden_partition_num() const { return hidden_partition_num_; }
@ -2356,6 +2406,9 @@ public:
// only used for generate part_name
int get_max_part_id(int64_t &part_id) const;
int get_max_part_idx(int64_t &part_idx) const;
const ListIdxHashArray *get_list_idx_hash_array() const { return list_idx_hash_array_; }
int build_list_idx_hash_array();
protected:
int inner_add_partition(const ObPartition &part);
template<class T>
@ -2385,6 +2438,8 @@ protected:
{
return (sub_part_template_flags_ & flag) > 0;
}
int get_list_idx_hash_array_mem_size_(const int64_t cnt) const;
int get_list_part_value_cnt_(int64_t &count);
protected:
static const int64_t DEFAULT_ARRAY_CAPACITY = 128;
protected:
@ -2424,6 +2479,10 @@ protected:
int64_t hidden_partition_num_;
common::ObRowkey transition_point_;
common::ObRowkey interval_range_;
// Record ObNewRow -> part_idx when table's firt part is list like
// won't serialize/deserialize
ListIdxHashArray *list_idx_hash_array_;
};
/*TODO: Delete the following interfaces in ObTablegroupSchema and ObDatabaseSchema
int ObTablegroupSchema::get_first_primary_zone_inherit()
@ -2794,6 +2853,7 @@ private:
const common::ObNewRange &range,
ObPartition * const* partition_array,
const int64_t partition_num,
const ListIdxHashArray *list_idx_hash_array,
common::ObIArray<PartitionIndex> &indexes);
// param[@in]:
@ -2827,6 +2887,7 @@ private:
const common::ObNewRow &row,
ObPartition * const* partition_array,
const int64_t partition_num,
const ListIdxHashArray *list_idx_hash_array,
common::ObIArray<PartitionIndex> &indexes);
// param[@in]:

View File

@ -590,6 +590,11 @@ int64_t ObSimpleTableSchemaV2::get_convert_size() const
convert_size += link_database_name_.length() + 1;
convert_size += transition_point_.get_deep_copy_size();
convert_size += interval_range_.get_deep_copy_size();
if (OB_NOT_NULL(list_idx_hash_array_)) {
int64_t cnt = list_idx_hash_array_->item_count();
convert_size += get_list_idx_hash_array_mem_size_(cnt);
convert_size += cnt * sizeof(ObNewRowValue);
}
return convert_size;
}

View File

@ -241,6 +241,7 @@ enum ObSysVarClassType
SYS_VAR_OB_ENABLE_RICH_ERROR_MSG = 10134,
SYS_VAR_OB_SQL_PLAN_MEMORY_PERCENTAGE = 10135,
SYS_VAR_LOG_ROW_VALUE_OPTIONS = 10136,
SYS_VAR_OB_MAX_READ_STALE_TIME = 10137,
};
}

View File

@ -236,6 +236,7 @@ namespace share
static const char* const OB_SV_ENABLE_RICH_ERROR_MSG = "ob_enable_rich_error_msg";
static const char* const OB_SV_SQL_PLAN_MEMORY_PERCENTAGE = "ob_sql_plan_memory_percentage";
static const char* const OB_SV_LOG_ROW_VALUE_OPTIONS = "log_row_value_options";
static const char* const OB_SV_MAX_READ_STALE_TIME = "ob_max_read_stale_time";
}
}

View File

@ -244,6 +244,7 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_NAME[] = {
"ob_interm_result_mem_limit",
"ob_last_schema_version",
"ob_log_level",
"ob_max_read_stale_time",
"ob_org_cluster_id",
"ob_pl_block_timeout",
"ob_plan_cache_evict_high_percentage",
@ -467,6 +468,7 @@ const ObSysVarClassType ObSysVarFactory::SYS_VAR_IDS_SORTED_BY_NAME[] = {
SYS_VAR_OB_INTERM_RESULT_MEM_LIMIT,
SYS_VAR_OB_LAST_SCHEMA_VERSION,
SYS_VAR_OB_LOG_LEVEL,
SYS_VAR_OB_MAX_READ_STALE_TIME,
SYS_VAR_OB_ORG_CLUSTER_ID,
SYS_VAR_OB_PL_BLOCK_TIMEOUT,
SYS_VAR_OB_PLAN_CACHE_EVICT_HIGH_PERCENTAGE,
@ -783,7 +785,8 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_ID[] = {
"_windowfunc_optimization_settings",
"ob_enable_rich_error_msg",
"ob_sql_plan_memory_percentage",
"log_row_value_options"
"log_row_value_options",
"ob_max_read_stale_time"
};
bool ObSysVarFactory::sys_var_name_case_cmp(const char *name1, const ObString &name2)
@ -1171,6 +1174,7 @@ int ObSysVarFactory::create_all_sys_vars()
+ sizeof(ObSysVarObEnableRichErrorMsg)
+ sizeof(ObSysVarObSqlPlanMemoryPercentage)
+ sizeof(ObSysVarLogRowValueOptions)
+ sizeof(ObSysVarObMaxReadStaleTime)
;
void *ptr = NULL;
if (OB_ISNULL(ptr = allocator_.alloc(total_mem_size))) {
@ -3159,6 +3163,15 @@ int ObSysVarFactory::create_all_sys_vars()
ptr = (void *)((char *)ptr + sizeof(ObSysVarLogRowValueOptions));
}
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObMaxReadStaleTime())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to new ObSysVarObMaxReadStaleTime", K(ret));
} else {
store_buf_[ObSysVarsToIdxMap::get_store_idx(static_cast<int64_t>(SYS_VAR_OB_MAX_READ_STALE_TIME))] = sys_var_ptr;
ptr = (void *)((char *)ptr + sizeof(ObSysVarObMaxReadStaleTime));
}
}
}
return ret;
@ -5605,6 +5618,17 @@ int ObSysVarFactory::create_sys_var(ObSysVarClassType sys_var_id, ObBasicSysVar
}
break;
}
case SYS_VAR_OB_MAX_READ_STALE_TIME: {
void *ptr = NULL;
if (OB_ISNULL(ptr = allocator_.alloc(sizeof(ObSysVarObMaxReadStaleTime)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to alloc memory", K(ret), K(sizeof(ObSysVarObMaxReadStaleTime)));
} else if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObMaxReadStaleTime())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to new ObSysVarObMaxReadStaleTime", K(ret));
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;

View File

@ -1591,6 +1591,13 @@ public:
inline virtual ObSysVarClassType get_type() const { return SYS_VAR_LOG_ROW_VALUE_OPTIONS; }
inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(219); }
};
class ObSysVarObMaxReadStaleTime : public ObIntSysVar
{
public:
ObSysVarObMaxReadStaleTime() : ObIntSysVar(ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large, NULL, NULL, NULL, NULL) {}
inline virtual ObSysVarClassType get_type() const { return SYS_VAR_OB_MAX_READ_STALE_TIME; }
inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(220); }
};
class ObSysVarFactory
@ -1610,7 +1617,7 @@ public:
static const common::ObString get_sys_var_name_by_id(ObSysVarClassType sys_var_id);
const static int64_t MYSQL_SYS_VARS_COUNT = 97;
const static int64_t OB_SYS_VARS_COUNT = 123;
const static int64_t OB_SYS_VARS_COUNT = 124;
const static int64_t ALL_SYS_VARS_COUNT = MYSQL_SYS_VARS_COUNT + OB_SYS_VARS_COUNT;
const static int16_t OB_SPECIFIC_SYS_VAR_ID_OFFSET = 10000;

View File

@ -2892,13 +2892,26 @@ static struct VarsInit{
ObSysVars[219].alias_ = "OB_SV_LOG_ROW_VALUE_OPTIONS" ;
}();
[&] (){
ObSysVars[220].info_ = "max stale time(us) for weak read query " ;
ObSysVars[220].name_ = "ob_max_read_stale_time" ;
ObSysVars[220].data_type_ = ObIntType ;
ObSysVars[220].value_ = "5000000" ;
ObSysVars[220].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
ObSysVars[220].on_check_and_convert_func_ = "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large" ;
ObSysVars[220].id_ = SYS_VAR_OB_MAX_READ_STALE_TIME ;
cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_OB_MAX_READ_STALE_TIME)) ;
ObSysVarsIdToArrayIdx[SYS_VAR_OB_MAX_READ_STALE_TIME] = 220 ;
ObSysVars[220].alias_ = "OB_SV_MAX_READ_STALE_TIME" ;
}();
if (cur_max_var_id >= ObSysVarFactory::OB_MAX_SYS_VAR_ID) {
HasInvalidSysVar = true;
}
}
}vars_init;
static int64_t var_amount = 220;
static int64_t var_amount = 221;
int64_t ObSysVariables::get_all_sys_var_count(){ return ObSysVarFactory::ALL_SYS_VARS_COUNT;}
ObSysVarClassType ObSysVariables::get_sys_var_id(int64_t i){ return ObSysVars[i].id_;}

View File

@ -2921,5 +2921,18 @@
"info_cn": "",
"background_cn": "",
"ref_url": "https://yuque.antfin-inc.com/ob/product_functionality_review/wan7iw4mox8vgkfm"
},
"ob_max_read_stale_time": {
"id": 10137,
"name": "ob_max_read_stale_time",
"value": "5000000",
"data_type": "int",
"info": "max stale time(us) for weak read query ",
"flags": "GLOBAL | SESSION | NEED_SERIALIZE",
"on_check_and_convert_func": "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large",
"publish_version": "410",
"info_cn": "",
"background_cn": "",
"ref_url": "https://yuque.antfin.com/ob/product_functionality_review/ns7okery0azyvlnp"
}
}

View File

@ -949,10 +949,15 @@ int ObTscCgService::generate_table_loc_meta(uint64_t table_loc_id,
loc_meta.ref_table_id_ = table_schema.get_table_id();
loc_meta.is_dup_table_ = table_schema.is_duplicate_table();
bool is_weak_read = false;
if (stmt.get_query_ctx()->has_dml_write_stmt_) {
if (OB_ISNULL(cg_.opt_ctx_) || OB_ISNULL(cg_.opt_ctx_->get_exec_ctx())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(cg_.opt_ctx_), K(ret));
} else if (stmt.get_query_ctx()->has_dml_write_stmt_) {
loc_meta.select_leader_ = 1;
loc_meta.is_weak_read_ = 0;
} else if (OB_FAIL(ObTableLocation::get_is_weak_read(stmt, &session, is_weak_read))) {
} else if (OB_FAIL(ObTableLocation::get_is_weak_read(stmt, &session,
cg_.opt_ctx_->get_exec_ctx()->get_sql_ctx(),
is_weak_read))) {
LOG_WARN("get is weak read failed", K(ret));
} else if (is_weak_read) {
loc_meta.is_weak_read_ = 1;

View File

@ -20,12 +20,14 @@
#include "share/schema/ob_multi_version_schema_service.h"
#include "share/schema/ob_schema_utils.h"
#include "sql/das/ob_das_utils.h"
#include "storage/tx/wrs/ob_black_list.h"
namespace oceanbase
{
using namespace common;
using namespace share;
using namespace share::schema;
using namespace transaction;
namespace sql
{
OB_SERIALIZE_MEMBER(DASRelatedTabletMap::MapEntry,
@ -630,14 +632,24 @@ int ObDASLocationRouter::nonblock_get_readable_replica(const uint64_t tenant_id,
}
if (OB_UNLIKELY(tablet_loc.need_refresh_)){
ObAddr strong_leader;
ObBLKey bl_key;
bool in_black_list = true;
for (int64_t i = 0; OB_SUCC(ret) && !is_found && i < ls_loc.get_replica_locations().count(); ++i) {
const ObLSReplicaLocation &tmp_replica_loc = ls_loc.get_replica_locations().at(i);
if (tmp_replica_loc.is_strong_leader()) {
//in version 4.0, if das task in retry, we force to choose the leader replica
strong_leader = tmp_replica_loc.get_server();
} else if (OB_SUCC(bl_key.init(tmp_replica_loc.get_server(), tenant_id, tablet_loc.ls_id_))
&& OB_SUCC(ObBLService::get_instance().check_in_black_list(bl_key, in_black_list))
&& !in_black_list) {
tablet_loc.server_ = tmp_replica_loc.get_server();
is_found = true;
}
}
if (!is_found && strong_leader.is_valid()) {
tablet_loc.server_ = strong_leader;
is_found = true;
}
}
for (int64_t i = 0; OB_SUCC(ret) && !is_found && i < ls_loc.get_replica_locations().count(); ++i) {

View File

@ -719,10 +719,9 @@ int ObExecContext::init_physical_plan_ctx(const ObPhysicalPlan &plan)
{
int ret = OB_SUCCESS;
int64_t foreign_key_checks = 0;
if (OB_ISNULL(phy_plan_ctx_) || OB_ISNULL(my_session_)) {
ret = OB_NOT_INIT;
LOG_WARN("physical_plan or ctx is NULL", K_(phy_plan_ctx), K_(my_session), K(ret));
ret = OB_ERR_UNEXPECTED;
if (OB_ISNULL(phy_plan_ctx_) || OB_ISNULL(my_session_) || OB_ISNULL(sql_ctx_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K_(phy_plan_ctx), K_(my_session), K(ret));
} else if (OB_FAIL(my_session_->get_foreign_key_checks(foreign_key_checks))) {
LOG_WARN("failed to get foreign_key_checks", K(ret));
} else {
@ -747,7 +746,9 @@ int ObExecContext::init_physical_plan_ctx(const ObPhysicalPlan &plan)
}
if (OB_SUCC(ret)) {
if (stmt::T_SELECT == plan.get_stmt_type()) { // select才有weak
if (OB_UNLIKELY(phy_plan_hint.read_consistency_ != INVALID_CONSISTENCY)) {
if (sql_ctx_->is_protocol_weak_read_) {
consistency = WEAK;
} else if (OB_UNLIKELY(phy_plan_hint.read_consistency_ != INVALID_CONSISTENCY)) {
consistency = phy_plan_hint.read_consistency_;
} else {
consistency = my_session_->get_consistency_level();

View File

@ -938,14 +938,17 @@ int ObResultSet::get_read_consistency(ObConsistencyLevel &consistency)
consistency = INVALID_CONSISTENCY;
int ret = OB_SUCCESS;
ObPhysicalPlan* physical_plan_ = static_cast<ObPhysicalPlan*>(cache_obj_guard_.get_cache_obj());
if (OB_ISNULL(physical_plan_)) {
ret = OB_NOT_INIT;
LOG_WARN("physical_plan", K_(physical_plan), K(ret));
ret = OB_ERR_UNEXPECTED;
if (OB_ISNULL(physical_plan_)
|| OB_ISNULL(exec_ctx_)
|| OB_ISNULL(exec_ctx_->get_sql_ctx())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("physical_plan", K_(physical_plan), K(exec_ctx_->get_sql_ctx()), K(ret));
} else {
const ObPhyPlanHint &phy_hint = physical_plan_->get_phy_plan_hint();
if (stmt::T_SELECT == stmt_type_) { // select才有weak
if (OB_UNLIKELY(phy_hint.read_consistency_ != INVALID_CONSISTENCY)) {
if (exec_ctx_->get_sql_ctx()->is_protocol_weak_read_) {
consistency = WEAK;
} else if (OB_UNLIKELY(phy_hint.read_consistency_ != INVALID_CONSISTENCY)) {
consistency = phy_hint.read_consistency_;
} else {
consistency = my_session_.get_consistency_level();

View File

@ -230,6 +230,7 @@ ObSqlCtx::ObSqlCtx()
cur_plan_(nullptr),
can_reroute_sql_(false),
is_sensitive_(false),
is_protocol_weak_read_(false),
flashback_query_expr_(nullptr),
is_execute_call_stmt_(false),
reroute_info_(nullptr)
@ -267,6 +268,7 @@ void ObSqlCtx::reset()
is_ddl_from_primary_ = false;
can_reroute_sql_ = false;
is_sensitive_ = false;
is_protocol_weak_read_ = false;
if (nullptr != reroute_info_) {
reroute_info_->reset();
op_reclaim_free(reroute_info_);

View File

@ -495,6 +495,7 @@ public:
bool can_reroute_sql_; // 是否可以重新路由
bool is_sensitive_; // 是否含有敏感信息,若有则不记入 sql_audit
bool is_protocol_weak_read_; // record whether proxy set weak read for this request in protocol flag
common::ObFixedArray<int64_t, common::ObIAllocator> multi_stmt_rowkey_pos_;
ObRawExpr *flashback_query_expr_;
ObSpmCacheCtx spm_ctx_;

View File

@ -35,6 +35,7 @@
#include "observer/ob_server_struct.h"
#include "observer/ob_server.h"
#include "storage/tx/wrs/ob_weak_read_util.h" //ObWeakReadUtil
#include "storage/tx_storage/ob_ls_service.h"
#include "sql/das/ob_das_dml_ctx_define.h"
#include "share/deadlock/ob_deadlock_detector_mgr.h"
@ -463,9 +464,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
OZ (stmt_sanity_check_(session, plan, plan_ctx));
OZ (txs->sql_stmt_start_hook(session->get_xid(), *session->get_tx_desc(), session->get_sessid()));
if (OB_SUCC(ret)
&& txs->get_tx_elr_util().check_and_update_tx_elr_info(
*session->get_tx_desc(),
session->get_early_lock_release())) {
&& txs->get_tx_elr_util().check_and_update_tx_elr_info(*session->get_tx_desc())) {
LOG_WARN("check and update tx elr info", K(ret), KPC(session->get_tx_desc()));
}
uint32_t session_id = 0;
@ -576,7 +575,8 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session,
auto &snapshot = das_ctx.get_snapshot();
if (cl == ObConsistencyLevel::WEAK || cl == ObConsistencyLevel::FROZEN) {
SCN snapshot_version = SCN::min_scn();
if (OB_FAIL(txs->get_weak_read_snapshot_version(snapshot_version))) {
if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(),
snapshot_version))) {
TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs));
} else {
snapshot.init_weak_read(snapshot_version);
@ -989,5 +989,53 @@ void ObSqlTransControl::clear_xa_branch(const ObXATransID &xid, ObTxDesc *&tx_de
MTL(transaction::ObXAService *)->clear_xa_branch(xid, tx_desc);
}
int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObAddr &addr,
const int64_t max_stale_time_ns,
bool &can_read)
{
int ret = OB_SUCCESS;
can_read = false;
if (!ls_id.is_valid()
|| !addr.is_valid()
|| max_stale_time_ns <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_ns));
} else if (observer::ObServer::get_instance().get_self() == addr) {
// distribute plan and check black list
ObBLKey blk;
bool in_black_list = false;
if (OB_FAIL(blk.init(addr, tenant_id, ls_id))) {
LOG_WARN("ObBLKey init error", K(ret), K(addr), K(tenant_id), K(ls_id));
} else if (OB_FAIL(ObBLService::get_instance().check_in_black_list(blk, in_black_list))) {
LOG_WARN("check in black list error", K(ret), K(blk));
} else {
can_read = (in_black_list ? false : true);
}
} else {
storage::ObLSService *ls_svr = MTL(storage::ObLSService *);
storage::ObLSHandle handle;
ObLS *ls = nullptr;
if (OB_ISNULL(ls_svr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log stream service is NULL", K(ret));
} else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::TRANS_MOD))) {
LOG_WARN("get id service log stream failed");
} else if (OB_ISNULL(ls = handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("id service log stream not exist");
} else if (ObTimeUtility::current_time() - max_stale_time_ns / 1000
< ls->get_ls_wrs_handler()->get_ls_weak_read_ts().convert_to_ts()) {
can_read = true;
}
}
return ret;
}
}/* ns sql*/
}/* ns oceanbase */

View File

@ -238,6 +238,11 @@ public:
const uint64_t table_id,
const transaction::tablelock::ObTableLockMode lock_mode);
static void clear_xa_branch(const transaction::ObXATransID &xid, transaction::ObTxDesc *&tx_desc);
static int check_ls_readable(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const common::ObAddr &addr,
const int64_t max_stale_time_ns,
bool &can_read);
private:
DISALLOW_COPY_AND_ASSIGN(ObSqlTransControl);
static int get_trans_expire_ts(const ObSQLSessionInfo &session,

View File

@ -2330,9 +2330,13 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx,
if (OB_FAIL(ret)) {
} else if (is_weak && !exec_ctx.get_my_session()->get_is_in_retry()) {
int64_t max_read_stale_time = exec_ctx.get_my_session()->get_ob_max_read_stale_time();
uint64_t tenant_id = exec_ctx.get_my_session()->get_effective_tenant_id();
if (OB_FAIL(ObLogPlan::weak_select_replicas(local_server,
static_cast<ObRoutePolicyType>(route_policy_type),
proxy_priority_hit_support,
tenant_id,
max_read_stale_time,
phy_tbl_loc_info_list,
is_hit_partition, follower_first_feedback))) {
LOG_WARN("fail to weak select intersect replicas", K(ret), K(local_server), K(phy_tbl_loc_info_list.count()));
@ -2443,6 +2447,8 @@ int ObLogPlan::strong_select_replicas(const ObAddr &local_server,
int ObLogPlan::weak_select_replicas(const ObAddr &local_server,
ObRoutePolicyType route_type,
bool proxy_priority_hit_support,
uint64_t tenant_id,
int64_t max_read_stale_time,
ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
bool &is_hit_partition,
ObFollowerFirstFeedbackType &follower_first_feedback)
@ -2457,6 +2463,8 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server,
route_policy_ctx.policy_type_ = route_type;
route_policy_ctx.consistency_level_ = WEAK;
route_policy_ctx.is_proxy_priority_hit_support_ = proxy_priority_hit_support;
route_policy_ctx.max_read_stale_time_ = max_read_stale_time;
route_policy_ctx.tenant_id_ = tenant_id;
if (OB_FAIL(route_policy.init())) {
LOG_WARN("fail to init route policy", K(ret));
@ -2474,7 +2482,9 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server,
ObIArray<ObRoutePolicy::CandidateReplica> &replica_array = phy_part_loc_info.get_partition_location().get_replica_locations();
if (OB_FAIL(route_policy.init_candidate_replicas(replica_array))) {
LOG_WARN("fail to init candidate replicas", K(replica_array), K(ret));
} else if (OB_FAIL(route_policy.calculate_replica_priority(replica_array, route_policy_ctx))) {
} else if (OB_FAIL(route_policy.calculate_replica_priority(phy_part_loc_info.get_ls_id(),
replica_array,
route_policy_ctx))) {
LOG_WARN("fail to calculate replica priority", K(replica_array), K(route_policy_ctx), K(ret));
} else if (OB_FAIL(route_policy.select_replica_with_priority(route_policy_ctx, replica_array, phy_part_loc_info))) {
LOG_WARN("fail to select replica", K(replica_array), K(ret));
@ -12519,4 +12529,4 @@ int ObLogPlan::allocate_material_for_recursive_cte_plan(ObIArray<ObLogicalOperat
}
}
return ret;
}
}

View File

@ -1671,6 +1671,8 @@ private: // member functions
static int weak_select_replicas(const common::ObAddr &local_server,
ObRoutePolicyType route_type,
bool proxy_priority_hit_support,
uint64_t tenant_id,
int64_t max_read_stale_time,
common::ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
bool &is_hit_partition,
share::ObFollowerFirstFeedbackType &follower_first_feedback);

View File

@ -58,7 +58,7 @@ public:
inline common::ObTabletID get_tablet_id() const { return tablet_id_; }
inline share::ObLSID get_ls_id() const { return ls_id_; }
inline const share::ObLSID &get_ls_id() const { return ls_id_; }
inline int64_t get_renew_time() const { return renew_time_; }
@ -90,7 +90,7 @@ public:
int add_priority_replica_idx(int64_t priority_replica_idx);
int64_t get_selected_replica_idx() const { return selected_replica_idx_; }
bool has_selected_replica() const { return common::OB_INVALID_INDEX != selected_replica_idx_; }
const share::ObLSID &get_ls_id() const { return opt_tablet_loc_.get_ls_id(); }
int get_selected_replica(share::ObLSReplicaLocation &replica_loc) const;
int get_selected_replica(ObRoutePolicy::CandidateReplica &replica_loc) const;
int get_priority_replica(int64_t idx, share::ObLSReplicaLocation &replica_loc) const;

View File

@ -60,26 +60,40 @@ int ObRoutePolicy::strong_sort_replicas(ObIArray<CandidateReplica>& candi_replic
return ret;
}
int ObRoutePolicy::filter_replica(ObIArray<CandidateReplica>& candi_replicas, ObRoutePolicyCtx &ctx)
int ObRoutePolicy::filter_replica(const ObLSID &ls_id,
ObIArray<CandidateReplica>& candi_replicas,
ObRoutePolicyCtx &ctx)
{
int ret = OB_SUCCESS;
ObRoutePolicyType policy_type = get_calc_route_policy_type(ctx);
for (int64_t i = 0; OB_SUCC(ret) && i < candi_replicas.count(); ++i) {
CandidateReplica &cur_replica = candi_replicas.at(i);
if ((policy_type == ONLY_READONLY_ZONE && cur_replica.attr_.zone_type_ == ZONE_TYPE_READWRITE)
|| cur_replica.attr_.zone_status_ == ObZoneStatus::INACTIVE
|| cur_replica.attr_.server_status_ != ObServerStatus::OB_SERVER_ACTIVE
|| cur_replica.attr_.start_service_time_ == 0
|| cur_replica.attr_.server_stop_time_ != 0
|| (0 == cur_replica.get_property().get_memstore_percent()
&& is_follower(cur_replica.get_role()))) { // 作为Follower的D副不能选择
cur_replica.is_filter_ = true;
bool can_read = true;
if (OB_FAIL(ObSqlTransControl::check_ls_readable(ctx.tenant_id_,
ls_id,
cur_replica.get_server(),
ctx.max_read_stale_time_,
can_read))) {
LOG_WARN("fail to check ls readable", K(ctx), K(cur_replica), K(ret));
} else {
LOG_TRACE("check ls readable", K(ctx), K(ls_id), K(cur_replica.get_server()), K(can_read));
if ((policy_type == ONLY_READONLY_ZONE && cur_replica.attr_.zone_type_ == ZONE_TYPE_READWRITE)
|| cur_replica.attr_.zone_status_ == ObZoneStatus::INACTIVE
|| cur_replica.attr_.server_status_ != ObServerStatus::OB_SERVER_ACTIVE
|| cur_replica.attr_.start_service_time_ == 0
|| cur_replica.attr_.server_stop_time_ != 0
|| (0 == cur_replica.get_property().get_memstore_percent()
&& is_follower(cur_replica.get_role()))// 作为Follower的D副不能选择
|| !can_read) {
cur_replica.is_filter_ = true;
}
}
}
return ret;
}
int ObRoutePolicy::calculate_replica_priority(ObIArray<CandidateReplica>& candi_replicas,
int ObRoutePolicy::calculate_replica_priority(const ObLSID &ls_id,
ObIArray<CandidateReplica>& candi_replicas,
ObRoutePolicyCtx &ctx)
{
int ret = OB_SUCCESS;
@ -88,7 +102,7 @@ int ObRoutePolicy::calculate_replica_priority(ObIArray<CandidateReplica>& candi_
LOG_WARN("not init", K(ret));
} else if (candi_replicas.count() <= 1) {//do nothing
} else if (WEAK == ctx.consistency_level_) {
if (OB_FAIL(filter_replica(candi_replicas, ctx))) {
if (OB_FAIL(filter_replica(ls_id, candi_replicas, ctx))) {
LOG_WARN("fail to filter replicas", K(candi_replicas), K(ctx), K(ret));
} else if (OB_FAIL(weak_sort_replicas(candi_replicas, ctx))) {
LOG_WARN("fail to sort replicas", K(candi_replicas), K(ctx), K(ret));
@ -219,7 +233,7 @@ int ObRoutePolicy::select_replica_with_priority(const ObRoutePolicyCtx &route_po
bool same_priority = true;
ReplicaAttribute priority_attr;
for (int64_t i = 0; OB_SUCC(ret) && same_priority && i < replica_array.count(); ++i) {
if (replica_array.at(i).is_usable()) {
if (replica_array.at(i).is_usable()/*+满足max_read_stale_time事务延迟*/) {
if (has_found) {
if (priority_attr == replica_array.at(i).attr_) {
if (OB_FAIL(phy_part_loc_info.add_priority_replica_idx(i))) {

View File

@ -46,16 +46,22 @@ struct ObRoutePolicyCtx
ObRoutePolicyCtx()
:policy_type_(POLICY_TYPE_MAX),
consistency_level_(common::INVALID_CONSISTENCY),
is_proxy_priority_hit_support_(false)
is_proxy_priority_hit_support_(false),
tenant_id_(OB_INVALID_TENANT_ID),
max_read_stale_time_(0)
{}
TO_STRING_KV(K(policy_type_),
K(consistency_level_),
K(is_proxy_priority_hit_support_));
K(is_proxy_priority_hit_support_),
K(tenant_id_),
K(max_read_stale_time_));
ObRoutePolicyType policy_type_;
common::ObConsistencyLevel consistency_level_;
bool is_proxy_priority_hit_support_;
uint64_t tenant_id_;
int64_t max_read_stale_time_;
};
class ObRoutePolicy
@ -164,7 +170,9 @@ public:
{}
~ObRoutePolicy() {}
int init();
int calculate_replica_priority(common::ObIArray<CandidateReplica>& candi_replicas, ObRoutePolicyCtx &ctx);
int calculate_replica_priority(const share::ObLSID &ls_id,
common::ObIArray<CandidateReplica>& candi_replicas,
ObRoutePolicyCtx &ctx);
int init_candidate_replicas(common::ObIArray<CandidateReplica> &candi_replicas);
int select_replica_with_priority(const ObRoutePolicyCtx &route_policy_ctx,
const common::ObIArray<CandidateReplica> &replica_array,
@ -204,7 +212,9 @@ protected:
int get_merge_status(const share::ObServerLocality &candi_locality, CandidateReplica &candi_replica);
int get_zone_status(const share::ObServerLocality &candi_locality, CandidateReplica &candi_replica);
int filter_replica(common::ObIArray<CandidateReplica>& candi_replicas, ObRoutePolicyCtx &ctx);
int filter_replica(const share::ObLSID &ls_id,
common::ObIArray<CandidateReplica>& candi_replicas,
ObRoutePolicyCtx &ctx);
int weak_sort_replicas(common::ObIArray<CandidateReplica>& candi_replicas, ObRoutePolicyCtx &ctx);
int strong_sort_replicas(common::ObIArray<CandidateReplica>& candi_replicas, ObRoutePolicyCtx &ctx);
inline ObRoutePolicyType get_calc_route_policy_type(const ObRoutePolicyCtx &ctx) const

View File

@ -998,7 +998,10 @@ int ObTableLocation::init_table_location(ObExecContext &exec_ctx,
}
if (OB_SUCC(ret)) {
bool is_weak_read = false;
if (OB_FAIL(get_is_weak_read(stmt, exec_ctx.get_my_session(), is_weak_read))) {
if (OB_FAIL(get_is_weak_read(stmt,
exec_ctx.get_my_session(),
exec_ctx.get_sql_ctx(),
is_weak_read))) {
LOG_WARN("get is weak read failed", K(ret));
} else if (ObDuplicateScope::DUPLICATE_SCOPE_NONE != table_schema->get_duplicate_scope()) {
loc_meta_.is_dup_table_ = 1;
@ -1272,7 +1275,7 @@ int ObTableLocation::init(
}
if (OB_SUCC(ret)) {
bool is_weak_read = false;
if (OB_FAIL(get_is_weak_read(stmt, session_info, is_weak_read))) {
if (OB_FAIL(get_is_weak_read(stmt, session_info, exec_ctx->get_sql_ctx(), is_weak_read))) {
LOG_WARN("get is weak read failed", K(ret));
} else if (ObDuplicateScope::DUPLICATE_SCOPE_NONE != table_schema->get_duplicate_scope()) {
loc_meta_.is_dup_table_ = 1;
@ -1291,20 +1294,24 @@ int ObTableLocation::init(
int ObTableLocation::get_is_weak_read(const ObDMLStmt &dml_stmt,
const ObSQLSessionInfo *session,
const ObSqlCtx *sql_ctx,
bool &is_weak_read)
{
int ret = OB_SUCCESS;
is_weak_read = false;
if (OB_ISNULL(session)) {
if (OB_ISNULL(session) || OB_ISNULL(sql_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected null", K(ret), K(session));
LOG_ERROR("unexpected null", K(ret), K(session), K(sql_ctx));
} else if (dml_stmt.get_query_ctx()->has_dml_write_stmt_) {
is_weak_read = false;
} else {
ObConsistencyLevel consistency_level = INVALID_CONSISTENCY;
ObTxConsistencyType trans_consistency_type = ObTxConsistencyType::INVALID;
if (stmt::T_SELECT == dml_stmt.get_stmt_type()) {
if (OB_UNLIKELY(INVALID_CONSISTENCY != dml_stmt.get_query_ctx()->get_global_hint().read_consistency_)) {
if (sql_ctx->is_protocol_weak_read_) {
consistency_level = WEAK;
} else if (OB_UNLIKELY(INVALID_CONSISTENCY
!= dml_stmt.get_query_ctx()->get_global_hint().read_consistency_)) {
consistency_level = dml_stmt.get_query_ctx()->get_global_hint().read_consistency_;
} else {
consistency_level = session->get_consistency_level();
@ -1374,8 +1381,10 @@ int ObTableLocation::calculate_partition_ids_by_rowkey(ObSQLSessionInfo &session
ObArenaAllocator allocator(ObModIds::OB_SQL_TABLE_LOCATION);
SMART_VAR(ObExecContext, exec_ctx, allocator) {
ObSqlSchemaGuard sql_schema_guard;
ObSqlCtx sql_ctx;
sql_schema_guard.set_schema_guard(&schema_guard);
exec_ctx.set_my_session(&session_info);
exec_ctx.set_sql_ctx(&sql_ctx);
ObDASTabletMapper tablet_mapper;
if (is_non_partition_optimized_ && table_id == loc_meta_.ref_table_id_) {
tablet_mapper.set_non_partitioned_table_ids(tablet_id_, object_id_, &related_list_);

View File

@ -603,6 +603,7 @@ public:
static int get_is_weak_read(const ObDMLStmt &dml_stmt,
const ObSQLSessionInfo *session_info,
const ObSqlCtx *sql_ctx,
bool &is_weak_read);
int send_add_interval_partition_rpc(ObExecContext &exec_ctx,

View File

@ -2490,6 +2490,12 @@ OB_INLINE int ObBasicSessionInfo::process_session_variable(ObSysVarClassType var
}
break;
}
case SYS_VAR_OB_MAX_READ_STALE_TIME: {
int64_t int_val = 0;
OZ (val.get_int(int_val), val);
OX (sys_vars_cache_.set_ob_max_read_stale_time(int_val));
break;
}
default: {
//do nothing
}
@ -2871,6 +2877,12 @@ int ObBasicSessionInfo::fill_sys_vars_cache_base_value(
}
break;
}
case SYS_VAR_OB_MAX_READ_STALE_TIME: {
int64_t int_val = 0;
OZ (val.get_int(int_val), val);
OX (sys_vars_cache.set_base_ob_max_read_stale_time(int_val));
break;
}
default: {
//do nothing
}
@ -3579,7 +3591,8 @@ OB_DEF_SERIALIZE(ObBasicSessionInfo::SysVarsCacheData)
nls_formats_[NLS_TIMESTAMP_TZ],
ob_trx_lock_timeout_,
ob_trace_info_,
ob_plsql_ccflags_);
ob_plsql_ccflags_,
ob_max_read_stale_time_);
return ret;
}
@ -3604,7 +3617,8 @@ OB_DEF_DESERIALIZE(ObBasicSessionInfo::SysVarsCacheData)
nls_formats_[NLS_TIMESTAMP_TZ],
ob_trx_lock_timeout_,
ob_trace_info_,
ob_plsql_ccflags_);
ob_plsql_ccflags_,
ob_max_read_stale_time_);
set_nls_date_format(nls_formats_[NLS_DATE]);
set_nls_timestamp_format(nls_formats_[NLS_TIMESTAMP]);
set_nls_timestamp_tz_format(nls_formats_[NLS_TIMESTAMP_TZ]);
@ -3634,7 +3648,8 @@ OB_DEF_SERIALIZE_SIZE(ObBasicSessionInfo::SysVarsCacheData)
nls_formats_[NLS_TIMESTAMP_TZ],
ob_trx_lock_timeout_,
ob_trace_info_,
ob_plsql_ccflags_);
ob_plsql_ccflags_,
ob_max_read_stale_time_);
return len;
}

View File

@ -559,6 +559,9 @@ public:
{
return sys_vars_cache_.get_ob_trx_lock_timeout();
}
int64_t get_ob_max_read_stale_time() {
return sys_vars_cache_.get_ob_max_read_stale_time();
}
int get_sql_throttle_current_priority(int64_t &sql_throttle_current_priority)
{
sql_throttle_current_priority = sys_vars_cache_.get_sql_throttle_current_priority();
@ -1473,7 +1476,8 @@ private:
nls_collation_(CS_TYPE_INVALID),
nls_nation_collation_(CS_TYPE_INVALID),
ob_trace_info_(),
ob_plsql_ccflags_()
ob_plsql_ccflags_(),
ob_max_read_stale_time_(0)
{
for (int64_t i = 0; i < ObNLSFormatEnum::NLS_MAX; ++i) {
MEMSET(nls_formats_buf_[i], 0, MAX_NLS_FORMAT_STR_LEN);
@ -1526,6 +1530,7 @@ private:
ob_trace_info_.reset();
iso_nls_currency_.reset();
ob_plsql_ccflags_.reset();
ob_max_read_stale_time_ = 0;
}
void set_nls_date_format(const common::ObString &format)
{
@ -1614,7 +1619,7 @@ private:
K_(optimizer_use_sql_plan_baselines), K_(optimizer_capture_sql_plan_baselines),
K_(is_result_accurate), K_(character_set_results),
K_(character_set_connection), K_(ob_pl_block_timeout), K_(ob_plsql_ccflags),
K_(iso_nls_currency));
K_(iso_nls_currency), K_(ob_max_read_stale_time));
public:
static const int64_t MAX_NLS_FORMAT_STR_LEN = 256;
@ -1665,6 +1670,7 @@ private:
char trace_info_buf_[OB_TRACE_BUFFER_SIZE];
ObString ob_plsql_ccflags_;
char plsql_ccflags_[OB_TMP_BUF_SIZE_256];
int64_t ob_max_read_stale_time_;
private:
char nls_formats_buf_[ObNLSFormatEnum::NLS_MAX][MAX_NLS_FORMAT_STR_LEN];
};
@ -1770,6 +1776,7 @@ private:
DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_pl_block_timeout);
DEF_SYS_VAR_CACHE_FUNCS_STR(plsql_ccflags);
DEF_SYS_VAR_CACHE_FUNCS_STR(iso_nls_currency);
DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_max_read_stale_time);
void set_autocommit_info(bool inc_value)
{
inc_data_.autocommit_ = inc_value;
@ -1831,6 +1838,7 @@ private:
bool inc_ob_pl_block_timeout_:1;
bool inc_plsql_ccflags_:1;
bool inc_iso_nls_currency_:1;
bool inc_ob_max_read_stale_time_:1;
};
};
};

View File

@ -189,6 +189,7 @@ ob_set_subtarget(ob_storage tablet
)
ob_set_subtarget(ob_storage tx_wrs
tx/wrs/ob_black_list.cpp
tx/wrs/ob_i_weak_read_service.cpp
tx/wrs/ob_tenant_weak_read_cluster_service.cpp
tx/wrs/ob_tenant_weak_read_cluster_version_mgr.cpp

View File

@ -96,7 +96,6 @@ public: // for mvcc engine invoke
const share::SCN max_trans_version,
const transaction::ObTransID &conflict_tx_id) = 0;
virtual void on_wlock_retry(const ObMemtableKey& key, const transaction::ObTransID &conflict_tx_id) = 0;
virtual bool is_can_elr() const = 0;
virtual void inc_truncate_cnt() = 0;
virtual void add_trans_mem_total_size(const int64_t size) = 0;
virtual void update_max_submitted_seq_no(const int64_t seq_no) = 0;

View File

@ -748,7 +748,7 @@ void ObMvccRow::lock_begin(ObIMemtableCtx &ctx) const
void ObMvccRow::mvcc_write_end(ObIMemtableCtx &ctx, int64_t ret) const
{
if (!ctx.is_can_elr() && GCONF.enable_sql_audit) {
if (GCONF.enable_sql_audit) {
const int64_t lock_use_time = OB_TSC_TIMESTAMP.current_time() - ctx.get_lock_start_time();
EVENT_ADD(MEMSTORE_WAIT_WRITE_LOCK_TIME, lock_use_time);
if (OB_FAIL(ret)) {
@ -995,8 +995,7 @@ int ObMvccRow::mvcc_write_(ObIMemtableCtx &ctx,
res.tx_node_ = &writer_node;
total_trans_node_cnt_++;
}
if (ctx.is_can_elr()
&& NULL != writer_node.prev_
if (NULL != writer_node.prev_
&& writer_node.prev_->is_elr()) {
ObMemtableCtx &mt_ctx = static_cast<ObMemtableCtx &>(ctx);
if (NULL != mt_ctx.get_trans_ctx()) {

View File

@ -553,7 +553,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
const ObStoreRowkey& row_key,
const int64_t timeout,
const bool is_remote_sql,
const bool can_elr,
const int64_t last_compact_cnt,
const int64_t total_trans_node_cnt,
const ObTransID &tx_id,
@ -578,7 +577,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
TRANS_LOG(WARN, "recheck lock fail", K(key), K(holder_tx_id));
} else if (locked) {
auto hash = wait_on_row ? row_hash : tx_hash;
if (is_remote_sql && can_elr) {
if (is_remote_sql) {
delay_header_node_run_ts(hash);
}
node->set((void*)node,
@ -605,7 +604,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
const ObLockID &lock_id,
const int64_t timeout,
const bool is_remote_sql,
const bool can_elr,
const int64_t last_compact_cnt,
const int64_t total_trans_node_cnt,
const transaction::ObTransID &tx_id,
@ -621,7 +619,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
} else if (NULL == (node = get_thread_node())) {
} else if (OB_TRY_LOCK_ROW_CONFLICT == tmp_ret) {
auto hash = hash_lock_id(lock_id);
const bool need_delay = is_remote_sql && can_elr;
const bool need_delay = is_remote_sql;
char lock_id_buf[common::MAX_LOCK_ID_BUF_LENGTH];
lock_id.to_string(lock_id_buf, sizeof(lock_id_buf));
lock_id_buf[common::MAX_LOCK_ID_BUF_LENGTH - 1] = '\0';

View File

@ -232,7 +232,6 @@ public:
const ObStoreRowkey &key,
const int64_t timeout,
const bool is_remote_sql,
const bool can_elr,
const int64_t last_compact_cnt,
const int64_t total_trans_node_cnt,
const transaction::ObTransID &tx_id,
@ -243,7 +242,6 @@ public:
const transaction::tablelock::ObLockID &lock_id,
const int64_t timeout,
const bool is_remote_sql,
const bool can_elr,
const int64_t last_compact_cnt,
const int64_t total_trans_node_cnt,
const transaction::ObTransID &tx_id,

View File

@ -2574,7 +2574,6 @@ int ObMemtable::post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
*row_key.get_rowkey(),
lock_wait_expire_ts,
remote_tx,
mem_ctx->is_can_elr(),
last_compact_cnt,
total_trans_node_cnt,
tx_id,

View File

@ -873,15 +873,6 @@ uint64_t ObMemtableCtx::get_tenant_id() const
return tenant_id;
}
bool ObMemtableCtx::is_can_elr() const
{
bool bret = false;
if (OB_NOT_NULL(ATOMIC_LOAD(&ctx_))) {
bret = ctx_->is_can_elr();
}
return bret;
}
void ObMemtableCtx::update_max_submitted_seq_no(const int64_t seq_no)
{
if (NULL != ATOMIC_LOAD(&ctx_)) {

View File

@ -397,7 +397,6 @@ public:
virtual void add_trans_mem_total_size(const int64_t size);
int64_t get_ref() const { return ATOMIC_LOAD(&ref_); }
uint64_t get_tenant_id() const;
bool is_can_elr() const;
inline bool has_read_elr_data() const { return read_elr_data_; }
int remove_callbacks_for_fast_commit();
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt);

View File

@ -335,7 +335,6 @@ int ObLockMemtable::post_obj_lock_conflict_(ObMvccAccessCtx &acc_ctx,
lock_id,
lock_wait_expire_ts,
remote_tx,
mem_ctx->is_can_elr(),
-1,
-1, // total_trans_node_cnt
tx_id,

View File

@ -148,7 +148,6 @@ public:
uint32_t get_session_id() const { return session_id_; }
void before_unlock(CtxLockArg &arg);
void after_unlock(CtxLockArg &arg);
bool is_can_elr() const { return can_elr_; }
public:
void set_exiting() { is_exiting_ = true; }
bool is_exiting() const { return is_exiting_; }

View File

@ -405,7 +405,6 @@ int ObLSTxCtxMgr::create_tx_ctx_(const ObTxCreateArg &arg,
arg.trans_service_,
arg.cluster_id_,
epoch,
arg.can_elr_,
this,
arg.for_replay_))) {
} else if (FALSE_IT(inc_total_tx_ctx_count())) {

View File

@ -89,8 +89,7 @@ typedef common::LinkHashValue<share::ObLSID> ObLSTxCtxMgrHashValue;
struct ObTxCreateArg
{
ObTxCreateArg(const bool can_elr,
const bool for_replay,
ObTxCreateArg(const bool for_replay,
const uint64_t tenant_id,
const ObTransID &trans_id,
const share::ObLSID &ls_id,
@ -100,8 +99,7 @@ struct ObTxCreateArg
const common::ObAddr &scheduler,
const int64_t trans_expired_time,
ObTransService *trans_service)
: can_elr_(can_elr),
for_replay_(for_replay),
: for_replay_(for_replay),
tenant_id_(tenant_id),
tx_id_(trans_id),
ls_id_(ls_id),
@ -118,11 +116,10 @@ struct ObTxCreateArg
&& trans_expired_time_ > 0
&& NULL != trans_service_;
}
TO_STRING_KV(K_(can_elr), K_(for_replay),
TO_STRING_KV(K_(for_replay),
K_(tenant_id), K_(tx_id),
K_(ls_id), K_(cluster_id), K_(cluster_version),
K_(session_id), K_(scheduler), K_(trans_expired_time), KP_(trans_service));
bool can_elr_;
bool for_replay_;
uint64_t tenant_id_;
ObTransID tx_id_;

View File

@ -63,7 +63,6 @@ int ObPartTransCtx::init(const uint64_t tenant_id,
ObTransService *trans_service,
const uint64_t cluster_id,
const int64_t epoch,
const bool can_elr,
ObLSTxCtxMgr *ls_ctx_mgr,
const bool for_replay)
{
@ -123,7 +122,6 @@ int ObPartTransCtx::init(const uint64_t tenant_id,
last_ask_scheduler_status_ts_ = 0;
cluster_id_ = cluster_id;
epoch_ = epoch;
can_elr_ = can_elr;
pending_write_ = 0;
set_role_state(for_replay);
@ -154,7 +152,7 @@ int ObPartTransCtx::init(const uint64_t tenant_id,
OB_ID(trans_id), trans_id,
OB_ID(ctx_ref), get_ref());
TRANS_LOG(TRACE, "part trans ctx init", K(ret), K(tenant_id), K(trans_id), K(trans_expired_time),
K(ls_id), K(cluster_version), KP(trans_service), K(cluster_id), K(epoch), K(can_elr));
K(ls_id), K(cluster_version), KP(trans_service), K(cluster_id), K(epoch));
return ret;
}
@ -254,6 +252,7 @@ void ObPartTransCtx::default_init_()
session_id_ = 0;
timeout_task_.reset();
trace_info_.reset();
can_elr_ = false;
// TODO ObPartTransCtx
clog_encrypt_info_.reset();
@ -694,6 +693,7 @@ int ObPartTransCtx::commit(const ObLSArray &parts,
TRANS_LOG(ERROR, "the size of participant is 0 when commit", KPC(this));
} else if (parts.count() == 1 && parts[0] == ls_id_) {
exec_info_.trans_type_ = TransType::SP_TRANS;
can_elr_ = (trans_service_->get_tx_elr_util().is_can_tenant_elr() ? true : false);
if (OB_FAIL(one_phase_commit_())) {
TRANS_LOG(WARN, "start sp coimit fail", K(ret), KPC(this));
}

View File

@ -116,7 +116,6 @@ public:
ObTransService *trans_service,
const uint64_t cluster_id,
const int64_t epoch,
const bool can_elr,
ObLSTxCtxMgr *ls_ctx_mgr,
const bool for_replay);
void reset() { }

View File

@ -1132,8 +1132,7 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id,
int ret = OB_SUCCESS;
bool existed = false;
int64_t epoch = 0;
ObTxCreateArg arg(tx.can_elr_, /* can_elr */
false, /* for_replay */
ObTxCreateArg arg(false, /* for_replay */
tx.tenant_id_,
tx.tx_id_,
ls_id,

View File

@ -671,7 +671,8 @@ int ObTransService::get_ls_read_snapshot_version(const share::ObLSID &local_ls_i
return ret;
}
int ObTransService::get_weak_read_snapshot_version(SCN &snapshot)
int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_time,
SCN &snapshot)
{
int ret = OB_SUCCESS;
bool monotinic_read = true;;
@ -685,8 +686,10 @@ int ObTransService::get_weak_read_snapshot_version(SCN &snapshot)
} else if (OB_FAIL(GCTX.weak_read_service_->get_cluster_version(tenant_id_, snapshot))) {
TRANS_LOG(WARN, "get weak read snapshot fail", K(ret), KPC(this));
} else {
const int64_t snapshot_barrier = ObTimeUtility::current_time() -
ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id_);
// do nothing
}
if (OB_SUCC(ret)) {
const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_read_stale_time;
if (snapshot.convert_to_ts() < snapshot_barrier) {
TRANS_LOG(WARN, "weak read snapshot too stale", K(snapshot),
K(snapshot_barrier), "delta", (snapshot_barrier - snapshot.convert_to_ts()));

View File

@ -226,13 +226,15 @@ int get_ls_read_snapshot_version(const share::ObLSID &local_ls_id,
/**
* get_weak_read_snapshot_version - get snapshot version for weak read
*
* max_read_stale_time the minimal threshold of stale snapshot
* @snapshot_version: the snapshot acquired
*
* Return:
* OB_SUCCESS - OK
* OB_REPLICA_NOT_READABLE - snapshot is too stale
*/
int get_weak_read_snapshot_version(share::SCN &snapshot_version);
int get_weak_read_snapshot_version(const int64_t max_read_stale_time,
share::SCN &snapshot_version);
/*
* release_snapshot - release snapshot
*

View File

@ -148,7 +148,7 @@ int GetTxStateWithSCNFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_
// return the transaction state_ according to the merge log ts.
// the detailed document is available as follows.
// https://yuque.antfin-inc.com/docs/share/a3160d5e-6e1a-4980-a12e-4af653c6cf57?#
if (ObTxData::RUNNING == state) {
if (ObTxData::RUNNING == state || ObTxData::ELR_COMMIT == state) {
// Case 1: data is during execution, so we return the running state with
// INT64_MAX as version
state_ = ObTxData::RUNNING;
@ -168,11 +168,6 @@ int GetTxStateWithSCNFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_
// ts, so we return the abort state with 0 as txn version
state_ = ObTxData::ABORT;
trans_version_ = SCN::min_scn();
} else if (ObTxData::ELR_COMMIT == state) {
// Case 5: data is elr committed and the required state is after the merge log
// ts, it means tx's state is completely decided so it must not be elr commit
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "unexpected state", K(ret), K(tx_data), KPC(tx_cc_ctx));
} else {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "unexpected transaction state_", K(ret), K(tx_data));
@ -218,13 +213,8 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
is_determined_state_ = true;
break;
}
case ObTxData::RUNNING:
case ObTxData::ELR_COMMIT: {
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = commit_version;
is_determined_state_ = false;
break;
}
case ObTxData::RUNNING: {
// Case 2: data is during execution, so the state is not decided.
if (read_latest && reader_tx_id == data_tx_id) {
// Case 2.0: read the latest written of current txn
@ -272,15 +262,22 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
can_read_ = false;
trans_version_ = SCN::min_scn();
} else {
// Case 2.2.3: data is in prepare state and the prepare version is
// smaller than the read txn's snapshot version, then the data's
// commit version may or may not be bigger than the read txn's
// snapshot version, so we are unsure whether we can read it and we
// need wait for the commit version of the data
ret = OB_ERR_SHARED_LOCK_CONFLICT;
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
TRANS_LOG(WARN, "lock_for_read need retry", K(ret),
K(tx_data), K(lock_for_read_arg_), K(tx_cc_ctx));
// Only dml statement can read elr data
if (ObTxData::ELR_COMMIT == state
&& lock_for_read_arg_.mvcc_acc_ctx_.get_tx_id().is_valid()) {
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = commit_version;
} else {
// Case 2.2.3: data is in prepare state and the prepare version is
// smaller than the read txn's snapshot version, then the data's
// commit version may or may not be bigger than the read txn's
// snapshot version, so we are unsure whether we can read it and we
// need wait for the commit version of the data
ret = OB_ERR_SHARED_LOCK_CONFLICT;
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
TRANS_LOG(WARN, "lock_for_read need retry", K(ret),
K(tx_data), K(lock_for_read_arg_), K(tx_cc_ctx));
}
}
}
}

View File

@ -20,15 +20,14 @@ namespace oceanbase
namespace transaction
{
int ObTxELRUtil::check_and_update_tx_elr_info(ObTxDesc &tx, const bool can_elr)
int ObTxELRUtil::check_and_update_tx_elr_info(ObTxDesc &tx)
{
int ret = OB_SUCCESS;
if (can_elr && can_tenant_elr_) { // tenant config enable elr
if (can_tenant_elr_ && OB_SYS_TENANT_ID != MTL_ID()) { // tenant config enable elr
tx.set_can_elr(true);
TX_STAT_ELR_ENABLE_TRANS_INC(MTL_ID());
} else {
refresh_elr_tenant_config_();
}
refresh_elr_tenant_config_();
return ret;
}

View File

@ -28,7 +28,8 @@ class ObTxELRUtil
public:
ObTxELRUtil() : last_refresh_ts_(0),
can_tenant_elr_(false) {}
int check_and_update_tx_elr_info(ObTxDesc &tx, const bool can_elr);
int check_and_update_tx_elr_info(ObTxDesc &tx);
bool is_can_tenant_elr() const { return can_tenant_elr_; }
void reset()
{
last_refresh_ts_ = 0;

View File

@ -207,8 +207,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_(ObTxLogType type,
ret = OB_SUCCESS;
bool tx_ctx_existed = false;
common::ObAddr scheduler;
ObTxCreateArg arg(false, /* can_elr */
true, /* for_replay */
ObTxCreateArg arg(true, /* for_replay */
tenant_id,
tx_id,
ls_id,

View File

@ -0,0 +1,344 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_black_list.h"
#include "share/ob_thread_mgr.h" // set_thread_name
#include "observer/ob_server_struct.h" // for GCTX
#include "deps/oblib/src/common/ob_role.h" // role
#include "src/storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil
namespace oceanbase
{
using namespace common;
using namespace storage;
namespace transaction
{
int ObBLService::init()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
TRANS_LOG(ERROR, "BLService init twice", K(*this));
} else if (OB_FAIL(ls_bl_mgr_.init())) {
TRANS_LOG(ERROR, "ls_bl_mgr_ init fail", KR(ret));
} else if (OB_FAIL(ObThreadPool::init())) {
TRANS_LOG(ERROR, "ThreadPool init fail", KR(ret));
} else {
is_inited_ = true;
TRANS_LOG(INFO, "BLService init success", K(*this));
}
return ret;
}
void ObBLService::reset()
{
is_running_ = false;
is_inited_ = false;
ObThreadPool::stop();
ObThreadPool::wait();
ls_bl_mgr_.reset();
}
void ObBLService::destroy()
{
is_running_ = false;
is_inited_ = false;
ObThreadPool::stop();
ObThreadPool::wait();
ls_bl_mgr_.destroy();
}
int ObBLService::start()
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
TRANS_LOG(ERROR, "BLService not inited", KR(ret));
} else if (is_running_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "BLService is already running", KR(ret));
} else if (OB_FAIL(ObThreadPool::start())) {
TRANS_LOG(ERROR, "ThreadPool start fail", KR(ret));
} else {
is_running_ = true;
TRANS_LOG(INFO, "BLService start success");
}
return ret;
}
void ObBLService::stop()
{
ObThreadPool::stop();
TRANS_LOG(INFO, "BLService stop");
}
void ObBLService::wait()
{
TRANS_LOG(INFO, "BLService wait begin");
ObThreadPool::wait();
TRANS_LOG(INFO, "BLService wait end");
}
int ObBLService::add(const ObBLKey &bl_key)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(ERROR, "BLService not inited", KR(ret));
} else {
switch(bl_key.get_type()) {
case BLTYPE_LS: {
if (OB_FAIL(ls_bl_mgr_.add(bl_key))) {
TRANS_LOG(WARN, "ls_bl_mgr_ add failed", KR(ret), K(bl_key));
}
break;
}
default: {
ret = OB_UNKNOWN_OBJ;
TRANS_LOG(ERROR, "unknown key type", K(bl_key));
}
}
}
return ret;
}
void ObBLService::remove(const ObBLKey &bl_key)
{
if (OB_UNLIKELY(!is_inited_)) {
TRANS_LOG(ERROR, "BLService not inited");
} else {
switch(bl_key.get_type()) {
case BLTYPE_LS: {
ls_bl_mgr_.remove(bl_key);
break;
}
default: {
TRANS_LOG(ERROR, "unknown key type", K(bl_key));
}
}
}
}
int ObBLService::check_in_black_list(const ObBLKey &bl_key, bool &in_black_list) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(ERROR, "BLService not inited", KR(ret));
} else {
switch(bl_key.get_type()) {
case BLTYPE_LS: {
if (OB_FAIL(ls_bl_mgr_.check_in_black_list(bl_key, in_black_list))) {
TRANS_LOG(WARN, "ls_bl_mgr_ check failed", KR(ret), K(bl_key));
}
break;
}
default: {
ret = OB_UNKNOWN_OBJ;
TRANS_LOG(ERROR, "unknown blacklist key type", K(bl_key));
}
}
}
return ret;
}
void ObBLService::run1()
{
const int64_t thread_index = get_thread_idx();
int64_t last_print_stat_ts = 0;
int64_t last_clean_up_ts = 0;
lib::set_thread_name("BlackListService");
TRANS_LOG(INFO, "blacklist refresh thread start", K(thread_index));
if (OB_UNLIKELY(!is_inited_)) {
TRANS_LOG(ERROR, "BLService not inited");
} else {
while (!has_set_stop()) {
int64_t begin_tstamp = ObTimeUtility::current_time();
do_thread_task_(begin_tstamp, last_print_stat_ts, last_clean_up_ts);
int64_t end_tstamp = ObTimeUtility::current_time();
int64_t wait_interval = BLACK_LIST_REFRESH_INTERVAL - (end_tstamp - begin_tstamp);
if (wait_interval > 0) {
thread_cond_.timedwait(wait_interval);
}
}
}
TRANS_LOG(INFO, "blacklist refresh thread end");
}
void ObBLService::do_thread_task_(const int64_t begin_tstamp,
int64_t &last_print_stat_ts,
int64_t &last_clean_up_ts)
{
int ret = OB_SUCCESS;
ObSqlString sql;
ObMySQLProxy *sql_proxy = NULL;
sqlclient::ObMySQLResult *result = NULL;
// 查询ls内部表,根据时间戳信息决定是否将ls其加入黑名单
SMART_VAR(ObISQLClient::ReadResult, res) {
if (OB_ISNULL((sql_proxy = GCTX.sql_proxy_))) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "sql_proxy is null", KR(ret));
} else if (OB_FAIL(sql.assign_fmt(BLACK_LIST_SELECT_LS_INFO_STMT))) {
TRANS_LOG(WARN, "fail to append sql", KR(ret));
} else if (OB_FAIL(sql_proxy->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
TRANS_LOG(WARN, "fail to execute sql", KR(ret), K(sql));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "fail to get sql result", KR(ret), K(sql));
} else if (OB_FAIL(do_black_list_check_(result))) {
TRANS_LOG(WARN, "fail to do black list check", KR(ret), K(sql));
} else {
// do nothing
}
}
// 如果连续失败多次,黑名单失去了时效性,需要清空黑名单
static int fail_cnt = 0;
if (OB_SUCCESS != ret) {
fail_cnt++;
if (fail_cnt == BLACK_LIST_MAX_FAIL_COUNT) {
ls_bl_mgr_.reset();
TRANS_LOG(WARN, "failed too much times, reset blacklist ");
}
} else {
fail_cnt = 0;
}
// 定期清理长久未更新的对象,它们实际上可能已经不存在了
if (begin_tstamp > last_clean_up_ts + BLACK_LIST_CLEAN_UP_INTERVAL) {
do_clean_up_();
last_clean_up_ts = begin_tstamp;
}
// 定期打印黑名单内容
if (ObTimeUtility::current_time() - last_print_stat_ts > BLACK_LIST_PRINT_INTERVAL) {
print_stat_();
last_print_stat_ts = begin_tstamp;
}
}
int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
{
int ret = OB_SUCCESS;
uint64_t max_stale_time = 0;
uint64_t curr_time_ns = static_cast<uint64_t>(ObTimeUtility::current_time_ns());
while (OB_SUCC(result->next())) {
ObBLKey bl_key;
ObLsInfo ls_info;
if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) {
TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result));
} else if (LEADER == ls_info.ls_state_) {
// 该日志流是leader,不能加入黑名单
} else if (OB_FAIL(get_tenant_max_stale_time_(bl_key.get_tenant_id(), max_stale_time))) {
TRANS_LOG(WARN, "get_tenant_max_stale_time_ fail ", KR(ret), K(bl_key));
} else {
uint64_t max_stale_time_ns = max_stale_time * 1000;
if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) {
// 时间戳落后,将对应日志流加入黑名单
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
}
} else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) {
// 时间戳赶上,将对应日志流从黑名单中移除
ls_bl_mgr_.remove(bl_key);
} else {
// do nothing
}
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "get next result fail ", KR(ret));
}
return ret;
}
int ObBLService::do_clean_up_()
{
int ret = OB_SUCCESS;
ObBLCleanUpFunctor<ObLsBLMgr> clean_up_functor(ls_bl_mgr_);
if (OB_FAIL(ls_bl_mgr_.for_each(clean_up_functor))) {
TRANS_LOG(WARN, "clean up blacklist fail ", KR(ret));
}
return ret;
}
int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey &bl_key, ObLsInfo &ls_info)
{
int ret = OB_SUCCESS;
ObString ip;
int64_t port = 0;
int64_t tenant_id = 0;
int64_t id = ObLSID::INVALID_LS_ID;
ObString ls_state_str;
uint64_t weak_read_scn = 0;
int64_t migrate_status_int = -1;
(void)GET_COL_IGNORE_NULL(result.get_varchar, "svr_ip", ip);
(void)GET_COL_IGNORE_NULL(result.get_int, "svr_port", port);
(void)GET_COL_IGNORE_NULL(result.get_int, "tenant_id", tenant_id);
(void)GET_COL_IGNORE_NULL(result.get_int, "ls_id", id);
(void)GET_COL_IGNORE_NULL(result.get_varchar, "ls_state", ls_state_str);
(void)GET_COL_IGNORE_NULL(result.get_uint, "weak_read_scn", weak_read_scn);
(void)GET_COL_IGNORE_NULL(result.get_int, "migrate_status", migrate_status_int);
ObLSID ls_id(id);
common::ObAddr server;
ObRole ls_state = INVALID_ROLE;
ObMigrateStatus migrate_status = ObMigrateStatus(migrate_status_int);
if (false == server.set_ip_addr(ip, static_cast<uint32_t>(port))) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "invalid server address", K(ip), K(port));
} else if (OB_FAIL(string_to_role(ls_state_str, ls_state))) {
TRANS_LOG(WARN, "string_to_role fail", K(ls_state_str));
} else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) {
TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id));
} else if (OB_FAIL(ls_info.init(ls_state, weak_read_scn, migrate_status))) {
TRANS_LOG(WARN, "ls_info init fail", K(ls_state), K(weak_read_scn), K(migrate_status));
}
return ret;
}
int ObBLService::get_tenant_max_stale_time_(uint64_t tenant_id, uint64_t &max_stale_time)
{
int ret = OB_SUCCESS;
int64_t max_stale_time_int = ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id,
ObWeakReadUtil::IGNORE_TENANT_EXIST_WARN);
if (max_stale_time_int <= 0) {
ret = OB_ERR_UNEXPECTED;
max_stale_time = 0;
} else {
max_stale_time = static_cast<uint64_t>(max_stale_time_int);
}
return ret;
}
void ObBLService::print_stat_()
{
TRANS_LOG(INFO, "start to print blacklist info");
ObBLPrintFunctor print_fn;
ls_bl_mgr_.for_each(print_fn);
}
} // transaction
} // oceanbase

View File

@ -0,0 +1,410 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_TRANSACTION_OB_BLACK_LIST_
#define OCEANBASE_TRANSACTION_OB_BLACK_LIST_
#include "common/ob_queue_thread.h" // ObCond
#include "common/ob_role.h" // ObRole
#include "storage/tx/ob_trans_define.h" // ObLSID, LinkHashNode
#include "storage/ob_storage_struct.h" // ObMigrateStatus
// 定期更新黑名单的时间间隔(us)
#define BLACK_LIST_REFRESH_INTERVAL 1000000 // 1s
// 判断时间戳是否赶上/落后的缓冲时间(ns),避免阈值附近的日志流反复加入/移出黑名单
#define BLACK_LIST_WHITEWASH_INTERVAL_NS 1000000000 // 1s
// 黑名单信息打印时间间隔(us)
#define BLACK_LIST_PRINT_INTERVAL 10000000 // 10s
// 清理超时对象的时间间隔(us),这些对象不会出现在 SQLResult 中,比如切换server之后旧server上的日志流
#define BLACK_LIST_CLEAN_UP_INTERVAL 5000000 // 5s
// 最大连续失败次数,连续刷新黑名单失败 达到 该次数则清空黑名单
#define BLACK_LIST_MAX_FAIL_COUNT 3
// 查询 __all_virtual_ls_info 的语句,设置了2s超时时间
#define BLACK_LIST_SELECT_LS_INFO_STMT \
"select /*+query_timeout(2000000)*/ svr_ip, svr_port, tenant_id, ls_id, ls_state, \
weak_read_scn, migrate_status from oceanbase.__all_virtual_ls_info;"
namespace oceanbase
{
using namespace common;
using namespace share;
using namespace storage;
namespace transaction
{
// blacklist type
enum BLType
{
BLTYPE_UNKNOWN=0,
BLTYPE_SERVER,
BLTYPE_LS,
BLTYPE_MAX
};
// blacklist key
class ObBLKey
{
public:
ObBLKey() : server_(), tenant_id_(OB_INVALID_TENANT_ID), ls_id_(ObLSID::INVALID_LS_ID) {}
~ObBLKey() { reset(); }
int init(const ObAddr &server, const uint64_t tenant_id, const ObLSID &ls_id)
{
int ret = OB_SUCCESS;
if (!server.is_valid() || OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else {
server_ = server;
tenant_id_ = tenant_id;
ls_id_ = ls_id;
}
return ret;
}
void reset()
{
server_.reset();
tenant_id_ = OB_INVALID_TENANT_ID;
ls_id_ = ObLSID::INVALID_LS_ID;
}
uint64_t hash() const {
uint64_t hash_val = 0;
int64_t server_hash = server_.hash();
uint64_t ls_hash = ls_id_.hash();
hash_val = murmurhash(&server_hash, sizeof(int64_t), 0);
hash_val = murmurhash(&tenant_id_, sizeof(uint64_t), hash_val);
hash_val = murmurhash(&ls_hash, sizeof(uint64_t), hash_val);
return hash_val;
}
int compare(const ObBLKey &other) const
{
int ret = 0;
uint64_t hash_1 = hash();
uint64_t hash_2 = other.hash();
if (hash_1 > hash_2) {
ret = 1;
} else if (hash_1 < hash_2) {
ret = -1;
}
return ret;
}
uint64_t get_tenant_id() const { return tenant_id_; }
const ObLSID &get_ls_id() const { return ls_id_; }
// TODO: different keys return different types, default return BLTYPE_UNKNOWN
BLType get_type() const { return BLType::BLTYPE_LS; }
bool is_valid() const {
return server_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid();
}
TO_STRING_KV(K_(server), K_(tenant_id), K_(ls_id));
private:
ObAddr server_;
uint64_t tenant_id_;
ObLSID ls_id_;
};
typedef LinkHashNode<ObBLKey> ObBlackListHashNode;
typedef LinkHashValue<ObBLKey> ObBlackListHashValue;
struct ObLsInfo
{
public:
ObLsInfo()
: ls_state_(INVALID_ROLE),
weak_read_scn_(0),
migrate_status_(OB_MIGRATE_STATUS_MAX)
{}
int init(ObRole ls_state, uint64_t weak_read_scn, ObMigrateStatus migrate_status)
{
int ret = OB_SUCCESS;
if (OB_MIGRATE_STATUS_MAX == migrate_status) {
ret = OB_INVALID_ARGUMENT;
} else {
ls_state_ = ls_state;
weak_read_scn_ = weak_read_scn;
migrate_status_ = migrate_status;
}
return ret;
}
bool is_valid() const
{
return OB_MIGRATE_STATUS_MAX != migrate_status_;
}
TO_STRING_KV(K_(ls_state), K_(weak_read_scn), K_(migrate_status));
// 日志流状态(角色):LEADER、FOLLOWER,其他角色对于日志流是没有意义的
ObRole ls_state_;
// 弱读时间戳,如果落后超过一定时间就要加入黑名单,单位ns
uint64_t weak_read_scn_;
// 迁移状态,正在迁移的日志流一定不可读
ObMigrateStatus migrate_status_;
};
// blacklist value
class ObBLValue : public ObBlackListHashValue
{
public:
ObBLValue() {}
int init(const ObBLKey &key)
{
int ret = OB_SUCCESS;
if (!key.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else {
key_ = key;
update_ts_ = ObTimeUtility::current_time();
}
return ret;
}
int init(const ObBLKey &key, const ObLsInfo ls_info)
{
int ret = OB_SUCCESS;
if (!key.is_valid() || !ls_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else {
key_ = key;
ls_info_ = ls_info;
update_ts_ = ObTimeUtility::current_time();
}
return ret;
}
int update(const ObLsInfo &ls_info)
{
int ret = OB_SUCCESS;
if (!ls_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else {
ls_info_ = ls_info;
update_ts_ = ObTimeUtility::current_time();
}
return ret;
}
int64_t get_update_ts()
{
return update_ts_;
}
TO_STRING_KV(K_(key), K_(ls_info));
private:
ObBLKey key_;
// ls相关信息
ObLsInfo ls_info_;
// 当前对象最后一次init/update的时间(us)
int64_t update_ts_;
};
class ObBlackListAlloc
{
public:
ObBLValue *alloc_value()
{
return op_alloc(ObBLValue);
}
void free_value(ObBLValue *val)
{
if (NULL != val) {
op_free(val);
}
}
ObBlackListHashNode* alloc_node(ObBLValue *val)
{
UNUSED(val);
return op_alloc(ObBlackListHashNode);
}
void free_node(ObBlackListHashNode *node)
{
if (NULL != node) {
op_free(node);
}
}
};
// blacklist manager
template<typename BLKey, typename BLValue>
class ObBLMgr
{
public:
ObBLMgr() {}
~ObBLMgr() { destroy(); }
int init()
{
int ret = OB_SUCCESS;
if (OB_FAIL(map_.init())) {
TRANS_LOG(ERROR, "BLMgr init failed", KR(ret));
}
return ret;
}
void reset()
{
map_.reset();
}
void destroy()
{
map_.destroy();
}
// create and insert
int add(const BLKey &bl_key)
{
int ret = OB_SUCCESS;
BLValue *value = NULL;
if (!bl_key.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "bl_key is invalid", KR(ret), K(bl_key));
} else if (OB_FAIL(map_.create(bl_key, value))) {
TRANS_LOG(WARN, "map create error", KR(ret), K(bl_key));
} else {
if (OB_FAIL(value->init(bl_key))) {
TRANS_LOG(WARN, "value init error", KR(ret), K(bl_key), KPC(value));
map_.del(bl_key);
}
map_.revert(value);
}
return ret;
}
// update or create
int update(const BLKey &bl_key, const ObLsInfo &ls_info)
{
int ret = OB_SUCCESS;
BLValue *value = NULL;
if (!bl_key.is_valid() || !ls_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "bl_key is invalid", KR(ret), K(bl_key), K(ls_info));
} else if (OB_FAIL(map_.get(bl_key, value)) && OB_ENTRY_NOT_EXIST != ret) {
TRANS_LOG(WARN, "map get error", KR(ret), K(bl_key), K(ls_info));
} else if (OB_ENTRY_NOT_EXIST == ret) {
// key不存在,创建value并插入map
if (OB_FAIL(map_.create(bl_key, value))) {
// 可能前面get时还没有这个key,但是在create之前别的线程把这个key插入map了
TRANS_LOG(WARN, "map create error", KR(ret), K(bl_key), K(ls_info));
} else {
if (OB_FAIL(value->init(bl_key, ls_info))) {
TRANS_LOG(WARN, "value init error", KR(ret), KPC(value), K(bl_key), K(ls_info));
map_.del(bl_key);
}
map_.revert(value);
}
// key已存在,直接更新value
} else if (OB_FAIL(value->update(ls_info))) {
// 只要get成功,就要减去value的引用计数
map_.revert(value);
TRANS_LOG(WARN, "value update error", KR(ret), KPC(value), K(bl_key), K(ls_info));
} else {
map_.revert(value);
}
return ret;
}
void remove(const BLKey &bl_key)
{
if (bl_key.is_valid()) {
map_.del(bl_key);
}
}
int check_in_black_list(const BLKey &bl_key, bool &in_black_list) const
{
int ret = OB_SUCCESS;
if (!bl_key.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_ENTRY_EXIST == map_.contains_key(bl_key)) {
in_black_list = true;
} else {
in_black_list = false;
}
return ret;
}
template <typename Function> int for_each(Function &fn)
{
return map_.for_each(fn);
}
private:
typedef ObLinkHashMap<BLKey, BLValue, ObBlackListAlloc, RefHandle, 128> BLHashMap;
BLHashMap map_;
};
typedef ObBLMgr<ObBLKey, ObBLValue> ObLsBLMgr;
class ObBLService : public ObThreadPool
{
public:
ObBLService() : is_inited_(false), is_running_(false), thread_cond_() {}
~ObBLService() { destroy(); }
int init();
void reset();
void destroy();
int start();
void stop();
void wait();
int add(const ObBLKey &bl_key);
void remove(const ObBLKey &bl_key);
int check_in_black_list(const ObBLKey &bl_key, bool &in_black_list) const;
void run1();
TO_STRING_KV(K_(is_inited), K_(is_running));
public:
static ObBLService &get_instance()
{
static ObBLService instance_;
return instance_;
}
private:
void do_thread_task_(const int64_t begin_tstamp, int64_t &last_print_stat_ts, int64_t &last_clean_up_ts);
int do_black_list_check_(sqlclient::ObMySQLResult *result);
int do_clean_up_();
int get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey &bl_key, ObLsInfo &ls_info);
int get_tenant_max_stale_time_(uint64_t tenant_id, uint64_t &max_stale_time);
void print_stat_();
private:
bool is_inited_;
bool is_running_;
common::ObCond thread_cond_;
ObLsBLMgr ls_bl_mgr_;
};
class ObBLPrintFunctor
{
public:
explicit ObBLPrintFunctor() {}
bool operator()(const ObBLKey &key, ObBLValue *value)
{
TRANS_LOG(INFO, "blacklist info print ", KPC(value));
return true;
}
};
template<typename BLMgr>
class ObBLCleanUpFunctor
{
public:
explicit ObBLCleanUpFunctor(BLMgr &bl_mgr) : bl_mgr_(bl_mgr) {}
bool operator()(const ObBLKey &key, ObBLValue *value)
{
if (ObTimeUtility::current_time() > value->get_update_ts() + BLACK_LIST_CLEAN_UP_INTERVAL) {
bl_mgr_.remove(key);
}
return true;
}
private:
BLMgr &bl_mgr_;
};
} // transaction
} // oceanbase
#endif // end of OCEANBASE_TRANSACTION_OB_BLACK_LIST_

View File

@ -97,8 +97,7 @@ int ObTxCtxTableRecoverHelper::recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls
transaction::ObPartTransCtx *tx_ctx = NULL;
bool tx_ctx_existed = true;
common::ObAddr scheduler;
transaction::ObTxCreateArg arg(false, /* can_elr */
true, /* for_replay */
transaction::ObTxCreateArg arg(true, /* for_replay */
MTL_ID(),
ctx_info.tx_id_,
ctx_info.ls_id_,

View File

@ -34,6 +34,7 @@ tx_unittest(test_simple_tx_ctx)
tx_unittest(test_ls_log_writer)
tx_unittest(test_ob_trans_hashmap)
storage_unittest(test_ob_black_list)
storage_unittest(test_ob_tx_log)
storage_unittest(test_ob_timestamp_service)
storage_unittest(test_ob_trans_rpc)

View File

@ -0,0 +1,289 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <thread>
#define protected public
#define private public
#include "storage/tx/wrs/ob_black_list.h"
namespace oceanbase
{
using namespace common;
using namespace storage;
using namespace transaction;
namespace unittest
{
class TestObBlackList : public ::testing::Test
{
public :
virtual void SetUp() {}
virtual void TearDown() {}
};
TEST_F(TestObBlackList, black_list_init_invalid)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
// init
ObBLService &bl_service = ObBLService::get_instance();
EXPECT_EQ(OB_SUCCESS, bl_service.init());
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077);
ObBLKey key;
ObLSID ls_id(2);
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id));
// add
EXPECT_EQ(OB_SUCCESS, bl_service.add(key));
// check
bool check;
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(true, check);
// update
ObLsInfo ls_info;
uint64_t curr_time = static_cast<uint64_t>(ObTimeUtility::current_time());
EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, curr_time, OB_MIGRATE_STATUS_NONE));
EXPECT_EQ(OB_SUCCESS, bl_service.ls_bl_mgr_.update(key, ls_info));
// check
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(true, check);
// remove
bl_service.remove(key);
// check
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(false, check);
// reset & init
bl_service.reset();
EXPECT_EQ(OB_SUCCESS, bl_service.init());
// add 100 keys & check
for (int i=1; i<=100; i++) {
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ObLSID(i)));
EXPECT_EQ(OB_SUCCESS, bl_service.add(key));
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(true, check);
}
// remove 100 keys & check
for (int i=1; i<=100; i++) {
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ObLSID(i)));
bl_service.remove(key);
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(false, check);
}
// destroy
bl_service.destroy();
}
TEST_F(TestObBlackList, black_list_error)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
// init key
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077);
ObBLKey key;
ObLSID ls_id(2);
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id));
// error: insert before init
ObBLService &bl_service = ObBLService::get_instance();
EXPECT_EQ(OB_NOT_INIT, bl_service.add(key));
// error: init twice
EXPECT_EQ(OB_SUCCESS, bl_service.init());
EXPECT_EQ(OB_INIT_TWICE, bl_service.init());
// erroe: add twice
EXPECT_EQ(OB_SUCCESS, bl_service.add(key));
EXPECT_EQ(OB_ENTRY_EXIST, bl_service.add(key));
// check
bool check;
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(true, check);
// remove twice
bl_service.remove(key);
bl_service.remove(key);
// check
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(false, check);
// destroy twice
bl_service.destroy();
bl_service.destroy();
// error: access after destroy
EXPECT_EQ(OB_NOT_INIT, bl_service.add(key));
bl_service.remove(key);
EXPECT_EQ(OB_NOT_INIT, bl_service.check_in_black_list(key, check));
}
TEST_F(TestObBlackList, black_list_inc_func)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
// init
ObBLService &bl_service = ObBLService::get_instance();
EXPECT_EQ(OB_SUCCESS, bl_service.init());
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077);
ObBLKey key;
bool check = false;
// add 100 keys & check
for (int i=1; i<=100; i++) {
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ObLSID(i)));
EXPECT_EQ(OB_SUCCESS, bl_service.add(key));
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
EXPECT_EQ(true, check);
}
// print
bl_service.print_stat_();
EXPECT_EQ(100, bl_service.ls_bl_mgr_.map_.size());
// clean up
bl_service.do_clean_up_();
EXPECT_EQ(100, bl_service.ls_bl_mgr_.map_.size());
// sleep 5s
int64_t clean_ts = ObTimeUtility::current_time();
usleep(BLACK_LIST_CLEAN_UP_INTERVAL);
// do_thread_task_
int64_t curr_ts = ObTimeUtility::current_time();
int64_t print_ts = curr_ts - 1;
bl_service.do_thread_task_(curr_ts, print_ts, clean_ts);
EXPECT_EQ(curr_ts, clean_ts);
EXPECT_EQ(0, bl_service.ls_bl_mgr_.map_.size());
// destroy
bl_service.destroy();
}
TEST_F(TestObBlackList, black_list_parallel_1)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
// init
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077);
ObBLService &bl_service = ObBLService::get_instance();
EXPECT_EQ(OB_SUCCESS, bl_service.init());
// parallel test
std::vector<std::thread> ths;
uint64_t loop_cnt = 10000;
uint64_t worker_cnt = 10;
auto do_worker = [&] (uint64_t id) {
ObBLKey key;
uint64_t base_id = id * loop_cnt;
// pre worker id, the first worker's pre worker is the last worker
uint64_t pre_id = ((id + worker_cnt - 1) % worker_cnt) * loop_cnt;
for (int i = 1; i <= loop_cnt; i++) {
ObLSID ls_id(base_id + i);
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id));
EXPECT_EQ(OB_SUCCESS, bl_service.add(key));
ObLSID ls_id_2(pre_id + i);
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id_2));
bl_service.remove(key);
}
};
for (int i = 0; i < worker_cnt; i++) {
std::thread th(do_worker, i);
ths.push_back(std::move(th));
}
for (auto &th : ths) {
th.join();
}
// destroy
bl_service.destroy();
}
TEST_F(TestObBlackList, black_list_parallel_2)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
// init
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 2077);
ObBLService &bl_service = ObBLService::get_instance();
EXPECT_EQ(OB_SUCCESS, bl_service.init());
// parallel test
std::vector<std::thread> ths;
uint64_t loop_cnt = 10000;
uint64_t worker_cnt = 10;
bool check = false;
auto do_worker = [&] (uint64_t id) {
int rand = 0;
ObBLKey key;
ObLsInfo ls_info;
EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, 1, OB_MIGRATE_STATUS_NONE));
std::srand((unsigned)std::time(NULL));
for (int i = 1; i <= loop_cnt; i++) {
rand = std::rand() % 1000 + 1;
ObLSID ls_id(std::rand() % loop_cnt + 1);
EXPECT_EQ(OB_SUCCESS, key.init(addr, 1006, ls_id));
if (rand <= 300) {
bl_service.add(key);
} else if (rand <= 600) {
bl_service.ls_bl_mgr_.update(key, ls_info);
} else if (rand <= 800) {
bl_service.remove(key);
} else {
EXPECT_EQ(OB_SUCCESS, bl_service.check_in_black_list(key, check));
}
}
};
for (int i = 0; i < worker_cnt; i++) {
std::thread th(do_worker, i);
ths.push_back(std::move(th));
}
for (auto &th : ths) {
th.join();
}
// destroy
bl_service.destroy();
}
} // unittest
} // oceanbase
using namespace oceanbase;
using namespace oceanbase::common;
int main(int argc, char **argv)
{
int ret = 1;
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_ob_black_list.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}