[CP] Support sequence currval session sync via obproxy
This commit is contained in:
		@ -700,6 +700,12 @@ public:
 | 
			
		||||
  {
 | 
			
		||||
    return seq_schema_;
 | 
			
		||||
  }
 | 
			
		||||
  uint64_t get_sequence_id() const {
 | 
			
		||||
    return seq_schema_.get_sequence_id();
 | 
			
		||||
  }
 | 
			
		||||
  uint64_t get_tenant_id() const {
 | 
			
		||||
    return seq_schema_.get_tenant_id();
 | 
			
		||||
  }
 | 
			
		||||
  common::ObBitSet<> &get_option_bitset()
 | 
			
		||||
  {
 | 
			
		||||
    return option_bitset_;
 | 
			
		||||
 | 
			
		||||
@ -765,6 +765,14 @@ int ObCmdExecutor::execute(ObExecContext &ctx, ObICmd &cmd)
 | 
			
		||||
      }
 | 
			
		||||
      case stmt::T_DROP_SEQUENCE: {
 | 
			
		||||
        DEFINE_EXECUTE_CMD(ObDropSequenceStmt, ObDropSequenceExecutor);
 | 
			
		||||
        if (OB_SUCC(ret)) {
 | 
			
		||||
          ObDropSequenceStmt &stmt = *(static_cast<ObDropSequenceStmt*>(&cmd));
 | 
			
		||||
          const uint64_t tenant_id = stmt.get_arg().get_tenant_id();
 | 
			
		||||
          const uint64_t sequence_id = stmt.get_arg().get_sequence_id();
 | 
			
		||||
          if (OB_FAIL(my_session->drop_sequence_value_if_exists(tenant_id, sequence_id))) {
 | 
			
		||||
            LOG_WARN("failed to drop sequence value from session", K(ret));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
      case stmt::T_ALTER_SEQUENCE: {
 | 
			
		||||
 | 
			
		||||
@ -2083,7 +2083,26 @@ int ObSQLSessionInfo::set_sequence_value(uint64_t tenant_id,
 | 
			
		||||
  } else if (OB_FAIL(sequence_currval_map_.set_refactored(seq_id, value, overwrite_exits))) {
 | 
			
		||||
    LOG_WARN("fail get seq", K(tenant_id), K(seq_id), K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    // ok
 | 
			
		||||
    sequence_currval_encoder_.is_changed_ = true;
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObSQLSessionInfo::drop_sequence_value_if_exists(uint64_t tenant_id, uint64_t seq_id)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id ||
 | 
			
		||||
      OB_INVALID_ID == seq_id)) {
 | 
			
		||||
    LOG_WARN("invalid args", K(tenant_id), K(seq_id), K(ret));
 | 
			
		||||
  } else if (OB_FAIL(sequence_currval_map_.erase_refactored(seq_id))) {
 | 
			
		||||
    if (OB_HASH_NOT_EXIST == ret) {
 | 
			
		||||
      LOG_INFO("drop sequence value not exists", K(ret),  K(tenant_id), K(seq_id));
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_WARN("drop sequence value failed", K(ret), K(tenant_id), K(seq_id));
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    sequence_currval_encoder_.is_changed_ = true;
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
@ -3226,6 +3245,137 @@ int ObAppCtxInfoEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* c
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObSequenceCurrvalEncoder::serialize(ObSQLSessionInfo &sess, char *buf, const int64_t buf_len, int64_t &pos)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObSequenceCurrvalMap &map = sess.get_sequence_currval_map();
 | 
			
		||||
  OB_UNIS_ENCODE(map.size());
 | 
			
		||||
  int64_t count = 0;
 | 
			
		||||
  for (auto it = map.begin(); OB_SUCC(ret) && it != map.end(); ++it, ++count) {
 | 
			
		||||
    OB_UNIS_ENCODE(it->first);
 | 
			
		||||
    OB_UNIS_ENCODE(it->second);
 | 
			
		||||
  }
 | 
			
		||||
  CK (count == map.size());
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObSequenceCurrvalEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t data_len, int64_t &pos)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  int64_t map_size = 0;
 | 
			
		||||
  OB_UNIS_DECODE(map_size);
 | 
			
		||||
  ObSequenceCurrvalMap &map = sess.get_sequence_currval_map();
 | 
			
		||||
  OX (sess.reuse_all_sequence_value());
 | 
			
		||||
  uint64_t seq_id = 0;
 | 
			
		||||
  ObSequenceValue seq_val;
 | 
			
		||||
  for (int64_t i = 0; OB_SUCC(ret) && i < map_size; ++i) {
 | 
			
		||||
    OB_UNIS_DECODE(seq_id);
 | 
			
		||||
    OB_UNIS_DECODE(seq_val);
 | 
			
		||||
    OZ (map.set_refactored(seq_id, seq_val, true /*overwrite_exits*/));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t ObSequenceCurrvalEncoder::get_serialize_size(ObSQLSessionInfo& sess) const
 | 
			
		||||
{
 | 
			
		||||
  int64_t len = 0;
 | 
			
		||||
  ObSequenceCurrvalMap &map = sess.get_sequence_currval_map();
 | 
			
		||||
  OB_UNIS_ADD_LEN(map.size());
 | 
			
		||||
  for (auto it = map.begin(); it != map.end(); ++it) {
 | 
			
		||||
    OB_UNIS_ADD_LEN(it->first);
 | 
			
		||||
    OB_UNIS_ADD_LEN(it->second);
 | 
			
		||||
  }
 | 
			
		||||
  return len;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObSequenceCurrvalEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf,
 | 
			
		||||
                                              const int64_t length, int64_t &pos)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_FAIL(serialize(sess, buf, length, pos))) {
 | 
			
		||||
    LOG_WARN("failed to fetch session info.", K(ret), K(pos), K(length));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t ObSequenceCurrvalEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
 | 
			
		||||
{
 | 
			
		||||
  return get_serialize_size(sess);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObSequenceCurrvalEncoder::compare_sess_info(const char* current_sess_buf,
 | 
			
		||||
                                                int64_t current_sess_length,
 | 
			
		||||
                                                const char* last_sess_buf,
 | 
			
		||||
                                                int64_t last_sess_length)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (current_sess_length != last_sess_length) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("fail to compare session info", K(ret), K(current_sess_length), K(last_sess_length),
 | 
			
		||||
      KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length));
 | 
			
		||||
  } else if (memcmp(current_sess_buf, last_sess_buf, current_sess_length) == 0) {
 | 
			
		||||
    LOG_TRACE("success to compare session info", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("fail to compare buf session info", K(ret),
 | 
			
		||||
      KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObSequenceCurrvalEncoder::display_sess_info(ObSQLSessionInfo &sess,
 | 
			
		||||
                                                const char* current_sess_buf,
 | 
			
		||||
                                                int64_t current_sess_length,
 | 
			
		||||
                                                const char* last_sess_buf,
 | 
			
		||||
                                                int64_t last_sess_length)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  UNUSED(current_sess_buf);
 | 
			
		||||
  UNUSED(current_sess_length);
 | 
			
		||||
  const char *buf = last_sess_buf;
 | 
			
		||||
  int64_t pos = 0;
 | 
			
		||||
  int64_t data_len = last_sess_length;
 | 
			
		||||
  int64_t map_size = 0;
 | 
			
		||||
  OB_UNIS_DECODE(map_size);
 | 
			
		||||
  ObSequenceCurrvalMap &map = sess.get_sequence_currval_map();
 | 
			
		||||
  if (map_size != map.size()) {
 | 
			
		||||
    share::ObTaskController::get().allow_next_syslog();
 | 
			
		||||
    LOG_WARN("Sequence currval map size mismatch", K(ret), "current_map_size", map.size(),
 | 
			
		||||
             "last_map_size", map_size);
 | 
			
		||||
  } else {
 | 
			
		||||
    bool found_mismatch = false;
 | 
			
		||||
    uint64_t seq_id = 0;
 | 
			
		||||
    ObSequenceValue seq_val_decode;
 | 
			
		||||
    ObSequenceValue seq_val_origin;
 | 
			
		||||
    for (int64_t i = 0; OB_SUCC(ret) && !found_mismatch && i < map_size; ++i) {
 | 
			
		||||
      OB_UNIS_DECODE(seq_id);
 | 
			
		||||
      OB_UNIS_DECODE(seq_val_decode);
 | 
			
		||||
      if (OB_SUCC(ret)) {
 | 
			
		||||
        if (OB_FAIL(map.get_refactored(seq_id, seq_val_origin))) {
 | 
			
		||||
          if (ret == OB_HASH_NOT_EXIST) {
 | 
			
		||||
            found_mismatch = true;
 | 
			
		||||
            share::ObTaskController::get().allow_next_syslog();
 | 
			
		||||
            LOG_WARN("Decoded sequence id not found", K(ret), K(i), K(map_size), K(seq_id));
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
          } else {
 | 
			
		||||
            LOG_WARN("Fail to get refactored from map", K(ret), K(seq_id));
 | 
			
		||||
          }
 | 
			
		||||
        } else if (seq_val_decode.val() != seq_val_origin.val()) {
 | 
			
		||||
          found_mismatch = true;
 | 
			
		||||
          share::ObTaskController::get().allow_next_syslog();
 | 
			
		||||
          LOG_WARN("Sequence currval mismatch", K(ret), K(i), K(map_size), K(seq_id),
 | 
			
		||||
                   "current_seq_val", seq_val_origin, "last_seq_val", seq_val_decode);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_SUCC(ret) && !found_mismatch) {
 | 
			
		||||
      share::ObTaskController::get().allow_next_syslog();
 | 
			
		||||
      LOG_WARN("All sequence currval is matched", K(ret), K(map_size));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
OB_DEF_SERIALIZE(ObInnerContextMap)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
 | 
			
		||||
@ -212,6 +212,7 @@ enum SessionSyncInfoType {
 | 
			
		||||
  SESSION_SYNC_TXN_DYNAMIC_INFO = 6,      // 6: txn dynamic info
 | 
			
		||||
  SESSION_SYNC_TXN_PARTICIPANTS_INFO = 7, // 7: txn dynamic info
 | 
			
		||||
  SESSION_SYNC_TXN_EXTRA_INFO = 8,        // 8: txn dynamic info
 | 
			
		||||
  SESSION_SYNC_SEQUENCE_CURRVAL = 9, // for sequence currval
 | 
			
		||||
  SESSION_SYNC_MAX_TYPE,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@ -309,6 +310,21 @@ public:
 | 
			
		||||
          int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class ObSequenceCurrvalEncoder : public ObSessInfoEncoder {
 | 
			
		||||
public:
 | 
			
		||||
  ObSequenceCurrvalEncoder() : ObSessInfoEncoder() {}
 | 
			
		||||
  virtual ~ObSequenceCurrvalEncoder() {}
 | 
			
		||||
  virtual int serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
 | 
			
		||||
  virtual int deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t length, int64_t &pos) override;
 | 
			
		||||
  virtual int64_t get_serialize_size(ObSQLSessionInfo &sess) const override;
 | 
			
		||||
  virtual int fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
 | 
			
		||||
  virtual int64_t get_fetch_sess_info_size(ObSQLSessionInfo& sess) override;
 | 
			
		||||
  virtual int compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
 | 
			
		||||
                                const char* last_sess_buf, int64_t last_sess_length) override;
 | 
			
		||||
  virtual int display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf,
 | 
			
		||||
                                int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class ObControlInfoEncoder : public ObSessInfoEncoder {
 | 
			
		||||
public:
 | 
			
		||||
  ObControlInfoEncoder() : ObSessInfoEncoder() {}
 | 
			
		||||
@ -923,6 +939,11 @@ public:
 | 
			
		||||
                         uint64_t seq_id,
 | 
			
		||||
                         const share::ObSequenceValue &value);
 | 
			
		||||
 | 
			
		||||
  int drop_sequence_value_if_exists(uint64_t tenant_id, uint64_t seq_id);
 | 
			
		||||
  void reuse_all_sequence_value()
 | 
			
		||||
  {
 | 
			
		||||
    sequence_currval_map_.reuse();
 | 
			
		||||
  }
 | 
			
		||||
  int get_context_values(const common::ObString &context_name,
 | 
			
		||||
                        const common::ObString &attribute,
 | 
			
		||||
                        common::ObString &value,
 | 
			
		||||
@ -977,7 +998,9 @@ public:
 | 
			
		||||
  ObAppCtxInfoEncoder &get_app_ctx_encoder() { return app_ctx_info_encoder_; }
 | 
			
		||||
  ObClientIdInfoEncoder &get_client_info_encoder() { return client_id_info_encoder_;}
 | 
			
		||||
  ObControlInfoEncoder &get_control_info_encoder() { return control_info_encoder_;}
 | 
			
		||||
  ObSequenceCurrvalEncoder &get_sequence_currval_encoder() { return sequence_currval_encoder_; }
 | 
			
		||||
  ObContextsMap &get_contexts_map() { return contexts_map_; }
 | 
			
		||||
  ObSequenceCurrvalMap &get_sequence_currval_map() { return sequence_currval_map_; }
 | 
			
		||||
  int get_mem_ctx_alloc(common::ObIAllocator *&alloc);
 | 
			
		||||
  int update_sess_sync_info(const SessionSyncInfoType sess_sync_info_type,
 | 
			
		||||
                                const char *buf, const int64_t length, int64_t &pos);
 | 
			
		||||
@ -1274,7 +1297,8 @@ private:
 | 
			
		||||
                            &txn_static_info_encoder_,
 | 
			
		||||
                            &txn_dynamic_info_encoder_,
 | 
			
		||||
                            &txn_participants_info_encoder_,
 | 
			
		||||
                            &txn_extra_info_encoder_
 | 
			
		||||
                            &txn_extra_info_encoder_,
 | 
			
		||||
                            &sequence_currval_encoder_
 | 
			
		||||
                            };
 | 
			
		||||
  ObSysVarEncoder sys_var_encoder_;
 | 
			
		||||
  //ObUserVarEncoder usr_var_encoder_;
 | 
			
		||||
@ -1286,6 +1310,7 @@ private:
 | 
			
		||||
  ObTxnDynamicInfoEncoder txn_dynamic_info_encoder_;
 | 
			
		||||
  ObTxnParticipantsInfoEncoder txn_participants_info_encoder_;
 | 
			
		||||
  ObTxnExtraInfoEncoder txn_extra_info_encoder_;
 | 
			
		||||
  ObSequenceCurrvalEncoder sequence_currval_encoder_;
 | 
			
		||||
public:
 | 
			
		||||
  void post_sync_session_info();
 | 
			
		||||
  void prep_txn_free_route_baseline(bool reset_audit = true);
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user