diff --git a/src/sql/engine/px/ob_px_row_store.cpp b/src/sql/engine/px/ob_px_row_store.cpp index bbf0008bf4..8279a04437 100644 --- a/src/sql/engine/px/ob_px_row_store.cpp +++ b/src/sql/engine/px/ob_px_row_store.cpp @@ -329,7 +329,14 @@ int ObReceiveRowReader::get_next_row(const ObIArray &exprs, ObEvalCtx & { int ret = OB_SUCCESS; if (NULL != datum_iter_) { - ret = datum_iter_->get_next_row(exprs, eval_ctx); + const ObChunkDatumStore::StoredRow *srow = NULL; + if (OB_FAIL(datum_iter_->get_next_row(srow))) { + if (OB_ITER_END != ret) { + LOG_WARN("get next stored row failed", K(ret)); + } + } else { + ret = to_expr(srow, exprs, eval_ctx); + } } else { free_iterated_buffers(); const ObChunkDatumStore::StoredRow *srow @@ -381,7 +388,18 @@ int ObReceiveRowReader::get_next_batch(const ObIArray &exprs, ret = OB_INVALID_ARGUMENT; LOG_WARN("NULL store rows", K(ret)); } else if (NULL != datum_iter_) { - ret = datum_iter_->get_next_batch(exprs, eval_ctx, max_rows, read_rows, srows); + if (max_rows > eval_ctx.max_batch_size_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(max_rows), K(eval_ctx.max_batch_size_)); + } else if (OB_FAIL(datum_iter_->get_next_batch(srows, max_rows, read_rows))) { + if (OB_ITER_END != ret) { + LOG_WARN("get next batch failed", K(ret), K(max_rows)); + } else { + read_rows = 0; + } + } else { + attach_rows(exprs, eval_ctx, srows, read_rows); + } } else { free_iterated_buffers(); read_rows = 0;