revert fix shared hash join + channel sync hang
This commit is contained in:
@ -1207,6 +1207,10 @@ int ObOperator::get_next_row()
|
|||||||
}
|
}
|
||||||
} else if (OB_ITER_END == ret) {
|
} else if (OB_ITER_END == ret) {
|
||||||
row_reach_end_ = true;
|
row_reach_end_ = true;
|
||||||
|
int tmp_ret = do_drain_exch();
|
||||||
|
if (OB_SUCCESS != tmp_ret) {
|
||||||
|
LOG_WARN("drain exchange data failed", K(tmp_ret));
|
||||||
|
}
|
||||||
if (got_first_row_) {
|
if (got_first_row_) {
|
||||||
op_monitor_info_.last_row_time_ = oceanbase::common::ObClockGenerator::getClock();
|
op_monitor_info_.last_row_time_ = oceanbase::common::ObClockGenerator::getClock();
|
||||||
}
|
}
|
||||||
@ -1227,12 +1231,6 @@ int ObOperator::get_next_row()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
|
||||||
int tmp_ret = do_drain_exch();
|
|
||||||
if (OB_SUCCESS != tmp_ret) {
|
|
||||||
LOG_WARN("drain exchange data failed", K(tmp_ret));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
end_ash_line_id_reg();
|
end_ash_line_id_reg();
|
||||||
end_cpu_time_counting();
|
end_cpu_time_counting();
|
||||||
return ret;
|
return ret;
|
||||||
@ -1241,7 +1239,6 @@ int ObOperator::get_next_row()
|
|||||||
int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows)
|
int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool need_drain = false;
|
|
||||||
begin_cpu_time_counting();
|
begin_cpu_time_counting();
|
||||||
begin_ash_line_id_reg();
|
begin_ash_line_id_reg();
|
||||||
|
|
||||||
@ -1369,7 +1366,10 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
|
|||||||
got_first_row_ = true;
|
got_first_row_ = true;
|
||||||
}
|
}
|
||||||
if (brs_.end_) {
|
if (brs_.end_) {
|
||||||
need_drain = true;
|
int tmp_ret = do_drain_exch();
|
||||||
|
if (OB_SUCCESS != tmp_ret) {
|
||||||
|
LOG_WARN("drain exchange data failed", K(tmp_ret));
|
||||||
|
}
|
||||||
op_monitor_info_.last_row_time_ = ObClockGenerator::getClock();
|
op_monitor_info_.last_row_time_ = ObClockGenerator::getClock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1395,12 +1395,7 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
|
|||||||
if (!spec_.use_rich_format_ && PHY_TABLE_SCAN != spec_.type_) {
|
if (!spec_.use_rich_format_ && PHY_TABLE_SCAN != spec_.type_) {
|
||||||
brs_.set_all_rows_active(false);
|
brs_.set_all_rows_active(false);
|
||||||
}
|
}
|
||||||
if (need_drain || OB_FAIL(ret)) {
|
|
||||||
int tmp_ret = do_drain_exch();
|
|
||||||
if (OB_SUCCESS != tmp_ret) {
|
|
||||||
LOG_WARN("drain exchange data failed", K(tmp_ret));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
end_ash_line_id_reg();
|
end_ash_line_id_reg();
|
||||||
end_cpu_time_counting();
|
end_cpu_time_counting();
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user