remove ObOperator::get_next_row() in ObSelectIntoOp::inner_open()

This commit is contained in:
obdev 2022-12-23 07:37:51 +00:00 committed by ob-robot
parent 190932ebb3
commit 35eec7741d
2 changed files with 82 additions and 78 deletions

View File

@ -5752,6 +5752,7 @@ int ObStaticEngineCG::generate_spec(ObLogSelectInto &op, ObSelectIntoSpec &spec,
spec.into_type_ = op.get_into_type();
spec.closed_cht_ = op.get_closed_cht();
spec.is_optional_ = op.get_is_optional();
spec.plan_->need_drive_dml_query_ = true;
}
}
return ret;

View File

@ -137,24 +137,90 @@ int ObSelectIntoOp::inner_open()
}
}
}
return ret;
}
int ObSelectIntoOp::inner_get_next_row()
{
int ret = OB_SUCCESS;
int64_t row_count = 0;
const ObItemType into_type = MY_SPEC.into_type_;
ObPhysicalPlanCtx *phy_plan_ctx = NULL;
if (OB_ISNULL(phy_plan_ctx = ctx_.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get phy_plan_ctx failed", K(ret));
}
while (OB_SUCC(ret) && row_count < top_limit_cnt_) {
if (is_vectorized()) {
int64_t batch_size = MIN(spec_.max_batch_size_, top_limit_cnt_ - row_count);
const ObBatchRows *brs = NULL;
if (OB_FAIL(ObOperator::get_next_batch(batch_size, brs))) {
LOG_WARN("get next batch failed", K(ret));
} else if (brs->size_ > 0) {
row_count += brs->size_ - brs->skip_->accumulate_bit_cnt(brs->size_);
clear_evaluated_flag();
if (OB_FAIL(child_->get_next_row())) {
if (OB_LIKELY(OB_ITER_END == ret)) {
} else {
LOG_WARN("get next row failed", K(ret));
}
} else {
++row_count;
if (T_INTO_VARIABLES == into_type) {
if (OB_FAIL(into_varlist())) {
LOG_WARN("into varlist failed", K(ret));
}
} else if (T_INTO_OUTFILE == into_type) {
if (OB_FAIL(into_outfile())) {
LOG_WARN("into outfile failed", K(ret));
}
} else {
if (OB_FAIL(into_dumpfile())) {
LOG_WARN("into dumpfile failed", K(ret));
}
}
}
if (OB_SUCC(ret) || OB_ITER_END == ret) { // if into user variables or into dumpfile, must be one row
if ((T_INTO_VARIABLES == into_type || T_INTO_DUMPFILE == into_type) && row_count > 1) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("more than one row for into variables or into dumpfile", K(ret), K(row_count));
}
}
} //end while
if (OB_ITER_END == ret || OB_SUCC(ret)) { // set affected rows
phy_plan_ctx->set_affected_rows(row_count);
}
return ret;
}
int ObSelectIntoOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
const ObBatchRows *child_brs = NULL;
int64_t batch_size = min(max_row_cnt, MY_SPEC.max_batch_size_);
int64_t row_count = 0;
const ObItemType into_type = MY_SPEC.into_type_;
ObPhysicalPlanCtx *phy_plan_ctx = NULL;
if (OB_ISNULL(phy_plan_ctx = ctx_.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get phy_plan_ctx failed", K(ret));
}
bool stop_loop = false;
bool is_iter_end = false;
while (OB_SUCC(ret) && !stop_loop) {
clear_evaluated_flag();
int64_t rowkey_batch_size = min(batch_size, top_limit_cnt_ - row_count);
if (OB_FAIL(child_->get_next_batch(rowkey_batch_size, child_brs))) {
LOG_WARN("get next batch failed", K(ret));
} else {
brs_.size_ = child_brs->size_;
brs_.end_ = child_brs->end_;
is_iter_end = brs_.end_ && 0 == brs_.size_;
if (brs_.size_ > 0) {
brs_.skip_->deep_copy(*(child_brs->skip_), brs_.size_);
row_count += brs_.size_ - brs_.skip_->accumulate_bit_cnt(brs_.size_);
if (T_INTO_OUTFILE == into_type) {
if (OB_FAIL(into_outfile_batch(*brs))) {
if (OB_FAIL(into_outfile_batch(brs_))) {
LOG_WARN("into outfile batch failed", K(ret));
}
} else {
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_size(brs->size_);
for (int64_t i = 0; OB_SUCC(ret) && i < brs->size_; i++) {
if (brs->skip_->contain(i)) {
guard.set_batch_size(brs_.size_);
for (int64_t i = 0; OB_SUCC(ret) && i < brs_.size_; i++) {
if (brs_.skip_->contain(i)) {
continue;
}
guard.set_batch_idx(i);
@ -170,86 +236,23 @@ int ObSelectIntoOp::inner_open()
}
}
}
if (OB_SUCC(ret) && brs->end_) {
ret = OB_ITER_END;
}
} else {
if (OB_FAIL(ObOperator::get_next_row())) {
if (OB_LIKELY(OB_ITER_END == ret)) {
} else {
LOG_WARN("get next row failed", K(ret));
}
} else {
++row_count;
if (T_INTO_VARIABLES == into_type) {
if (OB_FAIL(into_varlist())) {
LOG_WARN("into varlist failed", K(ret));
}
} else if (T_INTO_OUTFILE == into_type) {
if (OB_FAIL(into_outfile())) {
LOG_WARN("into outfile failed", K(ret));
}
} else {
if (OB_FAIL(into_dumpfile())) {
LOG_WARN("into dumpfile failed", K(ret));
}
}
}
}
if (OB_SUCC(ret) || OB_ITER_END == ret) { // if into user variables or into dumpfile, must be one row
if (is_iter_end || row_count >= top_limit_cnt_) {
stop_loop = true;
}
if (OB_SUCC(ret) || is_iter_end) { // if into user variables or into dumpfile, must be one row
if ((T_INTO_VARIABLES == into_type || T_INTO_DUMPFILE == into_type) && row_count > 1) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("more than one row for into variables or into dumpfile", K(ret), K(row_count));
}
}
} //end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) { // set affected rows
phy_plan_ctx->set_affected_rows(row_count);
}
return ret;
}
int ObSelectIntoOp::inner_get_next_row()
{
int ret = OB_SUCCESS;
clear_evaluated_flag();
if (OB_FAIL(child_->get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row failed", K(ret));
} else {
// OB_ITER_END
}
}
return ret;
}
int ObSelectIntoOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
clear_evaluated_flag();
const ObBatchRows *child_brs = NULL;
if (OB_FAIL(child_->get_next_batch(max_row_cnt, child_brs))) {
LOG_WARN("get next batch failed", K(ret));
} else {
brs_.size_ = child_brs->size_;
brs_.end_ = child_brs->end_;
if (brs_.size_ > 0) {
brs_.skip_->deep_copy(*(child_brs->skip_), brs_.size_);
const ObIArray<ObExpr*> &select_exprs = MY_SPEC.output_;
for (int64_t i = 0; i < select_exprs.count() && OB_SUCC(ret); i++) {
if (OB_FAIL(select_exprs.at(i)->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_))) {
LOG_WARN("eval expr failed", K(ret));
}
}
}
}
return ret;
}
int ObSelectIntoOp::inner_rescan()
{
int ret = OB_SUCCESS;