diff --git a/src/observer/virtual_table/ob_all_virtual_dtl_channel.cpp b/src/observer/virtual_table/ob_all_virtual_dtl_channel.cpp index d7ffe1dd48..9cbcdc2f07 100644 --- a/src/observer/virtual_table/ob_all_virtual_dtl_channel.cpp +++ b/src/observer/virtual_table/ob_all_virtual_dtl_channel.cpp @@ -53,6 +53,7 @@ void ObVirtualChannelInfo::get_info(ObDtlChannel* dtl_ch) thread_id_ = ch->get_thread_id(); owner_mod_ = ch->get_owner_mod(); peer_ = ch->get_peer(); + eof_ = metric.get_eof(); } int ObVirtualDtlChannelOp::operator()(ObDtlChannel *ch) @@ -297,6 +298,10 @@ int ObAllVirtualDtlChannel::get_row(ObVirtualChannelInfo &chan_info, ObNewRow *& cells[cell_idx].set_int(chan_info.peer_.get_port()); break; } + case DTL_EOF: { + cells[cell_idx].set_bool(chan_info.eof_); + break; + } default: { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected column id", K(col_id)); diff --git a/src/observer/virtual_table/ob_all_virtual_dtl_channel.h b/src/observer/virtual_table/ob_all_virtual_dtl_channel.h index f54ecca818..ef7c8464f0 100644 --- a/src/observer/virtual_table/ob_all_virtual_dtl_channel.h +++ b/src/observer/virtual_table/ob_all_virtual_dtl_channel.h @@ -30,7 +30,7 @@ public: is_local_(false), is_data_(false), is_transmit_(false), channel_id_(0), op_id_(-1), peer_id_(0), tenant_id_(0), alloc_buffer_cnt_(0), free_buffer_cnt_(0), send_buffer_cnt_(0), recv_buffer_cnt_(0), processed_buffer_cnt_(0), send_buffer_size_(0), hash_val_(0), buffer_pool_id_(0), pins_(0), first_in_ts_(0), first_out_ts_(0), last_in_ts_(0), last_out_ts_(0), - state_(0), thread_id_(0), owner_mod_(0), peer_() + state_(0), thread_id_(0), owner_mod_(0), peer_(), eof_(false) {} void get_info(sql::dtl::ObDtlChannel* ch); @@ -61,6 +61,7 @@ public: int64_t thread_id_; int64_t owner_mod_; ObAddr peer_; + bool eof_; }; class ObVirtualDtlChannelOp @@ -148,6 +149,7 @@ private: OWNER_MOD, PEER_IP, // OB_APP_MIN_COLUMN_ID + 25 PEER_PORT, // OB_APP_MIN_COLUMN_ID + 26 + DTL_EOF, }; int get_row(ObVirtualChannelInfo &chan_info, common::ObNewRow *&row); diff --git a/src/share/inner_table/ob_inner_table_schema.12101_12150.cpp b/src/share/inner_table/ob_inner_table_schema.12101_12150.cpp index 56f06a51a7..d51fdfbae1 100644 --- a/src/share/inner_table/ob_inner_table_schema.12101_12150.cpp +++ b/src/share/inner_table/ob_inner_table_schema.12101_12150.cpp @@ -5256,7 +5256,7 @@ int ObInnerTableSchema::all_virtual_dtl_channel_schema(ObTableSchema &table_sche } if (OB_SUCC(ret)) { - ADD_COLUMN_SCHEMA_TS("last_int_ts", //column_name + ADD_COLUMN_SCHEMA_TS("last_in_ts", //column_name ++column_id, //column_id 0, //rowkey_id 0, //index_id @@ -5361,6 +5361,21 @@ int ObInnerTableSchema::all_virtual_dtl_channel_schema(ObTableSchema &table_sche false, //is_nullable false); //is_autoincrement } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("eof", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObTinyIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + 1, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } if (OB_SUCC(ret)) { table_schema.get_part_option().set_part_num(1); table_schema.set_part_level(PARTITION_LEVEL_ONE); diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index bb26d0970c..d8494abbb5 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -8943,13 +8943,14 @@ def_table_schema( ('pins', 'int'), ('first_in_ts', 'timestamp'), ('first_out_ts', 'timestamp'), - ('last_int_ts', 'timestamp'), + ('last_in_ts', 'timestamp'), ('last_out_ts', 'timestamp'), ('status', 'int'), ('thread_id', 'int'), ('owner_mod', 'int'), ('peer_ip', 'varchar:MAX_IP_ADDR_LENGTH'), ('peer_port', 'int'), + ('eof', 'bool'), ], partition_columns = ['svr_ip', 'svr_port'], vtable_route_policy = 'distributed', diff --git a/src/sql/dtl/ob_dtl_basic_channel.cpp b/src/sql/dtl/ob_dtl_basic_channel.cpp index 3b1d3010c1..811e680100 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.cpp +++ b/src/sql/dtl/ob_dtl_basic_channel.cpp @@ -328,8 +328,9 @@ int ObDtlBasicChannel::send(const ObDtlMsg &msg, int64_t timeout_ts, if (is_data_msg_) { metric_.mark_first_in(); if (channel_is_eof_) { - metric_.mark_last_in(); + metric_.mark_eof(); } + metric_.set_last_in_ts(::oceanbase::common::ObTimeUtility::current_time()); } if (OB_FAIL(write_msg(msg, timeout_ts, eval_ctx, is_eof))) { if (OB_ITER_END != ret) { @@ -427,8 +428,9 @@ int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_b if (is_data_msg) { metric_.mark_first_in(); if (is_eof) { - metric_.mark_last_in(); + metric_.mark_eof(); } + metric_.set_last_in_ts(::oceanbase::common::ObTimeUtility::current_time()); } if (is_first_buffer_cached) { set_first_buffer(); @@ -598,11 +600,14 @@ int ObDtlBasicChannel::process1( bool transferred = false; ret = proc->process(*buffer, transferred); LOG_DEBUG("process buffer", K(ret), KP(buffer), K(transferred)); + if (buffer->is_data_msg()) { + metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time()); + } if (OB_ITER_END == ret || transferred) { inc_processed_buffer_cnt(); if (buffer->is_eof()) { if (buffer->is_data_msg()) { - metric_.mark_last_out(); + metric_.mark_eof(); } if (!is_eof()) { if (NULL != channel_loop_) { diff --git a/src/sql/dtl/ob_dtl_channel_loop.cpp b/src/sql/dtl/ob_dtl_channel_loop.cpp index 5362d7a9e0..0a07d44ad9 100644 --- a/src/sql/dtl/ob_dtl_channel_loop.cpp +++ b/src/sql/dtl/ob_dtl_channel_loop.cpp @@ -17,6 +17,7 @@ #include "share/diagnosis/ob_sql_monitor_statname.h" #include "share/ob_server_blacklist.h" #include "observer/omt/ob_th_worker.h" +#include "share/ob_occam_time_guard.h" using namespace oceanbase::common; @@ -47,7 +48,9 @@ ObDtlChannelLoop::ObDtlChannelLoop() eof_channel_cnt_(0), loop_times_(0), begin_wait_time_(0), - process_query_time_(0) + process_query_time_(0), + last_dump_channel_time_(0), + query_timeout_ts_(0) { op_monitor_info_.otherstat_5_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS_AFTER_DATA; op_monitor_info_.otherstat_6_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS; @@ -78,7 +81,9 @@ ObDtlChannelLoop::ObDtlChannelLoop(ObMonitorNode &op_monitor_info) eof_channel_cnt_(0), loop_times_(0), begin_wait_time_(0), - process_query_time_(0) + process_query_time_(0), + last_dump_channel_time_(0), + query_timeout_ts_(0) { op_monitor_info_.otherstat_5_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS_AFTER_DATA; op_monitor_info_.otherstat_6_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS; @@ -268,6 +273,32 @@ int ObDtlChannelLoop::process_base(ObIDltChannelLoopPred *pred, int64_t &hinted_ } ++loop_times_; + if ((loop_times_ & (INTERRUPT_CHECK_TIMES - 1)) == 0) { + last_dump_channel_time_ = last_dump_channel_time_ < process_query_time_ ? process_query_time_ : last_dump_channel_time_; + int64_t curr_time = ::oceanbase::common::ObTimeUtility::current_time(); + if (OB_UNLIKELY(curr_time - last_dump_channel_time_ >= static_cast (100_s))) { + last_dump_channel_time_ = curr_time; + LOG_WARN("dump channel loop info for query which active for more than 100 seconds", K(process_query_time_), K(curr_time), K(timeout), K(timeout_), K(query_timeout_ts_)); + int64_t idx = -1; + int64_t last_in_msg_time = INT64_MAX; + // Find a channel that has not received data for the longest time + for (int64_t i = 0; i < chans_.count(); ++i) { + if (nullptr != chans_.at(i)) { + ObDtlBasicChannel *channel = static_cast (chans_.at(i)); + if (channel->get_op_metric().get_last_in_ts() < last_in_msg_time) { + last_in_msg_time = channel->get_op_metric().get_last_in_ts(); + idx = i; + } + } + } + if (-1 == idx) { + LOG_WARN("no channel exists"); + } else { + ObDtlBasicChannel *channel = static_cast (chans_.at(idx)); + LOG_WARN("dump channel info for query which active for more than 100 seconds", K(idx), K(channel->get_id()), K(channel->get_peer_id()), K(channel->get_peer()), K(channel->get_op_metric())); + } + } + } if (ignore_interrupt_) { // do nothing. } else if ((loop_times_ & (INTERRUPT_CHECK_TIMES - 1)) == 0 && OB_UNLIKELY(IS_INTERRUPTED())) { @@ -328,7 +359,14 @@ int ObDtlChannelLoop::process_channels(ObIDltChannelLoopPred *pred, int64_t &nth LOG_WARN("unexpect next idx", K(next_idx_), K(chan_cnt), K(ret)); } else { chan = chans_[next_idx_]; - if (nullptr == pred || pred->pred_process(next_idx_, chan)) { + if (OB_UNLIKELY(share::ObServerBlacklist::get_instance().is_in_blacklist( + share::ObCascadMember(chan->get_peer(), GCONF.cluster_id), true, + get_process_query_time()))) { + ret = OB_RPC_CONNECT_ERROR; + LOG_WARN("peer no in communication, maybe crashed", K(ret), K(chan->get_peer()), + K(static_cast(GCONF.cluster_id))); + break; + } else if (nullptr == pred || pred->pred_process(next_idx_, chan)) { if (OB_SUCC(chan->process1(&process_func_, 0, last_row_in_buffer))) { nth_channel = next_idx_; first_data_get_ = true; diff --git a/src/sql/dtl/ob_dtl_channel_loop.h b/src/sql/dtl/ob_dtl_channel_loop.h index 11925eab9e..9c9083d5a7 100644 --- a/src/sql/dtl/ob_dtl_channel_loop.h +++ b/src/sql/dtl/ob_dtl_channel_loop.h @@ -95,6 +95,8 @@ public: } int64_t get_process_query_time() const { return process_query_time_; } void set_process_query_time(int64_t process_query_time) { process_query_time_ = process_query_time; } + int64_t get_query_timeout_ts() const { return query_timeout_ts_; } + void set_query_timeout_ts(int64_t time) { query_timeout_ts_ = time; } private: int process_channels(ObIDltChannelLoopPred *pred, int64_t &nth_channel); int process_channel(int64_t &nth_channel); @@ -146,6 +148,8 @@ private: int64_t loop_times_; int64_t begin_wait_time_; // use rdtsc to record begin dtl wait time int64_t process_query_time_; + int64_t last_dump_channel_time_; // Used to track long-term active channels + int64_t query_timeout_ts_; }; OB_INLINE void ObDtlChannelLoop::add_last_data_list(ObDtlChannel *ch) diff --git a/src/sql/dtl/ob_dtl_local_channel.cpp b/src/sql/dtl/ob_dtl_local_channel.cpp index 2f0673a092..b6f0ac893c 100644 --- a/src/sql/dtl/ob_dtl_local_channel.cpp +++ b/src/sql/dtl/ob_dtl_local_channel.cpp @@ -148,8 +148,9 @@ int ObDtlLocalChannel::send_shared_message(ObDtlLinkedBuffer *&buf) metric_.mark_first_out(); } if (is_eof) { - metric_.mark_last_out(); + metric_.mark_eof(); } + metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time()); } //统一返回消息 msg_response_.on_finish(is_block, ret); diff --git a/src/sql/dtl/ob_dtl_rpc_channel.cpp b/src/sql/dtl/ob_dtl_rpc_channel.cpp index 6b5e96c799..25052903af 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.cpp +++ b/src/sql/dtl/ob_dtl_rpc_channel.cpp @@ -256,8 +256,9 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer *&buffer) if (buffer->is_data_msg()) { metric_.mark_first_in(); if (buffer->is_eof()) { - metric_.mark_last_in(); + metric_.mark_eof(); } + metric_.set_last_in_ts(::oceanbase::common::ObTimeUtility::current_time()); } IGNORE_RETURN recv_sem_.signal(); if (msg_watcher_ != nullptr) { @@ -338,8 +339,9 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf) if (is_first) { metric_.mark_first_out(); } + metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time()); if (is_eof) { - metric_.mark_last_out(); + metric_.mark_eof(); set_eof(); } } diff --git a/src/sql/dtl/ob_op_metric.cpp b/src/sql/dtl/ob_op_metric.cpp index 188ad41d04..46b3ec80d4 100644 --- a/src/sql/dtl/ob_op_metric.cpp +++ b/src/sql/dtl/ob_op_metric.cpp @@ -18,4 +18,4 @@ using namespace oceanbase::common; using namespace oceanbase::sql; -OB_SERIALIZE_MEMBER(ObOpMetric, enable_audit_, id_, type_, first_in_ts_, first_out_ts_, last_in_ts_, last_out_ts_, counter_, exec_time_); +OB_SERIALIZE_MEMBER(ObOpMetric, enable_audit_, id_, type_, first_in_ts_, first_out_ts_, last_in_ts_, last_out_ts_, counter_, exec_time_, eof_); diff --git a/src/sql/dtl/ob_op_metric.h b/src/sql/dtl/ob_op_metric.h index 6dff138d2c..633a72ba11 100644 --- a/src/sql/dtl/ob_op_metric.h +++ b/src/sql/dtl/ob_op_metric.h @@ -28,7 +28,7 @@ class ObOpMetric public: ObOpMetric() : enable_audit_(false), id_(-1), type_(MetricType::DEFAULT_MAX), interval_cnt_(0), interval_start_time_(0), interval_end_time_(0), - exec_time_(0), flag_(0), first_in_ts_(0), first_out_ts_(0), last_in_ts_(0), last_out_ts_(0), counter_(0) + exec_time_(0), flag_(0), first_in_ts_(0), first_out_ts_(0), last_in_ts_(0), last_out_ts_(0), counter_(0), eof_(false) {} virtual ~ObOpMetric() {} @@ -48,6 +48,7 @@ public: last_in_ts_ = other.last_in_ts_; last_out_ts_ = other.last_out_ts_; counter_ = other.counter_; + eof_ = other.eof_; return *this; } @@ -55,18 +56,18 @@ public: void mark_first_in(); void mark_first_out(); - void mark_last_in(); - void mark_last_out(); + void mark_eof(); OB_INLINE void set_first_in_ts(int64_t first_in_ts) { first_in_ts_ = first_in_ts; } OB_INLINE void set_first_out_ts(int64_t first_out_ts ) { first_out_ts_ = first_out_ts; } OB_INLINE void set_last_in_ts(int64_t last_in_ts) { last_in_ts_ = last_in_ts; } OB_INLINE void set_last_out_ts(int64_t last_out_ts) { last_out_ts_ = last_out_ts; } - OB_INLINE int64_t get_first_in_ts() { return first_in_ts_; } - OB_INLINE int64_t get_first_out_ts() { return first_out_ts_; } - OB_INLINE int64_t get_last_in_ts() { return last_in_ts_; } - OB_INLINE int64_t get_last_out_ts() { return last_out_ts_; } + OB_INLINE int64_t get_first_in_ts() const { return first_in_ts_; } + OB_INLINE int64_t get_first_out_ts() const { return first_out_ts_; } + OB_INLINE int64_t get_last_in_ts() const { return last_in_ts_; } + OB_INLINE int64_t get_last_out_ts() const { return last_out_ts_; } + OB_INLINE bool get_eof() const { return eof_; } OB_INLINE void count() { ++counter_; } int64_t get_counter() { return counter_; } @@ -83,12 +84,13 @@ public: void mark_interval_end(int64_t *out_exec_time = nullptr, int64_t interval = 1); OB_INLINE int64_t get_exec_time() { return exec_time_; } - TO_STRING_KV(K_(id), K_(type), K_(first_in_ts), K_(first_out_ts), K_(last_in_ts), K_(last_out_ts), K_(counter), K_(exec_time)); + TO_STRING_KV(K_(id), K_(type), K_(first_in_ts), K_(first_out_ts), K_(last_in_ts), K_(last_out_ts), K_(counter), K_(exec_time), K_(eof)); private: static const int64_t FIRST_IN = 0x01; static const int64_t FIRST_OUT = 0x02; static const int64_t LAST_IN = 0x4; static const int64_t LAST_OUT = 0x08; + static const int64_t DTL_EOF = 0x10; bool enable_audit_; int64_t id_; @@ -108,6 +110,7 @@ private: int64_t last_out_ts_; int64_t counter_; + bool eof_; }; OB_INLINE void ObOpMetric::mark_first_in() @@ -126,22 +129,6 @@ OB_INLINE void ObOpMetric::mark_first_out() } } -OB_INLINE void ObOpMetric::mark_last_in() -{ - if (enable_audit_ && enable_audit_ && !(flag_ & LAST_IN)) { - last_in_ts_ = common::ObTimeUtility::current_time(); - flag_ |= LAST_IN; - } -} - -OB_INLINE void ObOpMetric::mark_last_out() -{ - if (enable_audit_ && !(flag_ & LAST_OUT)) { - last_out_ts_ = common::ObTimeUtility::current_time(); - flag_ |= LAST_OUT; - } -} - OB_INLINE void ObOpMetric::mark_interval_start(int64_t interval) { #ifndef NDEBUG @@ -188,6 +175,14 @@ OB_INLINE void ObOpMetric::mark_interval_end(int64_t *out_exec_time, int64_t int #endif } +OB_INLINE void ObOpMetric::mark_eof() +{ + if (enable_audit_ && !(flag_ & DTL_EOF)) { + eof_ = true; + flag_ |= DTL_EOF; + } +} + } // sql } // oceanbase diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp index 8b5c0239b0..a9592def02 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp @@ -514,7 +514,7 @@ int ObPxMSCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg) op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); } all_rows_finish_ = true; - metric_.mark_last_out(); + metric_.mark_eof(); } else if (row_heap_.capacity() == row_heap_.count()) { const ObChunkDatumStore::LastStoredRow *pop_row = nullptr; if (OB_FAIL(row_heap_.pop(pop_row))) { @@ -527,6 +527,7 @@ int ObPxMSCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg) } metric_.count(); metric_.mark_first_out(); + metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time()); } else if (row_heap_.capacity() > row_heap_.count()) { } else { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp index 817b85b760..0213676896 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_receive_op.cpp @@ -650,7 +650,7 @@ int ObPxMSReceiveOp::inner_get_next_row() if (0 == row_heap_.capacity()) { ret = OB_ITER_END; iter_end_ = true; - metric_.mark_last_out(); + metric_.mark_eof(); int release_ret = OB_SUCCESS; if (OB_SUCCESS != (release_ret = release_merge_inputs())) { LOG_WARN("failed to release merge sort and row store", K(ret), K(release_ret)); @@ -668,6 +668,7 @@ int ObPxMSReceiveOp::inner_get_next_row() } metric_.count(); metric_.mark_first_out(); + metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time()); } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid row heap state", K(row_heap_), K(ret)); diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index a3ca859ff6..0bc7302f01 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -254,6 +254,7 @@ int ObPxReceiveOp::init_channel( loop.register_processor(px_row_msg_proc) .register_interrupt_processor(interrupt_proc); loop.set_process_query_time(ctx_.get_my_session()->get_process_query_time()); + loop.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp()); ObPxSQCProxy *ch_provider = reinterpret_cast(recv_input.get_ch_provider()); int64_t batch_id = ctx_.get_px_batch_id(); const bool use_interm_result = ch_provider->get_recieve_use_interm_result(); @@ -826,6 +827,7 @@ int ObPxFifoReceiveOp::fetch_rows(const int64_t row_cnt) ret = get_rows_from_channels(row_cnt, timeout_ts - get_timestamp()); if (OB_SUCCESS == ret) { metric_.mark_first_out(); + metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time()); LOG_DEBUG("Got one row from channel", K(ret)); break; // got one row } else if (OB_ITER_END == ret) { @@ -833,7 +835,7 @@ int ObPxFifoReceiveOp::fetch_rows(const int64_t row_cnt) op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP; op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock(); } - metric_.mark_last_out(); + metric_.mark_eof(); LOG_TRACE("Got eof row from channel", K(ret)); break; } else if (OB_EAGAIN == ret) { diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index 50ffdb5197..593bd6e45d 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -332,6 +332,7 @@ int ObPxTransmitOp::init_channel(ObPxTransmitOpInput &trans_input) loop_.register_processor(dfc_unblock_msg_proc_) .register_interrupt_processor(interrupt_proc_); loop_.set_process_query_time(ctx_.get_my_session()->get_process_query_time()); + loop_.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp()); bool use_interm_result = false; int64_t px_batch_id = ctx_.get_px_batch_id(); ObPxSQCProxy *sqc_proxy = NULL; diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index e350a401b9..e9fbaef0ae 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -866,6 +866,7 @@ int ObPxCoordOp::receive_channel_root_dfo( msg_loop_.set_tenant_id(ctx.get_my_session()->get_effective_tenant_id()); msg_loop_.set_interm_result(enable_px_batch_rescan()); msg_loop_.set_process_query_time(ctx_.get_my_session()->get_process_query_time()); + msg_loop_.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp()); // root dfo 的 receive channel sets 在本机使用,不需要通过 DTL 发送 // 直接注册到 msg_loop 中收取数据即可 int64_t cnt = task_channels_.count(); @@ -937,6 +938,7 @@ int ObPxCoordOp::receive_channel_root_dfo( msg_loop_.set_tenant_id(ctx.get_my_session()->get_effective_tenant_id()); msg_loop_.set_interm_result(enable_px_batch_rescan()); msg_loop_.set_process_query_time(ctx_.get_my_session()->get_process_query_time()); + msg_loop_.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp()); // root dfo 的 receive channel sets 在本机使用,不需要通过 DTL 发送 // 直接注册到 msg_loop 中收取数据即可 int64_t cnt = task_channels_.count(); diff --git a/src/sql/engine/px/ob_px_sqc_proxy.cpp b/src/sql/engine/px/ob_px_sqc_proxy.cpp index 8a2d0d6e13..76cad037f0 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_proxy.cpp @@ -132,6 +132,7 @@ int ObPxSQCProxy::link_sqc_qc_channel(ObPxRpcInitSqcArgs &sqc_arg) const ObDtlBasicChannel *basic_channel = static_cast(sqc.get_sqc_channel()); sqc_ctx_.msg_loop_.set_tenant_id(basic_channel->get_tenant_id()); sqc_ctx_.msg_loop_.set_process_query_time(get_process_query_time()); + sqc_ctx_.msg_loop_.set_query_timeout_ts(get_query_timeout_ts()); LOG_TRACE("register sqc-qc channel", K(sqc)); } return ret; @@ -627,4 +628,13 @@ int64_t ObPxSQCProxy::get_process_query_time() return res; } +int64_t ObPxSQCProxy::get_query_timeout_ts() +{ + int64_t res = 0; + if (OB_NOT_NULL(sqc_arg_.exec_ctx_) && OB_NOT_NULL(sqc_arg_.exec_ctx_->get_physical_plan_ctx())) { + res = sqc_arg_.exec_ctx_->get_physical_plan_ctx()->get_timeout_timestamp(); + } + return res; +} + int64_t ObPxSQCProxy::get_task_count() const { return sqc_ctx_.get_task_count(); } diff --git a/src/sql/engine/px/ob_px_sqc_proxy.h b/src/sql/engine/px/ob_px_sqc_proxy.h index 170e9768c1..20b44efa4e 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_proxy.h @@ -179,6 +179,7 @@ private: bool need_receive_channel_map_via_dtl(int64_t child_dfo_id); int get_whole_msg_provider(uint64_t op_id, dtl::ObDtlMsgType msg_type, ObPxDatahubDataProvider *&provider); int64_t get_process_query_time(); + int64_t get_query_timeout_ts(); /* variables */ public: ObSqcCtx &sqc_ctx_;