diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.cpp b/mittest/logservice/env/ob_simple_log_cluster_env.cpp index 275cba7ed..b9aa91a4d 100644 --- a/mittest/logservice/env/ob_simple_log_cluster_env.cpp +++ b/mittest/logservice/env/ob_simple_log_cluster_env.cpp @@ -897,9 +897,8 @@ int ObSimpleLogClusterTestEnv::read_log(PalfHandleImplGuard &leader, const LSN & bool is_raw_write = false; if (OB_FAIL(iterator.next())) { } else if (OB_FAIL(iterator.get_entry(buf, buf_len, scn, log_offset, is_raw_write))) { - } else if (true == is_raw_write){ - ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "the log mustn't be raw write", K(ret), K(iterator), K(log_offset)); + } else { + PALF_LOG(TRACE, "print log entry", K(is_raw_write)); } } } @@ -920,6 +919,8 @@ int ObSimpleLogClusterTestEnv::read_group_log(PalfHandleImplGuard &leader, LSN l } else if (iterator.iterator_impl_.curr_entry_is_raw_write_ != entry.get_header().is_raw_write()) { ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "raw write not match, unexpected error", K(ret), K(iterator)); + } else { + PALF_LOG(TRACE, "print log group entry", K(entry)); } } } diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index f7bcd3611..8ccf2a0ea 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -662,7 +662,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) // case1: // - 验证mode_version变化后,cache是否清空 // - replayable_point_scn是否生效 - // 当mode version发生变化时,预期case应该清空 + // 当mode version发生变化时,预期cache应该清空 // raw模式下,当replayable_point_scn很小时,直接返回OB_ITER_END PALF_LOG(INFO, "runlin trace case1", K(mode_version_v), K(*mode_version), K(max_scn_case1)); // mode_version_v 为无效值时,预期不清空 @@ -721,19 +721,22 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) PALF_LOG(INFO, "runlin trace case3", K(iterator), K(max_scn_case3), K(end_lsn_v), K(max_scn_case2)); SCN first_scn = logts[0]; // 在使用next(replayable_point_scn, &next_log_min_scn)接口时 - // 我们禁止使用LogEntry的头作为迭代器重点 - LSN first_log_start_lsn = lsns[0]; + // 我们禁止使用LogEntry的头作为迭代器终点 + LSN first_log_start_lsn = lsns[0] - sizeof(LogGroupEntryHeader); LSN first_log_end_lsn = lsns[0]+log_size+sizeof(LogEntryHeader); SCN next_log_min_scn; bool iterate_end_by_replayable_point = false; count = 5; - // 模拟提前达到文件终点, 此时curr_entry_size为0,因此next_log_min_scn为max_scn_case2+1 + // 模拟提前达到文件终点, 没有读过新日志,因此next_log_min_scn为prev_entry_scn_+1 end_lsn_v = first_log_start_lsn - 1; CLOG_LOG(INFO, "runlin trace 1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(max_scn_case2), K(first_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(SCN::plus(first_scn, 10000), next_log_min_scn, iterate_end_by_replayable_point)); // file_end_lsn尽管回退了,但curr_entry_已经没有被读取过, 因此next_log_min_scn依旧为first_scn - EXPECT_EQ(SCN::plus(max_scn_case2, 1), next_log_min_scn); + EXPECT_EQ(SCN::plus(iterator.iterator_impl_.prev_entry_scn_, 1), next_log_min_scn); EXPECT_EQ(iterate_end_by_replayable_point, false); + CLOG_LOG(INFO, "runlin trace 3.1", K(iterator), K(end_lsn_v), KPC(end_lsn)); + EXPECT_EQ(first_log_start_lsn, + iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_)); // 读取一条日志成功,next_log_min_scn会被重置 // curr_entry为fisrt_log_ts对应的log @@ -742,10 +745,13 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) EXPECT_EQ(OB_SUCCESS, iterator.next(first_scn, next_log_min_scn, iterate_end_by_replayable_point)); count--; // iterator 返回成功,next_log_min_scn应该为OB_INVALID_TIMESTAMP EXPECT_EQ(next_log_min_scn.is_valid(), false); + // iterator中的prev_entry_scn_被设置为first_scn + EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, first_scn); CLOG_LOG(INFO, "runlin trace 3", K(iterator), K(end_lsn_v), KPC(end_lsn)); { - // 模拟提前达到文件终点, 此时文件终点为file_log_end_lsn预期next_log_min_scn为first_scn对应的日志+1 + // 模拟提前达到文件终点, 此时文件终点为file_log_end_lsn + // 预期next_log_min_scn为first_scn对应的日志+1 SCN second_scn = logts[1]; EXPECT_EQ(OB_ITER_END, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point)); // iterator返回OB_ITER_END,next_log_min_scn为first_scn+1 @@ -769,6 +775,8 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); // iterator返回OB_ITER_END,next_log_min_scn为replayable_point_scn + 1 + PALF_LOG(INFO, "runliun trace 4.1", K(replayable_point_scn), K(next_log_min_scn), + K(iterator)); EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1)); // 再次调用next,预期next_log_min_scn还是replayable_point_scn+1 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); @@ -782,33 +790,42 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) } // 模拟file end lsn不是group entry的终点 - { + { + // 设置终点为第三条日志LogEntry对应的起点 + end_lsn_v = lsns[2]+10; + // 设置时间戳为第三条日志 + SCN third_scn = logts[2]; + SCN replayable_point_scn = SCN::plus(third_scn, 10); + CLOG_LOG(INFO, "runlin trace 5.1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn)); + // 此时内存中缓存的日志为第三条日志, iterator读取过新日志,但该日志由于end_lsn的原因不可读(此时,由于日志非受控回放,因此curr_read_pos_会被递推56) + // 因此next_log_min_scn会被设置为third_scn + EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); + CLOG_LOG(INFO, "runlin trace 5.1.1", K(iterator), K(next_log_min_scn), K(replayable_point_scn)); + EXPECT_EQ(next_log_min_scn, third_scn); + EXPECT_EQ(iterate_end_by_replayable_point, false); - // 设置终点为第三条日志LogEntry对应的起点 - end_lsn_v = lsns[2]; - SCN third_scn = logts[2]; - SCN replayable_point_scn = third_scn.minus(third_scn, 1); - CLOG_LOG(INFO, "runlin trace 5.1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn)); - // 此时内存中缓存的日志为第三条日志, 但该日志由于end_lsn的原因不可读, - // 因此next_log_min_scn为replayable_point_scn+1 - EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); - EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1)); - EXPECT_EQ(iterate_end_by_replayable_point, false); - // 由于replayable_point_scn与curr_entry_之间不可能有日志,同时replayable_point_scn 0) { @@ -817,6 +834,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) EXPECT_EQ(false, next_log_min_scn.is_valid()); count--; } + CLOG_LOG(INFO, "runlin trace 6.1", K(iterator), K(end_lsn_v), K(max_scn_case3)); // 磁盘上以及受控回放点之后没有可读日志,此时应该返回受控回放点+1 EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case3, next_log_min_scn, iterate_end_by_replayable_point)); @@ -842,7 +860,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) PALF_LOG(INFO, "runlin trace 6.4", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(), "new_leader_lsn", leader.palf_handle_impl_->get_max_lsn()); - // case 7 end_lsn_v 为很大的之后,让内存中有2M数据, 预期iterator next会由于受控回放失败,prev_entry_scn_不变 + // case 7 end_lsn_v 为很大的值之后,让内存中有2M数据, 预期iterator next会由于受控回放失败,prev_entry_scn_不变 // replayable_point_scn 为第一条日志的时间戳-2, next_log_min_scn 为append第一条LogEntry的时间戳 // NB: 如果不将数据读到内存中来,可能会出现读数据报错OB_NEED_RETRY的问题。 end_lsn_v = LSN(1000000000); @@ -852,11 +870,11 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) end_lsn_v = lsns_append[1]+2; - // 此时curr_entry_为第二条日志, curr_entry有效但不可读 - // 模拟replayable_point_scn大于curr_entry_ - PALF_LOG(INFO, "runlin trace 7.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1])); - replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000); + // 此时curr_entry_为第二条日志, curr_entry有效但由于file end lsn不可读 + // 对于append 日志受控回放无效 + replayable_point_scn = SCN::plus(raw_write_leader.palf_handle_impl_->get_max_scn(), 1000000); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); + PALF_LOG(INFO, "runlin trace 7.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(replayable_point_scn)); EXPECT_EQ(next_log_min_scn, logts_append[1]); EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_); EXPECT_EQ(iterate_end_by_replayable_point, false); @@ -966,28 +984,36 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); - EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); - EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); - - // 推大受控回放点到第一条日志,但end_lsn_v也变为第一条日志的起点,此时会由于end_lsn_v不可读 - // 预期next_log_min_scn为prev_next_success_scn+1, 由于prev_next_success_scn和replayable_point_scn - // 之间可能存在日志,因此不能将next_Log_min_ts设置为replayable_point_scn+1 - // - // 模拟prev_entry_后没有日志,replayable_point_scn大于prev_entry_scn_ - end_lsn_v = lsns_append[0]; - replayable_point_scn = logts_append[0]; - PALF_LOG(INFO, "runlin trace 8.4", K(iterator), K(replayable_point_scn), K(end_lsn_v), + PALF_LOG(INFO, "runlin trace 8.3.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); + // 推大受控回放点到第一条日志,但end_lsn_v也变为第一条日志的起点,此时会由于end_lsn_v不可读 + // 预期next_min_scn为replayable_point_scn. + // 由于这条日志在此前的next中,不需要受控回放,会推大curr_read_pos_到LogEntry头,再次next不需要读数据直接返回OB_ITER_END + end_lsn_v = lsns_append[0]+10; + replayable_point_scn = logts_append[0]; + PALF_LOG(INFO, "runlin trace 8.4", K(iterator), K(replayable_point_scn), K(end_lsn_v), + K(logts_append[0]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); - EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); + EXPECT_EQ(replayable_point_scn, next_log_min_scn); + EXPECT_EQ(iterate_end_by_replayable_point, false); - // 模拟prev_entry_后没有日志,replayable_point_scn小于prev_entry_scn_ + PALF_LOG(INFO, "runlin trace 8.4.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), + K(logts_append[0]), K(prev_next_success_scn)); + EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(replayable_point_scn, next_log_min_scn); + EXPECT_EQ(iterate_end_by_replayable_point, false); + + PALF_LOG(INFO, "runlin trace 8.4.2", K(iterator), K(replayable_point_scn), K(end_lsn_v), + K(logts_append[0]), K(prev_next_success_scn)); + // 模拟prev_entry_后没有日志,replayable_point_scn小于prev_entry_scn_, 后续日志都需要受控回放 + // replayable_point_scn回退是不会出现的事,此时next_min_scn会返回prev_entry_scn_+1 replayable_point_scn = SCN::minus(prev_next_success_scn, 100); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); + EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); @@ -1116,6 +1142,29 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_gc_block) EXPECT_EQ(expect_scn, min_block_scn); } +class IOTaskCond : public LogIOTask { +public: + IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {} + virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final + { + PALF_LOG(INFO, "before cond_wait"); + cond_.wait(); + PALF_LOG(INFO, "after cond_wait"); + return OB_SUCCESS; + }; + virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final + { + return OB_SUCCESS; + } + virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; } + int init(int64_t palf_id) + { + palf_id_ = palf_id; + return OB_SUCCESS; + }; + virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);} + ObCond cond_; +}; TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) { SET_CASE_LOG_FILE(TEST_NAME, "test_iterator_with_flashback"); @@ -1130,49 +1179,445 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, 200)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200)); SCN max_scn1 = leader.palf_handle_impl_->get_max_scn(); - LSN max_lsn1 = leader.palf_handle_impl_->get_max_lsn(); - sleep(2); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, 200)); + LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); SCN max_scn2 = leader.palf_handle_impl_->get_max_scn(); + LSN end_pos_of_log2 = leader.palf_handle_impl_->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); + // 提交几条日志到raw_write leader EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator)); - // 迭代flashacbk之前的日志成功 - EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1)); - EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1)); - PALF_LOG(INFO, "runlin trace case1", K(iterator)); - EXPECT_EQ(OB_ITER_END, iterator.next(max_scn1)); + // 迭代flashback之前的日志成功 + SCN next_min_scn; + SCN tmp_scn; tmp_scn.val_ = 1000; + bool iterate_end_by_replayable_point = false; + EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(false, iterate_end_by_replayable_point); + EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn1); + EXPECT_EQ(OB_ITER_END, iterator.next( + max_scn1, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_)); + EXPECT_EQ(SCN::plus(max_scn1, 1), next_min_scn); + PALF_LOG(INFO, "runlin trace case1", K(iterator), K(end_pos_of_log1)); - EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->inner_flashback(max_scn1)); + EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->inner_flashback(max_scn2)); - EXPECT_EQ(max_scn1, raw_write_leader.palf_handle_impl_->get_max_scn()); - - EXPECT_EQ(OB_ITER_END, iterator.next(max_scn1)); - EXPECT_EQ(iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_-sizeof(LogGroupEntryHeader)), - max_lsn1); + EXPECT_EQ(max_scn2, raw_write_leader.palf_handle_impl_->get_max_scn()); int64_t mode_version; switch_flashback_to_append(raw_write_leader, mode_version); - EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 2, leader_idx, 333)); + // 磁盘上存在三条日志,一条日志已经迭代,另外一条日志没有迭代(raw_write),最后一条日志为Append + EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 1, leader_idx, 333)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); + EXPECT_EQ(OB_ITER_END, read_group_log(raw_write_leader, LSN(0))); SCN max_scn3 = raw_write_leader.palf_handle_impl_->get_max_scn(); PALF_LOG(INFO, "runlin trace case2", K(iterator), K(max_scn3), "end_lsn:", raw_write_leader.palf_handle_impl_->get_end_lsn()); LSN iterator_end_lsn = iterator.iterator_storage_.end_lsn_; - // 内存中有两条日志,预期返回成功, 此时会清cache - EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3)); - EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_); - EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3)); + // iterator内存中有几条日志,预期返回成功, 此时会清cache, 前一条日志的信息会被清除(raw_write日志) + // 迭代器游标预期依旧指向第一条日志的终点, 由于受控回放,返回iterate_end + EXPECT_EQ(OB_ITER_END, iterator.next( + max_scn1, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_)); + EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_); + + // 需要从磁盘上将后面两日志读上来,但由于受控回放不会吐出 + EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_); + EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn2, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(false, iterate_end_by_replayable_point); + EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_); + EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn2); + + EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(false, iterate_end_by_replayable_point); + EXPECT_EQ(false, iterator.iterator_impl_.curr_entry_is_raw_write_); + EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn3); + + // raw_write_leader已经有三条日志, raw_write(1 log entry), raw_write(1), append(1), + // 模拟一条group entry 中有多条小日志 + LSN last_lsn = raw_write_leader.palf_handle_impl_->get_max_lsn(); + SCN last_scn = raw_write_leader.palf_handle_impl_->get_max_scn(); + + LogIOWorker *io_worker = &raw_write_leader.palf_env_impl_->log_io_worker_; + IOTaskCond cond(id_raw_write, raw_write_leader.palf_env_impl_->last_palf_epoch_); + io_worker->submit_io_task(&cond); + std::vector lsns; + std::vector scns; + EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 10, 100, id_raw_write, lsns, scns)); + int group_entry_num = 1; + int first_log_entry_index = 0, last_log_entry_index = 0; + for (int i = 1; i < 10; i++) { + if (lsns[i-1]+100+sizeof(LogEntryHeader) == lsns[i]) { + last_log_entry_index = i; + } else { + first_log_entry_index = i; + group_entry_num++; + PALF_LOG(INFO, "group entry", K(i-1)); + } + if (first_log_entry_index - last_log_entry_index > 2) { + break; + } + } + leader.reset(); + if (first_log_entry_index != 1 && last_log_entry_index != 9) { + PALF_LOG(INFO, "no group log has more than 2 log entry", K(first_log_entry_index), K(last_log_entry_index)); + return; + } + + cond.cond_.signal(); + + // 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前还有几条LogEntry + // LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志) + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); + { + const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1); + PalfHandleImplGuard new_raw_write_leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader)); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader)); + + EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader)); + + PalfBufferIterator buff_iterator; + PalfGroupBufferIterator group_iterator; + + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator)); + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator)); + + SCN replayable_point_scn(SCN::min_scn()); + // 验证replayable_point_scn为min_scn + EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + + + // replayable_point_scn为第一条日志-1 + replayable_point_scn = SCN::minus(max_scn1, 1); + EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + + // replayable_point_scn为第一条日志 + replayable_point_scn = max_scn1; + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, false); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, false); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + + // replayable_point_scn为第一条日志 + 1 + replayable_point_scn = SCN::plus(max_scn1, 1); + EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + + // 成功迭代第二条日志,第三条日志 + replayable_point_scn = last_scn; + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); + + // 第四条日志一定是LogGroupEntry + replayable_point_scn = scns[0]; + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + + // 迭代第五条LogGroupEntry的第一条LogEntry + EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + // 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_ + EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), + buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_)); + + // 成功迭代第五条LogGroupEntry的第一条LogEntry + replayable_point_scn = scns[1]; + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + // group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放 + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + // 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn + EXPECT_EQ(next_min_scn, replayable_point_scn); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]); + EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + // 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_ + EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), + buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_)); + + // buff_iterator的游标到了第五条group_entry的第一条小日志 + // grou_iterator的游标到了第五条group_entry开头 + // sncs[0] 第四条group entry,scns[1] - scns[9]是第二条 + // 第五条group entry的第五条小日志被flashback + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[4])); + EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[4]); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader)); + // 提交一条group_entry + // 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append + // 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append + EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn())); + + // 对于buff_iterator + // lsns[2]为第二条小日志开头,即第一条小日志末尾 + // 验证游标起始位置为第一条小日志头部 + // next 返回iterate是否清空cache + // 迭代raw_write写入的小日志 + // 迭代append写入的小日志 + PALF_LOG(INFO, "rulin trace 1", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator)); + EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1)); + EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_); + // 迭代第二条日志 + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); + // 迭代第三条日志 + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); + // 迭代第四条日志 + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); + + // 迭代第五条日志(迭代新的GroupENtry, 非受控回放) + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn())); + EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn())); + + // 对于group_iterator + // 验证游标起始位置为raw_write日志开头 + // next 返回iterate是否清空cache + // 迭代raw_write写入的大日志 + // 迭代append写入的大日志 + PALF_LOG(INFO, "rulin trace 2", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator)); + EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader)); + EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1)); + + // 迭代raw_write日志 + EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn())); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn())); + EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::max_scn())); + } + + // 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前没有LogEntry + // LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志) + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); + { + const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1); + PalfHandleImplGuard new_raw_write_leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader)); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader)); + + EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader)); + + PalfBufferIterator buff_iterator; + PalfGroupBufferIterator group_iterator; + + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator)); + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator)); + + // 成功迭代第一条日志,第二条日志,第三条日志 + SCN replayable_point_scn(last_scn); + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); + + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); + + // 第四条日志一定是LogGroupEntry + replayable_point_scn = scns[0]; + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + + // 迭代第五条LogGroupEntry的第一条LogEntry + EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + // 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_ + EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), + buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_)); + + // 成功迭代第五条LogGroupEntry的第一条LogEntry + replayable_point_scn = scns[1]; + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); + EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + // group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放 + EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + // 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn + EXPECT_EQ(next_min_scn, replayable_point_scn); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]); + EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); + + // 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_ + EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( + group_iterator.iterator_impl_.curr_read_pos_), + buff_iterator.iterator_impl_.log_storage_->get_lsn( + buff_iterator.iterator_impl_.curr_read_pos_)); + + // 迭代日志发现需要受控回放 + EXPECT_EQ(OB_ITER_END, buff_iterator.next(scns[1], next_min_scn, iterate_end_by_replayable_point)); + + // buff_iterator的游标到了第五条group_entry的第一条小日志末尾 + // grou_iterator的游标到了第五条group_entry开头 + // sncs[0] 第四条group entry,scns[1] - scns[9]是第二条 + // 第五条group entry的第二条小日志被flashback + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[2])); + EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[2]); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader)); + // 提交一条group_entry + // 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append + // 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append + EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn())); + + // 对于buff_iterator + // lsns[2]为第二条小日志开头,即第一条小日志末尾 + // 验证游标起始位置为第一条小日志头部 + // next 返回iterate是否清空cache + // 迭代raw_write写入的小日志 + // 迭代append写入的小日志 + PALF_LOG(INFO, "rulin trace 3", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator)); + EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[2]); + EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1)); + EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_); + // 迭代第二条小日志 + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); + // 迭代新写入的LogGroupEntry, 不需要受控回放 + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn())); + + EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn())); + + // 对于group_iterator + // 验证游标起始位置为raw_write日志开头 + // next 返回iterate是否清空cache + // 迭代raw_write写入的大日志 + // 迭代append写入的大日志 + PALF_LOG(INFO, "rulin trace 4", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator)); + EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader)); + EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1)); + + // 迭代raw_write日志 + EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn())); + // 迭代新的GruopEntry + EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::min_scn())); + EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn())); + } + + // 验证一条LogGroupEntry需要受控回放,buff iterator不能更新accumlate_checksum和curr_read_pos_ + // LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志) + // last_scn scns[0] scns[1]... + { + const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1); + PalfHandleImplGuard new_raw_write_leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader)); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader)); + EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader)); + PalfBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator)); + SCN replayable_point_scn(last_scn); + + EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); + EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); + + EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(iterate_end_by_replayable_point, true); + EXPECT_EQ(next_min_scn, SCN::plus(last_scn, 1)); + + replayable_point_scn = scns[0]; + EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); + // scns[1]对应的日志无法吐出 + EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(next_min_scn, SCN::plus(scns[0], 1)); + EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, scns[0]); + + // flashback到scns[0] + EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[0])); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn())); + + // scns[0]对应的日志为raw write, 被flashback了, iterator停在scns[0]的末尾 + // 迭代新写入的日志成功 + EXPECT_EQ(OB_SUCCESS, iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(OB_ITER_END, iterator.next(SCN::min_scn())); + + } - EXPECT_EQ(OB_ITER_END, iterator.next(max_scn3)); } diff --git a/src/logservice/palf/log_iterator_impl.h b/src/logservice/palf/log_iterator_impl.h index 7657a97d8..6d3f74290 100644 --- a/src/logservice/palf/log_iterator_impl.h +++ b/src/logservice/palf/log_iterator_impl.h @@ -46,6 +46,42 @@ enum class LogEntryType }; // =========== LogEntryType end ============= +enum class IterateEndReason { + DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY = 0, + DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY = 1, + DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA = 2, + DUE_TO_FILE_END_LSN_READ_NEW_DATA = 3, + MAX_TYPE = 4 +}; +struct IterateEndInfo { + IterateEndInfo() + { + reset(); + } + ~IterateEndInfo() + { + reset(); + } + void reset() + { + reason_ = IterateEndReason::MAX_TYPE; + log_scn_.reset(); + } + bool is_valid() const + { + return IterateEndReason::MAX_TYPE != reason_ + && log_scn_.is_valid(); + } + bool is_iterate_end_by_replayable_point_scn() const + { + return IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY == reason_ + || IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY == reason_; + } + IterateEndReason reason_; + SCN log_scn_; + TO_STRING_KV(K_(reason), K_(log_scn)); +}; + // LogIteratorImpl provide the ability to iterate all log entry. template class LogIteratorImpl @@ -120,7 +156,8 @@ private: // OB_ITER_END // OB_ERR_OUT_LOWER_BOUND // OB_NEED_RETRY: means the data has been truncate concurrently - int get_next_entry_(); + int get_next_entry_(const SCN &replayable_point_scn, + IterateEndInfo &info); // According to LogEntryType, deserialize different log entry // The log format @@ -130,38 +167,79 @@ private: // @retval // OB_SUCCESS. // OB_BUF_NOT_ENOUGH. - // OB_INVALID_DATA, means log entry is not integrity, need check this - // log entry whether is the last one. - int parse_one_entry_(); + // OB_INVALID_DATA + // -- means log entry is not integrity, need check this log entry whether is the last one. + // OB_CHECKSUM_ERROR + // -- means accumlate checksum is not matched. + // OB_ITER_END + // -- means log entry is iterated end by replayable_point_scn + int parse_one_entry_(const SCN &replayable_point_scn, + IterateEndInfo &info); - template < - class ACTUAL_ENTRY> - int parse_one_specific_entry_(ACTUAL_ENTRY &actual_entry) + int parse_meta_entry_() { int ret = OB_SUCCESS; - const bool matched_type = std::is_same::value; + const bool matched_type = std::is_same::value; int64_t pos = curr_read_pos_; if (true == matched_type) { if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) { - // When curr_entry_ is LogGroupEntry, need check accumlate checksum - // and check whether it's raw write - } else if (OB_FAIL(handle_each_log_group_entry_(curr_entry_))) { - PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this)); } - } else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { - PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this)); - } else if (OB_FAIL(handle_each_log_group_entry_(actual_entry))) { - PALF_LOG(ERROR, "handle_each_log_group_entry_ failed", KPC(this), K(actual_entry)); } else { - ret = OB_EAGAIN; - advance_read_lsn_(actual_entry.get_payload_offset()); - PALF_LOG(TRACE, "advance_read_lsn_ payload offset", K(ret), KPC(this), K(actual_entry), "payload offset", - actual_entry.get_payload_offset()); + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "parse LogMetaEntry failed, unexpected error", K(ret), KPC(this)); } return ret; } - int parse_log_block_header_(); + int parse_log_entry_(const SCN &replayable_point_scn, + IterateEndInfo &info) + { + int ret = OB_SUCCESS; + const bool matched_type = std::is_same::value; + int64_t pos = curr_read_pos_; + if (true == matched_type) { + if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) { + } else if (curr_entry_is_raw_write_ && curr_entry_.get_scn() > replayable_point_scn) { + ret = OB_ITER_END; + info.log_scn_ = curr_entry_.get_scn(); + info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY; + PALF_LOG(TRACE, "iterate end by replayable_point", KPC(this), K(replayable_point_scn), K(info)); + } + } else { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "parse LogEntry failed, unexpected error", KPC(this), K(replayable_point_scn), K(info)); + } + return ret; + } + + // When entry in file is LogGroupEntry, handle it specifically. + // 1. for raw write LogGroupEntry, if it's controlled by replayable_point_scn, no need update + // 'curr_read_pos_', 'accum_checksum_', because this log may be flashbacked. + // 2. for append LogGroupEntry, handle it normally. + int parse_log_group_entry_(const SCN &replayable_point_scn, + IterateEndInfo &info) + { + int ret = OB_SUCCESS; + const bool matched_type = std::is_same::value; + LogGroupEntry actual_entry; + int64_t pos = curr_read_pos_; + if (true == matched_type) { + if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) { + } else if (OB_FAIL(handle_each_log_group_entry_(curr_entry_, replayable_point_scn, info))) { + PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this), K(info), K(replayable_point_scn)); + } + } else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { + PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this)); + } else if (OB_FAIL(handle_each_log_group_entry_(actual_entry, replayable_point_scn, info))) { + PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this), K(actual_entry), K(info), K(replayable_point_scn)); + } else { + ret = OB_EAGAIN; + advance_read_lsn_(actual_entry.get_payload_offset()); + PALF_LOG(TRACE, "advance_read_lsn_ payload offset", K(ret), KPC(this), K(actual_entry), "payload offset", + actual_entry.get_payload_offset(), K(info), K(replayable_point_scn)); + } + return ret; + } int get_log_entry_type_(LogEntryType &log_entry_type); @@ -175,29 +253,84 @@ private: void try_clean_up_cache_(); template - int handle_each_log_group_entry_(const T&entry) + // when T is not LogGroupEntry, no need do anything + int handle_each_log_group_entry_(const T&entry, + const SCN &replayable_point_scn, + IterateEndInfo &info) { PALF_LOG(TRACE, "T is not LogGroupEntry, do no thing", K(entry)); return OB_SUCCESS; } template <> - int handle_each_log_group_entry_(const LogGroupEntry&entry) + // When T is LogGroupEntry, need do: + // 1. check accumlate checksum: + // - if accumlate checksum is not match, return OB_CHECKSUM_ERROR + // - if data checksum is not match, return OB_INVALID_DATA + // 2. check this entry whether need control by 'replayable_point_scn': + // - if control by 'replayable_point_scn', return OB_ITER_END, and don't modify + // several fields in LogIteratorImpl('curr_read_pos_', 'curr_entry_is_raw_write_') + // - if not control by 'replayable_point_scn', return OB_SUCCESS. + int handle_each_log_group_entry_(const LogGroupEntry&entry, + const SCN &replayable_point_scn, + IterateEndInfo &info) { int ret = OB_SUCCESS; - PALF_LOG(TRACE, "T is LogGroupEntry, do no thing", K(entry)); - // if (OB_FAIL(verify_accum_checksum_(entry))) { - // PALF_LOG(ERROR, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry)); - // } else { + bool curr_entry_is_raw_write = entry.get_header().is_raw_write(); + int64_t new_accumlate_checksum = -1; + PALF_LOG(TRACE, "T is LogGroupEntry", K(entry)); + if (OB_FAIL(verify_accum_checksum_(entry, new_accumlate_checksum))) { + PALF_LOG(WARN, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry)); + // NB: when current entry is raw write, and the log scn of current entry is greater than + // replayable_point_scn, this log may be clean up, therefore we can not update several fields of + // LogIteratorImpl, return OB_ITER_END directlly, otherwise, we may not parse new LogGroupEntryHeader + // after flashback position, this will cause one log which is append, but control by replable_point_scn. + // + // NB: we need check the min scn of LogGroupEntry whether has been greater than + // replayable_point_scn: + // - if LogGroupEntry has two LogEntry, the scn of them are 10, 15 respectively, + // replayable_point_scn is 12. in this case, we can read first LogEntry, and + // we can update several fields like 'curr_entry_is_raw_write_', 'accum_checksum_' + // and the others('curr_read_pos_'...). when the flashback scn is 12, the LogEntry + // after 10 will be truncated, the new LogGroupEntry will be generated, meanwhile, + // we will advanced 'curr_read_pos_' to the end of first LogEntry and read new LogGroupEntry + // correctly. + // - if LogGroupEntry has one LogEntry, the scn of it is 13, the several fields are + // not been updated because of it's controlled by replayable_point_scn, when the flashback + // scn is 12, we don't need rollback these fields. + // + // NB: for PalfGroupBufferIterator, we should use max scn to control replay and use min scn + // as the log_scn_ of info. consider that, replayable_point_scn is 12, the min scn of group log + // is 7 and max scn of group 15, we should not return this log. meanwhile, we should use + // scn 7 to update next_min_scn. + } else if (true == curr_entry_is_raw_write) { + SCN min_scn; + bool is_group_iterator = std::is_same::value; + if (OB_FAIL(entry.get_log_min_scn(min_scn))) { + PALF_LOG(ERROR, "get_log_min_scn failed", K(ret), KPC(this), K(min_scn), + K(entry), K(replayable_point_scn)); + } else if ((is_group_iterator && entry.get_scn() > replayable_point_scn) + || (!is_group_iterator && min_scn > replayable_point_scn)) { + info.log_scn_ = min_scn; + info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY; + ret = OB_ITER_END; + PALF_LOG(TRACE, "iterate end by replayable_point", K(ret), KPC(this), K(min_scn), + K(entry), K(replayable_point_scn), K(info), K(is_group_iterator)); + } else { + } + } + if (OB_SUCC(ret)) { curr_entry_is_raw_write_ = entry.get_header().is_raw_write(); - // } - return ret; + accumlate_checksum_ = new_accumlate_checksum; + } + return ret; } // @brief: accumlate checksum verify, only verify checkum when accum_checksum_ is not -1. // ret val: // OB_SUCCESS // OB_CHECKSUM_ERROR - int verify_accum_checksum_(const LogGroupEntry &entry); + int verify_accum_checksum_(const LogGroupEntry &entry, + int64_t &new_accumlate_checksum); private: static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2; @@ -278,9 +411,12 @@ int LogIteratorImpl::init(const GetModeVersion &get_mode_version, curr_read_buf_end_pos_ = 0; next_round_pread_size_ = MAX_LOG_BUFFER_SIZE; log_storage_ = log_storage; + curr_entry_.reset(); + curr_entry_is_raw_write_ = false; curr_entry_size_ = 0; init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; get_mode_version_ = get_mode_version; + prev_entry_scn_.reset(); accumlate_checksum_ = -1; is_inited_ = true; PALF_LOG(TRACE, "LogIteratorImpl init success", K(ret), KPC(this)); @@ -295,9 +431,11 @@ void LogIteratorImpl::reuse() curr_read_buf_start_pos_ = 0; curr_read_buf_end_pos_ = 0; next_round_pread_size_ = MAX_LOG_BUFFER_SIZE; + curr_entry_.reset(); + curr_entry_is_raw_write_ = false; curr_entry_size_ = 0; - prev_entry_scn_.reset(); init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; + prev_entry_scn_.reset(); accumlate_checksum_ = -1; } @@ -306,15 +444,19 @@ void LogIteratorImpl::destroy() { if (IS_INIT) { is_inited_ = false; + accumlate_checksum_ = -1; prev_entry_scn_.reset(); + init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; curr_entry_size_ = 0; + curr_entry_is_raw_write_ = false; + curr_entry_.reset(); log_storage_ = NULL; next_round_pread_size_ = 0; + buf_ = NULL; curr_read_buf_end_pos_ = 0; curr_read_buf_start_pos_ = 0; curr_read_pos_ = 0; init_mode_version_ = 0; - accumlate_checksum_ = -1; } } @@ -337,14 +479,17 @@ LSN LogIteratorImpl::get_curr_read_lsn() const // last entry. // NB: for restarting, the committed offset of sliding window is invalid. template -int LogIteratorImpl::get_next_entry_() +int LogIteratorImpl::get_next_entry_(const SCN &replayable_point_scn, + IterateEndInfo &info) { int ret = OB_SUCCESS; // NB: check need read next enty // Assume that read size must greater or equal than 1 in each round. if (true == log_storage_->check_iterate_end(curr_read_pos_ + 1)) { ret = OB_ITER_END; - PALF_LOG(TRACE, "get_next_entry_ iterate end, not read new data", K(ret), KPC(this)); + info.reason_ = IterateEndReason::DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA; + info.log_scn_.reset(); + PALF_LOG(TRACE, "get_next_entry_ iterate end, not read new data", K(ret), KPC(this), K(info)); } else { // In truncate log case, the 'read_buf_' in LogIteratorStorage will not has // integrity data, to avoid read data in dead loop, return OB_NEED_RETRY. @@ -362,14 +507,16 @@ int LogIteratorImpl::get_next_entry_() int read_times = 0; do { int64_t header_size = 0; - if (OB_SUCC(parse_one_entry_())) { + if (OB_SUCC(parse_one_entry_(replayable_point_scn, info))) { curr_entry_size_ = curr_entry_.get_serialize_size(); } else if (OB_BUF_NOT_ENOUGH == ret) { if (OB_FAIL(read_data_from_storage_()) && OB_ITER_END != ret && OB_ERR_OUT_OF_LOWER_BOUND != ret) { PALF_LOG(WARN, "read_data_from_storage_ failed", K(ret), KPC(this)); } else if (OB_ITER_END == ret) { - PALF_LOG(WARN, "has iterate to end of block", K(ret), KPC(this)); + info.reason_ = IterateEndReason::DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA; + info.log_scn_.reset(); + PALF_LOG(WARN, "has iterate to end of block", K(ret), KPC(this), K(info)); } else if (OB_ERR_OUT_OF_LOWER_BOUND == ret) { PALF_LOG(WARN, "the block may be unlinked", K(ret), KPC(this)); } else { @@ -386,11 +533,13 @@ int LogIteratorImpl::get_next_entry_() } } while (OB_EAGAIN == ret); - // NB: check curr entry can be readable + // NB: check curr entry can be readable by file end lsn. if (OB_SUCC(ret) && true == log_storage_->check_iterate_end(curr_read_pos_ + curr_entry_size_)) { ret = OB_ITER_END; - PALF_LOG(TRACE, "get_next_entry_ iterate end, read new data", K(ret), KPC(this)); + info.reason_ = IterateEndReason::DUE_TO_FILE_END_LSN_READ_NEW_DATA; + info.log_scn_ = curr_entry_.get_scn(); + PALF_LOG(WARN, "get_next_entry_ iterate end, read new data", K(ret), KPC(this), K(info), K(replayable_point_scn)); } } return ret; @@ -414,25 +563,23 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, advance_read_lsn_(curr_entry_size_); curr_entry_size_ = 0; iterate_end_by_replayable_point = false; + IterateEndInfo info; - // NB: when return OB_ITER_END, we need try to clean up cache, and we should not clean up cache only when + // NB: when return OB_ITER_END, we need try to clean up cache, and we should clean up cache only when // the log ts of curr entry is greater than 'replayable_point_scn', otherwise, we would return some logs // which has been flasback, consider following case: - // 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option. + // 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option.(no any bad effect) // 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashbacked, and - // return OB_ITER_END because of 'file end lsn'. + // return OB_ITER_END because of 'file end lsn'.(no any bad effect) // 3. T3, 'replayable_point_scn' has been advanced to 16, and write several logs on disk, however, the cache - // of iterator has not been clean up, the old logs will be returned. - // - // 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option. - // 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashbacked, and - // has append new logs. - // 3. T3, 'replayable_point_scn' has been advanced to 16, however, the cache of iterator has not been clean up, - // the old logs will be returned. + // of iterator has not been clean up, the old logs will be returned.(bad effect) // // Therefore, we should try_clean_up_cache_ in the beginning of each round of next. (void) try_clean_up_cache_(); - if (OB_FAIL(get_next_entry_())) { + if (!replayable_point_scn.is_valid()) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "invalid argumetn", K(replayable_point_scn), KPC(this)); + } else if (OB_FAIL(get_next_entry_(replayable_point_scn, info))) { // NB: if the data which has been corrupted, clean cache. // NB: if the accum_checksum_ is not match, return OB_CHECKSUM_ERROR. if (OB_INVALID_DATA == ret) { @@ -444,68 +591,74 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, if (OB_ITER_END != ret) { PALF_LOG(WARN, "get_next_entry_ failed", K(ret), KPC(this)); } - } else { - // NB: when current entry is raw write, and the log scn of current entry is greater than replayable_point_scn - // clean up cache when mode version has been changed. - if (true == curr_entry_is_raw_write_ && curr_entry_.get_scn() > replayable_point_scn) { - ret = OB_ITER_END; - iterate_end_by_replayable_point = true; - } + PALF_LOG(TRACE, "get_next_entry_ failed", K(ret), KPC(this), K(info), K(replayable_point_scn)); + iterate_end_by_replayable_point = info.is_iterate_end_by_replayable_point_scn(); } // If 'curr_entry_' can be iterate at this round, set 'prev_entry_scn_' to the log ts of 'curr_entry_'. if (OB_SUCC(ret)) { prev_entry_scn_ = curr_entry_.get_scn(); } + // if 'curr_entry_' is not readable, we should set 'next_min_scn' to the scn of 'prev_entry_' + 1, + // and if 'curr_entry_' is not readable due to replayable_point_scn, we should set 'next_min_scn' + // to replayable_point_scn + 1 + // // In case of 'next' return OB_ITER_END, we should set 'next_min_scn' which is the out parameter of 'next' to: // - // 1. if 'prev_entry_scn_' is OB_INVALID_TIMESTAMP and 'curr_entry_' is not valid, means that there is no log before - // 'file end lsn', we should set 'next_min_scn' to OB_INVALID_TIMESTAMP - // - // 2. if 'curr_entry_' is valid but not readable, means that there is no readable log before 'file end lsn' or replayable_point_scn, - // we should set next_min_scn to std::min(replayable_point_scn, the log ts of 'curr_entry_'), however, replayable_point_scn may - // be smaller than 'prev_entry_scn_'(for example, the log entry correspond to'prev_entry_scn_' was writen by APPEND, its' scn - // may be greater than replayable_point_scn), we shoud set next_min_scn to std::max(std::min(replayable_point_scn + 1, the log ts - // of 'curr_entry_'), 'prev_entry_scn_' + 1). - // - // 3. if 'curr_entry_' is not valid and 'prev_entry_scn_' is not OB_INVALID_TIMESTAMP, means there is no log after 'file end lsn' - // or 'replayable_point_scn', we should set 'next_min_scn' to prev_entry_scn_ + 1; + // 1. if 'curr_entry_' iterate end by replayable point scn: + // we should set next_min_scn to std::min(replayable_point_scn+1, the log scn of 'curr_entry_'), + // however, replayable_point_scn may be smaller than 'prev_entry_scn_'(for example, the log + // entry correspond to'prev_entry_scn_' was writen by APPEND, its' scn may be greater than + // replayable_point_scn), we shoud set next_min_scn to std::max( + // std::min(replayable_point_scn + 1, the log scn of 'curr_entry_'), 'prev_entry_scn_' + 1). // + // 2. oterwise, iterate end by file end lsn: + // - case 1: if 'curr_entry_' has been parsed from LogIteratorStorage, however, it's not readable + // due to file end lsn(consider that someone set the file end lsn to the middle of one + // LogGroupEntry), we should set next_min_scn to the max of min(curr_entry_'s scn, + // replayable_point_scn + 1) and prev_entry_scn_. + // - case 2: if 'curr_entry_' has not been parsed from LogIteratorStorage, we just se next_min_scn + // to prev_entry_scn_ + 1. if (OB_ITER_END == ret) { - if (0 == curr_entry_size_ && !prev_entry_scn_.is_valid()) { + if (!info.is_valid() && !prev_entry_scn_.is_valid()) { next_min_scn.reset(); - PALF_LOG(WARN, "there is no readable log, set next_min_scn to OB_INVALID_TIMESTAMP", K(ret), KPC(this)); - } else if (0 != curr_entry_size_) { - if (!curr_entry_.is_valid()) { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "unexpected error, curr_entry_size_ is not zero but curr_entry_ is invalid", K(ret), KPC(this)); - } else { - next_min_scn = MIN( - (replayable_point_scn.is_valid() ? SCN::plus(replayable_point_scn, 1) : SCN::min_scn()), - curr_entry_.get_scn()); - } + PALF_LOG(WARN, "there is no readable log, set next_min_scn to OB_INVALID_TIMESTAMP", + K(ret), KPC(this), K(info), K(replayable_point_scn)); + } else if (iterate_end_by_replayable_point || IterateEndReason::DUE_TO_FILE_END_LSN_READ_NEW_DATA == info.reason_) { + // when there is no log between [replayable_point_scn, 'curr_entry'), we can advance next_log_min_scn. + next_min_scn = MIN( + (replayable_point_scn.is_valid() ? SCN::plus(replayable_point_scn, 1) : SCN::max_scn()), + info.log_scn_); + PALF_LOG(TRACE, "update next_min_scn to min of replayable_point_scn and log_group_entry_min_scn_", KPC(this), K(info)); + next_min_scn = MAX( (prev_entry_scn_.is_valid() ? SCN::plus(prev_entry_scn_, 1) : SCN::min_scn()), next_min_scn); - // NB: we need update 'prev_entry_scn_' to the max of 'replayable_point_scn' and 'prev_entry_scn_' - // when iterate end by replayable point, otherwise, 'next_min_scn' may be smaller than previous - // result, consider following case: - // 1. T1, next return OB_ITER_END because of replayable point, the log ts of curr_entry_ is 10, - // 'prev_entry_scn_' is 5, 'replayable_point_scn' is 9, next_min_scn would be set to 10 because - // of 'curr_entry_' is valid. - // 2. T2, several logs after 9(log_ts) has been truncate, next return OB_ITER_END because of - // 'file end lsn', 'curr_entry_' is invalid, 'replayable_point_scn' is 9, 'prev_entry_scn_' - // is 5, 'next_min_scn' would be set to 5. - if (replayable_point_scn < curr_entry_.get_scn()) { + PALF_LOG(TRACE, "update next_min_scn to max of prev_entry_scn_ and next_min_scn", + KPC(this), K(info), K(replayable_point_scn)); + // NB: To make 'next_min_scn' newly, advance 'prev_entry_scn_' when 'curr_entry_' need control by readable point scn. + // consider follower case: + // T1, iterate an entry successfully, and make prev_entry_scn to 5; + // T2, iterate control by readable point scn, scn of 'curr_entry_' is 10, and readable point scn is 8. + // T3, the logs after 8 have been flashabcked, and no log has been writen. + // if we don't advance 'prev_entry_scn_' to 8, the continuous point of replay service can not update + // to 8. + if (info.log_scn_ > replayable_point_scn) { prev_entry_scn_ = MAX(replayable_point_scn, prev_entry_scn_); + PALF_LOG(TRACE, "update prev_entry_scn_ to replayable_point_scn", KPC(this), K(info), K(replayable_point_scn)); } + // NB: if has not read any log, we should set next_min_scn to invalid. + } else if (IterateEndReason::DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA == info.reason_) { + next_min_scn = prev_entry_scn_.is_valid() ? SCN::plus(prev_entry_scn_, 1) : SCN::min_scn(); + PALF_LOG(TRACE, "update next_min_scn to prev_entry_scn_ + 1", KPC(this), K(info), K(replayable_point_scn)); } else { - // prev_entry_scn_ must be invalid - next_min_scn = SCN::plus(prev_entry_scn_, 1); + } } if (OB_FAIL(ret)) { curr_entry_size_ = 0; + // To debug easily, don't reset 'curr_entry_' + // curr_entry_.reset(); } return ret; } @@ -528,7 +681,8 @@ int LogIteratorImpl::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write } template -int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry) +int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry, + int64_t &new_accumlate_checksum) { int ret = OB_SUCCESS; int64_t data_checksum = -1; @@ -537,12 +691,13 @@ int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry) ret = OB_INVALID_DATA; PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry)); } else if (-1 == accumlate_checksum_) { - accumlate_checksum_ = expected_verify_checksum; - PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this)); + new_accumlate_checksum = expected_verify_checksum; + PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this), + K(new_accumlate_checksum)); } else if (OB_FAIL(LogChecksum::verify_accum_checksum( accumlate_checksum_, data_checksum, - expected_verify_checksum, accumlate_checksum_))) { - PALF_LOG(ERROR, "verify checksum failed", K(ret), KPC(this), K(entry)); + expected_verify_checksum, new_accumlate_checksum))) { + PALF_LOG(ERROR, "verify accumlate checksum failed", K(ret), KPC(this), K(entry)); } else { PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry)); } @@ -555,7 +710,8 @@ int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry) // entry type is not 'wanted_log_entry_type', skip the header of this entry. NB: for // LOG_PADDING, the header size include data_len. template -int LogIteratorImpl::parse_one_entry_() +int LogIteratorImpl::parse_one_entry_(const SCN &replayable_point_scn, + IterateEndInfo &info) { int ret = OB_SUCCESS; const int MAGIC_NUMBER_SIZE = sizeof(int16_t); @@ -567,26 +723,17 @@ int LogIteratorImpl::parse_one_entry_() switch (actual_log_entry_type) { case LogEntryType::GROUP_ENTRY_HEADER: { - LogGroupEntry entry; - ret = parse_one_specific_entry_(entry); + ret = parse_log_group_entry_(replayable_point_scn, info); break; } case LogEntryType::LOG_ENTRY_HEADER: { - LogEntry entry; - ret = parse_one_specific_entry_(entry); + ret = parse_log_entry_(replayable_point_scn, info); break; } case LogEntryType::LOG_META_ENTRY_HEADER: { - LogMetaEntry entry; - ret = parse_one_specific_entry_(entry); - break; - } - case LogEntryType::LOG_INFO_BLOCK_HEADER: - { - ret = parse_log_block_header_(); - if (OB_SUCC(ret)) ret = OB_EAGAIN; + ret = parse_meta_entry_(); break; } default: @@ -599,25 +746,6 @@ int LogIteratorImpl::parse_one_entry_() return ret; } -template -int LogIteratorImpl::parse_log_block_header_() -{ - int ret = OB_SUCCESS; - int64_t pos = curr_read_pos_; - LogBlockHeader actual_entry; - if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { - PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this)); - } else { - ret = OB_EAGAIN; - advance_read_lsn_(MAX_INFO_BLOCK_SIZE); - PALF_LOG(INFO, "parse_log_block_header_ success", K(ret), KPC(this)); - } - if (OB_BUF_NOT_ENOUGH == ret) { - OB_ASSERT(true); - } - return ret; -} - template void LogIteratorImpl::advance_read_lsn_(const offset_t step) { @@ -757,12 +885,30 @@ void LogIteratorImpl::try_clean_up_cache_() PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "current_mode_version is unexpected", K(current_mode_version), KPC(this)); } else if (init_mode_version_ < current_mode_version) { PALF_LOG_RET(WARN, OB_SUCCESS, "mode version has been changed, need reset cache buf", KPC(this), K(current_mode_version)); - init_mode_version_ = current_mode_version; LSN curr_read_lsn = log_storage_->get_lsn(curr_read_pos_); + // reuse LogIteratorStorage firstly, only the log before 'start_lsn_' + 'curr_read_pos_' + // has been consumed. + // NB: need ensure that we can not update 'curr_read_pos_' to 'curr_read_pos_' + sizeof(LogGroupEntryHeader) + // when the LogGroupEntryHeader after 'curr_read_pos_' need be controlled by replable_point_scn. log_storage_->reuse(curr_read_lsn); curr_read_buf_start_pos_ = 0; curr_read_pos_ = 0; curr_read_buf_end_pos_ = 0; + curr_entry_.reset(); + + // NB: we can not reset curr_entry_is_raw_write_, otherwise, the log entry after replayable_point_scn may no be + // controlled by replable_point_scn. + // curr_entry_is_raw_write_ = false; + curr_entry_size_ = 0; + init_mode_version_ = current_mode_version; + + // we can not reset prev_entry_scn_, otherwise, after flashback, if there is no logs which can be readbale on disk, + // we can not return a valid next_min_scn. + // - prev_entry_.reset(); + + // we need reset accum_checksum_, otherwise, the accum_checksum_ is the log before flashback, and iterate new + // group log will fail. + accumlate_checksum_ = -1; } } } // end namespace palf