remove unused code

This commit is contained in:
nroskill
2023-05-22 03:41:34 +00:00
committed by ob-robot
parent f8699503fa
commit b6cb96f6ce
123 changed files with 28 additions and 21432 deletions

View File

@ -1,599 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/basic/ob_limit.h"
#include "lib/utility/utility.h"
#include "share/object/ob_obj_cast.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/engine/ob_physical_plan.h"
#include "sql/engine/expr/ob_sql_expression.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
using namespace common;
namespace sql
{
//REGISTER_PHY_OPERATOR(ObLimit, PHY_LIMIT);
class ObLimit::ObLimitCtx : public ObPhyOperatorCtx
{
public:
explicit ObLimitCtx(ObExecContext &ctx)
: ObPhyOperatorCtx(ctx),
limit_(-1),
offset_(0),
input_count_(0),
output_count_(0),
total_count_(0),
is_percent_first_(false),
limit_last_row_(NULL)
{
}
virtual void destroy() { ObPhyOperatorCtx::destroy_base(); }
private:
int64_t limit_;
int64_t offset_;
int64_t input_count_;
int64_t output_count_;
int64_t total_count_;
bool is_percent_first_;
ObNewRow *limit_last_row_;
friend class ObLimit;
};
ObLimit::ObLimit(ObIAllocator &alloc)
: ObSingleChildPhyOperator(alloc),
org_limit_(NULL),
org_offset_(NULL),
org_percent_(NULL),
is_calc_found_rows_(false),
is_top_limit_(false),
is_fetch_with_ties_(false),
sort_columns_(alloc)
{
}
ObLimit::~ObLimit()
{
reset();
}
void ObLimit::reset()
{
org_limit_ = NULL;
org_offset_ = NULL;
org_percent_ = NULL;
is_calc_found_rows_ = false;
is_top_limit_ = false;
is_fetch_with_ties_ = false;
sort_columns_.reset();
ObSingleChildPhyOperator::reset();
}
void ObLimit::reuse()
{
reset();
}
int ObLimit::set_limit(ObSqlExpression *limit, ObSqlExpression *offset, ObSqlExpression *percent)
{
int ret = OB_SUCCESS;
if (limit) {
org_limit_ = limit;
}
if (offset) {
org_offset_ = offset;
}
if (percent) {
org_percent_ = percent;
}
return ret;
}
int ObLimit::get_int_value(ObExecContext &ctx,
const ObSqlExpression *in_val,
int64_t &out_val,
bool &is_null_value) const
{
int ret = OB_SUCCESS;
ObNewRow input_row;
ObObj result;
ObExprCtx expr_ctx;
is_null_value = false;
if (in_val != NULL && !in_val->is_empty()) {
if (OB_FAIL(wrap_expr_ctx(ctx, expr_ctx))) {
LOG_WARN("wrap expr context failed", K(ret));
} else if (OB_FAIL(in_val->calc(expr_ctx, input_row, result))) {
LOG_WARN("Failed to calculate expression", K(ret));
} else if (result.is_int()) {
if (OB_FAIL(result.get_int(out_val))) {
LOG_WARN("get_int error", K(ret), K(result));
}
} else if (result.is_null()) {
out_val = 0;
is_null_value = true;
} else {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
EXPR_GET_INT64_V2(result, out_val);
if (OB_FAIL(ret)) {
LOG_WARN("get_int error", K(ret), K(result));
}
}
}
return ret;
}
int ObLimit::get_double_value(ObExecContext &ctx,
const ObSqlExpression *double_val,
double &out_val) const
{
int ret = OB_SUCCESS;
ObNewRow input_row;
ObObj result;
ObExprCtx expr_ctx;
if (double_val != NULL && !double_val->is_empty()) {
if (OB_FAIL(wrap_expr_ctx(ctx, expr_ctx))) {
LOG_WARN("wrap expr context failed", K(ret));
} else if (OB_FAIL(double_val->calc(expr_ctx, input_row, result))) {
LOG_WARN("Failed to calculate expression", K(ret));
} else if (result.is_double()) {
if (OB_FAIL(result.get_double(out_val))) {
LOG_WARN("get_double error", K(ret), K(result));
}
} else if (result.is_null()) {
out_val = 0.0;
} else {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
EXPR_GET_DOUBLE_V2(result, out_val);
if (OB_FAIL(ret)) {
LOG_WARN("get_double error", K(ret), K(result));
}
}
}
return ret;
}
int ObLimit::get_limit(ObExecContext &ctx, int64_t &limit, int64_t &offset, bool &is_percent_first) const
{
int ret = OB_SUCCESS;
double percent = 0.0;
bool is_null_value = false;
limit = -1;
offset = 0;
if (OB_FAIL(get_int_value(ctx, org_limit_, limit, is_null_value))) {
LOG_WARN("Get limit value failed", K(ret));
} else if (!is_null_value && OB_FAIL(get_int_value(ctx, org_offset_, offset, is_null_value))) {
LOG_WARN("Get offset value failed", K(ret));
} else if (is_null_value) {
offset = 0;
limit = 0;
} else {
//由于下层的block算子大多数是在inner_get_next_row才计算出总的行数,因此这里也需要这样设置
is_percent_first = (org_percent_ != NULL && !org_percent_->is_empty());
//revise limit, offset because rownum < -1 is rewritten as limit -1
//offset 2 rows fetch next -3 rows only --> is meaningless
offset = offset < 0 ? 0 : offset;
if (org_limit_ != NULL && !org_limit_->is_empty()) {//不能统一直接设置为0,因为需要支持仅仅只有offset情形
limit = limit < 0 ? 0 : limit;
}
}
return ret;
}
bool ObLimit::is_valid() const
{
return (get_column_count() > 0 && child_op_ != NULL && child_op_->get_column_count() > 0);
}
int ObLimit::inner_open(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
ObLimitCtx *limit_ctx = NULL;
if (OB_UNLIKELY(!is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("limit operator is invalid");
} else if (OB_FAIL(init_op_ctx(ctx))) {
LOG_WARN("initialize operator context failed", K(ret));
} else if (OB_ISNULL(limit_ctx = GET_PHY_OPERATOR_CTX(ObLimitCtx, ctx, get_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get physical operator context failed", K_(id));
} else if (OB_FAIL(get_limit(ctx, limit_ctx->limit_, limit_ctx->offset_, limit_ctx->is_percent_first_))) {
LOG_WARN("Failed to instantiate limit/offset", K(ret));
} else { }
return ret;
}
int ObLimit::rescan(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
ObLimitCtx *limit_ctx = NULL;
if (OB_FAIL(ObSingleChildPhyOperator::rescan(ctx))) {
LOG_WARN("rescan child physical operator failed", K(ret));
} else if (OB_ISNULL(limit_ctx = GET_PHY_OPERATOR_CTX(ObLimitCtx, ctx, get_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("limit_ctx is null");
} else {
limit_ctx->input_count_ = 0;
limit_ctx->output_count_ = 0;
}
return ret;
}
int ObLimit::inner_close(ObExecContext &ctx) const
{
UNUSED(ctx);
return OB_SUCCESS;
}
int ObLimit::init_op_ctx(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
ObPhyOperatorCtx *op_ctx = NULL;
if(OB_FAIL(CREATE_PHY_OPERATOR_CTX(ObLimitCtx, ctx, get_id(), get_type(), op_ctx))) {
LOG_WARN("failed to create LimitCtx", K(ret));
} else if (OB_ISNULL(op_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("op_ctx is null");
} else if (OB_FAIL(init_cur_row(*op_ctx, need_copy_row_for_compute()))) {
LOG_WARN("init current row failed", K(ret));
}
return ret;
}
int ObLimit::inner_get_next_row(ObExecContext &ctx, const ObNewRow *&row) const
{
int ret = OB_SUCCESS;
ObLimitCtx *limit_ctx = NULL;
ObSQLSessionInfo *my_session = NULL;
const ObNewRow *input_row = NULL;
if (!is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("limit operator is invalid", K(ret));
} else if (OB_ISNULL(limit_ctx = GET_PHY_OPERATOR_CTX(ObLimitCtx, ctx, get_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get physical operator ctx failed");
} else if (OB_ISNULL(my_session = GET_MY_SESSION(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get my session", K(ret));
} else { }
while (OB_SUCC(ret) && limit_ctx->input_count_ < limit_ctx->offset_) {
if (OB_FAIL(child_op_->get_next_row(ctx, input_row))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("child_op failed to get next row",
K(limit_ctx->input_count_), K(limit_ctx->offset_), K(ret));
}
} else if (limit_ctx->is_percent_first_ && OB_FAIL(convert_limit_percent(ctx, limit_ctx))) {
LOG_WARN("failed to convert limit percent", K(ret));
} else {
++limit_ctx->input_count_;
}
} // end while
/*由于支持了oracle 12c的fetch功能,因此下面的执行流程比较复杂,这里简单解释一下:
* 1.is_percent_first_:代表的fetch是否指定的百分比取行数,比如:select * from t1 fetch next 50 percent rows only;
* 取总行数的50%出来,这个时候需要is_percent_first_来表明是否用的百分比,同时我们的下层block算子(sort、hash group by等)
* 都是在get_next_row时指定设置的,因此需要在第一次时去设置对应的limit数量,同时设置完后将is_percent_first_重新设置为false;
* 2.is_fetch_with_ties_:表示在拿到所需要的limit数量时,需要继续下探是否存在按照order by排序列值相等的情形,
* 比如表t1有3行数据 c1 c2 c3
* 1 2 3
* 1 2 4
* 2 2 3
* 这个时候假如按照表t1的c1列排序,同时设置了只输出一列,但是指定了with ties(sql为:select * from t1 order by c1 fetch next 1 rows with ties);
* 那么需要将每次从child op拿取rows同时保存拿到的最后一行数据,等拿到了指定的数量之后,继续按照保存好的最后一行数据
* 下探child op的rows,直到拿到按照order by排序列的值不相等或者拿完了child op的rows为止;比如上述例子中拿到行:1 2 3
* 会继续下探行:1 2 4,发现排序列c1值相等,会继续下探拿行:2 2 3,这个时候排序列c1值不等,整个get_next_row结束。
*
*/
int64_t left_count = 0;
if (OB_SUCC(ret)) {
if (limit_ctx->is_percent_first_ || limit_ctx->output_count_ < limit_ctx->limit_ || limit_ctx->limit_ < 0) {
if (OB_FAIL(child_op_->get_next_row(ctx, input_row))) {
if (OB_ITER_END != ret) {
LOG_WARN("child_op failed to get next row",
K(ret), K_(limit_ctx->limit), K_(limit_ctx->offset),
K_(limit_ctx->input_count), K_(limit_ctx->output_count));
}
} else if (limit_ctx->is_percent_first_ && OB_FAIL(convert_limit_percent(ctx, limit_ctx))) {
LOG_WARN("failed to convert limit percent", K(ret));
} else if (limit_ctx->limit_ == 0) {
ret = OB_ITER_END;
} else {
++limit_ctx->output_count_;
row = input_row;
//如果需要支持fetch with ties功能,需要拷贝limit拿出的最后一行保存下来供后续使用
if (is_fetch_with_ties_ && limit_ctx->output_count_ == limit_ctx->limit_ &&
OB_FAIL(deep_copy_limit_last_rows(limit_ctx, *row))) {
LOG_WARN("failed to deep copy limit last rows");
} else if (OB_FAIL(copy_cur_row(*limit_ctx, row))) {
LOG_WARN("copy current row failed", K(ret));
} else {/*do nothing*/}
}
//说明需要继续判断input rows能否按照order by items等值输出
} else if (limit_ctx->limit_ > 0 && is_fetch_with_ties_) {
bool is_equal = false;
if (OB_FAIL(child_op_->get_next_row(ctx, input_row))) {
if (OB_ITER_END != ret) {
LOG_WARN("child_op failed to get next row",
K(ret), K_(limit_ctx->limit), K_(limit_ctx->offset),
K_(limit_ctx->input_count), K_(limit_ctx->output_count));
}
} else if (OB_FAIL(is_row_order_by_item_value_equal(limit_ctx, input_row, is_equal))) {
LOG_WARN("failed to is row order by item value equal", K(ret));
} else if (is_equal) {
++limit_ctx->output_count_;
row = input_row;
if (OB_FAIL(copy_cur_row(*limit_ctx, row))) {
LOG_WARN("copy current row failed", K(ret));
} else {
LOG_DEBUG("copy cur row", K(*row));
}
} else {
//溢出的按照order by排序相等的row已经找完
ret = OB_ITER_END;
}
} else {
//结果条数已经满足
ret = OB_ITER_END;
if (is_calc_found_rows_) {
const ObNewRow *tmp_row = NULL;
while (OB_SUCC(child_op_->get_next_row(ctx, tmp_row))) {
++left_count;
}
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row from child", K(ret));
}
}
}
}
if (OB_ITER_END == ret) {
if (is_top_limit_) {
limit_ctx->total_count_ = left_count + limit_ctx->output_count_ + limit_ctx->input_count_;
ObPhysicalPlanCtx *plan_ctx = NULL;
if (OB_ISNULL(plan_ctx = ctx.get_physical_plan_ctx())) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("get physical plan context failed");
} else {
NG_TRACE_EXT(found_rows, OB_ID(total_count), limit_ctx->total_count_,
OB_ID(input_count), limit_ctx->input_count_);
plan_ctx->set_found_rows(limit_ctx->total_count_);
}
}
}
return ret;
}
int ObLimit::deep_copy_limit_last_rows(ObLimitCtx *limit_ctx, const ObNewRow row) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(limit_ctx) || OB_ISNULL(limit_ctx->calc_buf_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(limit_ctx));
} else {
const int64_t buf_len = sizeof(ObNewRow) + row.get_deep_copy_size();
int64_t pos = sizeof(ObNewRow);
char *buf = NULL;
if (OB_ISNULL(buf = static_cast<char *>(limit_ctx->calc_buf_->alloc(buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc new row failed", K(ret), K(buf_len));
} else if (OB_ISNULL(limit_ctx->limit_last_row_ = new (buf) ObNewRow())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("new_row is null", K(ret), K(buf_len));
} else if (OB_FAIL(limit_ctx->limit_last_row_ ->deep_copy(row, buf, buf_len, pos))) {
LOG_WARN("deep copy row failed", K(ret), K(buf_len), K(pos));
} else {/*do nothing*/}
}
return ret;
}
int ObLimit::is_row_order_by_item_value_equal(ObLimitCtx *limit_ctx,
const ObNewRow *input_row,
bool &is_equal) const
{
int ret = OB_SUCCESS;
ObNewRow *limit_last_row = NULL;
is_equal = false;
if (OB_ISNULL(limit_ctx) || OB_ISNULL(limit_last_row = limit_ctx->limit_last_row_) ||
OB_ISNULL(input_row)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(limit_ctx), K(input_row), K(limit_last_row));
} else {
is_equal = true;
for (int64_t i = 0; OB_SUCC(ret) && is_equal && i < sort_columns_.count(); ++i) {
if (OB_UNLIKELY(sort_columns_.at(i).index_ >= input_row->count_ ||
sort_columns_.at(i).index_ >= limit_last_row->count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("order by projector is invalid", K(ret), K(sort_columns_.at(i).index_),
K(input_row->count_), K(limit_last_row->count_));
} else {
is_equal = 0 == input_row->cells_[sort_columns_.at(i).index_].compare(
limit_last_row->cells_[sort_columns_.at(i).index_],
sort_columns_.at(i).cs_type_);
}
}
}
return ret;
}
int64_t ObLimit::to_string_kv(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
if (org_limit_ && org_offset_) {
J_KV(N_LIMIT, org_limit_, N_OFFSET, org_offset_);
} else if (org_limit_) {
J_KV(N_LIMIT, org_limit_);
} else if (org_offset_) {
J_KV(N_OFFSET, org_offset_);
}
return pos;
}
int ObLimit::add_filter(ObSqlExpression *expr)
{
UNUSED(expr);
LOG_ERROR_RET(OB_NOT_SUPPORTED, "limit operator should have no filter expr");
return OB_NOT_SUPPORTED;
}
int ObLimit::add_sort_columns(ObSortColumn sort_column) {
return sort_columns_.push_back(sort_column);
}
//针对percent需要这里根据总行数转换为对应的limit count
int ObLimit::convert_limit_percent(ObExecContext &ctx, ObLimitCtx *limit_ctx) const
{
int ret = OB_SUCCESS;
double percent = 0.0;
if (OB_ISNULL(limit_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(limit_ctx));
} else if (OB_FAIL(get_double_value(ctx, org_percent_, percent))) {
LOG_WARN("failed to get double value", K(ret));
} else if (percent > 0) {
int64_t tot_count = 0;
if (OB_UNLIKELY(limit_ctx->limit_ != -1) || OB_ISNULL(child_op_) ||
OB_UNLIKELY(child_op_->get_type() != PHY_MATERIAL &&
child_op_->get_type() != PHY_SORT)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret), K(limit_ctx->limit_), K(child_op_));
} else if (child_op_->get_type() == PHY_MATERIAL &&
OB_FAIL(static_cast<ObMaterial *>(child_op_)->get_material_row_count(ctx, tot_count))) {
LOG_WARN("failed to get op row count", K(ret));
} else if (child_op_->get_type() == PHY_SORT &&
OB_FAIL(static_cast<ObSort *>(child_op_)->get_sort_row_count(ctx, tot_count))) {
LOG_WARN("failed to get op row count", K(ret));
} else if (OB_UNLIKELY(tot_count < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid child op row count", K(tot_count), K(ret));
} else if (percent < 100) {
//兼容oracle,向上取整
int64_t percent_int64 = static_cast<int64_t>(percent);
int64_t offset = (tot_count * percent / 100 - tot_count * percent_int64 / 100) > 0 ? 1 : 0;
limit_ctx->limit_ = tot_count * percent_int64 / 100 + offset;
limit_ctx->is_percent_first_ = false;
} else {
limit_ctx->limit_ = tot_count;
limit_ctx->is_percent_first_ = false;
}
} else {
limit_ctx->limit_ = 0;
}
return ret;
}
OB_DEF_SERIALIZE(ObLimit)
{
int ret = OB_SUCCESS;
bool has_limit_count = (org_limit_ != NULL && !org_limit_->is_empty());
bool has_limit_offset = (org_offset_ != NULL && !org_offset_->is_empty());
bool has_limit_percent = (org_percent_ != NULL && !org_percent_->is_empty());
OB_UNIS_ENCODE(is_calc_found_rows_);
OB_UNIS_ENCODE(has_limit_count);
OB_UNIS_ENCODE(has_limit_offset);
OB_UNIS_ENCODE(is_top_limit_);
if (has_limit_count) {
OB_UNIS_ENCODE(*org_limit_);
}
if (has_limit_offset) {
OB_UNIS_ENCODE(*org_offset_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObSingleChildPhyOperator::serialize(buf, buf_len, pos))) {
LOG_WARN("serialize child operator failed", K(ret));
}
}
if (OB_SUCC(ret)) {
OB_UNIS_ENCODE(has_limit_percent);
OB_UNIS_ENCODE(is_fetch_with_ties_);
OB_UNIS_ENCODE(sort_columns_);
if (has_limit_percent) {
OB_UNIS_ENCODE(*org_percent_);
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObLimit)
{
int64_t len = 0;
bool has_limit_count = (org_limit_ != NULL && !org_limit_->is_empty());
bool has_limit_offset = (org_offset_ != NULL && !org_offset_->is_empty());
bool has_limit_percent = (org_percent_ != NULL && !org_percent_->is_empty());
OB_UNIS_ADD_LEN(is_calc_found_rows_);
OB_UNIS_ADD_LEN(has_limit_count);
OB_UNIS_ADD_LEN(has_limit_offset);
OB_UNIS_ADD_LEN(is_top_limit_);
if (has_limit_count) {
OB_UNIS_ADD_LEN(*org_limit_);
}
if (has_limit_offset) {
OB_UNIS_ADD_LEN(*org_offset_);
}
len += ObSingleChildPhyOperator::get_serialize_size();
OB_UNIS_ADD_LEN(has_limit_percent);
OB_UNIS_ADD_LEN(is_fetch_with_ties_);
OB_UNIS_ADD_LEN(sort_columns_);
if (has_limit_percent) {
OB_UNIS_ADD_LEN(*org_percent_);
}
return len;
}
OB_DEF_DESERIALIZE(ObLimit)
{
int ret = OB_SUCCESS;
bool has_limit_count = false;
bool has_limit_offset = false;
bool has_limit_percent = false;
OB_UNIS_DECODE(is_calc_found_rows_);
OB_UNIS_DECODE(has_limit_count);
OB_UNIS_DECODE(has_limit_offset);
OB_UNIS_DECODE(is_top_limit_);
if (OB_SUCC(ret) && has_limit_count) {
if (OB_FAIL(ObSqlExpressionUtil::make_sql_expr(my_phy_plan_, org_limit_))) {
LOG_WARN("make sql expression failed", K(ret));
} else if (OB_LIKELY(org_limit_ != NULL)) {
OB_UNIS_DECODE(*org_limit_);
}
}
if (OB_SUCC(ret) && has_limit_offset) {
if (OB_FAIL(ObSqlExpressionUtil::make_sql_expr(my_phy_plan_, org_offset_))) {
LOG_WARN("make sql expression failed", K(ret));
} else if (OB_LIKELY(org_offset_ != NULL)) {
OB_UNIS_DECODE(*org_offset_);
}
}
if (OB_SUCC(ret)) {
ret = ObSingleChildPhyOperator::deserialize(buf, data_len, pos);
}
if (OB_SUCC(ret)) {
OB_UNIS_DECODE(has_limit_percent);
OB_UNIS_DECODE(is_fetch_with_ties_);
OB_UNIS_DECODE(sort_columns_);
if (OB_SUCC(ret) && has_limit_percent) {
if (OB_FAIL(ObSqlExpressionUtil::make_sql_expr(my_phy_plan_, org_percent_))) {
LOG_WARN("make sql expression failed", K(ret));
} else if (OB_LIKELY(org_percent_ != NULL)) {
OB_UNIS_DECODE(*org_percent_);
}
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -1,391 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/basic/ob_topk.h"
#include "lib/utility/utility.h"
#include "share/object/ob_obj_cast.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/engine/ob_physical_plan.h"
#include "sql/engine/expr/ob_sql_expression.h"
#include "sql/engine/sort/ob_sort.h"
#include "sql/engine/basic/ob_material.h"
#include "sql/engine/aggregate/ob_hash_groupby.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
using namespace common;
namespace sql
{
class ObTopK::ObTopKCtx : public ObPhyOperatorCtx
{
public:
explicit ObTopKCtx(ObExecContext &ctx)
: ObPhyOperatorCtx(ctx),
topk_final_count_(-1),
output_count_(0)
{
}
virtual void destroy() { ObPhyOperatorCtx::destroy_base(); }
private:
int64_t topk_final_count_;//count of rows that need to be output upforward
int64_t output_count_;
friend class ObTopK;
};
ObTopK::ObTopK(ObIAllocator &alloc)
: ObSingleChildPhyOperator(alloc),
minimum_row_count_(-1),
topk_precision_(-1),
org_limit_(NULL),
org_offset_(NULL)
{
}
ObTopK::~ObTopK()
{
reset();
}
void ObTopK::reset()
{
org_limit_ = NULL;
org_offset_ = NULL;
minimum_row_count_ = -1;
topk_precision_ = -1;
ObSingleChildPhyOperator::reset();
}
void ObTopK::reuse()
{
reset();
}
int ObTopK::set_topk_params(ObSqlExpression *limit, ObSqlExpression *offset,
int64_t minimum_row_count, int64_t topk_precision)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(limit) || minimum_row_count < 0 || topk_precision < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", KP(limit), K(minimum_row_count), K(topk_precision), K(ret));
} else {
org_limit_ = limit;
org_offset_ = offset;
minimum_row_count_ = minimum_row_count;
topk_precision_ = topk_precision;
}
return ret;
}
int ObTopK::get_int_value(ObExecContext &ctx,
const ObSqlExpression *in_val,
int64_t &out_val,
bool &is_null_value) const
{
int ret = OB_SUCCESS;
ObNewRow input_row;
ObObj result;
ObExprCtx expr_ctx;
is_null_value = false;
if (in_val != NULL && !in_val->is_empty()) {
if (OB_FAIL(wrap_expr_ctx(ctx, expr_ctx))) {
LOG_WARN("wrap expr context failed", K(ret));
} else if (OB_FAIL(in_val->calc(expr_ctx, input_row, result))) {
LOG_WARN("Failed to calculate expression", K(ret));
} else if (OB_LIKELY(result.is_int())) {
if (OB_FAIL(result.get_int(out_val))) {
LOG_WARN("get_int error", K(ret), K(result));
}
} else if (result.is_null()) {
out_val = 0;
is_null_value = true;
} else {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NONE);
EXPR_GET_INT64_V2(result, out_val);
if (OB_FAIL(ret)) {
LOG_WARN("get_int error", K(ret), K(result));
}
}
}
return ret;
}
int ObTopK::get_topk_final_count(ObExecContext &ctx, int64_t &topk_final_count) const
{
int ret = OB_SUCCESS;
int64_t limit = -1;
int64_t offset = 0;
bool is_null_value = false;
ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
if (OB_ISNULL(plan_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid plan ctx is NULL", K(ret));
} else if (OB_ISNULL(child_op_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Invalid child_op_ is NULL", K(ret));
} else if (OB_FAIL(get_int_value(ctx, org_limit_, limit, is_null_value))) {
LOG_WARN("Get limit value failed", K(ret));
} else if (!is_null_value && OB_FAIL(get_int_value(ctx, org_offset_, offset, is_null_value))) {
LOG_WARN("Get offset value failed", K(ret));
} else {
//revise limit, offset because rownum < -1 is rewritten as limit -1
limit = (is_null_value || limit < 0) ? 0 : limit;
offset = (is_null_value || offset < 0) ? 0 : offset;
topk_final_count = std::max(minimum_row_count_, limit + offset);
int64_t row_count = 0;
ObPhyOperatorType op_type = child_op_->get_type();
//TODO(yaoying.yyy): may be we should add one func to paremt class
if (PHY_SORT == op_type) {
ObSort *sort_op = static_cast<ObSort *>(child_op_);
if (OB_ISNULL(sort_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("casted sort_op is NULL", K(ret));
} else if (OB_FAIL(sort_op->get_sort_row_count(ctx, row_count))){
LOG_WARN("failed to get sort row count", K(ret));
} else {/*do nothing*/}
} else if (PHY_MATERIAL == op_type) {
ObMaterial *material_op = static_cast<ObMaterial *>(child_op_);
if (OB_ISNULL(material_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("casted material_op is NULL", K(ret));
} else if (OB_FAIL(material_op->get_material_row_count(ctx, row_count))){
LOG_WARN("failed to get material row count", K(ret));
} else {/*do nothing*/}
} else if (PHY_HASH_GROUP_BY == op_type) {
ObHashGroupBy *hash_groupby_op = static_cast<ObHashGroupBy *>(child_op_);
if (OB_ISNULL(hash_groupby_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("casted hash_groupby_op is NULL", K(ret));
} else if (OB_FAIL(hash_groupby_op->get_hash_groupby_row_count(ctx, row_count))){
LOG_WARN("failed to get material row count", K(ret));
} else {/*do nothing*/}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Invalid child_op_", K(op_type), K(ret));
}
if (OB_SUCC(ret)) {
topk_final_count = std::max(topk_final_count, static_cast<int64_t>(row_count * topk_precision_ / 100));
if (topk_final_count >= row_count) {
plan_ctx->set_is_result_accurate(true);
} else {
plan_ctx->set_is_result_accurate(false);
}
}
}
return ret;
}
bool ObTopK::is_valid() const
{
return (get_column_count() > 0 && (NULL != org_limit_)
&& (NULL != child_op_) && child_op_->get_column_count() > 0);
}
int ObTopK::inner_open(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("limit operator is invalid");
} else if (OB_FAIL(init_op_ctx(ctx))) {
LOG_WARN("initialize operator context failed", K(ret));
} else {/*do nothing*/}
return ret;
}
int ObTopK::rescan(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
ObTopKCtx *topk_ctx = NULL;
if (OB_FAIL(ObSingleChildPhyOperator::rescan(ctx))) {
LOG_WARN("rescan child physical operator failed", K(ret));
} else if (OB_ISNULL(topk_ctx = GET_PHY_OPERATOR_CTX(ObTopKCtx, ctx, get_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("topk_ctx is null");
} else {
topk_ctx->output_count_ = 0;
}
return ret;
}
int ObTopK::inner_close(ObExecContext &ctx) const
{
UNUSED(ctx);
return OB_SUCCESS;
}
int ObTopK::init_op_ctx(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
ObPhyOperatorCtx *op_ctx = NULL;
if(OB_FAIL(CREATE_PHY_OPERATOR_CTX(ObTopKCtx, ctx, get_id(), get_type(), op_ctx))) {
LOG_WARN("failed to create TopKCtx", K(ret));
} else if (OB_ISNULL(op_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("op_ctx is null");
} else if (OB_FAIL(init_cur_row(*op_ctx, need_copy_row_for_compute()))) {
LOG_WARN("init current row failed", K(ret));
}
return ret;
}
int ObTopK::inner_get_next_row(ObExecContext &ctx, const ObNewRow *&row) const
{
int ret = OB_SUCCESS;
ObTopKCtx *topk_ctx = NULL;
ObSQLSessionInfo *my_session = NULL;
const ObNewRow *input_row = NULL;
if (!is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("limit operator is invalid");
} else if (OB_ISNULL(topk_ctx = GET_PHY_OPERATOR_CTX(ObTopKCtx, ctx, get_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get physical operator ctx failed");
} else if (OB_ISNULL(my_session = GET_MY_SESSION(ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get my session", K(ret));
} else {
if (0 == topk_ctx->output_count_ || topk_ctx->output_count_ < topk_ctx->topk_final_count_) {
if (OB_FAIL(child_op_->get_next_row(ctx, input_row))) {
if (OB_ITER_END != ret) {
LOG_WARN("child_op failed to get next row", K_(topk_ctx->topk_final_count),
K_(topk_ctx->output_count), K(ret));
}
} else {
if (0 == topk_ctx->output_count_) {
if (OB_FAIL(get_topk_final_count(ctx, topk_ctx->topk_final_count_))) {
LOG_WARN("failed to get_topk_final_count", K(ret));
} else if (OB_UNLIKELY(0 == topk_ctx->topk_final_count_)) {
//结果条数已经满足
ret = OB_ITER_END;
} else {/*do nothing*/}
}
if (OB_SUCC(ret)) {
++topk_ctx->output_count_;
row = input_row;
if (OB_FAIL(copy_cur_row(*topk_ctx, row))) {
LOG_WARN("copy current row failed", K(ret));
} else {
LOG_DEBUG("copy cur row", K(*row));
}
}
}
} else {
//结果条数已经满足
ret = OB_ITER_END;
}
}
return ret;
}
int64_t ObTopK::to_string_kv(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
//TODO(yaoying.yyy):macro define N_TOPK_PRECISION
if (org_limit_ && org_offset_) {
J_KV(N_LIMIT, org_limit_, N_OFFSET, org_offset_, "minimum_row_count", minimum_row_count_, "topk_precision", topk_precision_);
} else if (NULL != org_limit_) {
J_KV(N_LIMIT, org_limit_, "minimum_row_count", minimum_row_count_, "topk_precision", topk_precision_);
} else if (NULL != org_offset_) {
J_KV(N_OFFSET, org_offset_, "minimum_row_count", minimum_row_count_, "topk_precision", topk_precision_);
} else{/*do nothing*/}
return pos;
}
int ObTopK::add_filter(ObSqlExpression *expr)
{
UNUSED(expr);
LOG_ERROR_RET(OB_NOT_SUPPORTED, "limit operator should have no filter expr");
return OB_NOT_SUPPORTED;
}
OB_DEF_SERIALIZE(ObTopK)
{
int ret = OB_SUCCESS;
bool has_limit_count = (org_limit_ != NULL && !org_limit_->is_empty());
bool has_limit_offset = (org_offset_ != NULL && !org_offset_->is_empty());
OB_UNIS_ENCODE(minimum_row_count_);
OB_UNIS_ENCODE(topk_precision_);
OB_UNIS_ENCODE(has_limit_count);
OB_UNIS_ENCODE(has_limit_offset);
if (has_limit_count) {
OB_UNIS_ENCODE(*org_limit_);
}
if (has_limit_offset) {
OB_UNIS_ENCODE(*org_offset_);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObSingleChildPhyOperator::serialize(buf, buf_len, pos))) {
LOG_WARN("serialize child operator failed", K(ret));
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObTopK)
{
int64_t len = 0;
bool has_limit_count = (org_limit_ != NULL && !org_limit_->is_empty());
bool has_limit_offset = (org_offset_ != NULL && !org_offset_->is_empty());
OB_UNIS_ADD_LEN(minimum_row_count_);
OB_UNIS_ADD_LEN(topk_precision_);
OB_UNIS_ADD_LEN(has_limit_count);
OB_UNIS_ADD_LEN(has_limit_offset);
if (has_limit_count) {
OB_UNIS_ADD_LEN(*org_limit_);
}
if (has_limit_offset) {
OB_UNIS_ADD_LEN(*org_offset_);
}
len += ObSingleChildPhyOperator::get_serialize_size();
return len;
}
OB_DEF_DESERIALIZE(ObTopK)
{
int ret = OB_SUCCESS;
bool has_limit_count = false;
bool has_limit_offset = false;
OB_UNIS_DECODE(minimum_row_count_);
OB_UNIS_DECODE(topk_precision_);
OB_UNIS_DECODE(has_limit_count);
OB_UNIS_DECODE(has_limit_offset);
if (OB_SUCC(ret) && has_limit_count) {
if (OB_FAIL(ObSqlExpressionUtil::make_sql_expr(my_phy_plan_, org_limit_))) {
LOG_WARN("make sql expression failed", K(ret));
} else if (OB_LIKELY(org_limit_ != NULL)) {
OB_UNIS_DECODE(*org_limit_);
}
}
if (OB_SUCC(ret) && has_limit_offset) {
if (OB_FAIL(ObSqlExpressionUtil::make_sql_expr(my_phy_plan_, org_offset_))) {
LOG_WARN("make sql expression failed", K(ret));
} else if (OB_LIKELY(org_offset_ != NULL)) {
OB_UNIS_DECODE(*org_offset_);
}
}
if (OB_SUCC(ret)) {
ret = ObSingleChildPhyOperator::deserialize(buf, data_len, pos);
}
return ret;
}
} // namespace sql
} // namespace oceanbase