replace ts related to ddl with scn.

This commit is contained in:
obdev
2022-11-28 02:21:13 +00:00
committed by ob-robot
parent bbec6aff49
commit 8a4d14122f
539 changed files with 17685 additions and 173434 deletions

View File

@ -295,87 +295,91 @@ int ObOpSpec::create_operator(ObExecContext &exec_ctx, ObOperator *&op) const
int ObOpSpec::create_operator_recursive(ObExecContext &exec_ctx, ObOperator *&op) const
{
int ret = OB_SUCCESS;
ObOperatorKit *kit = exec_ctx.get_operator_kit(id_);
int64_t create_child_cnt = child_cnt_;
if (OB_ISNULL(kit) || (child_cnt_ > 0 && NULL == children_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator kit should be created before create operator "
"and children must be valid",
K(ret), K(id_), KP(kit), KP(children_), K(create_child_cnt), K(type_));
if (OB_FAIL(check_stack_overflow())) {
LOG_WARN("failed to check stack overflow", K(ret));
} else {
kit->spec_ = this;
LOG_TRACE("trace create spec", K(ret), K(id_), K(type_));
for (int64_t i = 0; OB_SUCC(ret) && i < child_cnt_; i++) {
if (NULL == children_[i]) {
// 这里如果有child但为nullptr,说明是receive算子
if (!IS_PX_RECEIVE(type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only receive is leaf in px", K(ret), K(type_), K(id_));
} else {
create_child_cnt = 0;
}
} else if (IS_TRANSMIT(children_[i]->type_)) {
OB_ASSERT(1 == child_cnt_);
// only create context for current DFO
create_child_cnt = 0;
}
}
}
// create operator input
if (OB_SUCC(ret)) {
// Operator input may created in scheduler, no need to create again.
if (NULL == kit->input_ && ObOperatorFactory::has_op_input(type_)) {
if (OB_FAIL(ObOperatorFactory::alloc_op_input(
exec_ctx.get_allocator(), exec_ctx, *this, kit->input_))) {
LOG_WARN("create operator input failed", K(ret), K(*this));
} else if (OB_ISNULL(kit->input_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL input returned", K(ret));
} else {
LOG_TRACE("trace create input", K(ret), K(id_), K(type_));
}
}
}
// create operator
if (OB_SUCC(ret)) {
if (OB_FAIL(ObOperatorFactory::alloc_operator(
exec_ctx.get_allocator(), exec_ctx, *this,
kit->input_, create_child_cnt, kit->op_))
|| OB_ISNULL(kit->op_)) {
ret = OB_SUCCESS == ret ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("create operator failed", K(ret), KP(kit->op_), K(*this));
ObOperatorKit *kit = exec_ctx.get_operator_kit(id_);
int64_t create_child_cnt = child_cnt_;
if (OB_ISNULL(kit) || (child_cnt_ > 0 && NULL == children_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator kit should be created before create operator "
"and children must be valid",
K(ret), K(id_), KP(kit), KP(children_), K(create_child_cnt), K(type_));
} else {
op = kit->op_;
op->get_monitor_info().set_operator_id(id_);
op->get_monitor_info().set_operator_type(type_);
op->get_monitor_info().set_plan_depth(plan_depth_);
op->get_monitor_info().set_tenant_id(GET_MY_SESSION(exec_ctx)->get_effective_tenant_id());
op->get_monitor_info().open_time_ = oceanbase::common::ObClockGenerator::getClock();
}
}
// create child operator
if (OB_SUCC(ret)) {
if (create_child_cnt > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < create_child_cnt; i++) {
ObOperator *child_op = NULL;
if (nullptr == children_[i]) {
kit->spec_ = this;
LOG_TRACE("trace create spec", K(ret), K(id_), K(type_));
for (int64_t i = 0; OB_SUCC(ret) && i < child_cnt_; i++) {
if (NULL == children_[i]) {
// 这里如果有child但为nullptr,说明是receive算子
if (!IS_PX_RECEIVE(type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only receive is leaf in px", K(ret), K(type_), K(id_));
} else {
create_child_cnt = 0;
}
} else if (OB_FAIL(children_[i]->create_operator_recursive(exec_ctx, child_op))) {
LOG_WARN("create operator failed", K(ret));
} else if (OB_FAIL(op->set_child(i, child_op))) {
LOG_WARN("set child operator failed", K(ret));
} else if (IS_TRANSMIT(children_[i]->type_)) {
OB_ASSERT(1 == child_cnt_);
// only create context for current DFO
create_child_cnt = 0;
}
}
} else if (child_cnt_ > 0) {
if (OB_FAIL(assign_spec_ptr_recursive(exec_ctx))) {
LOG_WARN("assign spec ptr failed", K(ret));
}
// create operator input
if (OB_SUCC(ret)) {
// Operator input may created in scheduler, no need to create again.
if (NULL == kit->input_ && ObOperatorFactory::has_op_input(type_)) {
if (OB_FAIL(ObOperatorFactory::alloc_op_input(
exec_ctx.get_allocator(), exec_ctx, *this, kit->input_))) {
LOG_WARN("create operator input failed", K(ret), K(*this));
} else if (OB_ISNULL(kit->input_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL input returned", K(ret));
} else {
LOG_TRACE("trace create input", K(ret), K(id_), K(type_));
}
}
}
// create operator
if (OB_SUCC(ret)) {
if (OB_FAIL(ObOperatorFactory::alloc_operator(
exec_ctx.get_allocator(), exec_ctx, *this,
kit->input_, create_child_cnt, kit->op_))
|| OB_ISNULL(kit->op_)) {
ret = OB_SUCCESS == ret ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("create operator failed", K(ret), KP(kit->op_), K(*this));
} else {
op = kit->op_;
op->get_monitor_info().set_operator_id(id_);
op->get_monitor_info().set_operator_type(type_);
op->get_monitor_info().set_plan_depth(plan_depth_);
op->get_monitor_info().set_tenant_id(GET_MY_SESSION(exec_ctx)->get_effective_tenant_id());
op->get_monitor_info().open_time_ = oceanbase::common::ObClockGenerator::getClock();
}
}
// create child operator
if (OB_SUCC(ret)) {
if (create_child_cnt > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < create_child_cnt; i++) {
ObOperator *child_op = NULL;
if (nullptr == children_[i]) {
// 这里如果有child但为nullptr,说明是receive算子
if (!IS_PX_RECEIVE(type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only receive is leaf in px", K(ret), K(type_), K(id_));
}
} else if (OB_FAIL(children_[i]->create_operator_recursive(exec_ctx, child_op))) {
LOG_WARN("create operator failed", K(ret));
} else if (OB_FAIL(op->set_child(i, child_op))) {
LOG_WARN("set child operator failed", K(ret));
}
}
} else if (child_cnt_ > 0) {
if (OB_FAIL(assign_spec_ptr_recursive(exec_ctx))) {
LOG_WARN("assign spec ptr failed", K(ret));
}
}
}
}
@ -476,7 +480,8 @@ ObOperator::ObOperator(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput
cpu_begin_time_(0),
batch_reach_end_(false),
row_reach_end_(false),
output_batches_b4_rescan_(0)
output_batches_b4_rescan_(0),
check_stack_overflow_(false)
{
eval_ctx_.max_batch_size_ = spec.max_batch_size_;
eval_ctx_.batch_size_ = spec.max_batch_size_;
@ -534,94 +539,111 @@ int ObOperator::init()
return OB_SUCCESS;
}
int ObOperator::check_stack_once()
{
int ret = OB_SUCCESS;
if (check_stack_overflow_) {
} else if (OB_FAIL(common::check_stack_overflow())) {
LOG_WARN("failed to check stack overflow", K(ret));
} else {
check_stack_overflow_ = true;
}
return ret;
}
// copy from ob_phy_operator.cpp
int ObOperator::open()
{
int ret = OB_SUCCESS;
OperatorOpenOrder open_order = get_operator_open_order();
if (!spec_.is_vectorized()) {
/*
for non-vectorized operator, need set batch size 1;
if (OB_FAIL(check_stack_overflow())) {
LOG_WARN("failed to check stack overflow", K(ret));
} else {
OperatorOpenOrder open_order = get_operator_open_order();
if (!spec_.is_vectorized()) {
/*
for non-vectorized operator, need set batch size 1;
case: vectorize.select_basic_vec_oracle
OceanBase(TEST@TEST)>explain insert into dlt6 select * from dlt4\G
*************************** 1. row ***************************
Query Plan: =======================================
|ID|OPERATOR |NAME |EST. ROWS|COST|
---------------------------------------
|0 |INSERT | |7 |97 |
|1 | SUBPLAN SCAN|VIEW1|7 |46 |
|2 | TABLE SCAN |DLT4 |7 |46 |
=======================================
when vectorizition enable, batch_result_ of column_conv expr in insert_op is true,
if batch size of insert eval_ctx is 0, when column_conv eval, reset_ptr will do nothing,
caused datum ptr is null, and core when execution;
*/
eval_ctx_.set_batch_size(1);
eval_ctx_.set_batch_idx(0);
}
if (ctx_.get_my_session()->is_user_session()) {
IGNORE_RETURN try_register_rt_monitor_node(0);
}
while (OB_SUCC(ret) && open_order != OPEN_EXIT) {
switch (open_order) {
case OPEN_CHILDREN_FIRST:
case OPEN_CHILDREN_LATER: {
for (int64_t i = 0; OB_SUCC(ret) && i < child_cnt_; ++i) {
// children_ pointer is checked before operator open, no need check again.
if (OB_FAIL(children_[i]->open())) {
case: vectorize.select_basic_vec_oracle
OceanBase(TEST@TEST)>explain insert into dlt6 select * from dlt4\G
*************************** 1. row ***************************
Query Plan: =======================================
|ID|OPERATOR |NAME |EST. ROWS|COST|
---------------------------------------
|0 |INSERT | |7 |97 |
|1 | SUBPLAN SCAN|VIEW1|7 |46 |
|2 | TABLE SCAN |DLT4 |7 |46 |
=======================================
when vectorizition enable, batch_result_ of column_conv expr in insert_op is true,
if batch size of insert eval_ctx is 0, when column_conv eval, reset_ptr will do nothing,
caused datum ptr is null, and core when execution;
*/
eval_ctx_.set_batch_size(1);
eval_ctx_.set_batch_idx(0);
}
if (ctx_.get_my_session()->is_user_session()) {
IGNORE_RETURN try_register_rt_monitor_node(0);
}
while (OB_SUCC(ret) && open_order != OPEN_EXIT) {
switch (open_order) {
case OPEN_CHILDREN_FIRST:
case OPEN_CHILDREN_LATER: {
for (int64_t i = 0; OB_SUCC(ret) && i < child_cnt_; ++i) {
// children_ pointer is checked before operator open, no need check again.
if (OB_FAIL(children_[i]->open())) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
LOG_WARN("Open child operator failed", K(ret), "op_type", op_name());
}
}
}
open_order = (OPEN_CHILDREN_FIRST == open_order) ? OPEN_SELF_LATER : OPEN_EXIT;
break;
}
case OPEN_SELF_FIRST:
case OPEN_SELF_LATER:
case OPEN_SELF_ONLY: {
if (OB_FAIL(init_evaluated_flags())) {
LOG_WARN("init evaluate flags failed", K(ret));
} else if (OB_FAIL(init_skip_vector())) {
LOG_WARN("init skip vector failed", K(ret));
} else if (OB_FAIL(inner_open())) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
LOG_WARN("Open child operator failed", K(ret), "op_type", op_name());
LOG_WARN("Open this operator failed", K(ret), "op_type", op_name());
}
}
open_order = (OPEN_SELF_FIRST == open_order) ? OPEN_CHILDREN_LATER : OPEN_EXIT;
break;
}
case OPEN_EXIT: {
open_order = OPEN_EXIT;
break;
}
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected open order type", K(open_order));
break;
}
}
if (OB_SUCC(ret)) {
opened_ = true;
clear_batch_end_flag();
if (spec_.need_check_output_datum_) {
void * ptr = ctx_.get_allocator().alloc(sizeof(ObBatchResultHolder));
if (OB_ISNULL(ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocation failed for brs_checker_", K(ret), K(sizeof(ObBatchResultHolder)));
} else {
// replace new the object
brs_checker_ = new(ptr) ObBatchResultHolder();
if (OB_FAIL(brs_checker_->init(spec_.output_, eval_ctx_))) {
LOG_WARN("brs_holder init failed", K(ret));
}
}
}
open_order = (OPEN_CHILDREN_FIRST == open_order) ? OPEN_SELF_LATER : OPEN_EXIT;
break;
}
case OPEN_SELF_FIRST:
case OPEN_SELF_LATER:
case OPEN_SELF_ONLY: {
if (OB_FAIL(init_evaluated_flags())) {
LOG_WARN("init evaluate flags failed", K(ret));
} else if (OB_FAIL(init_skip_vector())) {
LOG_WARN("init skip vector failed", K(ret));
} else if (OB_FAIL(inner_open())) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
LOG_WARN("Open this operator failed", K(ret), "op_type", op_name());
}
}
open_order = (OPEN_SELF_FIRST == open_order) ? OPEN_CHILDREN_LATER : OPEN_EXIT;
break;
}
case OPEN_EXIT: {
open_order = OPEN_EXIT;
break;
}
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected open order type", K(open_order));
break;
}
}
if (OB_SUCC(ret)) {
opened_ = true;
clear_batch_end_flag();
if (spec_.need_check_output_datum_) {
void * ptr = ctx_.get_allocator().alloc(sizeof(ObBatchResultHolder));
if (OB_ISNULL(ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocation failed for brs_checker_", K(ret), K(sizeof(ObBatchResultHolder)));
} else {
// replace new the object
brs_checker_ = new(ptr) ObBatchResultHolder();
if (OB_FAIL(brs_checker_->init(spec_.output_, eval_ctx_))) {
LOG_WARN("brs_holder init failed", K(ret));
}
}
}
LOG_DEBUG("open op", K(ret), "op_type", op_name(), "op_id", spec_.id_, K(open_order));
}
LOG_DEBUG("open op", K(ret), "op_type", op_name(), "op_id", spec_.id_, K(open_order));
return ret;
}
@ -820,26 +842,30 @@ int ObOperator::close()
{
int ret = OB_SUCCESS;
int child_ret = OB_SUCCESS;
OperatorOpenOrder open_order = get_operator_open_order();
if (OPEN_SELF_ONLY != open_order) {
//first call close of children
for (int64_t i = 0; i < child_cnt_; ++i) {
// children_ pointer is checked before operator open, no need check again.
int tmp_ret = children_[i]->close();
if (OB_SUCCESS != tmp_ret) {
ret = OB_SUCCESS == ret ? tmp_ret : ret;
LOG_WARN("Close child operator failed", K(child_ret), "op_type", op_name());
if (OB_FAIL(check_stack_overflow())) {
LOG_WARN("failed to check stack overflow", K(ret));
} else {
OperatorOpenOrder open_order = get_operator_open_order();
if (OPEN_SELF_ONLY != open_order) {
//first call close of children
for (int64_t i = 0; i < child_cnt_; ++i) {
// children_ pointer is checked before operator open, no need check again.
int tmp_ret = children_[i]->close();
if (OB_SUCCESS != tmp_ret) {
ret = OB_SUCCESS == ret ? tmp_ret : ret;
LOG_WARN("Close child operator failed", K(child_ret), "op_type", op_name());
}
}
}
}
// no matter what, must call operator's close function
int tmp_ret = inner_close();
if (OB_SUCCESS != tmp_ret) {
ret = tmp_ret; // overwrite child's error code.
LOG_WARN("Close this operator failed", K(ret), "op_type", op_name());
// no matter what, must call operator's close function
int tmp_ret = inner_close();
if (OB_SUCCESS != tmp_ret) {
ret = tmp_ret; // overwrite child's error code.
LOG_WARN("Close this operator failed", K(ret), "op_type", op_name());
}
IGNORE_RETURN submit_op_monitor_node();
}
IGNORE_RETURN submit_op_monitor_node();
return ret;
}
@ -871,207 +897,215 @@ int ObOperator::submit_op_monitor_node()
int ObOperator::get_next_row()
{
int ret = OB_SUCCESS;
begin_cpu_time_counting();
if (ctx_.get_my_session()->is_user_session()) {
IGNORE_RETURN try_register_rt_monitor_node(1);
}
if (row_reach_end_) {
ret = OB_ITER_END;
} else if (OB_UNLIKELY(get_spec().is_vectorized())) {
// Operator itself supports vectorization, while parent operator does NOT.
// Use vectorize method to get next row.
if (OB_FAIL(get_next_row_vectorizely())) {
// do nothing
}
if (OB_FAIL(check_stack_once())) {
LOG_WARN("too deep recusive", K(ret));
} else {
if (OB_UNLIKELY(!startup_passed_)) {
bool filtered = false;
if (OB_FAIL(startup_filter(filtered))) {
LOG_WARN("do startup filter failed", K(ret), "op", op_name());
} else {
if (filtered) {
ret = OB_ITER_END;
} else {
startup_passed_ = true;
}
}
begin_cpu_time_counting();
if (ctx_.get_my_session()->is_user_session()) {
IGNORE_RETURN try_register_rt_monitor_node(1);
}
while (OB_SUCC(ret)) {
if (OB_FAIL(inner_get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("inner get next row failed", K(ret), "type", spec_.type_, "op", op_name(),
"op_id", spec_.id_);
}
} else {
if (!spec_.filters_.empty()) {
bool filtered = false;
if (OB_FAIL(filter_row(filtered))) {
LOG_WARN("filter row failed", K(ret), "type", spec_.type_, "op", op_name());
if (row_reach_end_) {
ret = OB_ITER_END;
} else if (OB_UNLIKELY(get_spec().is_vectorized())) {
// Operator itself supports vectorization, while parent operator does NOT.
// Use vectorize method to get next row.
if (OB_FAIL(get_next_row_vectorizely())) {
// do nothing
}
} else {
if (OB_UNLIKELY(!startup_passed_)) {
bool filtered = false;
if (OB_FAIL(startup_filter(filtered))) {
LOG_WARN("do startup filter failed", K(ret), "op", op_name());
} else {
if (filtered) {
ret = OB_ITER_END;
} else {
if (filtered) {
continue;
}
startup_passed_ = true;
}
}
}
break;
}
if (OB_SUCCESS == ret) {
op_monitor_info_.output_row_count_++;
if (!got_first_row_) {
op_monitor_info_.first_row_time_ = oceanbase::common::ObClockGenerator::getClock();
got_first_row_ = true;
}
} else if (OB_ITER_END == ret) {
row_reach_end_ = true;
int tmp_ret = drain_exch();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret));
}
if (got_first_row_) {
op_monitor_info_.last_row_time_ = oceanbase::common::ObClockGenerator::getClock();
}
}
}
if (OB_ITER_END == ret) {
row_reach_end_ = true;
} else if (OB_SUCC(ret)) {
if (spec_.get_phy_plan()->is_vectorized()) {
ObDatum *res = NULL;
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval(eval_ctx_, res))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
while (OB_SUCC(ret)) {
if (OB_FAIL(inner_get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("inner get next row failed", K(ret), "type", spec_.type_, "op", op_name(),
"op_id", spec_.id_);
}
} else {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
if (!spec_.filters_.empty()) {
bool filtered = false;
if (OB_FAIL(filter_row(filtered))) {
LOG_WARN("filter row failed", K(ret), "type", spec_.type_, "op", op_name());
} else {
if (filtered) {
continue;
}
}
}
}
break;
}
if (OB_SUCCESS == ret) {
op_monitor_info_.output_row_count_++;
if (!got_first_row_) {
op_monitor_info_.first_row_time_ = oceanbase::common::ObClockGenerator::getClock();
got_first_row_ = true;
}
} else if (OB_ITER_END == ret) {
row_reach_end_ = true;
int tmp_ret = drain_exch();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret));
}
if (got_first_row_) {
op_monitor_info_.last_row_time_ = oceanbase::common::ObClockGenerator::getClock();
}
}
}
if (OB_ITER_END == ret) {
row_reach_end_ = true;
} else if (OB_SUCC(ret)) {
if (spec_.get_phy_plan()->is_vectorized()) {
ObDatum *res = NULL;
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval(eval_ctx_, res))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
} else {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
}
}
}
end_cpu_time_counting();
}
end_cpu_time_counting();
return ret;
}
int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows)
{
int ret = OB_SUCCESS;
begin_cpu_time_counting();
if (OB_UNLIKELY(spec_.need_check_output_datum_ && brs_checker_)) {
if (OB_FAIL(brs_checker_->check_datum_modified())) {
LOG_WARN("check output datum failed", K(ret), "id", spec_.get_id(), "op_name", op_name());
}
}
reset_batchrows();
int64_t skipped_rows_count = 0;
batch_rows = &brs_;
if (batch_reach_end_) {
// generated all data at last batch, just drain exch in this iterate
brs_.end_ = true;
brs_.size_ = 0;
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_LIKELY(get_spec().is_vectorized())) {
if (OB_UNLIKELY(!startup_passed_) && !brs_.end_) {
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_size(1);
guard.set_batch_idx(0);
bool filtered = false;
// TODO bin.lb: vectorized startup filter? no ObExpr::eval() in vectorization?
if (OB_FAIL(startup_filter(filtered))) {
LOG_WARN("do startup filter failed", K(ret), K_(eval_ctx), "op", op_name());
} else {
if (filtered) {
brs_.end_ = true;
} else {
startup_passed_ = true;
}
}
}
while (OB_SUCC(ret) && !brs_.end_) {
if (OB_FAIL(inner_get_next_batch(max_row_cnt))) {
LOG_WARN("get next batch failed", K(ret), K_(eval_ctx), "id", spec_.get_id(), "op_name", op_name());
} else {
LOG_DEBUG("inner get next batch", "id", spec_.get_id(), "op_name", op_name(), K(brs_));
}
if (OB_SUCC(ret)) {
// FIXME bin.lb: accumulate bit count is CPU consuming, disable in perf mode?
skipped_rows_count = brs_.skip_->accumulate_bit_cnt(brs_.size_);
if (OB_UNLIKELY(brs_.size_ == skipped_rows_count)) {
reset_batchrows();
continue;
}
}
if (OB_SUCC(ret) && ctx_.get_my_session()->is_user_session()) {
IGNORE_RETURN try_register_rt_monitor_node(brs_.size_);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(try_check_status_by_rows(brs_.size_))) {
LOG_WARN("check status failed", K(ret));
} else if (!spec_.filters_.empty()) {
bool all_filtered = false;
if (OB_FAIL(filter_batch_rows(spec_.filters_,
*brs_.skip_,
brs_.size_,
all_filtered))) {
LOG_WARN("filter batch rows failed", K(ret), K_(eval_ctx));
} else if (all_filtered) {
brs_.skip_->reset(brs_.size_);
brs_.size_ = 0;
// keep brs_.end_ unchanged.
continue;
}
}
break;
}
// do project
if (OB_SUCC(ret) && brs_.size_ > 0) {
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
} else {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
}
}
if (OB_SUCC(ret)) {
if (brs_.end_ && brs_.size_ >= 0) {
batch_reach_end_ = true; // prepare to be end
// if no data in batch, end iterate immediately, otherwise wait for next iterate
brs_.end_ = !brs_.size_;
}
op_monitor_info_.output_row_count_ += brs_.size_ - skipped_rows_count;
op_monitor_info_.skipped_rows_count_ += skipped_rows_count; // for batch
++op_monitor_info_.output_batches_; // for batch
if (!got_first_row_ && !brs_.end_) {
op_monitor_info_.first_row_time_ = ObClockGenerator::getClock();;
got_first_row_ = true;
}
if (brs_.end_) {
int tmp_ret = drain_exch();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret));
}
op_monitor_info_.last_row_time_ = ObClockGenerator::getClock();
}
}
if (OB_FAIL(check_stack_once())) {
LOG_WARN("too deep recusive", K(ret));
} else {
// Operator does NOT support vectorization, while its parent does. Return
// the batch with only 1 row
if (OB_FAIL(get_next_batch_with_onlyone_row())) {
begin_cpu_time_counting();
if (OB_UNLIKELY(spec_.need_check_output_datum_ && brs_checker_)) {
if (OB_FAIL(brs_checker_->check_datum_modified())) {
LOG_WARN("check output datum failed", K(ret), "id", spec_.get_id(), "op_name", op_name());
}
}
reset_batchrows();
int64_t skipped_rows_count = 0;
batch_rows = &brs_;
if (batch_reach_end_) {
// generated all data at last batch, just drain exch in this iterate
brs_.end_ = true;
brs_.size_ = 0;
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_LIKELY(get_spec().is_vectorized())) {
if (OB_UNLIKELY(!startup_passed_) && !brs_.end_) {
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_size(1);
guard.set_batch_idx(0);
bool filtered = false;
// TODO bin.lb: vectorized startup filter? no ObExpr::eval() in vectorization?
if (OB_FAIL(startup_filter(filtered))) {
LOG_WARN("do startup filter failed", K(ret), K_(eval_ctx), "op", op_name());
} else {
if (filtered) {
brs_.end_ = true;
} else {
startup_passed_ = true;
}
}
}
while (OB_SUCC(ret) && !brs_.end_) {
if (OB_FAIL(inner_get_next_batch(max_row_cnt))) {
LOG_WARN("get next batch failed", K(ret), K_(eval_ctx), "id", spec_.get_id(), "op_name", op_name());
} else {
LOG_DEBUG("inner get next batch", "id", spec_.get_id(), "op_name", op_name(), K(brs_));
}
if (OB_SUCC(ret)) {
// FIXME bin.lb: accumulate bit count is CPU consuming, disable in perf mode?
skipped_rows_count = brs_.skip_->accumulate_bit_cnt(brs_.size_);
if (OB_UNLIKELY(brs_.size_ == skipped_rows_count)) {
reset_batchrows();
continue;
}
}
if (OB_SUCC(ret) && ctx_.get_my_session()->is_user_session()) {
IGNORE_RETURN try_register_rt_monitor_node(brs_.size_);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(try_check_status_by_rows(brs_.size_))) {
LOG_WARN("check status failed", K(ret));
} else if (!spec_.filters_.empty()) {
bool all_filtered = false;
if (OB_FAIL(filter_batch_rows(spec_.filters_,
*brs_.skip_,
brs_.size_,
all_filtered))) {
LOG_WARN("filter batch rows failed", K(ret), K_(eval_ctx));
} else if (all_filtered) {
brs_.skip_->reset(brs_.size_);
brs_.size_ = 0;
// keep brs_.end_ unchanged.
continue;
}
}
break;
}
// do project
if (OB_SUCC(ret) && brs_.size_ > 0) {
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
} else {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
}
}
if (OB_SUCC(ret)) {
if (brs_.end_ && brs_.size_ >= 0) {
batch_reach_end_ = true; // prepare to be end
// if no data in batch, end iterate immediately, otherwise wait for next iterate
brs_.end_ = !brs_.size_;
}
op_monitor_info_.output_row_count_ += brs_.size_ - skipped_rows_count;
op_monitor_info_.skipped_rows_count_ += skipped_rows_count; // for batch
++op_monitor_info_.output_batches_; // for batch
if (!got_first_row_ && !brs_.end_) {
op_monitor_info_.first_row_time_ = ObClockGenerator::getClock();;
got_first_row_ = true;
}
if (brs_.end_) {
int tmp_ret = drain_exch();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("drain exchange data failed", K(tmp_ret));
}
op_monitor_info_.last_row_time_ = ObClockGenerator::getClock();
}
}
} else {
// Operator does NOT support vectorization, while its parent does. Return
// the batch with only 1 row
if (OB_FAIL(get_next_batch_with_onlyone_row())) {
// do nothing
}
}
}
if (OB_SUCC(ret)) {
if (OB_UNLIKELY(spec_.need_check_output_datum_) && brs_checker_ && !brs_.end_ && brs_.size_ > 0) {
OZ(brs_checker_->save(brs_.size_));
if (OB_SUCC(ret)) {
if (OB_UNLIKELY(spec_.need_check_output_datum_) && brs_checker_ && !brs_.end_ && brs_.size_ > 0) {
OZ(brs_checker_->save(brs_.size_));
}
LOG_DEBUG("get next batch", "id", spec_.get_id(), "op_name", op_name(), K(brs_));
}
LOG_DEBUG("get next batch", "id", spec_.get_id(), "op_name", op_name(), K(brs_));
end_cpu_time_counting();
}
end_cpu_time_counting();
return ret;
}