[CP] Fix: session sync related bug

This commit is contained in:
yaojing624
2023-09-26 06:10:06 +00:00
committed by ob-robot
parent 0be5791bd5
commit 8f6d380ccd
4 changed files with 68 additions and 49 deletions

View File

@ -3052,9 +3052,8 @@ int ObErrorSyncSysVarEncoder::deserialize(ObSQLSessionInfo &sess, const char *bu
return ret;
}
int64_t ObErrorSyncSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess) const {
int ObErrorSyncSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const {
int ret = OB_SUCCESS;
int64_t len = 0;
ObSEArray<ObSysVarClassType, ObSysVarFactory::ALL_SYS_VARS_COUNT> sys_var_delta_ids;
if (OB_FAIL(sess.get_error_sync_sys_vars(sys_var_delta_ids))) {
LOG_WARN("failed to calc need serialize vars", K(ret));
@ -3063,7 +3062,7 @@ int64_t ObErrorSyncSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess) con
} else {
LOG_DEBUG("success serialize size sys var delta", K(ret), K(sys_var_delta_ids.count()), K(len));
}
return len;
return ret;
}
int ObErrorSyncSysVarEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
@ -3198,9 +3197,8 @@ int ObSysVarEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf,
return ret;
}
int64_t ObSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess) const {
int ObSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const {
int ret = OB_SUCCESS;
int64_t len = 0;
ObSEArray<ObSysVarClassType, ObSysVarFactory::ALL_SYS_VARS_COUNT> sys_var_delta_ids;
if (OB_FAIL(sess.get_sync_sys_vars(sys_var_delta_ids))) {
LOG_WARN("failed to calc need serialize vars", K(ret));
@ -3209,7 +3207,7 @@ int64_t ObSysVarEncoder::get_serialize_size(ObSQLSessionInfo& sess) const {
} else {
LOG_DEBUG("success serialize size sys var delta", K(ret), K(sys_var_delta_ids.count()), K(len));
}
return len;
return ret;
}
int ObSysVarEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
@ -3224,7 +3222,8 @@ int ObSysVarEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const in
if (ObSysVariables::get_sys_var_id(j) == SYS_VAR_SERVER_UUID ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_PROXY_PARTITION_HIT ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_STATEMENT_TRACE_ID ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_VERSION_COMMENT) {
ObSysVariables::get_sys_var_id(j) == SYS_VAR_VERSION_COMMENT ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK) {
// no need sync sys var
continue;
}
@ -3245,7 +3244,8 @@ int64_t ObSysVarEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
if (ObSysVariables::get_sys_var_id(j) == SYS_VAR_SERVER_UUID ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_PROXY_PARTITION_HIT ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_STATEMENT_TRACE_ID ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_VERSION_COMMENT) {
ObSysVariables::get_sys_var_id(j) == SYS_VAR_VERSION_COMMENT ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK) {
// no need sync sys var
continue;
}
@ -3297,7 +3297,8 @@ int ObSysVarEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* curre
if (ObSysVariables::get_sys_var_id(j) == SYS_VAR_SERVER_UUID ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_PROXY_PARTITION_HIT ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_OB_STATEMENT_TRACE_ID ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR_VERSION_COMMENT) {
ObSysVariables::get_sys_var_id(j) == SYS_VAR_VERSION_COMMENT ||
ObSysVariables::get_sys_var_id(j) == SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK) {
// no need sync sys var
continue;
}
@ -3391,8 +3392,10 @@ int ObAppInfoEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf, const
return ret;
}
int64_t ObAppInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess) const {
return sess.get_client_app_info().get_serialize_size();
int ObAppInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const {
int ret = OB_SUCCESS;
len = sess.get_client_app_info().get_serialize_size();
return ret;
}
int ObAppInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
@ -3406,7 +3409,9 @@ int ObAppInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const i
int64_t ObAppInfoEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
{
return get_serialize_size(sess);
int64_t len = 0;
get_serialize_size(sess, len);
return len;
}
int ObAppInfoEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
@ -3523,11 +3528,11 @@ int ObClientIdInfoEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf,
}
return ret;
}
int64_t ObClientIdInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess) const
int ObClientIdInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const
{
int64_t len = 0;
int ret = OB_SUCCESS;
OB_UNIS_ADD_LEN(sess.get_client_identifier());
return len;
return ret;
}
int ObClientIdInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
@ -3541,7 +3546,9 @@ int ObClientIdInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, co
int64_t ObClientIdInfoEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
{
return get_serialize_size(sess);
int64_t len = 0;
get_serialize_size(sess, len);
return len;
}
int ObClientIdInfoEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
@ -3622,15 +3629,15 @@ int ObAppCtxInfoEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf, co
}
return ret;
}
int64_t ObAppCtxInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess) const
int ObAppCtxInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const
{
int64_t len = 0;
int ret = OB_SUCCESS;
ObContextsMap &map = sess.get_contexts_map();
OB_UNIS_ADD_LEN(map.size());
for (auto it = map.begin(); it != map.end(); ++it) {
OB_UNIS_ADD_LEN(*it->second);
}
return len;
return ret;
}
int ObAppCtxInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
@ -3644,7 +3651,9 @@ int ObAppCtxInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, cons
int64_t ObAppCtxInfoEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
{
return get_serialize_size(sess);
int64_t len = 0;
get_serialize_size(sess, len);
return len;
}
int ObAppCtxInfoEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
@ -3772,9 +3781,9 @@ int ObSequenceCurrvalEncoder::deserialize(ObSQLSessionInfo &sess, const char *bu
return ret;
}
int64_t ObSequenceCurrvalEncoder::get_serialize_size(ObSQLSessionInfo& sess) const
int ObSequenceCurrvalEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const
{
int64_t len = 0;
int ret = OB_SUCCESS;
ObSequenceCurrvalMap &map = sess.get_sequence_currval_map();
OB_UNIS_ADD_LEN(map.size());
for (auto it = map.begin(); it != map.end(); ++it) {
@ -3789,7 +3798,7 @@ int64_t ObSequenceCurrvalEncoder::get_serialize_size(ObSQLSessionInfo& sess) con
OB_UNIS_ADD_LEN(it->first.dblink_id_);
OB_UNIS_ADD_LEN(it->second);
}
return len;
return ret;
}
int ObSequenceCurrvalEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf,
@ -3804,7 +3813,9 @@ int ObSequenceCurrvalEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf,
int64_t ObSequenceCurrvalEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
{
return get_serialize_size(sess);
int64_t len = 0;
get_serialize_size(sess, len);
return len;
}
int ObSequenceCurrvalEncoder::compare_sess_info(const char* current_sess_buf,
@ -4040,9 +4051,11 @@ int ObControlInfoEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf, c
return ret;
}
int64_t ObControlInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess) const
int ObControlInfoEncoder::get_serialize_size(ObSQLSessionInfo& sess, int64_t &len) const
{
return sess.get_control_info().get_serialize_size() + 6 + sizeof(bool);
int ret = OB_SUCCESS;
len = sess.get_control_info().get_serialize_size() + 6 + sizeof(bool);
return ret;
}
int ObControlInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
@ -4056,7 +4069,9 @@ int ObControlInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, con
int64_t ObControlInfoEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
{
return get_serialize_size(sess);
int64_t len = 0;
get_serialize_size(sess, len);
return len;
}
int ObControlInfoEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
@ -4123,9 +4138,11 @@ int CLS::deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t leng
{ \
return ObSqlTransControl::update_txn_##func##_state(sess, buf, length, pos); \
} \
int64_t CLS::get_serialize_size(ObSQLSessionInfo &sess) const \
int CLS::get_serialize_size(ObSQLSessionInfo &sess, int64_t &len) const \
{ \
return ObSqlTransControl::get_txn_##func##_state_serialize_size(sess); \
int ret = OB_SUCCESS; \
len = ObSqlTransControl::get_txn_##func##_state_serialize_size(sess); \
return ret; \
} \
int CLS::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t data_len, int64_t &pos) \
{ \