Copy datums for dynamic const exprs in remote stage

This commit is contained in:
obdev 2023-05-10 04:52:14 +00:00 committed by ob-robot
parent dc8b7ebdb7
commit 25b2740486
4 changed files with 62 additions and 18 deletions

View File

@ -2951,7 +2951,15 @@ int ObStaticEngineCG::generate_spec(ObLogExchange &op, ObDirectReceiveSpec &spec
UNUSED(op);
UNUSED(spec);
UNUSED(in_root_job);
return OB_SUCCESS;
int ret = OB_SUCCESS;
ObSEArray<ObExpr *, 2> dynamic_consts;
for (int64_t i = 0; OB_SUCC(ret) && i < spec.output_.count(); i++) {
if (spec.output_.at(i)->is_dynamic_const_ && !spec.output_.at(i)->is_static_const_) {
OZ(dynamic_consts.push_back(spec.output_.at(i)));
}
}
OZ(spec.dynamic_const_exprs_.assign(dynamic_consts));
return ret;
}
int ObStaticEngineCG::generate_spec(ObLogExchange &op, ObPxMSReceiveSpec &spec, const bool in_root_job)

View File

@ -319,8 +319,12 @@ int ObReceiveRowReader::to_expr(const ObChunkDatumStore::StoredRow *srow,
ret = OB_ERR_UNEXPECTED;
} else {
for (uint32_t i = 0; i < srow->cnt_; ++i) {
exprs.at(i)->locate_expr_datum(eval_ctx) = srow->cells()[i];
exprs.at(i)->set_evaluated_projected(eval_ctx);
if (exprs.at(i)->is_static_const_) {
continue;
} else {
exprs.at(i)->locate_expr_datum(eval_ctx) = srow->cells()[i];
exprs.at(i)->set_evaluated_projected(eval_ctx);
}
}
// deep copy dynamic const expr datum
if (dynamic_const_exprs.count() > 0) {
@ -377,19 +381,23 @@ int ObReceiveRowReader::attach_rows(const common::ObIArray<ObExpr*> &exprs,
LOG_WARN("invalid argument", K(ret));
} else {
for (int64_t col_idx = 0; col_idx < exprs.count(); col_idx++) {
ObExpr *e = exprs.at(col_idx);
ObDatum *datums = e->locate_batch_datums(eval_ctx);
if (!e->is_batch_result()) {
datums[0] = srows[0]->cells()[col_idx];
if (exprs.at(col_idx)->is_static_const_) {
continue;
} else {
for (int64_t i = 0; i < read_rows; i++) {
datums[i] = srows[i]->cells()[col_idx];
ObExpr *e = exprs.at(col_idx);
ObDatum *datums = e->locate_batch_datums(eval_ctx);
if (!e->is_batch_result()) {
datums[0] = srows[0]->cells()[col_idx];
} else {
for (int64_t i = 0; i < read_rows; i++) {
datums[i] = srows[i]->cells()[col_idx];
}
}
e->set_evaluated_projected(eval_ctx);
ObEvalInfo &info = e->get_eval_info(eval_ctx);
info.notnull_ = false;
info.point_to_frame_ = false;
}
e->set_evaluated_projected(eval_ctx);
ObEvalInfo &info = e->get_eval_info(eval_ctx);
info.notnull_ = false;
info.point_to_frame_ = false;
}
// deep copy dynamic const expr datum
if (OB_SUCC(ret) && dynamic_const_exprs.count() > 0 && read_rows > 0) {

View File

@ -26,7 +26,7 @@ namespace oceanbase
{
namespace sql
{
OB_SERIALIZE_MEMBER((ObDirectReceiveSpec, ObOpSpec));
OB_SERIALIZE_MEMBER((ObDirectReceiveSpec, ObOpSpec), dynamic_const_exprs_);
ObDirectReceiveOp::ObDirectReceiveOp(ObExecContext &exec_ctx,
const ObOpSpec &spec,
@ -318,8 +318,24 @@ int ObDirectReceiveOp::get_next_row_from_cur_scanner()
ret = OB_ERR_UNEXPECTED;
} else {
for (uint32_t i = 0; i < tmp_sr->cnt_; ++i) {
MY_SPEC.output_.at(i)->locate_expr_datum(eval_ctx_) = tmp_sr->cells()[i];
MY_SPEC.output_.at(i)->set_evaluated_projected(eval_ctx_);
if (MY_SPEC.output_.at(i)->is_static_const_) {
continue;
} else {
MY_SPEC.output_.at(i)->locate_expr_datum(eval_ctx_) = tmp_sr->cells()[i];
MY_SPEC.output_.at(i)->set_evaluated_projected(eval_ctx_);
}
}
// deep copy dynamic const expr datum
clear_dynamic_const_parent_flag();
if (MY_SPEC.dynamic_const_exprs_.count() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.dynamic_const_exprs_.count(); i++) {
ObExpr *expr = MY_SPEC.dynamic_const_exprs_.at(i);
if (0 == expr->res_buf_off_) {
// for compat 4.0, do nothing
} else if (OB_FAIL(expr->deep_copy_self_datum(eval_ctx_))) {
LOG_WARN("fail to deep copy datum", K(ret), K(eval_ctx_), K(*expr));
}
}
}
LOG_DEBUG("direct receive next row", "row", ROWEXPR2STR(eval_ctx_, MY_SPEC.output_));
}

View File

@ -26,9 +26,9 @@ class ObDirectReceiveSpec : public ObReceiveSpec
OB_UNIS_VERSION_V(1);
public:
ObDirectReceiveSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type)
: ObReceiveSpec(alloc, type)
: ObReceiveSpec(alloc, type), dynamic_const_exprs_(alloc)
{}
ExprFixedArray dynamic_const_exprs_; // const expr which contain dynamic param
virtual ~ObDirectReceiveSpec() {};
};
@ -49,6 +49,18 @@ private:
int setup_next_scanner();
int get_next_row_from_cur_scanner();
int update_user_var();
// clear dynamic const expr parent evaluate flag, because when dynmaic param datum
// changed, if we don't clear dynamic const expr parent expr evaluate flag, the
// parent expr datum ptr may point to the last dynamic param datum memory which
// is invalid now
OB_INLINE void clear_dynamic_const_parent_flag()
{
const ObDirectReceiveSpec &spec = static_cast<const ObDirectReceiveSpec &>(spec_);
for (int64_t i = 0; i < spec.dynamic_const_exprs_.count(); i++) {
ObDynamicParamSetter::clear_parent_evaluated_flag(
eval_ctx_, *spec.dynamic_const_exprs_.at(i));
}
}
private:
common::ObScanner *scanner_;
ObChunkDatumStore::Iterator scanner_iter_;