diff --git a/src/sql/dtl/ob_dtl_channel_loop.cpp b/src/sql/dtl/ob_dtl_channel_loop.cpp index 17bb3baae1..8249a90000 100644 --- a/src/sql/dtl/ob_dtl_channel_loop.cpp +++ b/src/sql/dtl/ob_dtl_channel_loop.cpp @@ -299,7 +299,21 @@ int ObDtlChannelLoop::process_base(ObIDltChannelLoopPred *pred, int64_t &hinted_ } } } - if (ignore_interrupt_) { + if ((loop_times_ & (SERVER_ALIVE_CHECK_TIMES - 1)) == 0) { + for (int64_t i = 0; i < chans_.count(); ++i) { + if (nullptr != chans_.at(i)) { + ObDtlBasicChannel *ch = static_cast (chans_.at(i)); + if (OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(ch->get_peer(), + get_process_query_time()))) { + ret = OB_RPC_CONNECT_ERROR; + LOG_WARN("peer no in communication, maybe crashed", K(ret), K(ch->get_peer()), + K(static_cast(GCONF.cluster_id))); + } + } + } + } + if (OB_FAIL(ret)) { + } else if (ignore_interrupt_) { // do nothing. } else if ((loop_times_ & (INTERRUPT_CHECK_TIMES - 1)) == 0 && OB_UNLIKELY(IS_INTERRUPTED())) { // 中断错误处理 @@ -413,13 +427,7 @@ int ObDtlChannelLoop::process_channel(int64_t &nth_channel) } ObDtlChannel *ch = sentinel_node_.next_link_; while (OB_EAGAIN == ret && ch != &sentinel_node_) { - if (OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(ch->get_peer(), - get_process_query_time()))) { - ret = OB_RPC_CONNECT_ERROR; - LOG_WARN("peer no in communication, maybe crashed", K(ret), K(ch->get_peer()), - K(static_cast(GCONF.cluster_id))); - break; - } else if (OB_SUCC(ch->process1(&process_func_, 0, last_row_in_buffer))) { + if (OB_SUCC(ch->process1(&process_func_, 0, last_row_in_buffer))) { nth_channel = ch->get_loop_index(); break; } else if (OB_EAGAIN == ret) { @@ -442,6 +450,12 @@ int ObDtlChannelLoop::process_channel(int64_t &nth_channel) ObInterruptCode &code = GET_INTERRUPT_CODE(); ret = code.code_; LOG_WARN("message loop is interrupted", K(code), K(ret)); + } else if ((loop_times_ & (SERVER_ALIVE_CHECK_TIMES - 1)) == 0 + && OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(ch->get_peer(), + get_process_query_time()))) { + ret = OB_RPC_CONNECT_ERROR; + LOG_WARN("peer no in communication, maybe crashed", K(ret), K(ch->get_peer()), + K(static_cast(GCONF.cluster_id))); } ch = sentinel_node_.next_link_; ++n_times; diff --git a/src/sql/dtl/ob_dtl_channel_loop.h b/src/sql/dtl/ob_dtl_channel_loop.h index 9c9083d5a7..49f9ba1c10 100644 --- a/src/sql/dtl/ob_dtl_channel_loop.h +++ b/src/sql/dtl/ob_dtl_channel_loop.h @@ -123,6 +123,7 @@ public: void set_interm_result(bool flag) { use_interm_result_ = flag; } private: static const int64_t INTERRUPT_CHECK_TIMES = 16; + static const int64_t SERVER_ALIVE_CHECK_TIMES = 256; Proc *proc_map_[MAX_PROCS]; InterruptProc *interrupt_proc_; common::ObSEArray chans_; diff --git a/src/sql/engine/join/ob_hash_join_op.cpp b/src/sql/engine/join/ob_hash_join_op.cpp index 3c447c4b5c..f796b75f8f 100644 --- a/src/sql/engine/join/ob_hash_join_op.cpp +++ b/src/sql/engine/join/ob_hash_join_op.cpp @@ -1452,6 +1452,11 @@ int ObHashJoinOp::get_max_memory_size(int64_t input_size) } buf_mgr_->reuse(); buf_mgr_->set_reserve_memory_size(remain_data_memory_size_, 1.0); + if (OB_SUCC(ret)) { + if (OB_FAIL(sync_wait_part_count())) { + LOG_WARN("failed to sync part count", K(ret)); + } + } if (OB_SUCC(ret) && is_vectorized()) { char *buf = NULL; if (part_count_ <= 0) { @@ -1467,11 +1472,6 @@ int ObHashJoinOp::get_max_memory_size(int64_t input_size) sizeof(uint16_t) * (MY_SPEC.max_batch_size_ * part_count_)); } } - if (OB_SUCC(ret)) { - if (OB_FAIL(sync_wait_part_count())) { - LOG_WARN("failed to sync part count", K(ret)); - } - } return ret; }