Fix: ob_last_schema_version need sync

This commit is contained in:
yaojing624
2023-07-03 04:12:27 +00:00
committed by ob-robot
parent 2f62b5283b
commit d72f24221d
5 changed files with 233 additions and 17 deletions

View File

@ -56,6 +56,8 @@ int ObMPUtils::add_changed_session_info(OMPKOK &ok_pkt, sql::ObSQLSessionInfo &s
encoder->is_changed_ = true;
}
}
// record sys var need sync in error scene.
bool is_exist_error_sync_var = false;
for (int64_t i = 0; OB_SUCC(ret) && i < sys_var.count(); ++i) {
sql::ObBasicSessionInfo::ChangedVar change_var = sys_var.at(i);
ObObj new_val;
@ -73,15 +75,29 @@ int ObMPUtils::add_changed_session_info(OMPKOK &ok_pkt, sql::ObSQLSessionInfo &s
LOG_WARN("failed to get sys vairable new value string", K(ret), K(new_val));
} else if (OB_FAIL(ok_pkt.add_system_var(str_kv))) {
LOG_WARN("failed to add system variable", K(str_kv), K(ret));
} else if (session.is_exist_error_sync_var(change_var.id_) && FALSE_IT(is_exist_error_sync_var = true)) {
// do nothing.
} else {
if (is_exist_error_sync_var) {
ObSessInfoEncoder* encoder = NULL;
if (OB_FAIL(session.get_sess_encoder(SESSION_SYNC_ERROR_SYS_VAR, encoder))) {
LOG_WARN("failed to get session encoder", K(ret));
} else {
encoder->is_changed_ = true;
is_exist_error_sync_var = false;
}
}
if (OB_FAIL(ret)) {
} else {
#ifndef NDEBUG
LOG_TRACE("success add system var to ok pack", K(str_kv), K(change_var), K(new_val),
LOG_TRACE("success add system var to ok pack", K(str_kv), K(change_var), K(new_val),
K(session.get_sessid()), K(session.get_proxy_sessid()));
#else
// for autocommit change record.
LOG_INFO("success add system var to ok pack", K(str_kv), K(change_var), K(new_val),
// for autocommit change record.
LOG_INFO("success add system var to ok pack", K(str_kv), K(change_var), K(new_val),
K(session.get_sessid()), K(session.get_proxy_sessid()), K(change_var.id_));
#endif
}
}
} else {
LOG_TRACE("sys var not actully changed", K(changed), K(change_var), K(new_val),

View File

@ -3755,6 +3755,31 @@ bool ObBasicSessionInfo::is_sync_sys_var(share::ObSysVarClassType sys_var_id) co
return not_need_serialize;
}
bool ObBasicSessionInfo::is_exist_error_sync_var(share::ObSysVarClassType sys_var_id) const
{
bool is_exist = false;
switch (sys_var_id)
{
case SYS_VAR_OB_LAST_SCHEMA_VERSION:
is_exist = true;
break;
default:
break;
}
return is_exist;
}
int ObBasicSessionInfo::get_error_sync_sys_vars(ObIArray<share::ObSysVarClassType>
&sys_var_delta_ids) const
{
int ret = OB_SUCCESS;
sys_var_delta_ids.reset();
if (OB_FAIL(sys_var_delta_ids.push_back(SYS_VAR_OB_LAST_SCHEMA_VERSION))) {
LOG_WARN("fail to push_back id", K(ret));
}
return ret;
}
int ObBasicSessionInfo::get_sync_sys_vars(ObIArray<ObSysVarClassType>
&sys_var_delta_ids) const
{
@ -3791,16 +3816,16 @@ int ObBasicSessionInfo::get_sync_sys_vars(ObIArray<ObSysVarClassType>
K(sessid_), K(proxy_sessid_));
}
}
if (sys_var_delta_ids.count() == 0) {
if (OB_FAIL(sys_var_delta_ids.push_back(ids.at(0)))) {
LOG_WARN("fail to push_back id", K(ret));
} else {
LOG_TRACE("success to get default sync sys vars", K(ret), K(sys_var_delta_ids),
K(sessid_), K(proxy_sessid_));
}
}
}
if (sys_var_delta_ids.count() == 0) {
if (OB_FAIL(sys_var_delta_ids.push_back(ids.at(0)))) {
LOG_WARN("fail to push_back id", K(ret));
} else {
LOG_TRACE("success to get default sync sys vars", K(ret), K(sys_var_delta_ids),
K(sessid_), K(proxy_sessid_));
}
}
return ret;
}
@ -3842,8 +3867,22 @@ int ObBasicSessionInfo::serialize_sync_sys_vars(ObIArray<ObSysVarClassType>
return ret;
}
int ObBasicSessionInfo::deserialize_sync_sys_vars(int64_t &deserialize_sys_var_count,
int ObBasicSessionInfo::deserialize_sync_error_sys_vars(int64_t &deserialize_sys_var_count,
const char *buf, const int64_t &data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
// add is_error_sync for Distinguish between system variable synchronization and
// error scenarios that also require synchronization of system variables
bool is_error_sync = true;
if (OB_FAIL(deserialize_sync_sys_vars(deserialize_sys_var_count, buf, data_len, pos, is_error_sync))) {
LOG_WARN("failed to deserialize sys var delta", K(ret), K(deserialize_sys_var_count),
KPHEX(buf+pos, data_len-pos), K(data_len-pos), K(pos));
}
return ret;
}
int ObBasicSessionInfo::deserialize_sync_sys_vars(int64_t &deserialize_sys_var_count,
const char *buf, const int64_t &data_len, int64_t &pos, bool is_error_sync)
{
int ret = OB_SUCCESS;
LOG_TRACE("before deserialize sync sys vars", "inc var ids", sys_var_inc_info_.get_all_sys_var_ids(),
@ -3881,7 +3920,7 @@ int ObBasicSessionInfo::deserialize_sync_sys_vars(int64_t &deserialize_sys_var_c
}
} else if (OB_FAIL(create_sys_var(sys_var_id, store_idx, sys_var))) {
LOG_WARN("fail to create sys var", K(sys_var_id), K(ret));
} else if (!sys_var_inc_info_.all_has_sys_var_id(sys_var_id) &&
} else if (!is_error_sync && !sys_var_inc_info_.all_has_sys_var_id(sys_var_id) &&
OB_FAIL(sys_var_inc_info_.add_sys_var_id(sys_var_id))) {
LOG_WARN("fail to add sys var id", K(sys_var_id), K(ret));
} else if (OB_ISNULL(sys_var)) {
@ -3902,11 +3941,11 @@ int ObBasicSessionInfo::deserialize_sync_sys_vars(int64_t &deserialize_sys_var_c
K(sessid_), K(proxy_sessid_));
}
// add all deserialize sys_var id.
if (OB_SUCC(ret) && OB_FAIL(tmp_sys_var_inc_info.add_sys_var_id(sys_var_id))) {
if (OB_SUCC(ret) && !is_error_sync && OB_FAIL(tmp_sys_var_inc_info.add_sys_var_id(sys_var_id))) {
LOG_WARN("fail to add sys var id", K(sys_var_id), K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_SUCC(ret) && !is_error_sync) {
if (OB_FAIL(sync_default_sys_vars(sys_var_inc_info_, tmp_sys_var_inc_info,
is_influence_plan_cache_sys_var))) {
LOG_WARN("fail to sync default sys vars",K(ret));

View File

@ -1205,11 +1205,14 @@ public:
// for SESSION_SYNC_SYS_VAR serialize and deserialize.
int serialize_sync_sys_vars(common::ObIArray<share::ObSysVarClassType> &sys_var_delta_ids, char *buf, const int64_t &buf_len, int64_t &pos);
int deserialize_sync_sys_vars(int64_t &deserialize_sys_var_count, const char *buf, const int64_t &data_len, int64_t &pos);
int deserialize_sync_sys_vars(int64_t &deserialize_sys_var_count, const char *buf, const int64_t &data_len, int64_t &pos, bool is_error_sync = false);
int deserialize_sync_error_sys_vars(int64_t &deserialize_sys_var_count, const char *buf, const int64_t &data_len, int64_t &pos);
int sync_default_sys_vars(SysVarIncInfo sys_var_inc_info_, SysVarIncInfo tmp_sys_var_inc_info, bool &is_influence_plan_cache_sys_var);
int get_sync_sys_vars(common::ObIArray<share::ObSysVarClassType> &sys_var_delta_ids) const;
int get_error_sync_sys_vars(ObIArray<share::ObSysVarClassType> &sys_var_delta_ids) const;
int get_sync_sys_vars_size(common::ObIArray<share::ObSysVarClassType> &sys_var_delta_ids, int64_t &len) const;
bool is_sync_sys_var(share::ObSysVarClassType sys_var_id) const;
bool is_exist_error_sync_var(share::ObSysVarClassType sys_var_id) const;
// nested session and sql execute for foreign key.
bool is_nested_session() const { return nested_count_ > 0; }

View File

@ -2721,6 +2721,142 @@ int ObSQLSessionInfo::update_sess_sync_info(const SessionSyncInfoType sess_sync_
return ret;
}
int ObErrorSyncSysVarEncoder::serialize(ObSQLSessionInfo &sess, char *buf,
const int64_t length, int64_t &pos)
{
int ret = OB_SUCCESS;
ObSEArray<ObSysVarClassType, ObSysVarFactory::ALL_SYS_VARS_COUNT> sys_var_delta_ids;
if (OB_FAIL(sess.get_error_sync_sys_vars(sys_var_delta_ids))) {
LOG_WARN("failed to calc need serialize vars", K(ret));
} else if (OB_FAIL(sess.serialize_sync_sys_vars(sys_var_delta_ids, buf, length, pos))) {
LOG_WARN("failed to serialize sys var delta", K(ret), K(sys_var_delta_ids.count()),
KPHEX(buf+pos, length-pos), K(length-pos), K(pos));
} else {
LOG_TRACE("success serialize sys var delta", K(ret), K(sys_var_delta_ids),
"inc sys var ids", sess.sys_var_inc_info_.get_all_sys_var_ids(),
K(sess.get_sessid()), K(sess.get_proxy_sessid()));
}
return ret;
}
int ObErrorSyncSysVarEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf,
const int64_t length, int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t deserialize_sys_var_count = 0;
if (OB_FAIL(sess.deserialize_sync_error_sys_vars(deserialize_sys_var_count, buf, length, pos))) {
LOG_WARN("failed to deserialize sys var delta", K(ret), K(deserialize_sys_var_count),
KPHEX(buf+pos, length-pos), K(length-pos), K(pos));
} else {
LOG_DEBUG("success deserialize sys var delta", K(ret), K(deserialize_sys_var_count));
}
return ret;
}
int64_t ObErrorSyncSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess) const {
int ret = OB_SUCCESS;
int64_t len = 0;
ObSEArray<ObSysVarClassType, ObSysVarFactory::ALL_SYS_VARS_COUNT> sys_var_delta_ids;
if (OB_FAIL(sess.get_error_sync_sys_vars(sys_var_delta_ids))) {
LOG_WARN("failed to calc need serialize vars", K(ret));
} else if (OB_FAIL(sess.get_sync_sys_vars_size(sys_var_delta_ids, len))) {
LOG_WARN("failed to serialize size sys var delta", K(ret));
} else {
LOG_DEBUG("success serialize size sys var delta", K(ret), K(sys_var_delta_ids.count()), K(len));
}
return len;
}
int ObErrorSyncSysVarEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
{
int ret = OB_SUCCESS;
for (int64_t j = 0; OB_SUCC(ret) && j< share::ObSysVarFactory::ALL_SYS_VARS_COUNT; ++j) {
if (ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_LAST_SCHEMA_VERSION) {
//need sync sys var
if (OB_FAIL(sess.get_sys_var(j)->serialize(buf, length, pos))) {
LOG_WARN("failed to serialize", K(length), K(ret));
}
} else {
// do nothing.
}
}
return ret;
}
int64_t ObErrorSyncSysVarEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
{
int64_t size = 0;
for (int64_t j = 0; j< share::ObSysVarFactory::ALL_SYS_VARS_COUNT; ++j) {
if (ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_LAST_SCHEMA_VERSION) {
// need sync sys var
size += sess.get_sys_var(j)->get_serialize_size();
} else {
// do nothing.
}
}
return size;
}
int ObErrorSyncSysVarEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
const char* last_sess_buf, int64_t last_sess_length)
{
int ret = OB_SUCCESS;
if (current_sess_length != last_sess_length) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to compare session info", K(ret), K(current_sess_length), K(last_sess_length),
KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length));
} else if (memcmp(current_sess_buf, last_sess_buf, current_sess_length) == 0) {
LOG_TRACE("success to compare session info", K(ret));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to compare buf session info", K(ret),
KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length));
}
return ret;
}
int ObErrorSyncSysVarEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf,
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length)
{
int ret = OB_SUCCESS;
UNUSED(current_sess_buf);
UNUSED(current_sess_length);
int64_t pos = 0;
const char *buf = last_sess_buf;
int64_t data_len = last_sess_length;
common::ObArenaAllocator allocator(common::ObModIds::OB_SQL_SESSION,
OB_MALLOC_NORMAL_BLOCK_SIZE,
sess.get_effective_tenant_id());
ObBasicSysVar *last_sess_sys_vars = NULL;
for (int64_t j = 0; OB_SUCC(ret) && j< share::ObSysVarFactory::ALL_SYS_VARS_COUNT; ++j) {
if (ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_LAST_SCHEMA_VERSION) {
if (OB_FAIL(ObSessInfoVerify::create_tmp_sys_var(sess, ObSysVariables::get_sys_var_id(j),
last_sess_sys_vars, allocator))) {
LOG_WARN("fail to create sys var", K(ret));
} else if (OB_FAIL(last_sess_sys_vars->deserialize(buf, data_len, pos))) {
LOG_WARN("failed to deserialize", K(ret), K(data_len), K(pos));
} else if (!sess.get_sys_var(j)->get_value().can_compare(
last_sess_sys_vars->get_value())) {
share::ObTaskController::get().allow_next_syslog();
LOG_WARN("failed to verify sys vars", K(j), K(ret),
"current_sess_sys_vars", sess.get_sys_var(j)->get_value(),
"last_sess_sys_vars", last_sess_sys_vars->get_value());
} else if (sess.get_sys_var(j)->get_value() != last_sess_sys_vars->get_value()) {
share::ObTaskController::get().allow_next_syslog();
LOG_WARN("failed to verify sys vars", K(j), K(ret),
"current_sess_sys_vars", sess.get_sys_var(j)->get_value(),
"last_sess_sys_vars", last_sess_sys_vars->get_value());
} else {
// do nothing
}
}
}
return ret;
}
int ObSQLSessionInfo::get_mem_ctx_alloc(common::ObIAllocator *&alloc)
{
int ret = OB_SUCCESS;

View File

@ -213,6 +213,7 @@ enum SessionSyncInfoType {
SESSION_SYNC_TXN_PARTICIPANTS_INFO = 7, // 7: txn dynamic info
SESSION_SYNC_TXN_EXTRA_INFO = 8, // 8: txn dynamic info
SESSION_SYNC_SEQUENCE_CURRVAL = 9, // for sequence currval
SESSION_SYNC_ERROR_SYS_VAR = 10, // for error scene need sync sysvar info
SESSION_SYNC_MAX_TYPE,
};
@ -362,6 +363,24 @@ public:
static const int16_t CONINFO_BY_SESS = 0xC078;
};
// The current system variable synchronization will not trigger synchronization in
// the error reporting scenario. A new type is added here for variables that still
// need to be synchronized in the error reporting scenario
class ObErrorSyncSysVarEncoder : public ObSessInfoEncoder {
public:
ObErrorSyncSysVarEncoder() : ObSessInfoEncoder() {}
virtual ~ObErrorSyncSysVarEncoder() {}
virtual int serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
virtual int deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t length, int64_t &pos) override;
virtual int64_t get_serialize_size(ObSQLSessionInfo &sess) const override;
virtual int fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
virtual int64_t get_fetch_sess_info_size(ObSQLSessionInfo& sess) override;
virtual int compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
const char* last_sess_buf, int64_t last_sess_length) override;
virtual int display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf,
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override;
};
#define DEF_SESSION_TXN_ENCODER(CLS) \
class CLS final : public ObSessInfoEncoder { \
public: \
@ -1024,6 +1043,7 @@ public:
ObAppCtxInfoEncoder &get_app_ctx_encoder() { return app_ctx_info_encoder_; }
ObClientIdInfoEncoder &get_client_info_encoder() { return client_id_info_encoder_;}
ObControlInfoEncoder &get_control_info_encoder() { return control_info_encoder_;}
ObErrorSyncSysVarEncoder &get_error_sync_sys_var_encoder() { return error_sync_sys_var_encoder_;}
ObSequenceCurrvalEncoder &get_sequence_currval_encoder() { return sequence_currval_encoder_; }
ObContextsMap &get_contexts_map() { return contexts_map_; }
ObSequenceCurrvalMap &get_sequence_currval_map() { return sequence_currval_map_; }
@ -1335,7 +1355,8 @@ private:
&txn_dynamic_info_encoder_,
&txn_participants_info_encoder_,
&txn_extra_info_encoder_,
&sequence_currval_encoder_
&sequence_currval_encoder_,
&error_sync_sys_var_encoder_
};
ObSysVarEncoder sys_var_encoder_;
//ObUserVarEncoder usr_var_encoder_;
@ -1348,6 +1369,7 @@ private:
ObTxnParticipantsInfoEncoder txn_participants_info_encoder_;
ObTxnExtraInfoEncoder txn_extra_info_encoder_;
ObSequenceCurrvalEncoder sequence_currval_encoder_;
ObErrorSyncSysVarEncoder error_sync_sys_var_encoder_;
public:
void post_sync_session_info();
void prep_txn_free_route_baseline(bool reset_audit = true);