[FEAT MERGE] 4.2 add trans_debug_info for 4377

Co-authored-by: Handora <qcdsr970209@gmail.com>
This commit is contained in:
yishenglanlingzui
2023-05-06 06:08:35 +00:00
committed by ob-robot
parent 711f4d5cd0
commit 9e328378c8
63 changed files with 1013 additions and 169 deletions

View File

@ -112,10 +112,10 @@ int ObMultipleMerge::init(
STORAGE_LOG(WARN, "Failed to init datum row", K(ret));
} else if (OB_FAIL(nop_pos_.init(*context.stmt_allocator_, param.get_max_out_col_cnt()))) {
STORAGE_LOG(WARN, "Fail to init nop pos, ", K(ret));
} else if (NULL != param.op_ && (NULL == param.output_exprs_ || NULL == param.row2exprs_projector_
} else if (NULL != param.get_op() && (NULL == param.output_exprs_ || NULL == param.row2exprs_projector_
|| OB_FAIL(param.row2exprs_projector_->init(
*param.output_exprs_,
*param.op_,
*param.get_op(),
*param.iter_param_.out_cols_project_)))) {
if (OB_SUCCESS == ret) {
ret = OB_ERR_UNEXPECTED;
@ -359,6 +359,9 @@ int ObMultipleMerge::get_next_row(ObDatumRow *&row)
LOG_WARN("Failed to fill iter idx", K(ret), KPC(access_param_), K(unprojected_row_));
} else if (OB_FAIL(process_fuse_row(not_using_static_engine, unprojected_row_, row))) {
LOG_WARN("get row from fuse failed", K(ret), K(unprojected_row_));
} else if (OB_NOT_NULL(access_param_->get_op()) &&
OB_FAIL(access_param_->get_op()->write_trans_info_datum(unprojected_row_))) {
LOG_WARN("write trans_info to expr datum failed", K(ret), K(unprojected_row_));
} else if (nullptr != row) {
break;
}
@ -400,11 +403,11 @@ int ObMultipleMerge::get_next_rows(int64_t &count, int64_t capacity)
} else if (ObQRIterType::T_SINGLE_GET == get_type()) {
ObDatumRow *row = nullptr;
sql::ObEvalCtx *eval_ctx = nullptr;
if (OB_ISNULL(access_param_->op_)) {
if (OB_ISNULL(access_param_->get_op())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected access param: null op", K(ret));
} else if (FALSE_IT(eval_ctx = &access_param_->op_->get_eval_ctx())) {
} else if (FALSE_IT(eval_ctx->reuse(min(capacity, access_param_->op_->get_batch_size())))) {
} else if (FALSE_IT(eval_ctx = &access_param_->get_op()->get_eval_ctx())) {
} else if (FALSE_IT(eval_ctx->reuse(min(capacity, access_param_->get_op()->get_batch_size())))) {
} else if (OB_FAIL(get_next_row(row))) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to get single row", K(ret));
@ -430,13 +433,13 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity)
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("The ObMultipleMerge has not been inited, ", K(ret));
} else if (OB_UNLIKELY(nullptr == access_param_->op_
|| !access_param_->op_->is_vectorized()
} else if (OB_UNLIKELY(nullptr == access_param_->get_op()
|| !access_param_->get_op()->is_vectorized()
|| !access_param_->iter_param_.vectorized_enabled_
|| nullptr == block_row_store_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect pushdown operator in vectorized", K(ret), K(access_param_->iter_param_.pd_storage_flag_),
K(access_param_->op_), K(access_param_->op_->is_vectorized()), K(block_row_store_),
K(access_param_->get_op()), K(access_param_->get_op()->is_vectorized()), K(block_row_store_),
K(access_param_->iter_param_.vectorized_enabled_));
}
@ -445,7 +448,7 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity)
LOG_WARN("fail to refresh table on demand", K(ret));
} else {
ObVectorStore *vector_store = reinterpret_cast<ObVectorStore *>(block_row_store_);
int64_t batch_size = min(capacity, access_param_->op_->get_batch_size());
int64_t batch_size = min(capacity, access_param_->get_op()->get_batch_size());
vector_store->reuse_capacity(batch_size);
if (need_padding_) {
padding_allocator_.reuse();
@ -500,8 +503,10 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity)
} else if (OB_FAIL(process_fuse_row(nullptr == access_param_->output_exprs_, unprojected_row_, out_row))) {
LOG_WARN("get row from fuse failed", K(ret), K(unprojected_row_));
} else if (nullptr != out_row) {
if (OB_FAIL(access_param_->op_->deep_copy(access_param_->output_exprs_, vector_store->get_row_count()))) {
if (OB_FAIL(access_param_->get_op()->deep_copy(access_param_->output_exprs_, vector_store->get_row_count()))) {
LOG_WARN("fail to deep copy row", K(ret));
} else if (OB_FAIL(access_param_->get_op()->write_trans_info_datum(unprojected_row_))) {
LOG_WARN("write trans_info to expr datum failed", K(ret), K(unprojected_row_));
} else if (OB_FAIL(vector_store->fill_row(unprojected_row_))) {
LOG_WARN("fail to aggregate row", K(ret));
}
@ -539,7 +544,7 @@ int ObMultipleMerge::get_next_aggregate_row(ObDatumRow *&row)
} else if (OB_UNLIKELY(nullptr == block_row_store_ || access_param_->iter_param_.need_fill_group_idx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected aggregate pushdown status", K(ret),
K(access_param_->op_), K(block_row_store_), K(access_param_->iter_param_.need_fill_group_idx()));
K(access_param_->get_op()), K(block_row_store_), K(access_param_->iter_param_.need_fill_group_idx()));
} else if (OB_UNLIKELY(nullptr != access_ctx_->range_array_pos_ &&
access_ctx_->range_array_pos_->count() > 1)) {
ret = OB_ERR_UNEXPECTED;
@ -551,17 +556,17 @@ int ObMultipleMerge::get_next_aggregate_row(ObDatumRow *&row)
} else {
ObAggregatedStore *agg_row_store = reinterpret_cast<ObAggregatedStore *>(block_row_store_);
agg_row_store->reuse_aggregated_row();
if (OB_NOT_NULL(access_param_->op_)) {
int64_t batch_size = max(1, access_param_->op_->get_batch_size());
access_param_->op_->get_eval_ctx().reuse(batch_size);
if (OB_NOT_NULL(access_param_->get_op())) {
int64_t batch_size = max(1, access_param_->get_op()->get_batch_size());
access_param_->get_op()->get_eval_ctx().reuse(batch_size);
}
reuse_lob_locator();
while (OB_SUCC(ret) && !agg_row_store->is_end()) {
bool can_batch = false;
// clear evaluated flag for every row
// all rows will be touched in this loop
if (NULL != access_param_->op_) {
access_param_->op_->clear_datum_eval_flag();
if (NULL != access_param_->get_op()) {
access_param_->get_op()->clear_datum_eval_flag();
}
if (OB_FAIL(refresh_table_on_demand())) {
LOG_WARN("fail to refresh table on demand", K(ret));
@ -710,8 +715,8 @@ int ObMultipleMerge::process_fuse_row(const bool not_using_static_engine,
} else if (nullptr != access_ctx_->limit_param_
&& access_ctx_->out_cnt_ < access_ctx_->limit_param_->offset_) {
// clear evaluated flag for next row.
if (NULL != access_param_->op_) {
access_param_->op_->clear_datum_eval_flag();
if (NULL != access_param_->get_op()) {
access_param_->get_op()->clear_datum_eval_flag();
}
++access_ctx_->out_cnt_;
} else {
@ -865,8 +870,8 @@ int ObMultipleMerge::alloc_row_store(ObTableAccessContext &context, const ObTabl
LOG_WARN("fail to alloc aggregated store", K(ret));
} else {
block_row_store_ = new (buf) ObAggregatedStore(
param.iter_param_.vectorized_enabled_ ? param.op_->get_batch_size() : ObAggregatedStore::BATCH_SIZE,
param.op_->get_eval_ctx(),
param.iter_param_.vectorized_enabled_ ? param.get_op()->get_batch_size() : ObAggregatedStore::BATCH_SIZE,
param.get_op()->get_eval_ctx(),
context);
}
} else if (ObQRIterType::T_SINGLE_GET != get_type()) {
@ -876,8 +881,8 @@ int ObMultipleMerge::alloc_row_store(ObTableAccessContext &context, const ObTabl
LOG_WARN("fail to alloc vector store", K(ret));
} else {
block_row_store_ = new (buf) ObVectorStore(
param.op_->get_batch_size(),
param.op_->get_eval_ctx(),
param.get_op()->get_batch_size(),
param.get_op()->get_eval_ctx(),
context);
}
} else if (param.iter_param_.enable_pd_blockscan()) {
@ -940,9 +945,9 @@ int ObMultipleMerge::fuse_default(ObDatumRow &row)
if (!def_cell.is_nop_value()) {
sql::ObExpr *expr = access_param_->output_exprs_->at(pos);
sql::ObDatum &datum = expr->locate_datum_for_write(
access_param_->op_->get_eval_ctx());
access_param_->get_op()->get_eval_ctx());
sql::ObEvalInfo &eval_info = expr->get_eval_info(
access_param_->op_->get_eval_ctx());
access_param_->get_op()->get_eval_ctx());
if (OB_FAIL(datum.from_obj(def_cell, expr->obj_datum_map_))) {
LOG_WARN("convert obj to datum failed", K(ret));
} else if (is_lob_storage(def_cell.get_type()) &&
@ -1009,10 +1014,10 @@ int ObMultipleMerge::pad_columns(ObDatumRow &row)
// do nothing for virtual column with no data read,
// datum is filled && padded in fill_virtual_columns().
} else {
if (OB_ISNULL(access_param_->op_)) {
if (OB_ISNULL(access_param_->get_op())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected access param: null op", K(ret));
} else if (OB_FAIL(pad_column(col_param->get_accuracy(), access_param_->op_->get_eval_ctx(), *e))) {
} else if (OB_FAIL(pad_column(col_param->get_accuracy(), access_param_->get_op()->get_eval_ctx(), *e))) {
LOG_WARN("pad column failed", K(ret), K(*col_param));
}
}
@ -1024,7 +1029,7 @@ int ObMultipleMerge::pad_columns(ObDatumRow &row)
int ObMultipleMerge::fill_virtual_columns(ObDatumRow &row)
{
int ret = OB_SUCCESS;
if (NULL != access_param_->op_) {
if (NULL != access_param_->get_op()) {
for (int64_t i = 0; OB_SUCC(ret) && i < nop_pos_.count(); i++) {
int64_t pos = 0;
if (OB_FAIL(nop_pos_.get_nop_pos(i, pos))) {
@ -1034,16 +1039,16 @@ int ObMultipleMerge::fill_virtual_columns(ObDatumRow &row)
// table scan access exprs is column reference expr, only virtual column has argument.
if (expr->arg_cnt_ > 0) {
ObDatum *datum = NULL;
access_param_->op_->clear_datum_eval_flag();
if (OB_FAIL(expr->eval(access_param_->op_->get_eval_ctx(), datum))) {
access_param_->get_op()->clear_datum_eval_flag();
if (OB_FAIL(expr->eval(access_param_->get_op()->get_eval_ctx(), datum))) {
LOG_WARN("evaluate virtual column failed", K(ret));
} else if (need_padding_ && expr->obj_meta_.is_fixed_len_char_type()) {
const int64_t col_idx = access_param_->iter_param_.out_cols_project_->at(pos);
if (OB_ISNULL(access_param_->op_)) {
if (OB_ISNULL(access_param_->get_op())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected access param: null op", K(ret));
} else if (OB_FAIL(pad_column(access_param_->iter_param_.get_col_params()->at(col_idx)->get_accuracy(),
access_param_->op_->get_eval_ctx(),
access_param_->get_op()->get_eval_ctx(),
*expr))) {
LOG_WARN("pad column failed", K(ret));
}
@ -1069,7 +1074,7 @@ int ObMultipleMerge::check_filtered(const ObDatumRow &row, bool &filtered)
&& !access_param_->op_filters_->empty()) {
// Execute filter in sql static typing engine.
// %row is already projected to output expressions for main table scan.
if (OB_FAIL(access_param_->op_->filter_row_outside(*access_param_->op_filters_, filtered))) {
if (OB_FAIL(access_param_->get_op()->filter_row_outside(*access_param_->op_filters_, filtered))) {
LOG_WARN("filter row failed", K(ret));
}
}