BUGFIX: break infinite loop of write throttle
This commit is contained in:
@ -10,6 +10,7 @@
|
|||||||
* See the Mulan PubL v2 for more details.
|
* See the Mulan PubL v2 for more details.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define USING_LOG_PREFIX COMMON
|
||||||
#include "ob_fifo_arena.h"
|
#include "ob_fifo_arena.h"
|
||||||
#ifdef OB_USE_ASAN
|
#ifdef OB_USE_ASAN
|
||||||
#include <malloc.h>
|
#include <malloc.h>
|
||||||
@ -365,23 +366,48 @@ int64_t ObFifoArena::expected_wait_time(const int64_t seq) const
|
|||||||
return expected_wait_time;
|
return expected_wait_time;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// how much memory we can get after dt time.
|
||||||
int64_t ObFifoArena::calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const
|
int64_t ObFifoArena::calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const
|
||||||
{
|
{
|
||||||
double cur_chunk_seq = static_cast<double>(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE));
|
int ret = OB_SUCCESS;
|
||||||
int64_t mem_can_be_assigned = 0;
|
int64_t mem_can_be_assigned = 0;
|
||||||
|
|
||||||
int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
|
int64_t init_seq = ((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE);
|
||||||
int64_t accumulate_interval = 0;
|
int64_t init_page_left_size = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
|
||||||
int64_t the_page_interval = 0;
|
double init_page_left_interval = (1.0 * throttle_info_.decay_factor_ * pow(init_seq, 3) *
|
||||||
while (accumulate_interval < dt) {
|
init_page_left_size / MEM_SLICE_SIZE);
|
||||||
the_page_interval = static_cast<int64_t>(throttle_info_.decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE;
|
double past_interval = throttle_info_.decay_factor_ * pow(init_seq, 2) * pow(init_seq + 1, 2) / 4;
|
||||||
accumulate_interval += the_page_interval;
|
double last_page_interval = 0;
|
||||||
|
double mid_result = 0;
|
||||||
|
double approx_max_chunk_seq = 0;
|
||||||
|
int64_t max_seq = 0;
|
||||||
|
double accumulate_interval = 0;
|
||||||
|
if (init_page_left_interval > dt) {
|
||||||
|
last_page_interval = throttle_info_.decay_factor_ * pow(init_seq, 3);
|
||||||
|
mem_can_be_assigned = dt / last_page_interval * MEM_SLICE_SIZE;
|
||||||
|
} else {
|
||||||
|
mid_result = 4.0 * (dt + past_interval - init_page_left_interval) / throttle_info_.decay_factor_;
|
||||||
|
approx_max_chunk_seq = pow(mid_result, 0.25);
|
||||||
|
max_seq = floor(approx_max_chunk_seq);
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
if (pow(max_seq, 2) * pow(max_seq + 1, 2) < mid_result) {
|
||||||
|
max_seq = max_seq + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
accumulate_interval = pow(max_seq, 2) * pow(max_seq + 1, 2) * throttle_info_.decay_factor_ / 4 - past_interval + init_page_left_interval;
|
||||||
|
mem_can_be_assigned = init_page_left_size + (max_seq - init_seq) * MEM_SLICE_SIZE;
|
||||||
|
if (accumulate_interval > dt) {
|
||||||
|
last_page_interval = throttle_info_.decay_factor_ * pow(max_seq, 3);
|
||||||
|
mem_can_be_assigned -= (accumulate_interval - dt) / last_page_interval * MEM_SLICE_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mem_can_be_assigned += (accumulate_interval > dt ?
|
// defensive code
|
||||||
allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval :
|
if (pow(max_seq, 2) * pow(max_seq + 1, 2) < mid_result) {
|
||||||
allocate_size_in_the_page);
|
LOG_ERROR("unexpected result", K(max_seq), K(mid_result));
|
||||||
allocate_size_in_the_page = MEM_SLICE_SIZE;
|
}
|
||||||
cur_chunk_seq += double(1);
|
if (cur_mem_hold < trigger_mem_limit) {
|
||||||
|
LOG_ERROR("we should not limit speed now", K(cur_mem_hold), K(trigger_mem_limit));
|
||||||
}
|
}
|
||||||
|
|
||||||
return mem_can_be_assigned;
|
return mem_can_be_assigned;
|
||||||
|
|||||||
@ -103,7 +103,8 @@ ObStorageTableGuard::~ObStorageTableGuard()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (REACH_TIME_INTERVAL(100 * 1000L)) {
|
if (REACH_TIME_INTERVAL(100 * 1000L) &&
|
||||||
|
sleep_time > 0) {
|
||||||
int64_t cost_time = ObTimeUtility::current_time() - init_ts_;
|
int64_t cost_time = ObTimeUtility::current_time() - init_ts_;
|
||||||
LOG_INFO("throttle situation", K(sleep_time), K(time), K(seq), K(for_replay_), K(cost_time));
|
LOG_INFO("throttle situation", K(sleep_time), K(time), K(seq), K(for_replay_), K(cost_time));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -462,7 +462,7 @@ int ObOBJLock::fast_lock(
|
|||||||
lock_mode_in_same_trans,
|
lock_mode_in_same_trans,
|
||||||
need_retry,
|
need_retry,
|
||||||
conflict_tx_set))) {
|
conflict_tx_set))) {
|
||||||
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
|
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_EAGAIN != ret) {
|
||||||
LOG_WARN("try fast lock failed", KR(ret), K(lock_op));
|
LOG_WARN("try fast lock failed", KR(ret), K(lock_op));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user