Files
oceanbase/src/sql/engine/subquery/ob_unpivot_op.cpp
2023-01-12 19:02:33 +08:00

291 lines
11 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/subquery/ob_unpivot_op.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
namespace sql
{
ObUnpivotSpec::ObUnpivotSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
: ObOpSpec(alloc, type), max_part_count_(-1), unpivot_info_(false, 0, 0, 0) {}
OB_SERIALIZE_MEMBER((ObUnpivotSpec, ObOpSpec), max_part_count_, unpivot_info_);
ObUnpivotOp::ObUnpivotOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(exec_ctx, spec, input), curr_part_idx_(-1), curr_cols_idx_(-1),
child_brs_(NULL), multiplex_(NULL) {}
int ObUnpivotOp::inner_open()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObOperator::inner_open())) {
LOG_WARN("failed to open in base class", K(ret));
} else if (MY_SPEC.is_vectorized()) {
int64_t size = MY_SPEC.max_batch_size_ * MY_SPEC.get_output_count() * sizeof(ObDatum);
multiplex_ = static_cast<ObDatum *>(ctx_.get_allocator().alloc(size));
if (size > 0 && OB_ISNULL(multiplex_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(size), K(multiplex_));
}
}
return ret;
}
int ObUnpivotOp::inner_close()
{
return ObOperator::inner_close();
}
int ObUnpivotOp::inner_rescan()
{
reset();
return ObOperator::inner_rescan();
}
int ObUnpivotOp::inner_get_next_row()
{
int ret = OB_SUCCESS;
bool got_next_row = false;
clear_evaluated_flag();
if (curr_part_idx_ < 0 || curr_part_idx_ >= MY_SPEC.max_part_count_) {
if (OB_FAIL(child_->get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row from child operator failed", K(ret));
}
} else {
curr_part_idx_ = 0;
got_next_row = true;
}
} else {
// do nothing
}
if (OB_SUCC(ret)) {
LOG_DEBUG("arrive unpivot", K(ret), K(child_->get_spec().output_), K(MY_SPEC.output_),
K(curr_part_idx_), K(MY_SPEC.max_part_count_), K(MY_SPEC.unpivot_info_));
}
if (OB_SUCC(ret) && !MY_SPEC.unpivot_info_.is_include_null_) {
bool need_try_next_part = true;
while (need_try_next_part && OB_SUCC(ret)) {
if (curr_part_idx_ >= MY_SPEC.max_part_count_) {
if (got_next_row) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("has already get next row once, maybe filter does not work", K(ret),
K(child_->get_spec().output_));
} else if (OB_FAIL(child_->get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row from child operator failed", K(ret));
}
} else {
curr_part_idx_ = 0;
got_next_row = true;
}
}
int64_t null_count = 0;
const int64_t base_idx = curr_part_idx_ * MY_SPEC.unpivot_info_.get_new_column_count()
+ MY_SPEC.unpivot_info_.old_column_count_
+ MY_SPEC.unpivot_info_.for_column_count_;
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.unpivot_info_.unpivot_column_count_; ++i) {
const int64_t input_idx = i + base_idx;
ObDatum *datum = NULL;
ObExpr *expr = child_->get_spec().output_[input_idx];
if (OB_FAIL(expr->eval(eval_ctx_, datum))) {
LOG_WARN("expr evaluate failed", K(ret), K(expr));
} else if (datum->is_null()) {
++null_count;
}
}
if (OB_SUCC(ret)) {
if (null_count == MY_SPEC.unpivot_info_.unpivot_column_count_) {
LOG_DEBUG("is null, try next row", K(ret), K(curr_part_idx_),
K(MY_SPEC.unpivot_info_), K(child_->get_spec().output_));
++curr_part_idx_;
} else {
need_try_next_part = false;
}
}
} // end of while
} // end of exclude null
if (OB_SUCC(ret)) {
// old_column is the same as child, pass
int64_t output_idx = MY_SPEC.unpivot_info_.old_column_count_;
const int64_t base_idx = curr_part_idx_ * MY_SPEC.unpivot_info_.get_new_column_count();
for (; OB_SUCC(ret) && output_idx < MY_SPEC.get_output_count(); ++output_idx) {
const int64_t input_idx = output_idx + base_idx;
ObDatum *datum = NULL;
ObExpr *expr = child_->get_spec().output_[input_idx];
if (OB_FAIL(expr->eval(eval_ctx_, datum))) {
LOG_WARN("expr evaluate failed", K(ret), K(expr));
} else {
MY_SPEC.output_[output_idx]->locate_expr_datum(eval_ctx_) = *datum;
MY_SPEC.output_[output_idx]->set_evaluated_projected(eval_ctx_);
}
}
}
if (OB_SUCC(ret)) {
LOG_DEBUG("output next row", K(curr_part_idx_), K(MY_SPEC.unpivot_info_),
K(child_->get_spec().output_), K(MY_SPEC.output_));
++curr_part_idx_;
}
return ret;
}
int ObUnpivotOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
const ObBatchRows *child_brs = nullptr;
int64_t batch_size = common::min(max_row_cnt, MY_SPEC.max_batch_size_);
clear_evaluated_flag();
if (curr_part_idx_ < 0 || curr_part_idx_ >= MY_SPEC.max_part_count_) {
const ObBatchRows *child_brs = nullptr;
if (OB_FAIL(child_->get_next_batch(batch_size, child_brs))) {
LOG_WARN("get next batch from child operator failed", K(ret));
} else if (child_brs->end_ && 0 == child_brs->size_) {
ret = OB_ITER_END;
} else {
brs_.size_ = child_brs->size_;
child_brs_ = child_brs;
curr_part_idx_ = 0;
curr_cols_idx_ = 0;
int64_t pos = 0;
MEMSET(multiplex_, 0, MY_SPEC.max_batch_size_ * MY_SPEC.get_output_count() * sizeof(ObDatum));
for (int64_t cols = 0; OB_SUCC(ret) && cols < MY_SPEC.get_output_count(); ++cols) {
ObExpr *child_expr = child_->get_spec().output_[cols];
ObDatum *child_datum = child_expr->locate_batch_datums(eval_ctx_);
if (child_expr->is_batch_result()) {
MEMCPY(multiplex_ + pos, child_datum, child_brs->size_ * sizeof(ObDatum));
pos += child_brs->size_;
} else {
for (int64_t rows = 0; rows < child_brs->size_; ++rows) {
MEMCPY(multiplex_ + pos++, child_datum, sizeof(ObDatum));
}
}
}
}
} else {
brs_.size_ = child_brs_->size_;
}
if (OB_SUCC(ret)) {
LOG_DEBUG("arrive unpivot batch", K(ret), K(child_->get_spec().output_), K(MY_SPEC.output_),
K(curr_part_idx_), K(MY_SPEC.max_part_count_), K(MY_SPEC.unpivot_info_));
}
for (int64_t read_piece = 0; OB_SUCC(ret) && read_piece < brs_.size_; ++read_piece) {
int64_t read_cur_row = (curr_part_idx_ * brs_.size_ + read_piece) / MY_SPEC.max_part_count_;
// check for_column whether is all null
if (OB_SUCC(ret) && !MY_SPEC.unpivot_info_.is_include_null_) {
int64_t null_count = 0;
const int64_t base_idx = curr_cols_idx_ * MY_SPEC.unpivot_info_.get_new_column_count()
+ MY_SPEC.unpivot_info_.old_column_count_
+ MY_SPEC.unpivot_info_.for_column_count_;
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.unpivot_info_.unpivot_column_count_; ++i) {
const int64_t input_idx = i + base_idx;
if (0 == curr_cols_idx_) {
if (multiplex_[input_idx * brs_.size_ + read_cur_row].is_null()) {
++null_count;
}
} else {
ObExpr *child_expr = child_->get_spec().output_[input_idx];
ObDatum &child_datum = child_expr->locate_expr_datum(eval_ctx_, read_cur_row);
if (child_datum.is_null()) {
++null_count;
}
}
}
if (OB_SUCC(ret) && null_count == MY_SPEC.unpivot_info_.unpivot_column_count_) {
brs_.skip_->set(read_piece);
curr_cols_idx_ = (curr_cols_idx_ + 1) % MY_SPEC.max_part_count_;
continue;
}
}
// old_column
int64_t output_idx = 0;
for (; OB_SUCC(ret) && output_idx < MY_SPEC.unpivot_info_.old_column_count_; ++output_idx) {
ObExpr *father_expr = MY_SPEC.output_[output_idx];
ObDatum *father_datum = father_expr->locate_batch_datums(eval_ctx_);
father_datum[read_piece] = multiplex_[output_idx * brs_.size_ + read_cur_row];
}
// new_column
if (0 == curr_cols_idx_) {
for (; OB_SUCC(ret) && output_idx < MY_SPEC.get_output_count(); ++output_idx) {
ObExpr *father_expr = MY_SPEC.output_[output_idx];
ObDatum *father_datum = father_expr->locate_batch_datums(eval_ctx_);
if (father_expr->is_batch_result()) {
father_datum[read_piece] = multiplex_[output_idx * brs_.size_ + read_cur_row];
} else {
father_datum[0] = multiplex_[output_idx * brs_.size_ + read_cur_row];
}
}
} else {
const int64_t base_idx = curr_cols_idx_ * MY_SPEC.unpivot_info_.get_new_column_count();
for (; OB_SUCC(ret) && output_idx < MY_SPEC.get_output_count(); ++output_idx) {
const int64_t input_idx = output_idx + base_idx;
ObExpr *child_expr = child_->get_spec().output_[input_idx];
ObExpr *father_expr = MY_SPEC.output_[output_idx];
ObDatum *child_datum = child_expr->locate_batch_datums(eval_ctx_);
ObDatum *father_datum = father_expr->locate_batch_datums(eval_ctx_);
if (OB_UNLIKELY(!father_expr->is_batch_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("output of unpivot should be batch result", K(ret), KPC(father_expr));
} else if (child_expr->is_batch_result()) {
father_datum[read_piece] = child_datum[read_cur_row];
} else {
father_datum[read_piece] = *child_datum;
}
}
}
curr_cols_idx_ = (curr_cols_idx_ + 1) % MY_SPEC.max_part_count_;
}
if (OB_SUCC(ret)) {
++curr_part_idx_;
if (curr_part_idx_ == MY_SPEC.max_part_count_) {
for (int64_t output_idx = 0; output_idx < MY_SPEC.get_output_count(); ++output_idx) {
ObExpr *father_expr = MY_SPEC.output_[output_idx];
father_expr->set_evaluated_projected(eval_ctx_);
}
}
} else if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
brs_.end_ = true;
brs_.size_ = 0;
}
return ret;
}
void ObUnpivotOp::destroy()
{
ObOperator::destroy();
}
void ObUnpivotOp::reset()
{
curr_part_idx_ = -1;
curr_cols_idx_ = -1;
child_brs_ = NULL;
memset(multiplex_, 0, MY_SPEC.max_batch_size_ * MY_SPEC.get_output_count() * sizeof(ObDatum));
}
} // end namespace sql
} // end namespace oceanbase