fix process channels can not exit when peer is crashed && add diag for DTL process_base
This commit is contained in:
@ -53,6 +53,7 @@ void ObVirtualChannelInfo::get_info(ObDtlChannel* dtl_ch)
|
||||
thread_id_ = ch->get_thread_id();
|
||||
owner_mod_ = ch->get_owner_mod();
|
||||
peer_ = ch->get_peer();
|
||||
eof_ = metric.get_eof();
|
||||
}
|
||||
|
||||
int ObVirtualDtlChannelOp::operator()(ObDtlChannel *ch)
|
||||
@ -297,6 +298,10 @@ int ObAllVirtualDtlChannel::get_row(ObVirtualChannelInfo &chan_info, ObNewRow *&
|
||||
cells[cell_idx].set_int(chan_info.peer_.get_port());
|
||||
break;
|
||||
}
|
||||
case DTL_EOF: {
|
||||
cells[cell_idx].set_bool(chan_info.eof_);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected column id", K(col_id));
|
||||
|
||||
@ -30,7 +30,7 @@ public:
|
||||
is_local_(false), is_data_(false), is_transmit_(false), channel_id_(0), op_id_(-1), peer_id_(0), tenant_id_(0), alloc_buffer_cnt_(0),
|
||||
free_buffer_cnt_(0), send_buffer_cnt_(0), recv_buffer_cnt_(0), processed_buffer_cnt_(0), send_buffer_size_(0),
|
||||
hash_val_(0), buffer_pool_id_(0), pins_(0), first_in_ts_(0), first_out_ts_(0), last_in_ts_(0), last_out_ts_(0),
|
||||
state_(0), thread_id_(0), owner_mod_(0), peer_()
|
||||
state_(0), thread_id_(0), owner_mod_(0), peer_(), eof_(false)
|
||||
{}
|
||||
|
||||
void get_info(sql::dtl::ObDtlChannel* ch);
|
||||
@ -61,6 +61,7 @@ public:
|
||||
int64_t thread_id_;
|
||||
int64_t owner_mod_;
|
||||
ObAddr peer_;
|
||||
bool eof_;
|
||||
};
|
||||
|
||||
class ObVirtualDtlChannelOp
|
||||
@ -148,6 +149,7 @@ private:
|
||||
OWNER_MOD,
|
||||
PEER_IP, // OB_APP_MIN_COLUMN_ID + 25
|
||||
PEER_PORT, // OB_APP_MIN_COLUMN_ID + 26
|
||||
DTL_EOF,
|
||||
};
|
||||
int get_row(ObVirtualChannelInfo &chan_info, common::ObNewRow *&row);
|
||||
|
||||
|
||||
@ -5256,7 +5256,7 @@ int ObInnerTableSchema::all_virtual_dtl_channel_schema(ObTableSchema &table_sche
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA_TS("last_int_ts", //column_name
|
||||
ADD_COLUMN_SCHEMA_TS("last_in_ts", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
@ -5361,6 +5361,21 @@ int ObInnerTableSchema::all_virtual_dtl_channel_schema(ObTableSchema &table_sche
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("eof", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObTinyIntType, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
1, //column_length
|
||||
-1, //column_precision
|
||||
-1, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_schema.get_part_option().set_part_num(1);
|
||||
table_schema.set_part_level(PARTITION_LEVEL_ONE);
|
||||
|
||||
@ -8943,13 +8943,14 @@ def_table_schema(
|
||||
('pins', 'int'),
|
||||
('first_in_ts', 'timestamp'),
|
||||
('first_out_ts', 'timestamp'),
|
||||
('last_int_ts', 'timestamp'),
|
||||
('last_in_ts', 'timestamp'),
|
||||
('last_out_ts', 'timestamp'),
|
||||
('status', 'int'),
|
||||
('thread_id', 'int'),
|
||||
('owner_mod', 'int'),
|
||||
('peer_ip', 'varchar:MAX_IP_ADDR_LENGTH'),
|
||||
('peer_port', 'int'),
|
||||
('eof', 'bool'),
|
||||
],
|
||||
partition_columns = ['svr_ip', 'svr_port'],
|
||||
vtable_route_policy = 'distributed',
|
||||
|
||||
@ -328,8 +328,9 @@ int ObDtlBasicChannel::send(const ObDtlMsg &msg, int64_t timeout_ts,
|
||||
if (is_data_msg_) {
|
||||
metric_.mark_first_in();
|
||||
if (channel_is_eof_) {
|
||||
metric_.mark_last_in();
|
||||
metric_.mark_eof();
|
||||
}
|
||||
metric_.set_last_in_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
}
|
||||
if (OB_FAIL(write_msg(msg, timeout_ts, eval_ctx, is_eof))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
@ -427,8 +428,9 @@ int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_b
|
||||
if (is_data_msg) {
|
||||
metric_.mark_first_in();
|
||||
if (is_eof) {
|
||||
metric_.mark_last_in();
|
||||
metric_.mark_eof();
|
||||
}
|
||||
metric_.set_last_in_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
}
|
||||
if (is_first_buffer_cached) {
|
||||
set_first_buffer();
|
||||
@ -598,11 +600,14 @@ int ObDtlBasicChannel::process1(
|
||||
bool transferred = false;
|
||||
ret = proc->process(*buffer, transferred);
|
||||
LOG_DEBUG("process buffer", K(ret), KP(buffer), K(transferred));
|
||||
if (buffer->is_data_msg()) {
|
||||
metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
}
|
||||
if (OB_ITER_END == ret || transferred) {
|
||||
inc_processed_buffer_cnt();
|
||||
if (buffer->is_eof()) {
|
||||
if (buffer->is_data_msg()) {
|
||||
metric_.mark_last_out();
|
||||
metric_.mark_eof();
|
||||
}
|
||||
if (!is_eof()) {
|
||||
if (NULL != channel_loop_) {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "share/diagnosis/ob_sql_monitor_statname.h"
|
||||
#include "share/ob_server_blacklist.h"
|
||||
#include "observer/omt/ob_th_worker.h"
|
||||
#include "share/ob_occam_time_guard.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
|
||||
@ -47,7 +48,9 @@ ObDtlChannelLoop::ObDtlChannelLoop()
|
||||
eof_channel_cnt_(0),
|
||||
loop_times_(0),
|
||||
begin_wait_time_(0),
|
||||
process_query_time_(0)
|
||||
process_query_time_(0),
|
||||
last_dump_channel_time_(0),
|
||||
query_timeout_ts_(0)
|
||||
{
|
||||
op_monitor_info_.otherstat_5_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS_AFTER_DATA;
|
||||
op_monitor_info_.otherstat_6_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS;
|
||||
@ -78,7 +81,9 @@ ObDtlChannelLoop::ObDtlChannelLoop(ObMonitorNode &op_monitor_info)
|
||||
eof_channel_cnt_(0),
|
||||
loop_times_(0),
|
||||
begin_wait_time_(0),
|
||||
process_query_time_(0)
|
||||
process_query_time_(0),
|
||||
last_dump_channel_time_(0),
|
||||
query_timeout_ts_(0)
|
||||
{
|
||||
op_monitor_info_.otherstat_5_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS_AFTER_DATA;
|
||||
op_monitor_info_.otherstat_6_id_ = ObSqlMonitorStatIds::DTL_LOOP_TOTAL_MISS;
|
||||
@ -268,6 +273,32 @@ int ObDtlChannelLoop::process_base(ObIDltChannelLoopPred *pred, int64_t &hinted_
|
||||
}
|
||||
|
||||
++loop_times_;
|
||||
if ((loop_times_ & (INTERRUPT_CHECK_TIMES - 1)) == 0) {
|
||||
last_dump_channel_time_ = last_dump_channel_time_ < process_query_time_ ? process_query_time_ : last_dump_channel_time_;
|
||||
int64_t curr_time = ::oceanbase::common::ObTimeUtility::current_time();
|
||||
if (OB_UNLIKELY(curr_time - last_dump_channel_time_ >= static_cast<int64_t> (100_s))) {
|
||||
last_dump_channel_time_ = curr_time;
|
||||
LOG_WARN("dump channel loop info for query which active for more than 100 seconds", K(process_query_time_), K(curr_time), K(timeout), K(timeout_), K(query_timeout_ts_));
|
||||
int64_t idx = -1;
|
||||
int64_t last_in_msg_time = INT64_MAX;
|
||||
// Find a channel that has not received data for the longest time
|
||||
for (int64_t i = 0; i < chans_.count(); ++i) {
|
||||
if (nullptr != chans_.at(i)) {
|
||||
ObDtlBasicChannel *channel = static_cast<ObDtlBasicChannel *> (chans_.at(i));
|
||||
if (channel->get_op_metric().get_last_in_ts() < last_in_msg_time) {
|
||||
last_in_msg_time = channel->get_op_metric().get_last_in_ts();
|
||||
idx = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (-1 == idx) {
|
||||
LOG_WARN("no channel exists");
|
||||
} else {
|
||||
ObDtlBasicChannel *channel = static_cast<ObDtlBasicChannel *> (chans_.at(idx));
|
||||
LOG_WARN("dump channel info for query which active for more than 100 seconds", K(idx), K(channel->get_id()), K(channel->get_peer_id()), K(channel->get_peer()), K(channel->get_op_metric()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ignore_interrupt_) {
|
||||
// do nothing.
|
||||
} else if ((loop_times_ & (INTERRUPT_CHECK_TIMES - 1)) == 0 && OB_UNLIKELY(IS_INTERRUPTED())) {
|
||||
@ -328,7 +359,14 @@ int ObDtlChannelLoop::process_channels(ObIDltChannelLoopPred *pred, int64_t &nth
|
||||
LOG_WARN("unexpect next idx", K(next_idx_), K(chan_cnt), K(ret));
|
||||
} else {
|
||||
chan = chans_[next_idx_];
|
||||
if (nullptr == pred || pred->pred_process(next_idx_, chan)) {
|
||||
if (OB_UNLIKELY(share::ObServerBlacklist::get_instance().is_in_blacklist(
|
||||
share::ObCascadMember(chan->get_peer(), GCONF.cluster_id), true,
|
||||
get_process_query_time()))) {
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
LOG_WARN("peer no in communication, maybe crashed", K(ret), K(chan->get_peer()),
|
||||
K(static_cast<int64_t>(GCONF.cluster_id)));
|
||||
break;
|
||||
} else if (nullptr == pred || pred->pred_process(next_idx_, chan)) {
|
||||
if (OB_SUCC(chan->process1(&process_func_, 0, last_row_in_buffer))) {
|
||||
nth_channel = next_idx_;
|
||||
first_data_get_ = true;
|
||||
|
||||
@ -95,6 +95,8 @@ public:
|
||||
}
|
||||
int64_t get_process_query_time() const { return process_query_time_; }
|
||||
void set_process_query_time(int64_t process_query_time) { process_query_time_ = process_query_time; }
|
||||
int64_t get_query_timeout_ts() const { return query_timeout_ts_; }
|
||||
void set_query_timeout_ts(int64_t time) { query_timeout_ts_ = time; }
|
||||
private:
|
||||
int process_channels(ObIDltChannelLoopPred *pred, int64_t &nth_channel);
|
||||
int process_channel(int64_t &nth_channel);
|
||||
@ -146,6 +148,8 @@ private:
|
||||
int64_t loop_times_;
|
||||
int64_t begin_wait_time_; // use rdtsc to record begin dtl wait time
|
||||
int64_t process_query_time_;
|
||||
int64_t last_dump_channel_time_; // Used to track long-term active channels
|
||||
int64_t query_timeout_ts_;
|
||||
};
|
||||
|
||||
OB_INLINE void ObDtlChannelLoop::add_last_data_list(ObDtlChannel *ch)
|
||||
|
||||
@ -148,8 +148,9 @@ int ObDtlLocalChannel::send_shared_message(ObDtlLinkedBuffer *&buf)
|
||||
metric_.mark_first_out();
|
||||
}
|
||||
if (is_eof) {
|
||||
metric_.mark_last_out();
|
||||
metric_.mark_eof();
|
||||
}
|
||||
metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
}
|
||||
//统一返回消息
|
||||
msg_response_.on_finish(is_block, ret);
|
||||
|
||||
@ -256,8 +256,9 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer *&buffer)
|
||||
if (buffer->is_data_msg()) {
|
||||
metric_.mark_first_in();
|
||||
if (buffer->is_eof()) {
|
||||
metric_.mark_last_in();
|
||||
metric_.mark_eof();
|
||||
}
|
||||
metric_.set_last_in_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
}
|
||||
IGNORE_RETURN recv_sem_.signal();
|
||||
if (msg_watcher_ != nullptr) {
|
||||
@ -338,8 +339,9 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer *&buf)
|
||||
if (is_first) {
|
||||
metric_.mark_first_out();
|
||||
}
|
||||
metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
if (is_eof) {
|
||||
metric_.mark_last_out();
|
||||
metric_.mark_eof();
|
||||
set_eof();
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,4 +18,4 @@ using namespace oceanbase::common;
|
||||
using namespace oceanbase::sql;
|
||||
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObOpMetric, enable_audit_, id_, type_, first_in_ts_, first_out_ts_, last_in_ts_, last_out_ts_, counter_, exec_time_);
|
||||
OB_SERIALIZE_MEMBER(ObOpMetric, enable_audit_, id_, type_, first_in_ts_, first_out_ts_, last_in_ts_, last_out_ts_, counter_, exec_time_, eof_);
|
||||
|
||||
@ -28,7 +28,7 @@ class ObOpMetric
|
||||
public:
|
||||
ObOpMetric() :
|
||||
enable_audit_(false), id_(-1), type_(MetricType::DEFAULT_MAX), interval_cnt_(0), interval_start_time_(0), interval_end_time_(0),
|
||||
exec_time_(0), flag_(0), first_in_ts_(0), first_out_ts_(0), last_in_ts_(0), last_out_ts_(0), counter_(0)
|
||||
exec_time_(0), flag_(0), first_in_ts_(0), first_out_ts_(0), last_in_ts_(0), last_out_ts_(0), counter_(0), eof_(false)
|
||||
{}
|
||||
virtual ~ObOpMetric() {}
|
||||
|
||||
@ -48,6 +48,7 @@ public:
|
||||
last_in_ts_ = other.last_in_ts_;
|
||||
last_out_ts_ = other.last_out_ts_;
|
||||
counter_ = other.counter_;
|
||||
eof_ = other.eof_;
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -55,18 +56,18 @@ public:
|
||||
|
||||
void mark_first_in();
|
||||
void mark_first_out();
|
||||
void mark_last_in();
|
||||
void mark_last_out();
|
||||
void mark_eof();
|
||||
|
||||
OB_INLINE void set_first_in_ts(int64_t first_in_ts) { first_in_ts_ = first_in_ts; }
|
||||
OB_INLINE void set_first_out_ts(int64_t first_out_ts ) { first_out_ts_ = first_out_ts; }
|
||||
OB_INLINE void set_last_in_ts(int64_t last_in_ts) { last_in_ts_ = last_in_ts; }
|
||||
OB_INLINE void set_last_out_ts(int64_t last_out_ts) { last_out_ts_ = last_out_ts; }
|
||||
|
||||
OB_INLINE int64_t get_first_in_ts() { return first_in_ts_; }
|
||||
OB_INLINE int64_t get_first_out_ts() { return first_out_ts_; }
|
||||
OB_INLINE int64_t get_last_in_ts() { return last_in_ts_; }
|
||||
OB_INLINE int64_t get_last_out_ts() { return last_out_ts_; }
|
||||
OB_INLINE int64_t get_first_in_ts() const { return first_in_ts_; }
|
||||
OB_INLINE int64_t get_first_out_ts() const { return first_out_ts_; }
|
||||
OB_INLINE int64_t get_last_in_ts() const { return last_in_ts_; }
|
||||
OB_INLINE int64_t get_last_out_ts() const { return last_out_ts_; }
|
||||
OB_INLINE bool get_eof() const { return eof_; }
|
||||
|
||||
OB_INLINE void count() { ++counter_; }
|
||||
int64_t get_counter() { return counter_; }
|
||||
@ -83,12 +84,13 @@ public:
|
||||
void mark_interval_end(int64_t *out_exec_time = nullptr, int64_t interval = 1);
|
||||
OB_INLINE int64_t get_exec_time() { return exec_time_; }
|
||||
|
||||
TO_STRING_KV(K_(id), K_(type), K_(first_in_ts), K_(first_out_ts), K_(last_in_ts), K_(last_out_ts), K_(counter), K_(exec_time));
|
||||
TO_STRING_KV(K_(id), K_(type), K_(first_in_ts), K_(first_out_ts), K_(last_in_ts), K_(last_out_ts), K_(counter), K_(exec_time), K_(eof));
|
||||
private:
|
||||
static const int64_t FIRST_IN = 0x01;
|
||||
static const int64_t FIRST_OUT = 0x02;
|
||||
static const int64_t LAST_IN = 0x4;
|
||||
static const int64_t LAST_OUT = 0x08;
|
||||
static const int64_t DTL_EOF = 0x10;
|
||||
|
||||
bool enable_audit_;
|
||||
int64_t id_;
|
||||
@ -108,6 +110,7 @@ private:
|
||||
int64_t last_out_ts_;
|
||||
|
||||
int64_t counter_;
|
||||
bool eof_;
|
||||
};
|
||||
|
||||
OB_INLINE void ObOpMetric::mark_first_in()
|
||||
@ -126,22 +129,6 @@ OB_INLINE void ObOpMetric::mark_first_out()
|
||||
}
|
||||
}
|
||||
|
||||
OB_INLINE void ObOpMetric::mark_last_in()
|
||||
{
|
||||
if (enable_audit_ && enable_audit_ && !(flag_ & LAST_IN)) {
|
||||
last_in_ts_ = common::ObTimeUtility::current_time();
|
||||
flag_ |= LAST_IN;
|
||||
}
|
||||
}
|
||||
|
||||
OB_INLINE void ObOpMetric::mark_last_out()
|
||||
{
|
||||
if (enable_audit_ && !(flag_ & LAST_OUT)) {
|
||||
last_out_ts_ = common::ObTimeUtility::current_time();
|
||||
flag_ |= LAST_OUT;
|
||||
}
|
||||
}
|
||||
|
||||
OB_INLINE void ObOpMetric::mark_interval_start(int64_t interval)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
@ -188,6 +175,14 @@ OB_INLINE void ObOpMetric::mark_interval_end(int64_t *out_exec_time, int64_t int
|
||||
#endif
|
||||
}
|
||||
|
||||
OB_INLINE void ObOpMetric::mark_eof()
|
||||
{
|
||||
if (enable_audit_ && !(flag_ & DTL_EOF)) {
|
||||
eof_ = true;
|
||||
flag_ |= DTL_EOF;
|
||||
}
|
||||
}
|
||||
|
||||
} // sql
|
||||
} // oceanbase
|
||||
|
||||
|
||||
@ -514,7 +514,7 @@ int ObPxMSCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg)
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
all_rows_finish_ = true;
|
||||
metric_.mark_last_out();
|
||||
metric_.mark_eof();
|
||||
} else if (row_heap_.capacity() == row_heap_.count()) {
|
||||
const ObChunkDatumStore::LastStoredRow *pop_row = nullptr;
|
||||
if (OB_FAIL(row_heap_.pop(pop_row))) {
|
||||
@ -527,6 +527,7 @@ int ObPxMSCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg)
|
||||
}
|
||||
metric_.count();
|
||||
metric_.mark_first_out();
|
||||
metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
} else if (row_heap_.capacity() > row_heap_.count()) {
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
||||
@ -650,7 +650,7 @@ int ObPxMSReceiveOp::inner_get_next_row()
|
||||
if (0 == row_heap_.capacity()) {
|
||||
ret = OB_ITER_END;
|
||||
iter_end_ = true;
|
||||
metric_.mark_last_out();
|
||||
metric_.mark_eof();
|
||||
int release_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (release_ret = release_merge_inputs())) {
|
||||
LOG_WARN("failed to release merge sort and row store", K(ret), K(release_ret));
|
||||
@ -668,6 +668,7 @@ int ObPxMSReceiveOp::inner_get_next_row()
|
||||
}
|
||||
metric_.count();
|
||||
metric_.mark_first_out();
|
||||
metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid row heap state", K(row_heap_), K(ret));
|
||||
|
||||
@ -254,6 +254,7 @@ int ObPxReceiveOp::init_channel(
|
||||
loop.register_processor(px_row_msg_proc)
|
||||
.register_interrupt_processor(interrupt_proc);
|
||||
loop.set_process_query_time(ctx_.get_my_session()->get_process_query_time());
|
||||
loop.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp());
|
||||
ObPxSQCProxy *ch_provider = reinterpret_cast<ObPxSQCProxy *>(recv_input.get_ch_provider());
|
||||
int64_t batch_id = ctx_.get_px_batch_id();
|
||||
const bool use_interm_result = ch_provider->get_recieve_use_interm_result();
|
||||
@ -826,6 +827,7 @@ int ObPxFifoReceiveOp::fetch_rows(const int64_t row_cnt)
|
||||
ret = get_rows_from_channels(row_cnt, timeout_ts - get_timestamp());
|
||||
if (OB_SUCCESS == ret) {
|
||||
metric_.mark_first_out();
|
||||
metric_.set_last_out_ts(::oceanbase::common::ObTimeUtility::current_time());
|
||||
LOG_DEBUG("Got one row from channel", K(ret));
|
||||
break; // got one row
|
||||
} else if (OB_ITER_END == ret) {
|
||||
@ -833,7 +835,7 @@ int ObPxFifoReceiveOp::fetch_rows(const int64_t row_cnt)
|
||||
op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP;
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
metric_.mark_last_out();
|
||||
metric_.mark_eof();
|
||||
LOG_TRACE("Got eof row from channel", K(ret));
|
||||
break;
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
|
||||
@ -332,6 +332,7 @@ int ObPxTransmitOp::init_channel(ObPxTransmitOpInput &trans_input)
|
||||
loop_.register_processor(dfc_unblock_msg_proc_)
|
||||
.register_interrupt_processor(interrupt_proc_);
|
||||
loop_.set_process_query_time(ctx_.get_my_session()->get_process_query_time());
|
||||
loop_.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp());
|
||||
bool use_interm_result = false;
|
||||
int64_t px_batch_id = ctx_.get_px_batch_id();
|
||||
ObPxSQCProxy *sqc_proxy = NULL;
|
||||
|
||||
@ -866,6 +866,7 @@ int ObPxCoordOp::receive_channel_root_dfo(
|
||||
msg_loop_.set_tenant_id(ctx.get_my_session()->get_effective_tenant_id());
|
||||
msg_loop_.set_interm_result(enable_px_batch_rescan());
|
||||
msg_loop_.set_process_query_time(ctx_.get_my_session()->get_process_query_time());
|
||||
msg_loop_.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp());
|
||||
// root dfo 的 receive channel sets 在本机使用,不需要通过 DTL 发送
|
||||
// 直接注册到 msg_loop 中收取数据即可
|
||||
int64_t cnt = task_channels_.count();
|
||||
@ -937,6 +938,7 @@ int ObPxCoordOp::receive_channel_root_dfo(
|
||||
msg_loop_.set_tenant_id(ctx.get_my_session()->get_effective_tenant_id());
|
||||
msg_loop_.set_interm_result(enable_px_batch_rescan());
|
||||
msg_loop_.set_process_query_time(ctx_.get_my_session()->get_process_query_time());
|
||||
msg_loop_.set_query_timeout_ts(ctx_.get_physical_plan_ctx()->get_timeout_timestamp());
|
||||
// root dfo 的 receive channel sets 在本机使用,不需要通过 DTL 发送
|
||||
// 直接注册到 msg_loop 中收取数据即可
|
||||
int64_t cnt = task_channels_.count();
|
||||
|
||||
@ -132,6 +132,7 @@ int ObPxSQCProxy::link_sqc_qc_channel(ObPxRpcInitSqcArgs &sqc_arg)
|
||||
const ObDtlBasicChannel *basic_channel = static_cast<ObDtlBasicChannel*>(sqc.get_sqc_channel());
|
||||
sqc_ctx_.msg_loop_.set_tenant_id(basic_channel->get_tenant_id());
|
||||
sqc_ctx_.msg_loop_.set_process_query_time(get_process_query_time());
|
||||
sqc_ctx_.msg_loop_.set_query_timeout_ts(get_query_timeout_ts());
|
||||
LOG_TRACE("register sqc-qc channel", K(sqc));
|
||||
}
|
||||
return ret;
|
||||
@ -627,4 +628,13 @@ int64_t ObPxSQCProxy::get_process_query_time()
|
||||
return res;
|
||||
}
|
||||
|
||||
int64_t ObPxSQCProxy::get_query_timeout_ts()
|
||||
{
|
||||
int64_t res = 0;
|
||||
if (OB_NOT_NULL(sqc_arg_.exec_ctx_) && OB_NOT_NULL(sqc_arg_.exec_ctx_->get_physical_plan_ctx())) {
|
||||
res = sqc_arg_.exec_ctx_->get_physical_plan_ctx()->get_timeout_timestamp();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
int64_t ObPxSQCProxy::get_task_count() const { return sqc_ctx_.get_task_count(); }
|
||||
|
||||
@ -179,6 +179,7 @@ private:
|
||||
bool need_receive_channel_map_via_dtl(int64_t child_dfo_id);
|
||||
int get_whole_msg_provider(uint64_t op_id, dtl::ObDtlMsgType msg_type, ObPxDatahubDataProvider *&provider);
|
||||
int64_t get_process_query_time();
|
||||
int64_t get_query_timeout_ts();
|
||||
/* variables */
|
||||
public:
|
||||
ObSqcCtx &sqc_ctx_;
|
||||
|
||||
Reference in New Issue
Block a user