fix shared hash join core because of change part count && fix tpch performance regression because of check alive
This commit is contained in:
@ -299,7 +299,21 @@ int ObDtlChannelLoop::process_base(ObIDltChannelLoopPred *pred, int64_t &hinted_
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ignore_interrupt_) {
|
||||
if ((loop_times_ & (SERVER_ALIVE_CHECK_TIMES - 1)) == 0) {
|
||||
for (int64_t i = 0; i < chans_.count(); ++i) {
|
||||
if (nullptr != chans_.at(i)) {
|
||||
ObDtlBasicChannel *ch = static_cast<ObDtlBasicChannel *> (chans_.at(i));
|
||||
if (OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(ch->get_peer(),
|
||||
get_process_query_time()))) {
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
LOG_WARN("peer no in communication, maybe crashed", K(ret), K(ch->get_peer()),
|
||||
K(static_cast<int64_t>(GCONF.cluster_id)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (ignore_interrupt_) {
|
||||
// do nothing.
|
||||
} else if ((loop_times_ & (INTERRUPT_CHECK_TIMES - 1)) == 0 && OB_UNLIKELY(IS_INTERRUPTED())) {
|
||||
// 中断错误处理
|
||||
@ -413,13 +427,7 @@ int ObDtlChannelLoop::process_channel(int64_t &nth_channel)
|
||||
}
|
||||
ObDtlChannel *ch = sentinel_node_.next_link_;
|
||||
while (OB_EAGAIN == ret && ch != &sentinel_node_) {
|
||||
if (OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(ch->get_peer(),
|
||||
get_process_query_time()))) {
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
LOG_WARN("peer no in communication, maybe crashed", K(ret), K(ch->get_peer()),
|
||||
K(static_cast<int64_t>(GCONF.cluster_id)));
|
||||
break;
|
||||
} else if (OB_SUCC(ch->process1(&process_func_, 0, last_row_in_buffer))) {
|
||||
if (OB_SUCC(ch->process1(&process_func_, 0, last_row_in_buffer))) {
|
||||
nth_channel = ch->get_loop_index();
|
||||
break;
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
@ -442,6 +450,12 @@ int ObDtlChannelLoop::process_channel(int64_t &nth_channel)
|
||||
ObInterruptCode &code = GET_INTERRUPT_CODE();
|
||||
ret = code.code_;
|
||||
LOG_WARN("message loop is interrupted", K(code), K(ret));
|
||||
} else if ((loop_times_ & (SERVER_ALIVE_CHECK_TIMES - 1)) == 0
|
||||
&& OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(ch->get_peer(),
|
||||
get_process_query_time()))) {
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
LOG_WARN("peer no in communication, maybe crashed", K(ret), K(ch->get_peer()),
|
||||
K(static_cast<int64_t>(GCONF.cluster_id)));
|
||||
}
|
||||
ch = sentinel_node_.next_link_;
|
||||
++n_times;
|
||||
|
||||
@ -123,6 +123,7 @@ public:
|
||||
void set_interm_result(bool flag) { use_interm_result_ = flag; }
|
||||
private:
|
||||
static const int64_t INTERRUPT_CHECK_TIMES = 16;
|
||||
static const int64_t SERVER_ALIVE_CHECK_TIMES = 256;
|
||||
Proc *proc_map_[MAX_PROCS];
|
||||
InterruptProc *interrupt_proc_;
|
||||
common::ObSEArray<ObDtlChannel*, 128> chans_;
|
||||
|
||||
@ -1452,6 +1452,11 @@ int ObHashJoinOp::get_max_memory_size(int64_t input_size)
|
||||
}
|
||||
buf_mgr_->reuse();
|
||||
buf_mgr_->set_reserve_memory_size(remain_data_memory_size_, 1.0);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sync_wait_part_count())) {
|
||||
LOG_WARN("failed to sync part count", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && is_vectorized()) {
|
||||
char *buf = NULL;
|
||||
if (part_count_ <= 0) {
|
||||
@ -1467,11 +1472,6 @@ int ObHashJoinOp::get_max_memory_size(int64_t input_size)
|
||||
sizeof(uint16_t) * (MY_SPEC.max_batch_size_ * part_count_));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sync_wait_part_count())) {
|
||||
LOG_WARN("failed to sync part count", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user