Fix bug that sleep wrongly when get dtl data
This commit is contained in:
@ -99,7 +99,6 @@ int ObPxFifoCoordOp::inner_get_next_row()
|
||||
msg_loop_.set_tenant_id(ctx_.get_my_session()->get_effective_tenant_id());
|
||||
first_row_fetched_ = true;
|
||||
}
|
||||
|
||||
bool wait_next_msg = true;
|
||||
while (OB_SUCC(ret) && wait_next_msg) {
|
||||
// SQC-QC control channel and TASKs-QC data channel are registered in loop.
|
||||
@ -191,6 +190,10 @@ int ObPxFifoCoordOp::next_row(bool& wait_next_msg)
|
||||
LOG_TRACE("All channel finish", "finish_ch_cnt", finish_ch_cnt_, K(ret));
|
||||
all_rows_finish_ = true;
|
||||
ret = OB_SUCCESS;
|
||||
if (GCONF.enable_sql_audit) {
|
||||
op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP;
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
}
|
||||
} else if (OB_SUCCESS == ret) {
|
||||
wait_next_msg = false;
|
||||
|
||||
@ -339,6 +339,10 @@ int ObPxMSCoordOp::next_row(bool& wait_next_msg)
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (0 == row_heap_.capacity()) {
|
||||
if (GCONF.enable_sql_audit) {
|
||||
op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP;
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
all_rows_finish_ = true;
|
||||
metric_.mark_last_out();
|
||||
} else if (row_heap_.capacity() == row_heap_.count()) {
|
||||
|
||||
@ -327,10 +327,12 @@ int ObPxFifoReceiveOp::inner_close()
|
||||
LOG_WARN("release dtl channel failed", K(release_channel_ret));
|
||||
}
|
||||
ObDTLIntermResultKey key;
|
||||
ObDtlBasicChannel* channel = NULL;
|
||||
ObDtlBasicChannel *channel = NULL;
|
||||
int64_t recv_cnt = 0;
|
||||
for (int i = 0; i < task_channels_.count(); ++i) {
|
||||
channel = static_cast<ObDtlBasicChannel*>(task_channels_.at(i));
|
||||
key.channel_id_ = channel->get_id();
|
||||
recv_cnt += channel->get_recv_buffer_cnt();
|
||||
if (channel->use_interm_result()) {
|
||||
release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key);
|
||||
if (release_channel_ret != common::OB_SUCCESS) {
|
||||
@ -338,7 +340,8 @@ int ObPxFifoReceiveOp::inner_close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT;
|
||||
op_monitor_info_.otherstat_3_value_ = recv_cnt;
|
||||
release_channel_ret = msg_loop_.unregister_all_channel();
|
||||
if (release_channel_ret != common::OB_SUCCESS) {
|
||||
// the following unlink actions is not safe is any unregister failure happened
|
||||
@ -378,6 +381,10 @@ int ObPxFifoReceiveOp::inner_get_next_row()
|
||||
LOG_DEBUG("Got one row from channel", K(ret));
|
||||
break; // got one row
|
||||
} else if (OB_ITER_END == ret) {
|
||||
if (GCONF.enable_sql_audit) {
|
||||
op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP;
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
metric_.mark_last_out();
|
||||
LOG_TRACE("Got eof row from channel", K(ret));
|
||||
break;
|
||||
|
||||
@ -253,6 +253,14 @@ int ObPxTransmitOp::inner_close()
|
||||
int ret = OB_SUCCESS;
|
||||
/* we must release channel even if there is some error happen before */
|
||||
chs_agent_.destroy();
|
||||
ObDtlBasicChannel *ch = nullptr;
|
||||
int64_t recv_cnt = 0;
|
||||
for (int i = 0; i < task_channels_.count(); ++i) {
|
||||
ch = static_cast<ObDtlBasicChannel *>(task_channels_.at(i));
|
||||
recv_cnt += ch->get_send_buffer_cnt();
|
||||
}
|
||||
op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT;
|
||||
op_monitor_info_.otherstat_3_value_ = recv_cnt;
|
||||
int release_channel_ret = loop_.unregister_all_channel();
|
||||
if (release_channel_ret != common::OB_SUCCESS) {
|
||||
// the following unlink actions is not safe is any unregister failure happened
|
||||
@ -336,6 +344,9 @@ int ObPxTransmitOp::send_eof_row()
|
||||
task_channels_, ch_info_, true, phy_plan_ctx->get_timeout_timestamp(), &eval_ctx_);
|
||||
if (OB_FAIL(eof_asyn_sender.asyn_send())) {
|
||||
LOG_WARN("failed to asyn send drain", K(ret), K(lbt()));
|
||||
} else if (GCONF.enable_sql_audit) {
|
||||
op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP;
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -459,6 +470,9 @@ int ObPxTransmitOp::broadcast_eof_row()
|
||||
LOG_WARN("unexpected NULL ptr", K(ret));
|
||||
} else if (OB_FAIL(chs_agent_.flush())) {
|
||||
LOG_WARN("fail flush row to slice channel", K(ret));
|
||||
} else if (GCONF.enable_sql_audit) {
|
||||
op_monitor_info_.otherstat_2_id_ = ObSqlMonitorStatIds::EXCHANGE_EOF_TIMESTAMP;
|
||||
op_monitor_info_.otherstat_2_value_ = oceanbase::common::ObClockGenerator::getClock();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -554,6 +554,14 @@ int ObPxCoordOp::destroy_all_channel()
|
||||
// note: must unregister channel from msg_loop first. This enable unlink_channel safely.
|
||||
// Otherwise, channnel may receive data while unlink_channel,
|
||||
// and result in memory leak or something else.
|
||||
int64_t recv_cnt = 0;
|
||||
ObDtlBasicChannel *ch = nullptr;
|
||||
for (int i = 0; i < task_channels_.count(); ++i) {
|
||||
ch = static_cast<ObDtlBasicChannel *>(task_channels_.at(i));
|
||||
recv_cnt += ch->get_recv_buffer_cnt();
|
||||
}
|
||||
op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT;
|
||||
op_monitor_info_.otherstat_3_value_ = recv_cnt;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = msg_loop_.unregister_all_channel())) {
|
||||
LOG_WARN("fail unregister channels from msg_loop. ignore", KR(tmp_ret));
|
||||
|
||||
Reference in New Issue
Block a user