fix inaccurate db_time.

This commit is contained in:
raywill
2023-06-12 07:24:29 +00:00
committed by ob-robot
parent 624aed41d5
commit bb98de773c
12 changed files with 41 additions and 101 deletions

View File

@ -754,38 +754,10 @@ int ObHashJoinOp::do_sync_wait_all()
return ret; return ret;
} }
// copy ObOperator::drain_exch // need add sync to wait exit for shared hash join
// It's same as the base operator, but only need add sync to wait exit for shared hash join int ObHashJoinOp::inner_drain_exch()
int ObHashJoinOp::drain_exch()
{ {
int ret = OB_SUCCESS; return do_sync_wait_all();
int tmp_ret = OB_SUCCESS;
/**
* 1. try to open this operator
* 2. try to drain all children
*/
if (OB_FAIL(try_open())) {
LOG_WARN("fail to open operator", K(ret));
} else if (!exch_drained_) {
// don't sync to wait all when operator call drain_exch by self
tmp_ret = do_sync_wait_all();
exch_drained_ = true;
for (int64_t i = 0; i < child_cnt_ && OB_SUCC(ret); i++) {
if (OB_ISNULL(children_[i])) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL child found", K(ret), K(i));
} else if (OB_FAIL(children_[i]->drain_exch())) {
LOG_WARN("drain exch failed", K(ret));
}
}
if (OB_SUCC(ret)) {
ret = tmp_ret;
if (OB_FAIL(ret)) {
LOG_WARN("failed to do sync wait all", K(ret));
}
}
}
return ret;
} }
int ObHashJoinOp::next() int ObHashJoinOp::next()

View File

@ -787,9 +787,7 @@ private:
public: public:
virtual int inner_open() override; virtual int inner_open() override;
virtual int inner_rescan() override; virtual int inner_rescan() override;
virtual int inner_drain_exch() override;
virtual int drain_exch() override;
virtual int inner_get_next_row() override; virtual int inner_get_next_row() override;
virtual int inner_get_next_batch(const int64_t max_row_cnt) override; virtual int inner_get_next_batch(const int64_t max_row_cnt) override;
virtual void destroy() override; virtual void destroy() override;

View File

@ -450,40 +450,13 @@ int ObJoinFilterOp::mark_not_need_send_bf_msg()
return ret; return ret;
} }
// copy ObOperator::drain_exch // need add mark_not_need_send_bf_msg for shared shuffled bloom filter
// It's same as the base operator, but only need add mark_not_need_send_bf_msg for shared shuffled bloom filter int ObJoinFilterOp::inner_drain_exch()
int ObJoinFilterOp::drain_exch()
{ {
uint64_t cpu_begin_time = rdtsc();
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; if (MY_SPEC.is_create_mode()) {
/** ret = mark_not_need_send_bf_msg();
* 1. try to open this operator
* 2. try to drain all children
*/
if (OB_FAIL(try_open())) {
LOG_WARN("fail to open operator", K(ret));
} else if (!exch_drained_) {
if (MY_SPEC.is_create_mode()) {
tmp_ret = mark_not_need_send_bf_msg();
}
exch_drained_ = true;
for (int64_t i = 0; i < child_cnt_ && OB_SUCC(ret); i++) {
if (OB_ISNULL(children_[i])) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL child found", K(ret), K(i));
} else if (OB_FAIL(children_[i]->drain_exch())) {
LOG_WARN("drain exch failed", K(ret));
}
}
if (OB_SUCC(ret)) {
ret = tmp_ret;
if (OB_FAIL(ret)) {
LOG_WARN("failed to mark_not_need_send_bf_msg", K(ret));
}
}
} }
total_time_ += (rdtsc() - cpu_begin_time);
return ret; return ret;
} }

View File

@ -221,7 +221,7 @@ public:
virtual int inner_rescan() override; virtual int inner_rescan() override;
virtual int inner_get_next_row() override; virtual int inner_get_next_row() override;
virtual int inner_get_next_batch(const int64_t max_row_cnt) override; // for batch virtual int inner_get_next_batch(const int64_t max_row_cnt) override; // for batch
virtual int drain_exch() override; virtual int inner_drain_exch() override;
virtual void destroy() override { virtual void destroy() override {
lucky_devil_champions_.reset(); lucky_devil_champions_.reset();
local_rf_msgs_.reset(); local_rf_msgs_.reset();

View File

@ -1095,7 +1095,7 @@ int ObOperator::get_next_row()
} }
} else if (OB_ITER_END == ret) { } else if (OB_ITER_END == ret) {
row_reach_end_ = true; row_reach_end_ = true;
int tmp_ret = drain_exch(); int tmp_ret = do_drain_exch();
if (OB_SUCCESS != tmp_ret) { if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret)); LOG_WARN("drain exchange data failed", K(tmp_ret));
} }
@ -1235,7 +1235,7 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
got_first_row_ = true; got_first_row_ = true;
} }
if (brs_.end_) { if (brs_.end_) {
int tmp_ret = drain_exch(); int tmp_ret = do_drain_exch();
if (OB_SUCCESS != tmp_ret) { if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret)); LOG_WARN("drain exchange data failed", K(tmp_ret));
} }
@ -1329,7 +1329,15 @@ int ObOperator::filter_batch_rows(const ObExprPtrIArray &exprs,
// copy ObPhyOperator::drain_exch // copy ObPhyOperator::drain_exch
int ObOperator::drain_exch() int ObOperator::drain_exch()
{ {
int ret = OB_SUCCESS;
uint64_t cpu_begin_time = rdtsc(); uint64_t cpu_begin_time = rdtsc();
ret = do_drain_exch();
total_time_ += (rdtsc() - cpu_begin_time);
return ret;
}
int ObOperator::do_drain_exch()
{
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
/** /**
* 1. try to open this operator * 1. try to open this operator
@ -1338,21 +1346,25 @@ int ObOperator::drain_exch()
if (OB_FAIL(try_open())) { if (OB_FAIL(try_open())) {
LOG_WARN("fail to open operator", K(ret)); LOG_WARN("fail to open operator", K(ret));
} else if (!exch_drained_) { } else if (!exch_drained_) {
int tmp_ret = inner_drain_exch();
exch_drained_ = true; exch_drained_ = true;
for (int64_t i = 0; i < child_cnt_ && OB_SUCC(ret); i++) { if (!spec_.is_receive()) {
if (OB_ISNULL(children_[i])) { for (int64_t i = 0; i < child_cnt_ && OB_SUCC(ret); i++) {
ret = OB_ERR_UNEXPECTED; if (OB_ISNULL(children_[i])) {
LOG_WARN("NULL child found", K(ret), K(i)); ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(children_[i]->drain_exch())) { LOG_WARN("NULL child found", K(ret), K(i));
LOG_WARN("drain exch failed", K(ret)); } else if (OB_FAIL(children_[i]->drain_exch())) {
LOG_WARN("drain exch failed", K(ret));
}
} }
} }
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
} }
total_time_ += (rdtsc() - cpu_begin_time);
return ret; return ret;
} }
int ObOperator::get_real_child(ObOperator *&child, const int32_t child_idx) int ObOperator::get_real_child(ObOperator *&child, const int32_t child_idx)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -501,7 +501,6 @@ public:
ObBatchRows &get_brs() { return brs_; } ObBatchRows &get_brs() { return brs_; }
// Drain exchange in data for PX, or producer DFO will be blocked. // Drain exchange in data for PX, or producer DFO will be blocked.
virtual int drain_exch(); virtual int drain_exch();
void set_pushdown_param_null(const common::ObIArray<ObDynamicParamSetter> &rescan_params); void set_pushdown_param_null(const common::ObIArray<ObDynamicParamSetter> &rescan_params);
void set_feedback_node_idx(int64_t idx) void set_feedback_node_idx(int64_t idx)
{ fb_node_idx_ = idx; } { fb_node_idx_ = idx; }
@ -522,6 +521,7 @@ protected:
// try open operator // try open operator
int try_open() { return opened_ ? common::OB_SUCCESS : open(); } int try_open() { return opened_ ? common::OB_SUCCESS : open(); }
virtual void do_clear_datum_eval_flag(); virtual void do_clear_datum_eval_flag();
void clear_batch_end_flag() { brs_.end_ = false; } void clear_batch_end_flag() { brs_.end_ = false; }
inline void reset_batchrows() inline void reset_batchrows()
@ -571,6 +571,9 @@ private:
int output_expr_sanity_check(); int output_expr_sanity_check();
int output_expr_sanity_check_batch(); int output_expr_sanity_check_batch();
int setup_op_feedback_info(); int setup_op_feedback_info();
int do_drain_exch();
// child can implement this interface, but can't call this directly
virtual int inner_drain_exch() { return common::OB_SUCCESS; };
protected: protected:
const ObOpSpec &spec_; const ObOpSpec &spec_;
ObExecContext &ctx_; ObExecContext &ctx_;

View File

@ -390,15 +390,11 @@ int ObPxReceiveOp::inner_rescan()
return ret; return ret;
} }
int ObPxReceiveOp::drain_exch() int ObPxReceiveOp::inner_drain_exch()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
uint64_t version = -1; uint64_t version = -1;
if (OB_FAIL(try_open())) { if (iter_end_) {
LOG_WARN("get operator ctx failed", K(ret));
} else if (exch_drained_) {
// has been drained, do noting.
} else if (iter_end_) {
exch_drained_ = true; exch_drained_ = true;
} else if (!exch_drained_) { } else if (!exch_drained_) {
if (IS_PX_COORD(get_spec().get_type())) { if (IS_PX_COORD(get_spec().get_type())) {

View File

@ -114,7 +114,7 @@ public:
virtual int init_dfc(dtl::ObDtlDfoKey &key); virtual int init_dfc(dtl::ObDtlDfoKey &key);
bool channel_linked() { return channel_linked_; } bool channel_linked() { return channel_linked_; }
virtual int drain_exch() override; virtual int inner_drain_exch() override;
int active_all_receive_channel(); int active_all_receive_channel();
int link_ch_sets(ObPxTaskChSet &ch_set, int link_ch_sets(ObPxTaskChSet &ch_set,
common::ObIArray<dtl::ObDtlChannel*> &channels, common::ObIArray<dtl::ObDtlChannel*> &channels,

View File

@ -563,7 +563,6 @@ int ObPxTransmitOp::send_rows_one_by_one(ObSliceIdxCalc &slice_calc)
clear_evaluated_flag(); clear_evaluated_flag();
const ObPxTransmitSpec &spec = static_cast<const ObPxTransmitSpec &>(get_spec()); const ObPxTransmitSpec &spec = static_cast<const ObPxTransmitSpec &>(get_spec());
ret = next_row(); ret = next_row();
uint64_t begin_cpu_time = rdtsc(); // must place after next_row()
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
if (OB_UNLIKELY(OB_ITER_END != ret)) { if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next row from child op", LOG_WARN("fail to get next row from child op",
@ -604,7 +603,7 @@ int ObPxTransmitOp::send_rows_one_by_one(ObSliceIdxCalc &slice_calc)
get_spec().output_, eval_ctx_, slice_idx_array))) { get_spec().output_, eval_ctx_, slice_idx_array))) {
LOG_WARN("fail get slice idx", K(ret)); LOG_WARN("fail get slice idx", K(ret));
} else if (dfc_.all_ch_drained()) { } else if (dfc_.all_ch_drained()) {
int tmp_ret = drain_exch(); int tmp_ret = child_->drain_exch();
if (OB_SUCCESS != tmp_ret) { if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret)); LOG_WARN("drain exchange data failed", K(tmp_ret));
} }
@ -643,9 +642,8 @@ int ObPxTransmitOp::send_rows_in_batch(ObSliceIdxCalc &slice_calc)
LOG_WARN("fetch next rows failed", K(ret)); LOG_WARN("fetch next rows failed", K(ret));
break; break;
} }
uint64_t begin_cpu_time = rdtsc();
if (dfc_.all_ch_drained()) { if (dfc_.all_ch_drained()) {
int tmp_ret = drain_exch(); int tmp_ret = child_->drain_exch();
if (OB_SUCCESS != tmp_ret) { if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret)); LOG_WARN("drain exchange data failed", K(tmp_ret));
} }

View File

@ -103,7 +103,7 @@ public:
return common::OB_ITER_END; return common::OB_ITER_END;
} }
virtual int drain_exch() override virtual int inner_drain_exch() override
{ {
// Drain exchange is used in parallelism execution, // Drain exchange is used in parallelism execution,
// do nothing for old fashion distributed execution. // do nothing for old fashion distributed execution.

View File

@ -1036,17 +1036,6 @@ int ObPxCoordOp::init_batch_info()
return ret; return ret;
} }
int ObPxCoordOp::drain_exch()
{
int ret = OB_SUCCESS;
if (enable_px_batch_rescan()) {
// do nothing
} else if (OB_FAIL(ObPxReceiveOp::drain_exch())) {
LOG_WARN("fail to drain exch", K(ret));
}
return ret;
}
int ObPxCoordOp::batch_rescan() int ObPxCoordOp::batch_rescan()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -89,7 +89,6 @@ public:
ObExecContext &ctx, ObDfo &parent, dtl::ObDtlChTotalInfo &ch_info) override; ObExecContext &ctx, ObDfo &parent, dtl::ObDtlChTotalInfo &ch_info) override;
virtual int notify_peers_mock_eof( virtual int notify_peers_mock_eof(
ObDfo *dfo, int64_t timeout_ts, common::ObAddr addr) const; ObDfo *dfo, int64_t timeout_ts, common::ObAddr addr) const;
virtual int drain_exch() override;
protected: protected:
virtual int free_allocator() { return common::OB_SUCCESS; } virtual int free_allocator() { return common::OB_SUCCESS; }
/* destroy all channel */ /* destroy all channel */