From 4deb96eab49cfd3a07edb7ec086d2274a454e8b2 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 16 Jul 2021 14:30:14 +0800 Subject: [PATCH] Fix bug that sleep wrongly when get dtl data --- src/share/diagnosis/ob_sql_monitor_statname.h | 10 ++++++++-- src/sql/dtl/ob_dtl_basic_channel.cpp | 19 ++++++++++++++++++- src/sql/dtl/ob_dtl_channel.h | 17 +++++------------ src/sql/dtl/ob_dtl_channel_loop.cpp | 3 ++- src/sql/dtl/ob_dtl_flow_control.cpp | 2 ++ .../px/exchange/ob_px_fifo_coord_op.cpp | 5 ++++- .../engine/px/exchange/ob_px_ms_coord_op.cpp | 4 ++++ .../engine/px/exchange/ob_px_receive_op.cpp | 11 +++++++++-- .../engine/px/exchange/ob_px_transmit_op.cpp | 14 ++++++++++++++ src/sql/engine/px/ob_px_coord_op.cpp | 8 ++++++++ 10 files changed, 74 insertions(+), 19 deletions(-) diff --git a/src/share/diagnosis/ob_sql_monitor_statname.h b/src/share/diagnosis/ob_sql_monitor_statname.h index b6c734b5e..8677b08f0 100644 --- a/src/share/diagnosis/ob_sql_monitor_statname.h +++ b/src/share/diagnosis/ob_sql_monitor_statname.h @@ -36,8 +36,14 @@ SQL_MONITOR_STATNAME_DEF( PDML_PARTITION_FLUSH_COUNT, "times write to storage", "total times writing data to storage by pdml op") // reshuffle SQL_MONITOR_STATNAME_DEF( - EXCHANGE_DROP_ROW_COUNT, "drop row count", "total row dropped by exchange out op for unmatched partition") -// end + EXCHANGE_DROP_ROW_COUNT, "drop row count", "total row dropped by exchange out op for unmatched partition") +// DTL +SQL_MONITOR_STATNAME_DEF( + DTL_SEND_RECV_COUNT, "the count of buffer received or sended", "the count of dtl buffer that received or sended") +SQL_MONITOR_STATNAME_DEF( + EXCHANGE_EOF_TIMESTAMP, "the timestamp of send eof or receive eof", "the timestamp of send eof or receive eof") +//end + SQL_MONITOR_STATNAME_DEF(MONITOR_STATNAME_END, "monitor end", "monitor stat name end") #endif diff --git a/src/sql/dtl/ob_dtl_basic_channel.cpp b/src/sql/dtl/ob_dtl_basic_channel.cpp index 51236bae9..562038b94 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.cpp +++ b/src/sql/dtl/ob_dtl_basic_channel.cpp @@ -795,6 +795,10 @@ int ObDtlBasicChannel::wait_unblocking() } else if (OB_FAIL(channel_loop_->find(this, idx))) { LOG_WARN("channel not exists in channel loop", K(ret)); } else { + int64_t start_t = ObTimeUtility::current_time(); + int64_t interval_t = 1 * 60 * 1000000; // 1min + int64_t last_t = 0; + int64_t print_log_t = 10 * 60 * 1000000; block_proc_.set_ch_idx_var(&idx); LOG_TRACE("wait unblocking", K(ret), K(dfc_->is_block()), KP(id_), K(peer_)); do { @@ -809,7 +813,8 @@ int ObDtlBasicChannel::wait_unblocking() &block_proc_, timeout_ts - ObTimeUtility::current_time(), got_channel_idx))) { // no msg, then don't process if (OB_EAGAIN == ret) { - if (ObTimeUtility::current_time() > timeout_ts) { + int64_t end_t = ObTimeUtility::current_time(); + if (end_t > timeout_ts) { ret = OB_TIMEOUT; LOG_WARN("get row from channel timeout", K(ret), K(timeout_ts)); } else { @@ -819,6 +824,18 @@ int ObDtlBasicChannel::wait_unblocking() LOG_WARN("worker interrupt", K(tmp_ret), K(ret)); break; } + if (end_t - start_t > print_log_t) { + bool print_log = false; + if (0 == last_t || end_t - last_t > interval_t) { + print_log = true; + last_t = end_t; + } + if (print_log) { + LOG_INFO("wait unblocking", K(id_), K(peer_id_), K(peer_id_), + K(recv_buffer_cnt_), K(processed_buffer_cnt_), K(send_buffer_cnt_), + K(idx), K(got_channel_idx)); + } + } ret = OB_SUCCESS; } } else { diff --git a/src/sql/dtl/ob_dtl_channel.h b/src/sql/dtl/ob_dtl_channel.h index 8a1dd2fe3..7b2ca19b5 100644 --- a/src/sql/dtl/ob_dtl_channel.h +++ b/src/sql/dtl/ob_dtl_channel.h @@ -213,18 +213,11 @@ public: } } while (1); } - void set_drain() - { - set_status(DTL_CHAN_DARIN); - } - void set_eof() - { - set_status(DTL_CHAN_EOF); - } - void set_first_buffer() - { - set_status(DTL_CHAN_FIRST_BUF); - } + + void set_drain() { set_status(DTL_CHAN_DARIN); } + void set_eof() { set_status(DTL_CHAN_EOF); } + void set_first_buffer() { set_status(DTL_CHAN_FIRST_BUF); } + void set_blocked() { set_status(DTL_CHAN_BLOCKED); } int set_bcast_chan_ids(common::ObIArray& chans); int get_bcast_chan_ids(common::ObIArray& chans); diff --git a/src/sql/dtl/ob_dtl_channel_loop.cpp b/src/sql/dtl/ob_dtl_channel_loop.cpp index 5ef22f484..9590ebc78 100644 --- a/src/sql/dtl/ob_dtl_channel_loop.cpp +++ b/src/sql/dtl/ob_dtl_channel_loop.cpp @@ -339,8 +339,9 @@ int ObDtlChannelLoop::process_channel(int64_t& nth_channel) remove_data_list(ch); } if (n_times > 100) { + // it's maybe unexpected ! LOG_WARN("loop times", K(n_times)); - sleep(10); + usleep(1); } ++loop_times_; if (ignore_interrupt_) { diff --git a/src/sql/dtl/ob_dtl_flow_control.cpp b/src/sql/dtl/ob_dtl_flow_control.cpp index d5829d4b9..f4774ff8a 100644 --- a/src/sql/dtl/ob_dtl_flow_control.cpp +++ b/src/sql/dtl/ob_dtl_flow_control.cpp @@ -237,6 +237,7 @@ int ObDtlFlowControl::block_channel(ObDtlChannel* ch) LOG_WARN("channel is blocked", K(ret), K(idx)); } else { set_block(idx); + ch->set_blocked(); LOG_TRACE("transmit set channel block trace", K(ch), KP(ch->get_id()), K(ch->get_peer()), K(idx)); } } @@ -257,6 +258,7 @@ int ObDtlFlowControl::unblock_channel(ObDtlChannel* ch) LOG_WARN("failed to clear response block info", K(ret)); } else if (is_block(idx)) { unblock(idx); + ch->set_blocked(); } LOG_TRACE("channel unblock", K(ch), KP(ch->get_id()), K(ch->get_peer()), K(idx), K(is_block(idx))); } diff --git a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp index d62d0fed4..d73fb48fc 100644 --- a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp @@ -99,7 +99,6 @@ int ObPxFifoCoordOp::inner_get_next_row() msg_loop_.set_tenant_id(ctx_.get_my_session()->get_effective_tenant_id()); first_row_fetched_ = true; } - bool wait_next_msg = true; while (OB_SUCC(ret) && wait_next_msg) { // SQC-QC control channel and TASKs-QC data channel are registered in loop. @@ -191,6 +190,10 @@ int ObPxFifoCoordOp::next_row(bool& wait_next_msg) LOG_TRACE("All channel finish", "finish_ch_cnt", finish_ch_cnt_, K(ret)); all_rows_finish_ = true; ret = OB_SUCCESS; + if (GCONF.enable_sql_audit) { + op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP; + op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); + } } } else if (OB_SUCCESS == ret) { wait_next_msg = false; diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp index baa89b2e2..f0cc8144f 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp @@ -339,6 +339,10 @@ int ObPxMSCoordOp::next_row(bool& wait_next_msg) if (OB_SUCC(ret)) { if (0 == row_heap_.capacity()) { + if (GCONF.enable_sql_audit) { + op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP; + op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); + } all_rows_finish_ = true; metric_.mark_last_out(); } else if (row_heap_.capacity() == row_heap_.count()) { diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index dbfe9bbea..d95944653 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -327,10 +327,12 @@ int ObPxFifoReceiveOp::inner_close() LOG_WARN("release dtl channel failed", K(release_channel_ret)); } ObDTLIntermResultKey key; - ObDtlBasicChannel* channel = NULL; + ObDtlBasicChannel *channel = NULL; + int64_t recv_cnt = 0; for (int i = 0; i < task_channels_.count(); ++i) { channel = static_cast(task_channels_.at(i)); key.channel_id_ = channel->get_id(); + recv_cnt += channel->get_recv_buffer_cnt(); if (channel->use_interm_result()) { release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key); if (release_channel_ret != common::OB_SUCCESS) { @@ -338,7 +340,8 @@ int ObPxFifoReceiveOp::inner_close() } } } - + op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT; + op_monitor_info_.otherstat_3_value_ = recv_cnt; release_channel_ret = msg_loop_.unregister_all_channel(); if (release_channel_ret != common::OB_SUCCESS) { // the following unlink actions is not safe is any unregister failure happened @@ -378,6 +381,10 @@ int ObPxFifoReceiveOp::inner_get_next_row() LOG_DEBUG("Got one row from channel", K(ret)); break; // got one row } else if (OB_ITER_END == ret) { + if (GCONF.enable_sql_audit) { + op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP; + op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); + } metric_.mark_last_out(); LOG_TRACE("Got eof row from channel", K(ret)); break; diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index e63b14d9d..623b053dd 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -253,6 +253,14 @@ int ObPxTransmitOp::inner_close() int ret = OB_SUCCESS; /* we must release channel even if there is some error happen before */ chs_agent_.destroy(); + ObDtlBasicChannel *ch = nullptr; + int64_t recv_cnt = 0; + for (int i = 0; i < task_channels_.count(); ++i) { + ch = static_cast(task_channels_.at(i)); + recv_cnt += ch->get_send_buffer_cnt(); + } + op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT; + op_monitor_info_.otherstat_3_value_ = recv_cnt; int release_channel_ret = loop_.unregister_all_channel(); if (release_channel_ret != common::OB_SUCCESS) { // the following unlink actions is not safe is any unregister failure happened @@ -336,6 +344,9 @@ int ObPxTransmitOp::send_eof_row() task_channels_, ch_info_, true, phy_plan_ctx->get_timeout_timestamp(), &eval_ctx_); if (OB_FAIL(eof_asyn_sender.asyn_send())) { LOG_WARN("failed to asyn send drain", K(ret), K(lbt())); + } else if (GCONF.enable_sql_audit) { + op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP; + op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); } } return ret; @@ -459,6 +470,9 @@ int ObPxTransmitOp::broadcast_eof_row() LOG_WARN("unexpected NULL ptr", K(ret)); } else if (OB_FAIL(chs_agent_.flush())) { LOG_WARN("fail flush row to slice channel", K(ret)); + } else if (GCONF.enable_sql_audit) { + op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP; + op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); } return ret; } diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 5b8405392..6c88185e0 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -554,6 +554,14 @@ int ObPxCoordOp::destroy_all_channel() // note: must unregister channel from msg_loop first. This enable unlink_channel safely. // Otherwise, channnel may receive data while unlink_channel, // and result in memory leak or something else. + int64_t recv_cnt = 0; + ObDtlBasicChannel *ch = nullptr; + for (int i = 0; i < task_channels_.count(); ++i) { + ch = static_cast(task_channels_.at(i)); + recv_cnt += ch->get_recv_buffer_cnt(); + } + op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT; + op_monitor_info_.otherstat_3_value_ = recv_cnt; int tmp_ret = OB_SUCCESS; if (OB_SUCCESS != (tmp_ret = msg_loop_.unregister_all_channel())) { LOG_WARN("fail unregister channels from msg_loop. ignore", KR(tmp_ret));