[FEAT MERGE] DAS iterator refactor and keep order optimization

Co-authored-by: saltonz <saltonzh@gmail.com>
Co-authored-by: zhenhan.gong@gmail.com <zhenhan.gong@gmail.com>
Co-authored-by: Tyshawn <tuyunshan@gmail.com>
This commit is contained in:
pe-99y
2024-06-24 13:57:14 +00:00
committed by ob-robot
parent 7f3ce430fb
commit 5c5e6da6ce
88 changed files with 7062 additions and 2432 deletions

View File

@ -105,6 +105,9 @@ int ObTscCgService::generate_tsc_ctdef(ObLogTableScan &op, ObTableScanCtDef &tsc
root_ctdef = &scan_ctdef;
}
}
if (OB_SUCC(ret) && op.das_need_keep_ordering()) {
tsc_ctdef.is_das_keep_order_ = true;
}
//to generate dynamic tsc partition pruning info
if (OB_SUCC(ret)) {
@ -166,7 +169,8 @@ int ObTscCgService::generate_tsc_ctdef(ObLogTableScan &op, ObTableScanCtDef &tsc
if (OB_SUCC(ret) && op.is_multivalue_index_scan()) {
ObDASIRAuxLookupCtDef *aux_lookup_ctdef = nullptr;
if (OB_FAIL(generate_doc_id_lookup_ctdef(op, tsc_ctdef, root_ctdef, aux_lookup_ctdef))) {
ObExpr *doc_id_col_expr = scan_ctdef.result_output_.at(scan_ctdef.result_output_.count() - 1);
if (OB_FAIL(generate_doc_id_lookup_ctdef(op, tsc_ctdef, root_ctdef, doc_id_col_expr, aux_lookup_ctdef))) {
LOG_WARN("failed to generate text ir ctdef", K(ret));
} else {
root_ctdef = aux_lookup_ctdef;
@ -194,14 +198,15 @@ int ObTscCgService::generate_tsc_ctdef(ObLogTableScan &op, ObTableScanCtDef &tsc
return ret;
}
int ObTscCgService::generate_table_param(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef)
int ObTscCgService::generate_table_param(const ObLogTableScan &op,
ObDASScanCtDef &scan_ctdef,
common::ObIArray<uint64_t> &tsc_out_cols)
{
int ret = OB_SUCCESS;
ObTableID index_id = scan_ctdef.ref_table_id_;
const ObTableSchema *table_schema = NULL;
const bool pd_agg = scan_ctdef.pd_expr_spec_.pd_storage_flag_.is_aggregate_pushdown();
const bool pd_group_by = scan_ctdef.pd_expr_spec_.pd_storage_flag_.is_group_by_pushdown();
ObArray<uint64_t> tsc_out_cols;
ObSqlSchemaGuard *schema_guard = cg_.opt_ctx_->get_sql_schema_guard();
CK(OB_NOT_NULL(schema_guard));
if (OB_UNLIKELY(pd_agg && 0 == scan_ctdef.aggregate_column_ids_.count()) ||
@ -242,8 +247,6 @@ int ObTscCgService::generate_table_param(const ObLogTableScan &op, ObDASScanCtDe
scan_ctdef.pd_expr_spec_.pd_storage_flag_))) {
LOG_WARN("convert group by failed", K(ret), K(*table_schema),
K(scan_ctdef.aggregate_column_ids_), K(scan_ctdef.group_by_column_ids_));
} else if (OB_FAIL(generate_das_result_output(tsc_out_cols, scan_ctdef, op.get_trans_info_expr(), pd_agg))) {
LOG_WARN("failed to init result outputs", K(ret));
}
return ret;
}
@ -297,6 +300,7 @@ int ObTscCgService::generate_das_result_output(const ObIArray<uint64_t> &output_
scan_ctdef.trans_info_expr_ = e;
}
}
return ret;
}
@ -594,15 +598,20 @@ int ObTscCgService::extract_das_access_exprs(const ObLogTableScan &op,
LOG_WARN("append the data table rowkey expr failed", K(ret), K(op.get_rowkey_exprs()));
} else if (OB_FAIL(append_array_no_dup(access_exprs, op.get_part_exprs()))) {
LOG_WARN("append the data table part expr failed", K(ret), K(op.get_part_exprs()));
} else if (NULL != op.get_group_id_expr() && op.use_batch()
&& OB_FAIL(add_var_to_array_no_dup(access_exprs,
const_cast<ObRawExpr *>(op.get_group_id_expr())))) {
LOG_WARN("fail to add group id", K(ret));
}
}
} else if (OB_FAIL(access_exprs.assign(op.get_access_exprs()))) {
LOG_WARN("assign access exprs failed", K(ret));
}
// store group_id_expr when use group id
if (OB_SUCC(ret) && op.use_group_id()) {
const ObRawExpr* group_id_expr = op.get_group_id_expr();
if (group_id_expr != nullptr &&
OB_FAIL(add_var_to_array_no_dup(access_exprs, const_cast<ObRawExpr*>(group_id_expr)))) {
LOG_WARN("failed to push back group id expr", K(ret));
}
}
if (OB_SUCC(ret) && is_oracle_mapping_real_virtual_table(op.get_ref_table_id())) {
//the access exprs are the agent virtual table columns, but das need the real table columns
//now to replace the real table column
@ -897,7 +906,7 @@ int ObTscCgService::generate_das_scan_ctdef(const ObLogTableScan &op,
}
//4. generate batch scan ctdef
if (OB_SUCC(ret) && op.use_batch()) {
if (OB_SUCC(ret) && op.use_group_id()) {
if (OB_FAIL(cg_.generate_rt_expr(*op.get_group_id_expr(), scan_ctdef.group_id_expr_))) {
LOG_WARN("generate group id expr failed", K(ret));
}
@ -911,11 +920,19 @@ int ObTscCgService::generate_das_scan_ctdef(const ObLogTableScan &op,
}
}
//6. generate table param
ObArray<uint64_t> tsc_out_cols;
if (OB_SUCC(ret)) {
if (OB_FAIL(generate_table_param(op, scan_ctdef))) {
if (OB_FAIL(generate_table_param(op, scan_ctdef, tsc_out_cols))) {
LOG_WARN("generate table param failed", K(ret));
}
}
//7. generate das result output
if (OB_SUCC(ret)) {
const bool pd_agg = scan_ctdef.pd_expr_spec_.pd_storage_flag_.is_aggregate_pushdown();
if (OB_FAIL(generate_das_result_output(tsc_out_cols, scan_ctdef, op.get_trans_info_expr(), pd_agg))) {
LOG_WARN("failed to init result outputs", K(ret));
}
}
return ret;
}
@ -945,7 +962,7 @@ int ObTscCgService::extract_das_output_column_ids(const ObLogTableScan &op,
LOG_WARN("unexpected nullptr to table schema", K(ret));
} else if (OB_FAIL(table_schema->get_rowkey_column_ids(output_cids))) {
LOG_WARN("get rowkey column ids failed", K(ret));
} else if (nullptr != op.get_group_id_expr() && op.use_batch()) {
} else if (nullptr != op.get_group_id_expr() && op.use_group_id()) {
if (OB_FAIL(output_cids.push_back(OB_HIDDEN_GROUP_IDX_COLUMN_ID))) {
LOG_WARN("store group column id failed", K(ret));
}
@ -986,6 +1003,7 @@ int ObTscCgService::extract_das_output_column_ids(const ObLogTableScan &op,
LOG_WARN("store colum id failed", K(ret));
}
}
//column expr in non-pushdown filter need to be output,
//because filter_row will use it in TSC operator
} else if (OB_FAIL(extract_tsc_access_columns(op, das_output_cols))) {
@ -1003,7 +1021,7 @@ int ObTscCgService::extract_das_output_column_ids(const ObLogTableScan &op,
LOG_WARN("store real output colum id failed", K(ret), KPC(mapping_expr));
}
}
} else if (nullptr != op.get_group_id_expr() && op.use_batch() &&
} else if (nullptr != op.get_group_id_expr() && op.use_group_id() &&
OB_FAIL(das_output_cols.push_back(const_cast<ObRawExpr*>(op.get_group_id_expr())))) {
LOG_WARN("store group id expr failed", K(ret));
} else if (OB_FAIL(extract_das_column_ids(das_output_cols, output_cids))) {
@ -1114,6 +1132,7 @@ int ObTscCgService::generate_text_ir_ctdef(const ObLogTableScan &op,
ObSqlSchemaGuard *schema_guard = cg_.opt_ctx_->get_sql_schema_guard();
ObDASIRScanCtDef *ir_scan_ctdef = nullptr;
ObDASSortCtDef *sort_ctdef = nullptr;
ObExpr *index_back_doc_id_column = nullptr;
const bool use_approx_pre_agg = true; // TODO: support differentiate use approx agg or not
if (OB_ISNULL(match_against) || OB_ISNULL(schema_guard)) {
ret = OB_ERR_UNEXPECTED;
@ -1221,6 +1240,7 @@ int ObTscCgService::generate_text_ir_ctdef(const ObLogTableScan &op,
partition_row_cnt = est_cost_info->table_meta_info_->table_row_count_ / est_cost_info->table_meta_info_->part_count_;
}
ir_scan_ctdef->estimated_total_doc_cnt_ = partition_row_cnt;
index_back_doc_id_column = ir_scan_ctdef->inv_scan_doc_id_col_;
}
}
@ -1245,7 +1265,8 @@ int ObTscCgService::generate_text_ir_ctdef(const ObLogTableScan &op,
ObDASIRAuxLookupCtDef *aux_lookup_ctdef = nullptr;
ObDASBaseCtDef *ir_output_ctdef = nullptr == sort_ctdef ?
static_cast<ObDASBaseCtDef *>(ir_scan_ctdef) : static_cast<ObDASBaseCtDef *>(sort_ctdef);
if (OB_FAIL(generate_doc_id_lookup_ctdef(op, tsc_ctdef, ir_output_ctdef, aux_lookup_ctdef))) {
if (OB_FAIL(generate_doc_id_lookup_ctdef(
op, tsc_ctdef, ir_output_ctdef, index_back_doc_id_column, aux_lookup_ctdef))) {
LOG_WARN("generate doc id lookup ctdef failed", K(ret));
} else if (OB_FAIL(append_fts_relavence_project_col(aux_lookup_ctdef, ir_scan_ctdef))) {
LOG_WARN("failed to append fts relavence project col", K(ret));
@ -1511,6 +1532,7 @@ int ObTscCgService::generate_text_ir_spec_exprs(const ObLogTableScan &op,
int ObTscCgService::generate_doc_id_lookup_ctdef(const ObLogTableScan &op,
ObTableScanCtDef &tsc_ctdef,
ObDASBaseCtDef *ir_scan_ctdef,
ObExpr *doc_id_expr,
ObDASIRAuxLookupCtDef *&aux_lookup_ctdef)
{
int ret = OB_SUCCESS;
@ -1560,6 +1582,13 @@ int ObTscCgService::generate_doc_id_lookup_ctdef(const ObLogTableScan &op,
LOG_WARN("generate das lookup scan ctdef failed", K(ret));
} else if (OB_FAIL(result_outputs.assign(scan_ctdef->result_output_))) {
LOG_WARN("construct aux lookup ctdef failed", K(ret));
} else if (OB_ISNULL(doc_id_expr) || OB_UNLIKELY(!scan_ctdef->rowkey_exprs_.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected doc id expr status", K(ret), KPC(doc_id_expr), KPC(scan_ctdef));
} else if (OB_FAIL(scan_ctdef->rowkey_exprs_.reserve(1))) {
LOG_WARN("failed to reserve doc id lookup rowkey exprs", K(ret));
} else if (OB_FAIL(scan_ctdef->rowkey_exprs_.push_back(doc_id_expr))) {
LOG_WARN("failed to append doc id rowkey expr", K(ret));
} else if (OB_FAIL(generate_table_loc_meta(op.get_table_id(),
*op.get_stmt(),
*index_schema,
@ -1637,6 +1666,17 @@ int ObTscCgService::generate_table_lookup_ctdef(const ObLogTableScan &op,
LOG_WARN("fail to generate rowkey exprs", K(ret));
}
}
if (OB_SUCC(ret) && op.get_index_back()) {
ObArray<ObRawExpr*> rowkey_exprs;
if (OB_FAIL(rowkey_exprs.assign(op.get_rowkey_exprs()))) {
LOG_WARN("failed to assign rowkey exprs", K(ret));
} else if (!op.get_is_index_global() && OB_FAIL(mapping_oracle_real_agent_virtual_exprs(op, rowkey_exprs))) {
LOG_WARN("failed to mapping oracle real virtual exprs", K(ret));
} else if (OB_FAIL(cg_.generate_rt_exprs(rowkey_exprs, tsc_ctdef.lookup_ctdef_->rowkey_exprs_))) {
LOG_WARN("failed to generate main table rowkey exprs", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDASTaskFactory::alloc_das_ctdef(DAS_OP_TABLE_LOOKUP, allocator, lookup_ctdef))) {
@ -1828,5 +1868,33 @@ int ObTscCgService::generate_das_sort_ctdef(
return ret;
}
int ObTscCgService::mapping_oracle_real_agent_virtual_exprs(const ObLogTableScan &op,
common::ObIArray<ObRawExpr*> &access_exprs)
{
int ret = OB_SUCCESS;
if (is_oracle_mapping_real_virtual_table(op.get_ref_table_id())) {
//the access exprs are the agent virtual table columns, but das need the real table columns
//now to replace the real table column
for (int64_t i = 0; OB_SUCC(ret) && i < access_exprs.count(); ++i) {
ObRawExpr *expr = access_exprs.at(i);
ObRawExpr *mapping_expr = nullptr;
uint64_t column_id = UINT64_MAX;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr expr", K(expr), K(ret));
} else if (T_ORA_ROWSCN == expr->get_expr_type()) {
// keep orign expr as access expr
} else if (OB_ISNULL(mapping_expr = op.get_real_expr(expr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mapping expr is null", K(ret), KPC(expr));
} else {
//replace the agent virtual table column expr
access_exprs.at(i) = mapping_expr;
}
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -53,7 +53,7 @@ private:
int generate_access_ctdef(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef, bool &has_rowscn);
int generate_pushdown_aggr_ctdef(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef);
int generate_das_scan_ctdef(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef, bool &has_rowscn);
int generate_table_param(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef);
int generate_table_param(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef, common::ObIArray<uint64_t> &tsc_out_cols);
int extract_das_output_column_ids(const ObLogTableScan &op,
ObDASScanCtDef &scan_ctdef,
const ObTableSchema &index_schema,
@ -79,6 +79,7 @@ private:
int generate_doc_id_lookup_ctdef(const ObLogTableScan &op,
ObTableScanCtDef &tsc_ctdef,
ObDASBaseCtDef *ir_scan_ctdef,
ObExpr *doc_id_expr,
ObDASIRAuxLookupCtDef *&aux_lookup_ctdef);
int generate_table_lookup_ctdef(const ObLogTableScan &op,
ObTableScanCtDef &tsc_ctdef,
@ -98,6 +99,9 @@ private:
ObRawExpr *topk_offset_expr,
ObDASBaseCtDef *child_ctdef,
ObDASSortCtDef *&sort_ctdef);
int mapping_oracle_real_agent_virtual_exprs(const ObLogTableScan &op,
common::ObIArray<ObRawExpr*> &access_exprs);
private:
ObStaticEngineCG &cg_;
};