fix bug: dynamic param not deep copy in recieve op

This commit is contained in:
obdev 2023-02-13 04:12:03 +00:00 committed by ob-robot
parent 947377240e
commit a5e5719fa7
8 changed files with 92 additions and 28 deletions

View File

@ -517,6 +517,8 @@ public:
OB_INLINE int eval_batch_param_value(ObEvalCtx &ctx, const ObBitVector &skip,
const int64_t size, TS &...args) const;
OB_INLINE int deep_copy_self_datum(ObEvalCtx &ctx) const;
// deep copy %datum to reserve buffer or new allocated buffer if reserved buffer is not enough.
OB_INLINE int deep_copy_datum(ObEvalCtx &ctx, const common::ObDatum &datum) const;
@ -1060,6 +1062,17 @@ OB_INLINE int ObExpr::eval_batch(ObEvalCtx &ctx,
return ret;
}
OB_INLINE int ObExpr::deep_copy_self_datum(ObEvalCtx &ctx) const
{
int ret = OB_SUCCESS;
const ObDatum &datum = locate_expr_datum(ctx);
if (OB_FAIL(deep_copy_datum(ctx, datum))) {
SQL_LOG(WARN, "fail to deep copy datum", K(ret), K(ctx), K(datum));
}
return ret;
}
OB_INLINE int ObExpr::deep_copy_datum(ObEvalCtx &ctx, const common::ObDatum &datum) const
{
int ret = common::OB_SUCCESS;

View File

@ -147,11 +147,13 @@ int ObPxFifoCoordOp::fetch_rows(const int64_t row_cnt)
clear_dynamic_const_parent_flag();
metric_.mark_interval_start();
if (!is_vectorized()) {
ret = row_reader_.get_next_row(MY_SPEC.child_exprs_, eval_ctx_);
ret = row_reader_.get_next_row(MY_SPEC.child_exprs_,
MY_SPEC.dynamic_const_exprs_,
eval_ctx_);
} else {
int64_t read_rows = 0;
ret = row_reader_.get_next_batch(MY_SPEC.child_exprs_, eval_ctx_,
row_cnt, read_rows, stored_rows_);
ret = row_reader_.get_next_batch(MY_SPEC.child_exprs_, MY_SPEC.dynamic_const_exprs_,
eval_ctx_, row_cnt, read_rows, stored_rows_);
brs_.size_ = read_rows;
}
metric_.mark_interval_end(&time_recorder_);

View File

@ -454,7 +454,7 @@ int ObPxMSCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg)
wait_next_msg = true;
LOG_TRACE("Begin next_row");
metric_.mark_interval_start();
ret = reader.get_next_row(MY_SPEC.child_exprs_, eval_ctx_);
ret = reader.get_next_row(MY_SPEC.child_exprs_, MY_SPEC.dynamic_const_exprs_, eval_ctx_);
metric_.mark_interval_end(&time_recorder_);
if (OB_ITER_END == ret) {
finish_ch_cnt_++;

View File

@ -367,8 +367,8 @@ int ObPxMSReceiveOp::GlobalOrderInput::get_one_row_from_channels(
while (OB_SUCC(ret) && ms_receive_op->row_reader_.has_more()) {
ms_receive_op->clear_evaluated_flag();
ms_receive_op->clear_dynamic_const_parent_flag();
if (OB_FAIL(ms_receive_op->row_reader_.get_next_row(
ms_receive_op->my_spec().child_exprs_, eval_ctx))) {
if (OB_FAIL(ms_receive_op->row_reader_.get_next_row(ms_receive_op->my_spec().child_exprs_,
ms_receive_op->my_spec().dynamic_const_exprs_, eval_ctx))) {
LOG_WARN("get row failed", K(ret));
} else {
processed_cnt_++;
@ -658,7 +658,10 @@ int ObPxMSReceiveOp::inner_get_next_row()
} else if (row_heap_.capacity() == row_heap_.count()) {
if (OB_FAIL(row_heap_.pop(store_row))) {
LOG_WARN("fail pop row from heap", K(ret));
} else if (OB_FAIL(ObReceiveRowReader::to_expr(store_row, MY_SPEC.all_exprs_, eval_ctx_))) {
} else if (OB_FAIL(ObReceiveRowReader::to_expr(store_row,
MY_SPEC.dynamic_const_exprs_,
MY_SPEC.all_exprs_,
eval_ctx_))) {
LOG_WARN("failed to convert store row", K(ret));
} else {
LOG_TRACE("trace output row", K(ret), K(ObToStringExprRow(eval_ctx_, MY_SPEC.all_exprs_)));
@ -773,7 +776,9 @@ int ObPxMSReceiveOp::get_all_rows_from_channels(
clear_evaluated_flag();
clear_dynamic_const_parent_flag();
// Get row to %child_exprs_ instead of %all_exprs_ which contain sort expressions
if (OB_FAIL(row_reader_.get_next_row(MY_SPEC.child_exprs_, eval_ctx_))) {
if (OB_FAIL(row_reader_.get_next_row(MY_SPEC.child_exprs_,
MY_SPEC.dynamic_const_exprs_,
eval_ctx_))) {
LOG_WARN("get row from reader failed", K(ret));
} else {
++processed_cnt_;

View File

@ -255,7 +255,7 @@ int ObPxOrderedCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg
wait_next_msg = true;
LOG_TRACE("Begin next_row");
metric_.mark_interval_start();
ret = reader.get_next_row(MY_SPEC.child_exprs_, eval_ctx_);
ret = reader.get_next_row(MY_SPEC.child_exprs_, MY_SPEC.dynamic_const_exprs_, eval_ctx_);
metric_.mark_interval_end(&time_recorder_);
if (OB_ITER_END == ret) {
finish_ch_cnt_++;

View File

@ -896,7 +896,9 @@ int ObPxFifoReceiveOp::get_rows_from_channels(const int64_t row_cnt, int64_t tim
clear_evaluated_flag();
clear_dynamic_const_parent_flag();
if (!is_vectorized()) {
if (OB_FAIL(row_reader_.get_next_row(MY_SPEC.child_exprs_, eval_ctx_))) {
if (OB_FAIL(row_reader_.get_next_row(MY_SPEC.child_exprs_,
MY_SPEC.dynamic_const_exprs_,
eval_ctx_))) {
LOG_WARN("get next row from row reader failed", K(ret));
} else {
got_row = true;
@ -904,8 +906,12 @@ int ObPxFifoReceiveOp::get_rows_from_channels(const int64_t row_cnt, int64_t tim
}
} else {
int64_t read_rows = 0;
if (OB_FAIL(row_reader_.get_next_batch(MY_SPEC.child_exprs_, eval_ctx_,
row_cnt, read_rows, stored_rows_))) {
if (OB_FAIL(row_reader_.get_next_batch(MY_SPEC.child_exprs_,
MY_SPEC.dynamic_const_exprs_,
eval_ctx_,
row_cnt,
read_rows,
stored_rows_))) {
LOG_WARN("get next batch failed", K(ret));
} else {
got_row = true;

View File

@ -310,6 +310,7 @@ int ObReceiveRowReader::get_next_row(common::ObNewRow &row)
}
int ObReceiveRowReader::to_expr(const ObChunkDatumStore::StoredRow *srow,
const ObIArray<ObExpr*> &dynamic_const_exprs,
const ObIArray<ObExpr*> &exprs,
ObEvalCtx &eval_ctx)
{
@ -321,11 +322,22 @@ int ObReceiveRowReader::to_expr(const ObChunkDatumStore::StoredRow *srow,
exprs.at(i)->locate_expr_datum(eval_ctx) = srow->cells()[i];
exprs.at(i)->set_evaluated_projected(eval_ctx);
}
// deep copy dynamic const expr datum
if (dynamic_const_exprs.count() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < dynamic_const_exprs.count(); i++) {
ObExpr *expr = dynamic_const_exprs.at(i);
if (OB_FAIL(expr->deep_copy_self_datum(eval_ctx))) {
LOG_WARN("fail to deep copy datum", K(ret), K(eval_ctx), K(*expr));
}
}
}
}
return ret;
}
int ObReceiveRowReader::get_next_row(const ObIArray<ObExpr*> &exprs, ObEvalCtx &eval_ctx)
int ObReceiveRowReader::get_next_row(const ObIArray<ObExpr*> &exprs,
const ObIArray<ObExpr*> &dynamic_const_exprs,
ObEvalCtx &eval_ctx)
{
int ret = OB_SUCCESS;
if (NULL != datum_iter_) {
@ -335,7 +347,7 @@ int ObReceiveRowReader::get_next_row(const ObIArray<ObExpr*> &exprs, ObEvalCtx &
LOG_WARN("get next stored row failed", K(ret));
}
} else {
ret = to_expr(srow, exprs, eval_ctx);
ret = to_expr(srow, dynamic_const_exprs, exprs, eval_ctx);
}
} else {
free_iterated_buffers();
@ -344,19 +356,23 @@ int ObReceiveRowReader::get_next_row(const ObIArray<ObExpr*> &exprs, ObEvalCtx &
if (NULL == srow) {
ret = OB_ITER_END;
} else {
ret = to_expr(srow, exprs, eval_ctx);
ret = to_expr(srow, dynamic_const_exprs, exprs, eval_ctx);
}
}
return ret;
}
void ObReceiveRowReader::attach_rows(const common::ObIArray<ObExpr*> &exprs,
ObEvalCtx &eval_ctx,
const ObChunkDatumStore::StoredRow **srows,
const int64_t read_rows)
int ObReceiveRowReader::attach_rows(const common::ObIArray<ObExpr*> &exprs,
const ObIArray<ObExpr*> &dynamic_const_exprs,
ObEvalCtx &eval_ctx,
const ObChunkDatumStore::StoredRow **srows,
const int64_t read_rows)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(srows)) {
// do nothing
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
for (int64_t col_idx = 0; col_idx < exprs.count(); col_idx++) {
ObExpr *e = exprs.at(col_idx);
@ -373,10 +389,26 @@ void ObReceiveRowReader::attach_rows(const common::ObIArray<ObExpr*> &exprs,
info.notnull_ = false;
info.point_to_frame_ = false;
}
// deep copy dynamic const expr datum
if (OB_SUCC(ret) && dynamic_const_exprs.count() > 0 && read_rows > 0) {
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(eval_ctx);
batch_info_guard.set_batch_size(read_rows);
batch_info_guard.set_batch_idx(0);
for (int64_t i = 0; OB_SUCC(ret) && i < dynamic_const_exprs.count(); i++) {
ObExpr *expr = dynamic_const_exprs.at(i);
OB_ASSERT(!expr->is_batch_result());
if (OB_FAIL(expr->deep_copy_self_datum(eval_ctx))) {
LOG_WARN("fail to deep copy datum", K(ret), K(eval_ctx), K(*expr));
}
}
}
}
return ret;
}
int ObReceiveRowReader::get_next_batch(const ObIArray<ObExpr*> &exprs,
const ObIArray<ObExpr*> &dynamic_const_exprs,
ObEvalCtx &eval_ctx,
const int64_t max_rows,
int64_t &read_rows,
@ -398,7 +430,7 @@ int ObReceiveRowReader::get_next_batch(const ObIArray<ObExpr*> &exprs,
read_rows = 0;
}
} else {
attach_rows(exprs, eval_ctx, srows, read_rows);
OZ(attach_rows(exprs, dynamic_const_exprs, eval_ctx, srows, read_rows));
}
} else {
free_iterated_buffers();
@ -412,7 +444,7 @@ int ObReceiveRowReader::get_next_batch(const ObIArray<ObExpr*> &exprs,
ret = OB_ITER_END;
} else {
LOG_DEBUG("read rows", K(read_rows), KP(this));
attach_rows(exprs, eval_ctx, srows, read_rows);
OZ(attach_rows(exprs, dynamic_const_exprs, eval_ctx, srows, read_rows));
}
}
return ret;

View File

@ -73,24 +73,30 @@ public:
}
static int to_expr(const ObChunkDatumStore::StoredRow *srow,
const ObIArray<ObExpr*> &dynamic_const_exprs,
const ObIArray<ObExpr*> &exprs,
ObEvalCtx &eval_ctx);
static void attach_rows(const common::ObIArray<ObExpr*> &exprs,
ObEvalCtx &eval_ctx,
const ObChunkDatumStore::StoredRow **srows,
const int64_t read_rows);
static int attach_rows(const common::ObIArray<ObExpr*> &exprs,
const ObIArray<ObExpr*> &dynamic_const_exprs,
ObEvalCtx &eval_ctx,
const ObChunkDatumStore::StoredRow **srows,
const int64_t read_rows);
// get row interface for PX_CHUNK_ROW
int get_next_row(common::ObNewRow &row);
// get row interface for PX_DATUM_ROW
int get_next_row(const ObIArray<ObExpr*> &exprs, ObEvalCtx &eval_ctx);
int get_next_row(const ObIArray<ObExpr*> &exprs,
const ObIArray<ObExpr*> &dynamic_const_exprs,
ObEvalCtx &eval_ctx);
// get next batch rows
// set read row count to %read_rows
// return OB_ITER_END and set %read_rows to zero for iterate end.
int get_next_batch(const ObIArray<ObExpr*> &exprs, ObEvalCtx &eval_ctx,
int get_next_batch(const ObIArray<ObExpr*> &exprs,
const ObIArray<ObExpr*> &dynamic_const_exprs,
ObEvalCtx &eval_ctx,
const int64_t max_rows, int64_t &read_rows,
const ObChunkDatumStore::StoredRow **srows);