Files
oceanbase/mittest/palf_cluster/palf-cluster.diff
2023-09-27 08:43:51 +00:00

365 lines
18 KiB
Diff

diff --git a/build.sh b/build.sh
--- a/build.sh
+++ b/build.sh
@@ -188,7 +188,7 @@ function build
set -- "${BUILD_ARGS[@]}"
case "x$1" in
xrelease)
- do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_LLD=$LLD_OPTION
+ do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_AUTO_FDO=ON -DOB_USE_LLD=$LLD_OPTION
;;
xrelease_no_unity)
do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_LLD=$LLD_OPTION -DOB_ENABLE_UNITY=OFF
diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h
--- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h
+++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h
@@ -968,6 +968,9 @@ PCODE_DEF(OB_LOG_FORCE_CLEAR_ARB_CLUSTER_INFO, 0x1521)
PCODE_DEF(OB_LOG_GET_ARB_MEMBER_INFO, 0x1522)
#endif
PCODE_DEF(OB_LOG_BATCH_FETCH_RESP, 0X1523)
+PCODE_DEF(OB_LOG_CREATE_REPLICA_CMD, 0x1524)
+PCODE_DEF(OB_LOG_SUBMIT_LOG_CMD, 0x1525)
+PCODE_DEF(OB_LOG_SUBMIT_LOG_CMD_RESP, 0x1526)
// 1531-1550 for obesi
// PCODE_DEF(OB_ESI_IS_EXIST, 0x1531)
diff --git a/src/logservice/applyservice/ob_log_apply_service.cpp b/src/logservice/applyservice/ob_log_apply_service.cpp
--- a/src/logservice/applyservice/ob_log_apply_service.cpp
+++ b/src/logservice/applyservice/ob_log_apply_service.cpp
@@ -28,6 +28,9 @@ using namespace storage;
using namespace share;
namespace logservice
{
+int64_t p_append_cnt = 0;
+int64_t p_log_body_size = 0;
+int64_t p_rt = 0;
//---------------ObApplyFsCb---------------//
ObApplyFsCb::ObApplyFsCb()
: apply_status_(NULL)
@@ -482,6 +485,26 @@ int ObApplyStatus::try_handle_cb_queue(ObApplyServiceQueueTask *cb_queue,
scn = cb->__get_scn();
get_cb_trace_(cb, append_start_time, append_finish_time, cb_first_handle_time, cb_start_time);
CLOG_LOG(TRACE, "cb on_success", K(lsn), K(scn), KP(link->next_), KPC(cb_queue), KPC(this));
+
+ ATOMIC_INC(&p_append_cnt);
+ ATOMIC_FAA(&p_log_body_size, cb->get_log_size());
+ int64_t tmp_rt = ObTimeUtility::current_time() - cb->get_append_start_ts();
+ ATOMIC_FAA(&p_rt, tmp_rt);
+
+ if (REACH_TIME_INTERVAL(1000 *1000)) {
+ int64_t l_append_cnt = ATOMIC_LOAD(&p_append_cnt);
+ if (l_append_cnt == 0) l_append_cnt = 1;
+ int64_t l_log_body_size = ATOMIC_LOAD(&p_log_body_size);
+ int64_t l_rt = ATOMIC_LOAD(&p_rt);
+
+ ATOMIC_STORE(&p_append_cnt, 0);
+ ATOMIC_STORE(&p_log_body_size, 0);
+ ATOMIC_STORE(&p_rt, 0);
+
+ CLOG_LOG(ERROR, "result:", K(l_append_cnt), K(l_log_body_size), K(l_rt),
+ "avg_body_size", l_log_body_size/l_append_cnt, "avg_rt", l_rt/l_append_cnt);
+ }
+
if (OB_FAIL(cb->on_success())) {
// 不处理此类失败情况
CLOG_LOG(ERROR, "cb on_success failed", KP(cb), K(ret), KPC(this));
@@ -1044,7 +1067,7 @@ int ObLogApplyService::init(PalfEnv *palf_env,
CLOG_LOG(WARN, "invalid argument", K(ret), KP(palf_env), K(ls_adapter));
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::ApplyService, tg_id_))) {
CLOG_LOG(WARN, "fail to create thread group", K(ret));
- } else if (OB_FAIL(MTL_REGISTER_THREAD_DYNAMIC(0.5, tg_id_))) {
+ } else if (OB_FAIL(MTL_REGISTER_THREAD_DYNAMIC(1, tg_id_))) {
CLOG_LOG(WARN, "MTL_REGISTER_THREAD_DYNAMIC failed", K(ret), K(tg_id_));
} else if (OB_FAIL(apply_status_map_.init("APPLY_STATUS", MAP_TENANT_ID))) {
CLOG_LOG(WARN, "apply_status_map_ init error", K(ret));
diff --git a/src/logservice/ob_append_callback.h b/src/logservice/ob_append_callback.h
--- a/src/logservice/ob_append_callback.h
+++ b/src/logservice/ob_append_callback.h
@@ -60,6 +60,7 @@ public:
append_start_ts_ = OB_INVALID_TIMESTAMP;
append_finish_ts_ = OB_INVALID_TIMESTAMP;
cb_first_handle_ts_ = OB_INVALID_TIMESTAMP;
+ log_size_ = 0;
}
virtual int on_success() = 0;
virtual int on_failure() = 0;
@@ -69,10 +70,16 @@ public:
int64_t get_append_start_ts() const { return append_start_ts_; }
int64_t get_append_finish_ts() const { return append_finish_ts_; }
int64_t get_cb_first_handle_ts() const { return cb_first_handle_ts_; }
+ void set_log_size(const int64_t size)
+ {
+ log_size_ = size;
+ }
+ int64_t get_log_size() const { return log_size_; }
public:
int64_t append_start_ts_; //提交到palf的起始时刻
int64_t append_finish_ts_; //palf提交完成时刻,即提交到apply service起始时刻
int64_t cb_first_handle_ts_; //cb第一次被处理的时刻,不一定调用on_success
+ int64_t log_size_;
};
} // end namespace logservice
diff --git a/src/logservice/ob_log_handler.cpp b/src/logservice/ob_log_handler.cpp
--- a/src/logservice/ob_log_handler.cpp
+++ b/src/logservice/ob_log_handler.cpp
@@ -237,6 +237,7 @@ int ObLogHandler::append(const void *buffer,
cb->set_append_finish_ts(ObTimeUtility::fast_current_time());
cb->__set_lsn(lsn);
cb->__set_scn(scn);
+ cb->set_log_size(nbytes);
ret = apply_status_->push_append_cb(cb);
CLOG_LOG(TRACE, "palf_handle_ push_append_cb success", K(lsn), K(scn), K(ret), K(id_));
}
diff --git a/src/logservice/palf/log_block_handler.cpp b/src/logservice/palf/log_block_handler.cpp
--- a/src/logservice/palf/log_block_handler.cpp
+++ b/src/logservice/palf/log_block_handler.cpp
@@ -437,12 +437,26 @@ int LogBlockHandler::inner_writev_once_(const offset_t offset,
return ret;
}
+int64_t p_io_cnt = 0;
+int64_t p_log_size = 0;
+
int LogBlockHandler::inner_write_impl_(const int fd, const char *buf, const int64_t count, const int64_t offset)
{
int ret = OB_SUCCESS;
int64_t start_ts = ObTimeUtility::fast_current_time();
int64_t write_size = 0;
int64_t time_interval = OB_INVALID_TIMESTAMP;
+ ATOMIC_INC(&p_io_cnt);
+ ATOMIC_FAA(&p_log_size, count);
+ if (REACH_TIME_INTERVAL(1000 * 1000)) {
+ int64_t l_io_cnt = ATOMIC_LOAD(&p_io_cnt);
+ int64_t l_log_size = ATOMIC_LOAD(&p_log_size);
+
+ ATOMIC_STORE(&p_io_cnt, 0);
+ ATOMIC_STORE(&p_log_size, 0);
+
+ CLOG_LOG(ERROR, "io result:", K(l_io_cnt), K(l_log_size), "avg_size", l_log_size/l_io_cnt);
+ }
do {
if (count != (write_size = ob_pwrite(fd, buf, count, offset))) {
if (palf_reach_time_interval(1000 * 1000, time_interval)) {
diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h
--- a/src/logservice/palf/log_define.h
+++ b/src/logservice/palf/log_define.h
@@ -102,7 +102,7 @@ const int64_t PALF_LEADER_RECONFIRM_SYNC_TIMEOUT_US = 10 * 1000 * 1000L; //
const int64_t PREPARE_LOG_BUFFER_SIZE = 2048;
const int64_t PALF_LEADER_ACTIVE_SYNC_TIMEOUT_US = 10 * 1000 * 1000L; // 10s
const int32_t PALF_MAX_REPLAY_TIMEOUT = 500 * 1000;
-const int32_t DEFAULT_LOG_LOOP_INTERVAL_US = 100 * 1000; // 100ms
+const int32_t DEFAULT_LOG_LOOP_INTERVAL_US = 500; // 100ms
const int32_t LOG_LOOP_INTERVAL_FOR_PERIOD_FREEZE_US = 1 * 1000; // 1ms
const int64_t PALF_SLIDING_WINDOW_SIZE = 1 << 11; // must be 2^n(n>0), default 2^11 = 2048
const int64_t PALF_MAX_LEADER_SUBMIT_LOG_COUNT = PALF_SLIDING_WINDOW_SIZE / 2; // max number of concurrent submitting group log in leader
diff --git a/src/logservice/palf/log_io_task.cpp b/src/logservice/palf/log_io_task.cpp
--- a/src/logservice/palf/log_io_task.cpp
+++ b/src/logservice/palf/log_io_task.cpp
@@ -623,7 +623,8 @@ int BatchLogIOFlushLogTask::push_flush_cb_to_thread_pool_(int tg_id, IPalfEnvImp
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret), KPC(this));
} else {
const int64_t current_time = ObTimeUtility::current_time();
- for (int64_t i = 0; i < count && OB_SUCC(ret); i++) {
+ // for (int64_t i = 0; i < count && OB_SUCC(ret); i++) {
+ int64_t i = count - 1;
LogIOFlushLogTask *io_task = io_task_array_[i];
if (NULL == io_task) {
PALF_LOG(WARN, "io_task is nullptr, may be its' epoch has changed", K(ret), KP(io_task),
@@ -637,6 +638,9 @@ int BatchLogIOFlushLogTask::push_flush_cb_to_thread_pool_(int tg_id, IPalfEnvImp
// again.
io_task_array_[i] = NULL;
}
+ // }
+ for (int64_t i = 0; i < count && OB_SUCC(ret); i++) {
+ io_task_array_[i] = NULL;
}
}
return ret;
diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp
--- a/src/logservice/palf/log_sliding_window.cpp
+++ b/src/logservice/palf/log_sliding_window.cpp
@@ -132,7 +132,8 @@ LogSlidingWindow::LogSlidingWindow()
accum_log_cnt_(0),
accum_group_log_size_(0),
last_record_group_log_id_(FIRST_VALID_LOG_ID - 1),
- freeze_mode_(FEEDBACK_FREEZE_MODE),
+ freeze_mode_(PERIOD_FREEZE_MODE),
+ avg_log_batch_cnt_(0),
is_inited_(false)
{}
@@ -448,6 +449,7 @@ int LogSlidingWindow::submit_log(const char *buf,
const int64_t start_log_id = get_start_id();
const int64_t log_id_upper_bound = start_log_id + PALF_MAX_LEADER_SUBMIT_LOG_COUNT - 1;
LSN tmp_lsn, lsn_upper_bound;
+ bool need_freeze_self = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (NULL == buf || buf_len <= 0 || buf_len > MAX_LOG_BODY_SIZE || (!ref_scn.is_valid())) {
@@ -462,7 +464,7 @@ int LogSlidingWindow::submit_log(const char *buf,
}
// sw_ cannot submit larger log
} else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size, log_id_upper_bound, lsn_upper_bound,
- tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size))) {
+ tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size, need_freeze_self))) {
PALF_LOG(WARN, "alloc_lsn_scn failed", K(ret), K_(palf_id), K_(self));
} else if (OB_FAIL(leader_wait_sw_slot_ready_(log_id))) {
PALF_LOG(WARN, "leader_wait_sw_slot_ready_ failed", K(ret), K_(palf_id), K_(self), K(log_id));
@@ -480,7 +482,7 @@ int LogSlidingWindow::submit_log(const char *buf,
K(padding_size), K(is_new_log), K(valid_log_size));
} else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) {
} else if (OB_FAIL(generate_new_group_log_(tmp_lsn, log_id, scn, padding_entry_body_size, LOG_PADDING, \
- NULL, padding_entry_body_size, is_need_handle))) {
+ NULL, padding_entry_body_size, need_freeze_self, is_need_handle))) {
PALF_LOG(ERROR, "generate_new_group_log_ failed", K(ret), K_(palf_id), K_(self), K(log_id), K(tmp_lsn), K(padding_size),
K(is_new_log), K(valid_log_size));
} else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) {
@@ -503,7 +505,7 @@ int LogSlidingWindow::submit_log(const char *buf,
PALF_LOG(WARN, "try_freeze_prev_log_ failed", K(ret), K_(palf_id), K_(self), K(log_id));
} else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) {
} else if (OB_FAIL(generate_new_group_log_(tmp_lsn, log_id, scn, valid_log_size, LOG_SUBMIT, \
- buf, buf_len, is_need_handle))) {
+ buf, buf_len, need_freeze_self, is_need_handle))) {
PALF_LOG(WARN, "generate_new_group_log_ failed", K(ret), K_(palf_id), K_(self), K(log_id));
} else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) {
} else {
@@ -673,6 +675,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
const LogType &log_type,
const char *log_data,
const int64_t data_len,
+ const bool need_freeze_self,
bool &is_need_handle)
{
int ret = OB_SUCCESS;
@@ -712,6 +715,9 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
} else if (OB_FAIL(log_task->set_initial_header_info(header_info))) {
PALF_LOG(WARN, "set_initial_header_info failed", K(ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
} else {
+ if (need_freeze_self) {
+ log_task->set_end_lsn(lsn + log_body_size + LogGroupEntryHeader::HEADER_SER_SIZE);
+ }
// The first log is responsible to try freezing self, if its end_lsn_ has been set by next log.
log_task->try_freeze_by_myself();
}
@@ -1048,6 +1054,10 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated)
const int64_t avg_group_log_size = total_group_log_size / total_group_log_cnt;
PALF_LOG(INFO, "[PALF STAT GROUP LOG INFO]", K_(palf_id), K_(self), "role", role_to_string(role),
K(total_group_log_cnt), K(avg_log_batch_cnt), K(total_group_log_size), K(avg_group_log_size));
+ if (total_log_cnt > 0) {
+ const int64_t avg_log_size = total_group_log_size / total_log_cnt;
+ avg_log_batch_cnt_ = (avg_log_batch_cnt_ * 2 + avg_log_batch_cnt * 8)/10;
+ }
}
ATOMIC_STORE(&accum_log_cnt_, 0);
ATOMIC_STORE(&accum_group_log_size_, 0);
@@ -1235,19 +1245,34 @@ int LogSlidingWindow::check_and_switch_freeze_mode()
{
int ret = OB_SUCCESS;
int64_t total_append_cnt = 0;
- for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) {
- total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]);
- ATOMIC_STORE(&append_cnt_array_[i], 0);
- }
+ // for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) {
+ // total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]);
+ // ATOMIC_STORE(&append_cnt_array_[i], 0);
+ // }
+
+ // for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) {
+ // total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]);
+ // ATOMIC_STORE(&append_cnt_array_[i], 0);
+ // }
+ // adaptively
+ const int64_t avg_log_batch_cnt = avg_log_batch_cnt_;
+ // periodically
+ // const int64_t avg_log_batch_cnt = 20;
+ // feedback
+ // const int64_t avg_log_batch_cnt = 0;
+ const int64_t SWITCH_BARRIER = 10;
+
if (FEEDBACK_FREEZE_MODE == freeze_mode_) {
- if (total_append_cnt >= APPEND_CNT_LB_FOR_PERIOD_FREEZE) {
+ // if (total_append_cnt >= APPEND_CNT_LB_FOR_PERIOD_FREEZE) {
+ if (avg_log_batch_cnt > SWITCH_BARRIER) {
freeze_mode_ = PERIOD_FREEZE_MODE;
- PALF_LOG(INFO, "switch freeze_mode to period", K_(palf_id), K_(self), K(total_append_cnt));
+ PALF_LOG(ERROR, "switch freeze_mode to period", K_(palf_id), K_(self), K(total_append_cnt));
}
} else if (PERIOD_FREEZE_MODE == freeze_mode_) {
- if (total_append_cnt < APPEND_CNT_LB_FOR_PERIOD_FREEZE) {
+ // if (total_append_cnt < APPEND_CNT_LB_FOR_PERIOD_FREEZE) {
+ if (avg_log_batch_cnt <= SWITCH_BARRIER) {
freeze_mode_ = FEEDBACK_FREEZE_MODE;
- PALF_LOG(INFO, "switch freeze_mode to feedback", K_(palf_id), K_(self), K(total_append_cnt));
+ PALF_LOG(ERROR, "switch freeze_mode to feedback", K_(palf_id), K_(self), K(total_append_cnt));
(void) feedback_freeze_last_log_();
}
} else {}
diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h
--- a/src/logservice/palf/log_sliding_window.h
+++ b/src/logservice/palf/log_sliding_window.h
@@ -396,6 +396,7 @@ private:
const LogType &log_type,
const char *log_data,
const int64_t data_len,
+ const bool need_freeze_self,
bool &is_need_handle);
int append_to_group_log_(const LSN &lsn,
const int64_t log_id,
@@ -608,6 +609,7 @@ private:
int64_t last_record_group_log_id_;
int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE];
FreezeMode freeze_mode_;
+ int64_t avg_log_batch_cnt_;
bool is_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow);
diff --git a/src/logservice/palf/lsn_allocator.cpp b/src/logservice/palf/lsn_allocator.cpp
--- a/src/logservice/palf/lsn_allocator.cpp
+++ b/src/logservice/palf/lsn_allocator.cpp
@@ -297,7 +297,8 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn,
SCN &scn,
bool &is_new_group_log,
bool &need_gen_padding_entry,
- int64_t &padding_len)
+ int64_t &padding_len,
+ bool &need_freeze_self)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@@ -439,6 +440,11 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn,
is_next_need_cut = true;
}
}
+ // freeze immediately
+ if (need_freeze_self) {
+ is_new_group_log = true;
+ is_next_need_cut = true;
+ }
if (is_new_group_log) {
tmp_next_log_id_delta++;
}
diff --git a/src/logservice/palf/lsn_allocator.h b/src/logservice/palf/lsn_allocator.h
--- a/src/logservice/palf/lsn_allocator.h
+++ b/src/logservice/palf/lsn_allocator.h
@@ -64,7 +64,8 @@ public:
share::SCN &scn,
bool &is_new_log,
bool &need_gen_padding_entry,
- int64_t &padding_len);
+ int64_t &padding_len,
+ bool &need_freeze_self);
// 更新last_lsn和log_timestamp
// receive_log/append_disk_log 时调用
int inc_update_last_log_info(const LSN &lsn, const int64_t log_id, const share::SCN &scn);
diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h
--- a/src/share/ob_thread_define.h
+++ b/src/share/ob_thread_define.h
@@ -164,4 +164,5 @@ TG_DEF(SvrStartupHandler, SvrStartupHandler, QUEUE_THREAD,
TG_DEF(TenantTTLManager, TTLManager, TIMER)
TG_DEF(TenantTabletTTLMgr, TTLTabletMgr, TIMER)
TG_DEF(TntSharedTimer, TntSharedTimer, TIMER)
+TG_DEF(LogServerTest, LogServerTest, THREAD_POOL, ThreadCountPair(10, 10))
#endif