Fix slog reader doesnt switch file when theres no switch log at EOF
This commit is contained in:
@ -437,10 +437,12 @@ int ObStorageLogReader::load_buf()
|
|||||||
int ObStorageLogReader::check_switch_file(const int get_ret, const LogCommand cmd)
|
int ObStorageLogReader::check_switch_file(const int get_ret, const LogCommand cmd)
|
||||||
{
|
{
|
||||||
int ret = get_ret;
|
int ret = get_ret;
|
||||||
// Check switch file when meet below situation:
|
const bool fetch_log_again = (OB_READ_NOTHING == get_ret);
|
||||||
// 1) Meet switch log command
|
// Check switch file when meet below 2 situation:
|
||||||
|
// 1) Meet switch log command, or
|
||||||
|
// 2) read the last log (which may be incomplete and ignored)
|
||||||
// Otherwise directly ret
|
// Otherwise directly ret
|
||||||
if (common::OB_SUCCESS == get_ret && OB_LOG_SWITCH_LOG == cmd) {
|
if ((common::OB_SUCCESS == get_ret && OB_LOG_SWITCH_LOG == cmd) || fetch_log_again) {
|
||||||
STORAGE_REDO_LOG(INFO, "reach the end of log", K_(file_id));
|
STORAGE_REDO_LOG(INFO, "reach the end of log", K_(file_id));
|
||||||
if (OB_FAIL(close())) {
|
if (OB_FAIL(close())) {
|
||||||
STORAGE_REDO_LOG(ERROR, "close error", K(ret));
|
STORAGE_REDO_LOG(ERROR, "close error", K(ret));
|
||||||
|
|||||||
@ -863,9 +863,100 @@ TEST_F(TestStorageLogReaderWriter, seek_to_end)
|
|||||||
ASSERT_EQ(1, cnt);
|
ASSERT_EQ(1, cnt);
|
||||||
ret = reader.get_next_cursor(read_cursor);
|
ret = reader.get_next_cursor(read_cursor);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
ASSERT_EQ(LOG_FILE_ID, read_cursor.file_id_);
|
ASSERT_EQ(LOG_FILE_ID + 1, read_cursor.file_id_);
|
||||||
ASSERT_EQ(LOG_START_SEQ + 2, read_cursor.log_id_);
|
ASSERT_EQ(LOG_START_SEQ + 2, read_cursor.log_id_);
|
||||||
ASSERT_EQ(4096 * 2, read_cursor.offset_);
|
ASSERT_EQ(0, read_cursor.offset_);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TestStorageLogReaderWriter, read_multiple_files)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const char LOG_DIR[512] = "./test_storage_log_rw";
|
||||||
|
const int64_t LOG_FILE_SIZE = 64 << 20; // 64MB
|
||||||
|
const int64_t CONCURRENT_TRANS_CNT = 128;
|
||||||
|
const int64_t LOG_BUFFER_SIZE = 1966080L; // 1.875MB
|
||||||
|
const int64_t log_cnt_per_file = 10;
|
||||||
|
|
||||||
|
// write part
|
||||||
|
ObLogCursor start_cursor;
|
||||||
|
start_cursor.file_id_ = 1;
|
||||||
|
start_cursor.log_id_ = 1;
|
||||||
|
start_cursor.offset_ = 0;
|
||||||
|
|
||||||
|
char write_data[128] = "this is read multiple files test.";
|
||||||
|
ObBaseStorageLogBuffer log_buf;
|
||||||
|
ret = log_buf.assign(write_data, 1024);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ret = log_buf.set_pos(strlen(write_data));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
ObStorageLogWriter writer;
|
||||||
|
ret = writer.init(LOG_DIR, LOG_FILE_SIZE, LOG_BUFFER_SIZE, CONCURRENT_TRANS_CNT);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ret = writer.start_log(start_cursor);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
for (int64_t i = 0; i < log_cnt_per_file; ++i) {
|
||||||
|
ret = writer.flush_log(LogCommand::OB_LOG_DUMMY_LOG, log_buf, start_cursor);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
}
|
||||||
|
start_cursor.file_id_ = 2;
|
||||||
|
start_cursor.offset_ = 0;
|
||||||
|
start_cursor.log_id_ = writer.get_cur_cursor().log_id_;
|
||||||
|
writer.destroy();
|
||||||
|
|
||||||
|
// manually switch file
|
||||||
|
ret = writer.init(LOG_DIR, LOG_FILE_SIZE, LOG_BUFFER_SIZE, CONCURRENT_TRANS_CNT);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ret = writer.start_log(start_cursor);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
for (int64_t i = 0; i < log_cnt_per_file; ++i) {
|
||||||
|
ret = writer.flush_log(LogCommand::OB_LOG_DUMMY_LOG, log_buf, start_cursor);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
}
|
||||||
|
writer.destroy();
|
||||||
|
|
||||||
|
// read part
|
||||||
|
LogCommand cmd = LogCommand::OB_LOG_UNKNOWN;
|
||||||
|
uint64_t seq = 0;
|
||||||
|
int64_t read_len = 0;
|
||||||
|
char *read_data = NULL;
|
||||||
|
ObLogCursor read_cursor;
|
||||||
|
|
||||||
|
ObStorageLogReader reader;
|
||||||
|
ret = reader.init(LOG_DIR, 1, 0);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
while (OB_SUCCESS == ret) {
|
||||||
|
// read dummy log
|
||||||
|
ret = reader.read_log(cmd, seq, read_data, read_len);
|
||||||
|
}
|
||||||
|
ASSERT_EQ(OB_READ_NOTHING, ret);
|
||||||
|
|
||||||
|
ret = reader.get_cursor(read_cursor);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
ASSERT_EQ(start_cursor.file_id_ + 1, read_cursor.file_id_);
|
||||||
|
ASSERT_EQ(0, read_cursor.offset_);
|
||||||
|
ASSERT_EQ(start_cursor.log_id_ + 1, read_cursor.log_id_);
|
||||||
|
reader.reset();
|
||||||
|
|
||||||
|
// read start from the end of file 1
|
||||||
|
ret = reader.init(LOG_DIR, 1, 21);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
while (OB_SUCCESS == ret) {
|
||||||
|
// read dummy log
|
||||||
|
ret = reader.read_log(cmd, seq, read_data, read_len);
|
||||||
|
}
|
||||||
|
ASSERT_EQ(OB_READ_NOTHING, ret);
|
||||||
|
|
||||||
|
ret = reader.get_cursor(read_cursor);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
ASSERT_EQ(start_cursor.file_id_ + 1, read_cursor.file_id_);
|
||||||
|
ASSERT_EQ(0, read_cursor.offset_);
|
||||||
|
ASSERT_EQ(start_cursor.log_id_ + 1, read_cursor.log_id_);
|
||||||
|
reader.reset();
|
||||||
}
|
}
|
||||||
} // namespace blocksstable
|
} // namespace blocksstable
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|||||||
Reference in New Issue
Block a user