switch slog file for new comming slog data in any way
This commit is contained in:
		
				
					committed by
					
						
						wangzelin.wzl
					
				
			
			
				
	
			
			
			
						parent
						
							b5fd1cecbc
						
					
				
				
					commit
					9812af88fb
				
			@ -569,9 +569,13 @@ int ObStorageLogReplayer::replay_after_ckpt(
 | 
				
			|||||||
        LOG_WARN("Fail to get write_start_cursor, ", K(ret));
 | 
					        LOG_WARN("Fail to get write_start_cursor, ", K(ret));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      finish_cursor_.file_id_ = replay_start_cursor.file_id_;
 | 
					      // if last_entry_seq is -1,
 | 
				
			||||||
 | 
					      // it means we never read one single slog after replay_start_cursor(the checkpoint cursor)
 | 
				
			||||||
 | 
					      // this is because this slog file may have reach the end,
 | 
				
			||||||
 | 
					      // in this situation we should move forward to next file.
 | 
				
			||||||
 | 
					      finish_cursor_.file_id_ = replay_start_cursor.file_id_ + 1;
 | 
				
			||||||
      finish_cursor_.log_id_ = replay_start_cursor.log_id_;
 | 
					      finish_cursor_.log_id_ = replay_start_cursor.log_id_;
 | 
				
			||||||
      finish_cursor_.offset_ = replay_start_cursor.offset_;
 | 
					      finish_cursor_.offset_ = 0;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -23,6 +23,59 @@ using namespace ::testing;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
namespace oceanbase {
 | 
					namespace oceanbase {
 | 
				
			||||||
namespace blocksstable {
 | 
					namespace blocksstable {
 | 
				
			||||||
 | 
					class TestLogData
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  TestLogData(char* data, int len)
 | 
				
			||||||
 | 
					    : data_(data), len_(len), locally_allocated(false)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  TestLogData(int len)
 | 
				
			||||||
 | 
					    : data_(nullptr), len_(len), locally_allocated(true)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    data_ = new char[len]();
 | 
				
			||||||
 | 
					    MEMSET(data_, 'T', len);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ~TestLogData()
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    if (locally_allocated)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      delete[] data_;
 | 
				
			||||||
 | 
					      data_ = nullptr;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  int serialize(char *buf, int64_t limit, int64_t &pos) const
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					    if (OB_ISNULL(buf) || limit < 0 || pos < 0) {
 | 
				
			||||||
 | 
					      ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    } else if (pos + len_ > limit) {
 | 
				
			||||||
 | 
					      ret = OB_BUF_NOT_ENOUGH;
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      MEMCPY(buf + pos, data_, len_);
 | 
				
			||||||
 | 
					      pos += len_;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return ret;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  int deserialize(const char *buf, int64_t limit, int64_t &pos)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    UNUSED(buf);
 | 
				
			||||||
 | 
					    UNUSED(limit);
 | 
				
			||||||
 | 
					    UNUSED(pos);
 | 
				
			||||||
 | 
					    return OB_NOT_SUPPORTED;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  int get_serialize_size() const { return len_; }
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  char *data_;
 | 
				
			||||||
 | 
					  int len_;
 | 
				
			||||||
 | 
					  bool locally_allocated;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TestStorageLogReaderWriter : public ::testing::Test {
 | 
					class TestStorageLogReaderWriter : public ::testing::Test {
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  TestStorageLogReaderWriter()
 | 
					  TestStorageLogReaderWriter()
 | 
				
			||||||
@ -31,6 +84,8 @@ public:
 | 
				
			|||||||
  {}
 | 
					  {}
 | 
				
			||||||
  virtual void SetUp();
 | 
					  virtual void SetUp();
 | 
				
			||||||
  virtual void TearDown();
 | 
					  virtual void TearDown();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ObBaseStorageLogBuffer get_buffer(const TestLogData& log_data);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void TestStorageLogReaderWriter::SetUp()
 | 
					void TestStorageLogReaderWriter::SetUp()
 | 
				
			||||||
@ -45,6 +100,14 @@ void TestStorageLogReaderWriter::TearDown()
 | 
				
			|||||||
  system("rm -rf ./test_storage_log_rw");
 | 
					  system("rm -rf ./test_storage_log_rw");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					ObBaseStorageLogBuffer TestStorageLogReaderWriter::get_buffer(const TestLogData& log_data)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  ObBaseStorageLogBuffer buffer;
 | 
				
			||||||
 | 
					  buffer.assign(log_data.data_, log_data.len_);
 | 
				
			||||||
 | 
					  buffer.set_pos(log_data.len_);
 | 
				
			||||||
 | 
					  return buffer;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
TEST_F(TestStorageLogReaderWriter, normal)
 | 
					TEST_F(TestStorageLogReaderWriter, normal)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -748,6 +811,62 @@ TEST_F(TestStorageLogReaderWriter, seek_second_log)
 | 
				
			|||||||
  ASSERT_EQ(OB_SUCCESS, ret);
 | 
					  ASSERT_EQ(OB_SUCCESS, ret);
 | 
				
			||||||
  OB_LOG(INFO, "read finish ", K(read_cursor));
 | 
					  OB_LOG(INFO, "read finish ", K(read_cursor));
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TEST_F(TestStorageLogReaderWriter, seek_to_end)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  char LOG_DIR[512] = { 0 };
 | 
				
			||||||
 | 
					  strcpy(LOG_DIR, "./test_storage_log_rw");
 | 
				
			||||||
 | 
					  const int64_t LOG_FILE_SIZE = 64 << 20; //64MB
 | 
				
			||||||
 | 
					  const int64_t CONCURRENT_TRANS_CNT = 128;
 | 
				
			||||||
 | 
					  const int64_t LOG_BUFFER_SIZE = 1966080L;  // 1.875MB
 | 
				
			||||||
 | 
					  const int64_t LOG_FILE_ID = 615;
 | 
				
			||||||
 | 
					  const int64_t LOG_START_SEQ = 320168236;
 | 
				
			||||||
 | 
					  const int64_t CKPT_BUFFER_SIZE = 4096 - 32 - 12;
 | 
				
			||||||
 | 
					  // write part
 | 
				
			||||||
 | 
					  ObLogCursor start_cursor;
 | 
				
			||||||
 | 
					  start_cursor.file_id_ = LOG_FILE_ID;
 | 
				
			||||||
 | 
					  start_cursor.log_id_ = LOG_START_SEQ;
 | 
				
			||||||
 | 
					  start_cursor.offset_ = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ObStorageLogWriter writer;
 | 
				
			||||||
 | 
					  writer.init(LOG_DIR, LOG_FILE_SIZE, LOG_BUFFER_SIZE, CONCURRENT_TRANS_CNT);
 | 
				
			||||||
 | 
					  ret = writer.start_log(start_cursor);
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, ret);
 | 
				
			||||||
 | 
					  // write checkpoint log
 | 
				
			||||||
 | 
					  TestLogData CKPT_LOG_DATA(CKPT_BUFFER_SIZE);
 | 
				
			||||||
 | 
					  start_cursor.reset();
 | 
				
			||||||
 | 
					  ret = writer.flush_log(LogCommand::OB_LOG_CHECKPOINT, get_buffer(CKPT_LOG_DATA), start_cursor);
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, ret);
 | 
				
			||||||
 | 
					  ASSERT_EQ(LOG_FILE_ID, start_cursor.file_id_);
 | 
				
			||||||
 | 
					  ASSERT_EQ(LOG_START_SEQ, start_cursor.log_id_);
 | 
				
			||||||
 | 
					  // read part
 | 
				
			||||||
 | 
					  LogCommand cmd = LogCommand::OB_LOG_UNKNOWN;
 | 
				
			||||||
 | 
					  uint64_t seq = 0;
 | 
				
			||||||
 | 
					  int64_t read_len = 0;
 | 
				
			||||||
 | 
					  char *read_data = nullptr;
 | 
				
			||||||
 | 
					  ObLogCursor read_cursor;
 | 
				
			||||||
 | 
					  ObStorageLogReader reader;
 | 
				
			||||||
 | 
					  ret = reader.init(LOG_DIR, LOG_FILE_ID, LOG_START_SEQ);
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, ret);
 | 
				
			||||||
 | 
					  int64_t last_entry_seq = -1;
 | 
				
			||||||
 | 
					  int64_t cnt = 0;
 | 
				
			||||||
 | 
					  while (OB_SUCC(ret) && OB_SUCC(reader.read_log(cmd, seq, read_data, read_len))) {
 | 
				
			||||||
 | 
					    ++cnt;
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					      last_entry_seq = seq;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_READ_NOTHING, ret);
 | 
				
			||||||
 | 
					  // last_entry_seq is not -1 beacuse there's nop log
 | 
				
			||||||
 | 
					  ASSERT_EQ(LOG_START_SEQ + 1, last_entry_seq);
 | 
				
			||||||
 | 
					  ASSERT_EQ(1, cnt);
 | 
				
			||||||
 | 
					  ret = reader.get_next_cursor(read_cursor);
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, ret);
 | 
				
			||||||
 | 
					  ASSERT_EQ(LOG_FILE_ID, read_cursor.file_id_);
 | 
				
			||||||
 | 
					  ASSERT_EQ(LOG_START_SEQ + 2, read_cursor.log_id_);
 | 
				
			||||||
 | 
					  ASSERT_EQ(4096 * 2, read_cursor.offset_);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
}  // namespace blocksstable
 | 
					}  // namespace blocksstable
 | 
				
			||||||
}  // namespace oceanbase
 | 
					}  // namespace oceanbase
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user