fix autoinc seq replay
This commit is contained in:
		
				
					committed by
					
						
						wangzelin.wzl
					
				
			
			
				
	
			
			
			
						parent
						
							e3937271ab
						
					
				
				
					commit
					1432398f97
				
			@ -507,5 +507,37 @@ int ObTabletAutoincSeqRpcHandler::batch_set_tablet_autoinc_seq(
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTabletAutoincSeqRpcHandler::replay_update_tablet_autoinc_seq(
 | 
			
		||||
    const ObLS *ls,
 | 
			
		||||
    const ObTabletID &tablet_id,
 | 
			
		||||
    const uint64_t autoinc_seq,
 | 
			
		||||
    const int64_t replay_log_ts)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(ls == nullptr || !tablet_id.is_valid() || autoinc_seq == 0 || replay_log_ts <= 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", K(ret), K(tablet_id), K(autoinc_seq), K(replay_log_ts));
 | 
			
		||||
  } else {
 | 
			
		||||
    ObTabletHandle tablet_handle;
 | 
			
		||||
    ObBucketHashWLockGuard guard(bucket_lock_, tablet_id.hash());
 | 
			
		||||
    if (OB_FAIL(ls->replay_get_tablet(tablet_id,
 | 
			
		||||
                                      replay_log_ts,
 | 
			
		||||
                                      tablet_handle))) {
 | 
			
		||||
      if (OB_TABLET_NOT_EXIST == ret) {
 | 
			
		||||
        LOG_INFO("tablet may be deleted, skip this log", K(ret), K(tablet_id), K(replay_log_ts));
 | 
			
		||||
        ret = OB_SUCCESS;
 | 
			
		||||
      } else if (OB_EAGAIN == ret) {
 | 
			
		||||
        // retry replay again
 | 
			
		||||
      } else {
 | 
			
		||||
        LOG_WARN("fail to replay get tablet, retry again", K(ret), K(tablet_id), K(replay_log_ts));
 | 
			
		||||
        ret = OB_EAGAIN;
 | 
			
		||||
      }
 | 
			
		||||
    } else if (OB_FAIL(tablet_handle.get_obj()->update_tablet_autoinc_seq(autoinc_seq, replay_log_ts))) {
 | 
			
		||||
      LOG_WARN("failed to update tablet auto inc seq", K(ret), K(autoinc_seq), K(replay_log_ts));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -131,11 +131,16 @@ public:
 | 
			
		||||
  int batch_set_tablet_autoinc_seq(
 | 
			
		||||
      const obrpc::ObBatchSetTabletAutoincSeqArg &arg,
 | 
			
		||||
      obrpc::ObBatchSetTabletAutoincSeqRes &res);
 | 
			
		||||
  int replay_update_tablet_autoinc_seq(
 | 
			
		||||
      const ObLS *ls,
 | 
			
		||||
      const ObTabletID &tablet_id,
 | 
			
		||||
      const uint64_t autoinc_seq,
 | 
			
		||||
      const int64_t replay_log_ts);
 | 
			
		||||
private:
 | 
			
		||||
  ObTabletAutoincSeqRpcHandler();
 | 
			
		||||
  ~ObTabletAutoincSeqRpcHandler();
 | 
			
		||||
private:
 | 
			
		||||
  static const int64_t BUCKET_LOCK_BUCKET_CNT = 100;
 | 
			
		||||
  static const int64_t BUCKET_LOCK_BUCKET_CNT = 10243L;
 | 
			
		||||
  bool is_inited_;
 | 
			
		||||
  common::ObBucketLock bucket_lock_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@ -16,10 +16,12 @@
 | 
			
		||||
#include "storage/ob_sync_tablet_seq_clog.h"
 | 
			
		||||
#include "logservice/ob_log_base_header.h"
 | 
			
		||||
#include "lib/oblog/ob_log_module.h"
 | 
			
		||||
#include "share/ob_tablet_autoincrement_service.h"
 | 
			
		||||
 | 
			
		||||
namespace oceanbase
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
using namespace share;
 | 
			
		||||
namespace storage
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
@ -55,7 +57,7 @@ int ObLSSyncTabletSeqHandler::replay(const void *buffer,
 | 
			
		||||
  ObSyncTabletSeqLog log;
 | 
			
		||||
  int64_t tmp_pos = 0;
 | 
			
		||||
  const char *log_buf = static_cast<const char *>(buffer);
 | 
			
		||||
  ObTabletHandle tablet_handle;
 | 
			
		||||
  ObTabletAutoincSeqRpcHandler &autoinc_seq_handler = ObTabletAutoincSeqRpcHandler::get_instance();
 | 
			
		||||
  if (IS_NOT_INIT) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("ObLSSyncTabletSeqHandler not inited", K(ret));
 | 
			
		||||
@ -63,20 +65,10 @@ int ObLSSyncTabletSeqHandler::replay(const void *buffer,
 | 
			
		||||
    LOG_WARN("log base header deserialize error", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(log.deserialize(log_buf, nbytes, tmp_pos))) {
 | 
			
		||||
    LOG_WARN("ObSyncTabletSeqLog deserialize error", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(ls_->replay_get_tablet(log.get_tablet_id(),
 | 
			
		||||
                                            ts_ns,
 | 
			
		||||
                                            tablet_handle))) {
 | 
			
		||||
    if (OB_TABLET_NOT_EXIST == ret) {
 | 
			
		||||
      LOG_INFO("tablet may be deleted, skip this log", K(ret), "tablet_id", log.get_tablet_id(), K(ts_ns));
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
    } else if (OB_EAGAIN == ret) {
 | 
			
		||||
      // retry replay again
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_WARN("fail to replay get tablet, retry again", K(ret), K(log), K(ts_ns));
 | 
			
		||||
      ret = OB_EAGAIN;
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_FAIL(tablet_handle.get_obj()->update_tablet_autoinc_seq(log.get_autoinc_seq(),
 | 
			
		||||
                                                                        ts_ns))) {
 | 
			
		||||
  } else if (OB_FAIL(autoinc_seq_handler.replay_update_tablet_autoinc_seq(ls_,
 | 
			
		||||
                                                                          log.get_tablet_id(),
 | 
			
		||||
                                                                          log.get_autoinc_seq(),
 | 
			
		||||
                                                                          ts_ns))) {
 | 
			
		||||
    LOG_WARN("failed to update tablet auto inc seq", K(ret), K(log));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user