dml adapt with minmal

This commit is contained in:
yishenglanlingzui 2023-12-19 13:13:24 +00:00 committed by ob-robot
parent 8a90731161
commit 513f1ed008
3 changed files with 705 additions and 67 deletions

View File

@ -1007,6 +1007,7 @@ int ObDmlCgService::generate_scan_ctdef(ObLogInsert &op,
return ret;
}
int ObDmlCgService::generate_dml_column_ids(const ObLogicalOperator &op,
const ObIArray<ObColumnRefRawExpr*> &columns_exprs,
ObIArray<uint64_t> &column_ids)
@ -1179,9 +1180,535 @@ int ObDmlCgService::add_geo_col_projector(const ObIArray<ExprType*> &cur_row,
return ret;
}
int ObDmlCgService::append_all_pk_column_id(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
const ObRowkeyInfo &rowkey_info = table_schema->get_rowkey_info();
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_info.get_size(); ++i) {
const ObRowkeyColumn *rowkey_column = rowkey_info.get_column(i);
if (OB_FAIL(add_var_to_array_no_dup(minimal_column_ids, rowkey_column->column_id_))) {
LOG_WARN("store column id failed", K(ret), K(rowkey_column->column_id_));
}
}
return ret;
}
int ObDmlCgService::append_heap_table_part_id(const ObTableSchema *table_schema,
ObIArray<uint64_t> &part_key_ids)
{
int ret = OB_SUCCESS;
const ObPartitionKeyInfo &partition_keys = table_schema->get_partition_key_info();
const ObPartitionKeyInfo &subpartition_keys = table_schema->get_subpartition_key_info();
if (partition_keys.is_valid() && OB_FAIL(partition_keys.get_column_ids(part_key_ids))) {
LOG_WARN("fail to get column ids from partition keys", K(ret));
} else if (subpartition_keys.is_valid() && OB_FAIL(subpartition_keys.get_column_ids(part_key_ids))) {
LOG_WARN("fail to get column ids from subpartition keys", K(ret));
}
return ret;
}
int ObDmlCgService::append_heap_table_part_key_dependcy_column(const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
ObSEArray<uint64_t, 4> part_key_column_ids;
if (OB_FAIL(append_heap_table_part_id(table_schema, part_key_column_ids))) {
LOG_WARN("fail to append heap table part_id", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < part_key_column_ids.count(); i++) {
const ObColumnSchemaV2 *column_schema = nullptr;
if (OB_ISNULL(column_schema = table_schema->get_column_schema(part_key_column_ids.at(i)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret), K(part_key_column_ids.at(i)));
} else if (column_schema->is_generated_column()) {
ObArray<uint64_t> cascaded_columns;
if (OB_FAIL(column_schema->get_cascaded_column_ids(cascaded_columns))) {
LOG_WARN("get cascaded column ids failed", K(ret));
} else if (OB_FAIL(append_array_no_dup(minimal_column_ids, cascaded_columns))) {
LOG_WARN("fail to append cascaded_columns to minimal_column_ids", K(cascaded_columns), K(minimal_column_ids));
} else if (column_schema->is_stored_generated_column()) {
if (OB_FAIL(add_var_to_array_no_dup(minimal_column_ids, part_key_column_ids.at(i)))) {
LOG_WARN("fail to add part_key_id to minimal_column_ids", K(ret));
}
}
} else if (OB_FAIL(add_var_to_array_no_dup(minimal_column_ids, part_key_column_ids.at(i)))) {
LOG_WARN("fail to add part_key_id to minimal_column_ids", K(ret));
}
}
}
return ret;
}
int ObDmlCgService::check_unique_key_is_updated(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
bool &is_updated)
{
int ret = OB_SUCCESS;
is_updated = false;
const ObAssignments &assignments = index_dml_info.assignments_;
for (int64_t i = 0; OB_SUCC(ret) && !is_updated && i < assignments.count(); i++) {
// We cannot use col_expr->is_unique_key_column_ here, which may be false for a unique index column.
ObColumnRefRawExpr *col_expr = assignments.at(i).column_expr_;
bool is_unique_col = false;
if (OB_ISNULL(col_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(table_schema->is_real_unique_index_column(*schema_guard,
col_expr->get_column_id(),
is_unique_col))) {
LOG_WARN("is_unique_key_column failed", K(ret));
} else if (is_unique_col) {
is_updated = true;
}
}
return ret;
}
int ObDmlCgService::heap_table_has_not_null_uk(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool &need_all_columns)
{
int ret = OB_SUCCESS;
need_all_columns = false;
bool has_not_null_uk = false;
if (table_schema->is_heap_table()) {
if (OB_FAIL(table_schema->has_not_null_unique_key(*schema_guard, has_not_null_uk))) {
LOG_WARN("fail to check has not null UK", K(ret));
} else if (!has_not_null_uk) {
LOG_TRACE("this table don't has not null UK", K(table_schema->get_table_id()));
need_all_columns = true;
}
}
return ret;
}
int ObDmlCgService::append_time_type_column_id(const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
ObTableSchema::const_column_iterator iter = table_schema->column_begin();
for (; OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
const ObColumnSchemaV2 *column = *iter;
if (OB_ISNULL(column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column schema", K(column));
} else if (column->get_meta_type().is_datetime() ||
column->get_meta_type().is_timestamp() ||
column->get_meta_type().is_time() ||
column->get_meta_type().is_date() ||
column->get_meta_type().is_otimestamp_type()) {
// date/datatime and time/timestamp column need to be added to old_row
if (OB_FAIL(add_var_to_array_no_dup(minimal_column_ids, column->get_column_id()))) {
LOG_WARN("add time type column_id failed", K(ret), K(column->get_column_id()));
}
}
}
return ret;
}
int ObDmlCgService::append_udt_hidden_column_id(const ObTableSchema *table_schema,
const uint64_t column_id,
const uint64_t udt_set_id,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
ObSEArray<ObColumnSchemaV2 *, 1> hidden_cols;
if (OB_FAIL(table_schema->get_column_schema_in_same_col_group(column_id,
udt_set_id,
hidden_cols))) {
LOG_WARN("failed to get column schema", K(ret));
} else {
for (int j = 0; OB_SUCC(ret) && j < hidden_cols.count(); j++) {
if (OB_FAIL(add_var_to_array_no_dup(minimal_column_ids, hidden_cols.at(j)->get_column_id()))) {
LOG_WARN("add column id failed", K(ret), K(minimal_column_ids), K(hidden_cols.at(j)->get_column_id()));
}
}
}
return ret;
}
int ObDmlCgService::check_has_upd_rowkey(ObLogicalOperator &op,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
bool &upd_rowkey)
{
int ret = OB_SUCCESS;
upd_rowkey = false;
ObSEArray<uint64_t, 8> pk_ids;
const ObDMLStmt *stmt = op.get_stmt();
const ObAssignments &assignment = index_dml_info.assignments_;
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret), K(op));
} else if (OB_FAIL(table_schema->get_rowkey_info().get_column_ids(pk_ids))) {
LOG_WARN("failed to get rowkey column ids", K(ret));
}
for (int64_t i = 0; !upd_rowkey && OB_SUCC(ret) && i < assignment.count(); ++i) {
ColumnItem *column_item = nullptr;
const ObColumnRefRawExpr *column_expr = assignment.at(i).column_expr_;
if (OB_ISNULL(column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null column expr", K(ret));
} else if (OB_ISNULL(column_item = stmt->get_column_item_by_id(column_expr->get_table_id(),
column_expr->get_column_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null column item", K(ret), KPC(column_expr));
} else if (has_exist_in_array(pk_ids, column_item->base_cid_)) {
upd_rowkey = true;
}
}
return ret;
}
int ObDmlCgService::append_udt_hidden_col_id(ObLogicalOperator &op,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
const ObDMLStmt *stmt = op.get_stmt();
const ObAssignments &assignment = index_dml_info.assignments_;
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret), K(op));
}
for (int64_t i = 0; OB_SUCC(ret) && i < assignment.count(); ++i) {
ColumnItem *column_item = nullptr;
const ObColumnRefRawExpr *column_expr = assignment.at(i).column_expr_;
if (OB_ISNULL(column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null column expr", K(ret));
} else if (OB_ISNULL(column_item = stmt->get_column_item_by_id(column_expr->get_table_id(),
column_expr->get_column_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null column item", K(ret), KPC(column_expr));
} else if (column_expr->get_result_type().is_user_defined_sql_type()) {
// udt column is updated, then must record the hidden column
if (OB_FAIL(append_udt_hidden_column_id(table_schema,
column_item->base_cid_,
column_expr->get_udt_set_id(),
minimal_column_ids))) {
LOG_WARN("append udt hidden column id failed", K(ret),
K(column_item->base_cid_), K(column_expr->get_udt_set_id()), K(minimal_column_ids));
}
}
}
return ret;
}
int ObDmlCgService::append_upd_assignment_column_id(const ObTableSchema *table_schema,
ObDASUpdCtDef &das_upd_ctdef,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
if (OB_FAIL(append_array_no_dup(minimal_column_ids, das_upd_ctdef.updated_column_ids_))) {
LOG_WARN("fail to append update_column_id",
K(ret), K(minimal_column_ids), K(das_upd_ctdef.updated_column_ids_));
}
return ret;
}
int ObDmlCgService::is_table_has_unique_key(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool &is_has_uk)
{
int ret = OB_SUCCESS;
is_has_uk = false;
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
if (NULL == table_schema) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table schema", K(table_schema));
} else if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos))) {
LOG_WARN("get simple_index_infos failed", K(ret));
} else {
const uint64_t tenant_id = table_schema->get_tenant_id();
for (int64_t i = 0; OB_SUCC(ret) && !is_has_uk && i < simple_index_infos.count(); ++i) {
const ObTableSchema *index_table_schema = NULL;
if (OB_FAIL(schema_guard->get_table_schema(tenant_id,
simple_index_infos.at(i).table_id_, index_table_schema))) {
LOG_WARN("fail to get table schema", K(tenant_id),
K(simple_index_infos.at(i).table_id_), K(ret));
} else if (OB_ISNULL(index_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("index table schema must not be NULL", K(ret));
} else if (index_table_schema->is_unique_index()) {
is_has_uk = true;
} else {
// not unique index, skip
}
}
}
return ret;
}
int ObDmlCgService::check_upd_need_all_columns(ObLogicalOperator &op,
ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
bool is_primary_index,
bool &need_all_columns)
{
int ret = OB_SUCCESS;
bool has_uk = false;
bool is_uk_updated = false;
bool is_update_pk = false;
need_all_columns = false;
int64_t binlog_row_image = ObBinlogRowImage::FULL;
ObSQLSessionInfo *session = cg_.opt_ctx_->get_session_info();
if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(session->get_binlog_row_image(binlog_row_image))) {
LOG_WARN("fail to get binlog image", K(ret));
} else if (binlog_row_image == ObBinlogRowImage::FULL) {
// full mode
need_all_columns = true;
} else if (!is_primary_index) {
// index_table if update PK, also need record all_columns
if (OB_FAIL(check_has_upd_rowkey(op, table_schema, index_dml_info, is_update_pk))) {
LOG_WARN("fail to check has update UK", K(ret));
} else if (is_update_pk) {
need_all_columns = true;
LOG_TRACE("is update pk, need all columns", K(table_schema->get_table_id()));
}
} else if (OB_FAIL(is_table_has_unique_key(schema_guard, table_schema, has_uk))) {
LOG_WARN("fail to check table has UK", K(ret));
} else if (has_uk &&
OB_FAIL(check_unique_key_is_updated(schema_guard, table_schema, index_dml_info, is_uk_updated))) {
LOG_WARN("fail to check unique key is updated", K(ret), K(index_dml_info));
} else if (is_uk_updated) {
// need all columns
need_all_columns = true;
LOG_TRACE("update primary_table unique key, need all columns", K(need_all_columns));
} else if (OB_FAIL(heap_table_has_not_null_uk(schema_guard, table_schema, need_all_columns))) {
LOG_WARN("fail to check table whether has not null UK", K(ret));
} else if (need_all_columns) {
// need all columns
LOG_TRACE("is heap table and don't has not_null UK, need all columns", K(need_all_columns));
} else if (OB_FAIL(check_has_upd_rowkey(op, table_schema, index_dml_info, is_update_pk))) {
LOG_TRACE("update primary_table primary key, need all columns", K(need_all_columns));
} else if (is_update_pk) {
// rowkey is changed, need all columns
need_all_columns = true;
LOG_TRACE("update primary_table primary key, need all columns", K(table_schema->get_table_name_str()));
}
return ret;
}
int ObDmlCgService::append_upd_old_row_cid(ObLogicalOperator &op,
ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool is_primary_index,
ObDASUpdCtDef &das_upd_ctdef,
const IndexDMLInfo &index_dml_info,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
bool has_uk = false;
if (OB_FAIL(append_all_pk_column_id(schema_guard, table_schema, minimal_column_ids))) {
// append all PK
LOG_WARN("fail to append all pk to column_id", K(ret), K(table_schema->get_table_name_str()));
} else if (!is_primary_index) {
// append update column
if (OB_FAIL(append_upd_assignment_column_id(table_schema, das_upd_ctdef, minimal_column_ids))) {
LOG_WARN("fail to append upd assignment column_id", K(ret), K(index_dml_info));
}
} else if (OB_FAIL(is_table_has_unique_key(schema_guard, table_schema, has_uk))) {
LOG_WARN("fail to check table has UK", K(ret));
} else if (OB_FAIL(append_udt_hidden_col_id(op, table_schema, index_dml_info, minimal_column_ids))) {
// append UDT hidden column
LOG_WARN("fail to append upd assignment column_id", K(ret), K(index_dml_info));
} else if (OB_FAIL(append_upd_assignment_column_id(table_schema,
das_upd_ctdef,
minimal_column_ids))) {
// append update column
LOG_WARN("fail to append upd assignment column_id", K(ret), K(index_dml_info));
} else if (has_uk &&
OB_FAIL(append_all_uk_column_id(schema_guard, table_schema, minimal_column_ids))) {
// append UK
LOG_WARN("fail to append all uk column_id", K(ret));
} else if (OB_FAIL(append_time_type_column_id(table_schema, minimal_column_ids))) {
// append time_type column
LOG_WARN("fail to append time type column_id", K(ret));
} else if (table_schema->is_heap_table() &&
OB_FAIL(append_heap_table_part_key_dependcy_column(table_schema, minimal_column_ids))) {
// append heap table part_key_column_id and dependency column
LOG_WARN("fail to append heap table part_id", K(ret));
}
return ret;
}
int ObDmlCgService::generate_minimal_upd_old_row_cid(ObLogicalOperator &op,
ObTableID index_tid,
ObDASUpdCtDef &das_upd_ctdef,
const IndexDMLInfo &index_dml_info,
bool is_primary_index,
bool &need_all_columns,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
need_all_columns = false;
ObSchemaGetterGuard *schema_guard = NULL;
const ObTableSchema *table_schema = NULL;
bool has_uk = false;
bool is_uk_updated = false;
bool has_not_null_uk = false;
bool is_update_pk = false;
if (OB_ISNULL(schema_guard = cg_.opt_ctx_->get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL schema guard", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(MTL_ID(), index_tid, table_schema))) {
LOG_WARN("get schema fail", KR(ret), K(index_tid));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(index_tid));
} else if (OB_FAIL(check_upd_need_all_columns(op,
schema_guard,
table_schema,
index_dml_info,
is_primary_index,
need_all_columns))) {
LOG_WARN("fail to check whether update need all columns", K(ret), K(index_tid));
} else if (need_all_columns) {
if (OB_FAIL(minimal_column_ids.assign(das_upd_ctdef.column_ids_))) {
LOG_WARN("fail to assign column_id", K(ret), K(das_upd_ctdef.column_ids_));
}
} else if (OB_FAIL(append_upd_old_row_cid(op,
schema_guard,
table_schema,
is_primary_index,
das_upd_ctdef,
index_dml_info,
minimal_column_ids))) {
LOG_WARN("fail to append update old_row column_id", K(ret), K(index_tid));
}
return ret;
}
int ObDmlCgService::check_del_need_all_columns(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool &need_all_columns)
{
int ret = OB_SUCCESS;
need_all_columns = false;
bool has_not_null_uk = false;
int64_t binlog_row_image = ObBinlogRowImage::FULL;
ObSQLSessionInfo *session = cg_.opt_ctx_->get_session_info();
if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(session->get_binlog_row_image(binlog_row_image))) {
LOG_WARN("fail to get binlog image", K(ret));
} else if (binlog_row_image == ObBinlogRowImage::FULL) {
// full mode
need_all_columns = true;
} else if (table_schema->is_heap_table()) {
if (OB_FAIL(table_schema->has_not_null_unique_key(*schema_guard, has_not_null_uk))) {
LOG_WARN("fail to check whether has not null unique key", K(ret), K(table_schema->get_table_name_str()));
} else if (!has_not_null_uk) {
need_all_columns = true;
LOG_TRACE("is heap_table and don't has not_null uk", K(table_schema->get_table_name_str()));
}
}
return ret;
}
int ObDmlCgService::generate_minimal_delete_old_row_cid(ObTableID index_tid,
bool is_primary_index,
ObDASDelCtDef &das_del_ctdef,
ObIArray<uint64_t> &minimal_column_ids)
{
// The rules of minmal mode are as follows: delete semantic index table only needs to provide the primary key
// Primary table:
// heap_table and there is no non-empty UK (non-empty UK means that all columns of the key are not null),
// you need to remember all columns
// If there is a primary key table, old_row records PK and all UK
// The hidden primary key/partition key/partition key dependent columns of heap_table need to be retained
int ret = OB_SUCCESS;
ObSchemaGetterGuard *schema_guard = NULL;
bool need_all_columns = false;
const ObTableSchema *table_schema = NULL;
if (OB_ISNULL(schema_guard = cg_.opt_ctx_->get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL schema guard", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(MTL_ID(), index_tid, table_schema))) {
LOG_WARN("get schema fail", KR(ret), K(index_tid));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(index_tid));
} else if (OB_FAIL(check_del_need_all_columns(schema_guard, table_schema, need_all_columns))) {
LOG_WARN("fail to check del need all columns", K(ret), K(is_primary_index), K(index_tid));
} else if (need_all_columns) {
if (OB_FAIL(minimal_column_ids.assign(das_del_ctdef.column_ids_))) {
LOG_WARN("fail to assig delete column_id", K(ret));
}
} else if (OB_FAIL(append_all_pk_column_id(schema_guard, table_schema, minimal_column_ids))) {
// append PK
LOG_WARN("fail to append all pk to column_id", K(ret), K(index_tid));
} else if (!is_primary_index) {
// index_table only record PK
} else if (OB_FAIL(append_all_uk_column_id(schema_guard, table_schema, minimal_column_ids))) {
// append unique key
LOG_WARN("fail to append all unique key column_id");
} else if (table_schema->is_heap_table()) {
if (OB_FAIL(append_heap_table_part_key_dependcy_column(table_schema, minimal_column_ids))) {
// append heap table part_key_column_id and dependcy column
LOG_WARN("fail to append heap table part_id", K(ret));
}
}
return ret;
}
int ObDmlCgService::append_all_uk_column_id(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids)
{
int ret = OB_SUCCESS;
// extract all column_ids for unique key, and add them into stmt
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos))) {
LOG_WARN("get simple_index_infos failed", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) {
const ObTableSchema *index_table_schema = NULL;
if (OB_FAIL(schema_guard->get_table_schema(MTL_ID(), simple_index_infos.at(i).table_id_, index_table_schema))) {
LOG_WARN("fail to get table schema", K(ret), K(MTL_ID()), K(simple_index_infos.at(i).table_id_));
} else if (OB_ISNULL(index_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("index table schema must not be NULL", K(ret));
} else if (!index_table_schema->is_unique_index()) {
// not unique index, skip
} else {
ObTableSchema::const_column_iterator iter = index_table_schema->column_begin();
for ( ; iter != index_table_schema->column_end(); iter++) {
const ObColumnSchemaV2 *column_schema = *iter;
if (OB_ISNULL(column_schema)) {
LOG_WARN("unexpected err", KPC(column_schema));
} else if (!column_schema->is_index_column()) {
// skip non index column
} else if (OB_FAIL(add_var_to_array_no_dup(minimal_column_ids, column_schema->get_column_id()))) {
LOG_WARN("add column_id to list failed",
K(ret), K(column_schema->get_column_id()), K(minimal_column_ids));
}
}
}
} // end simple_index_infos for
return ret;
}
template<typename OldExprType, typename NewExprType>
int ObDmlCgService::generate_das_projector(const ObIArray<uint64_t> &dml_column_ids,
const ObIArray<uint64_t> &storage_column_ids,
const ObIArray<uint64_t> &written_column_ids,
const ObIArray<OldExprType*> &old_row,
const ObIArray<NewExprType*> &new_row,
const ObIArray<ObRawExpr*> &full_row,
@ -1201,24 +1728,29 @@ int ObDmlCgService::generate_das_projector(const ObIArray<uint64_t> &dml_column_
}
for (int64_t i = 0; OB_SUCC(ret) && i < storage_column_ids.count(); ++i) {
uint64_t storage_cid = storage_column_ids.at(i);
uint64_t ref_cid = is_shadow_column(storage_cid) ?
storage_cid - OB_MIN_SHADOW_COLUMN_ID :
storage_cid;
if (is_mlog_reference_column(storage_cid)
&& !das_ctdef.is_access_mlog_as_master_table_) {
ref_cid = ObTableSchema::gen_ref_col_id_from_mlog_col_id(storage_cid);
}
int64_t column_idx = OB_INVALID_INDEX;
int64_t projector_idx = OB_INVALID_INDEX;
old_row_projector.at(i) = OB_INVALID_INDEX;
if (has_exist_in_array(dml_column_ids, ref_cid, &column_idx)) {
ObRawExpr *column_expr = old_row.at(column_idx);
if (!has_exist_in_array(full_row, column_expr, &projector_idx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row column not found in full row columns", K(ret),
K(column_idx), KPC(old_row.at(column_idx)));
} else {
old_row_projector.at(i) = projector_idx;
// the column_id of shadow_pk in storage_column_ids and written_column_ids
// don't subtract offset of OB_MIN_SHADOW_COLUMN_ID.
// but the column_id of shadow_pk in dml_column_ids subtract the offset of OB_MIN_SHADOW_COLUMN_ID
if (has_exist_in_array(written_column_ids, storage_cid, &column_idx)) {
uint64_t ref_cid = is_shadow_column(storage_cid) ?
storage_cid - OB_MIN_SHADOW_COLUMN_ID :
storage_cid;
if (is_mlog_reference_column(storage_cid)
&& !das_ctdef.is_access_mlog_as_master_table_) {
ref_cid = ObTableSchema::gen_ref_col_id_from_mlog_col_id(storage_cid);
}
if (has_exist_in_array(dml_column_ids, ref_cid, &column_idx)) {
ObRawExpr *column_expr = old_row.at(column_idx);
if (!has_exist_in_array(full_row, column_expr, &projector_idx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row column not found in full row columns", K(ret),
K(column_idx), KPC(old_row.at(column_idx)));
} else {
old_row_projector.at(i) = projector_idx;
}
}
}
}
@ -1236,24 +1768,29 @@ int ObDmlCgService::generate_das_projector(const ObIArray<uint64_t> &dml_column_
}
for (int64_t i = 0; OB_SUCC(ret) && i < storage_column_ids.count(); ++i) {
uint64_t storage_cid = storage_column_ids.at(i);
uint64_t ref_cid = is_shadow_column(storage_cid) ?
storage_cid - OB_MIN_SHADOW_COLUMN_ID :
storage_cid;
if (is_mlog_reference_column(storage_cid)
&& !das_ctdef.is_access_mlog_as_master_table_) {
ref_cid = ObTableSchema::gen_ref_col_id_from_mlog_col_id(storage_cid);
}
int64_t column_idx = OB_INVALID_INDEX;
int64_t projector_idx = OB_INVALID_INDEX;
new_row_projector.at(i) = OB_INVALID_INDEX;
if (has_exist_in_array(dml_column_ids, ref_cid, &column_idx)) {
ObRawExpr *column_expr = new_row.at(column_idx);
if (!has_exist_in_array(full_row, column_expr, &projector_idx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row column not found in full row columns", K(ret),
K(column_idx), KPC(new_row.at(column_idx)));
} else {
new_row_projector.at(i) = projector_idx;
// the column_id of shadow_pk in storage_column_ids and written_column_ids
// don't subtract offset of OB_MIN_SHADOW_COLUMN_ID.
// but the column_id of shadow_pk in dml_column_ids subtract the offset of OB_MIN_SHADOW_COLUMN_ID
if (has_exist_in_array(written_column_ids, storage_cid, &column_idx)) {
uint64_t ref_cid = is_shadow_column(storage_cid) ?
storage_cid - OB_MIN_SHADOW_COLUMN_ID :
storage_cid;
if (is_mlog_reference_column(storage_cid)
&& !das_ctdef.is_access_mlog_as_master_table_) {
ref_cid = ObTableSchema::gen_ref_col_id_from_mlog_col_id(storage_cid);
}
if (has_exist_in_array(dml_column_ids, ref_cid, &column_idx)) {
ObRawExpr *column_expr = new_row.at(column_idx);
if (!has_exist_in_array(full_row, column_expr, &projector_idx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row column not found in full row columns", K(ret),
K(column_idx), KPC(new_row.at(column_idx)));
} else {
new_row_projector.at(i) = projector_idx;
}
}
}
}
@ -1263,6 +1800,9 @@ int ObDmlCgService::generate_das_projector(const ObIArray<uint64_t> &dml_column_
LOG_WARN("add geo column projector failed", K(ret));
}
}
LOG_TRACE("print dml_column_ids", K(dml_column_ids), K(storage_column_ids),
K(written_column_ids), K(das_ctdef.old_row_projector_), K(das_ctdef.new_row_projector_));
return ret;
}
@ -1397,6 +1937,7 @@ int ObDmlCgService::generate_das_ins_ctdef(ObLogDelUpd &op,
} else if (OB_FAIL(generate_dml_column_ids(op, index_dml_info.column_exprs_, dml_column_ids))) {
LOG_WARN("generate dml column ids failed", K(ret));
} else if (OB_FAIL(generate_das_projector(dml_column_ids,
das_ins_ctdef.column_ids_,
das_ins_ctdef.column_ids_,
empty_old_row, new_row, new_row,
das_ins_ctdef))) {
@ -1445,16 +1986,30 @@ int ObDmlCgService::generate_das_del_ctdef(ObLogDelUpd &op,
int ret = OB_SUCCESS;
ObArray<uint64_t> dml_column_ids;
ObArray<ObRawExpr*> empty_new_row;
ObArray<uint64_t> minimal_column_ids;
bool need_all_columns = false;
bool is_primary_table = false;
if (index_dml_info.is_primary_index_ && index_tid == index_dml_info.ref_table_id_) {
is_primary_table = true;
}
if (OB_FAIL(generate_das_dml_ctdef(op, index_tid, index_dml_info, das_del_ctdef))) {
LOG_WARN("generate das dml ctdef failed", K(ret));
} else if (OB_FAIL(generate_dml_column_ids(op, index_dml_info.column_exprs_, dml_column_ids))) {
LOG_WARN("generate dml column ids failed", K(ret));
} else if (OB_FAIL(generate_minimal_delete_old_row_cid(index_tid,
is_primary_table,
das_del_ctdef,
minimal_column_ids))) {
LOG_WARN("generate dml column ids failed", K(ret), K(is_primary_table), K(index_tid));
} else if (OB_FAIL(generate_das_projector(dml_column_ids,
das_del_ctdef.column_ids_,
minimal_column_ids,
old_row, empty_new_row, old_row,
das_del_ctdef))) {
LOG_WARN("add old row projector failed", K(ret), K(old_row));
}
return ret;
}
@ -1501,6 +2056,12 @@ int ObDmlCgService::generate_das_upd_ctdef(ObLogDelUpd &op,
int ret = OB_SUCCESS;
const ObAssignments &assigns = index_dml_info.assignments_;
ObArray<uint64_t> dml_column_ids;
ObArray<uint64_t> minimal_column_ids;
bool need_all_columns = false;
bool is_primary_table = false;
if (index_dml_info.is_primary_index_ && index_tid == index_dml_info.ref_table_id_) {
is_primary_table = true;
}
if (OB_FAIL(generate_das_dml_ctdef(op, index_tid, index_dml_info, das_upd_ctdef))) {
LOG_WARN("generate das dml base ctdef failed", K(ret), K(index_dml_info));
} else if (OB_FAIL(generate_updated_column_ids(op, assigns, das_upd_ctdef.column_ids_,
@ -1509,8 +2070,17 @@ int ObDmlCgService::generate_das_upd_ctdef(ObLogDelUpd &op,
LOG_WARN("add updated column ids failed", K(ret), K(assigns));
} else if (OB_FAIL(generate_dml_column_ids(op, index_dml_info.column_exprs_, dml_column_ids))) {
LOG_WARN("generate dml column ids failed", K(ret));
} else if (OB_FAIL(generate_minimal_upd_old_row_cid(op,
index_tid,
das_upd_ctdef,
index_dml_info,
is_primary_table,
need_all_columns,
minimal_column_ids))) {
LOG_WARN("fail to project update row column_id");
} else if (OB_FAIL(generate_das_projector(dml_column_ids,
das_upd_ctdef.column_ids_,
minimal_column_ids,
old_row, new_row, full_row,
das_upd_ctdef))) {
LOG_WARN("add old row projector failed", K(ret), K(old_row), K(full_row));
@ -1520,6 +2090,7 @@ int ObDmlCgService::generate_das_upd_ctdef(ObLogDelUpd &op,
ObString column_name;
OZ(ob_write_string(cg_.phy_plan_->get_allocator(), col->get_column_name(), column_name));
}
return ret;
}
@ -1584,6 +2155,7 @@ int ObDmlCgService::generate_das_lock_ctdef(ObLogicalOperator &op,
} else if (OB_FAIL(generate_dml_column_ids(op, index_dml_info.column_exprs_, dml_column_ids))) {
LOG_WARN("generate dml column ids failed", K(ret));
} else if (OB_FAIL(generate_das_projector(dml_column_ids,
das_lock_ctdef.column_ids_,
das_lock_ctdef.column_ids_,
old_row, empty_new_row, old_row,
das_lock_ctdef))) {
@ -1857,7 +2429,7 @@ int ObDmlCgService::need_fire_update_event(const ObTableSchema &table_schema,
const ObDMLStmt *stmt = log_op.get_stmt();
for (int64_t i = 0; OB_SUCC(ret) && i < assignments.count(); ++i) {
ObColumnRefRawExpr *column_expr = assignments.at(i).column_expr_;
ColumnItem *column_item = nullptr;
const ColumnItem *column_item = nullptr;
if (OB_ISNULL(column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null column expr", K(ret));

View File

@ -120,9 +120,93 @@ private:
int convert_dml_column_info(common::ObTableID index_tid,
bool only_rowkey,
ObDASDMLBaseCtDef &das_dml_info);
int generate_minimal_upd_old_row_cid(ObLogicalOperator &op,
ObTableID index_tid,
ObDASUpdCtDef &das_upd_ctdef,
const IndexDMLInfo &index_dml_info,
bool is_primary_index,
bool &need_all_columns,
ObIArray<uint64_t> &minimal_column_ids);
int append_upd_old_row_cid(ObLogicalOperator &op,
ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool is_primary_index,
ObDASUpdCtDef &das_upd_ctdef,
const IndexDMLInfo &index_dml_info,
ObIArray<uint64_t> &minimal_column_ids);
int check_upd_need_all_columns(ObLogicalOperator &op,
ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
bool is_primary_index,
bool &need_all_columns);
int is_table_has_unique_key(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool &is_has_uk);
int append_upd_assignment_column_id(const ObTableSchema *table_schema,
ObDASUpdCtDef &das_upd_ctdef,
ObIArray<uint64_t> &minimal_column_ids);
int append_udt_hidden_col_id(ObLogicalOperator &op,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
ObIArray<uint64_t> &minimal_column_ids);
int check_has_upd_rowkey(ObLogicalOperator &op,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
bool &upd_rowkey);
int append_udt_hidden_column_id(const ObTableSchema *table_schema,
const uint64_t column_id,
const uint64_t udt_set_id,
ObIArray<uint64_t> &minimal_column_ids);
int check_unique_key_is_updated(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
const IndexDMLInfo &index_dml_info,
bool &is_updated);
int append_time_type_column_id(const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids);
int heap_table_has_not_null_uk(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool &need_all_columns);
int generate_minimal_delete_old_row_cid(ObTableID index_tid,
bool is_primary_index,
ObDASDelCtDef &das_del_ctdef,
ObIArray<uint64_t> &minimal_column_ids);
int check_del_need_all_columns(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
bool &need_all_columns);
int append_all_uk_column_id(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids);
int append_all_pk_column_id(ObSchemaGetterGuard *schema_guard,
const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids);
int append_heap_table_part_id(const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids);
int append_heap_table_part_key_dependcy_column(const ObTableSchema *table_schema,
ObIArray<uint64_t> &minimal_column_ids);
template<typename OldExprType, typename NewExprType>
int generate_das_projector(const common::ObIArray<uint64_t> &dml_column_ids,
const common::ObIArray<uint64_t> &storage_column_ids,
const common::ObIArray<uint64_t> &written_column_ids,
const common::ObIArray<OldExprType*> &old_row,
const common::ObIArray<NewExprType*> &new_row,
const common::ObIArray<ObRawExpr*> &full_row,

View File

@ -2041,42 +2041,24 @@ int ObDelUpdLogPlan::generate_index_column_exprs(const uint64_t table_id,
}
}
if (OB_SUCC(ret)) {
if (is_modify_key || ObBinlogRowImage::FULL == binlog_row_image) {
//modify rowkey, need add all columns in schema
ObTableSchema::const_column_iterator iter = index_schema.column_begin();
ObTableSchema::const_column_iterator end = index_schema.column_end();
for (; OB_SUCC(ret) && iter != end; ++iter) {
const ObColumnSchemaV2 *column = *iter;
// skip all rowkeys
if (OB_ISNULL(column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column schema", K(column));
} else if (column->is_rowkey_column()) {
// do nothing
} else if (OB_ISNULL(col_item = ObResolverUtils::find_col_by_base_col_id(*stmt, table_id,
column->get_column_id(),
OB_INVALID_ID, true))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get column item by id failed", K(ret), K(table_id), K(column->get_column_id()));
} else if (OB_FAIL(column_exprs.push_back(col_item->expr_))) {
LOG_WARN("store column expr to column exprs failed", K(ret));
}
}
} else {
//没有修改主键,只添加自己被修改的列及主表的rowkey
for (int64_t i = 0; OB_SUCC(ret) && i < assignments.count(); ++i) {
col_expr = const_cast<ObColumnRefRawExpr*>(assignments.at(i).column_expr_);
if (OB_ISNULL(col_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("assignment column expr is null");
} else if (OB_ISNULL(index_schema.get_column_schema(col_expr->get_column_id()))) {
//该列不存在于该表的schema中,忽略掉
} else if (OB_FAIL(column_exprs.push_back(col_expr))) {
LOG_WARN("store column expr to column exprs failed", K(ret));
}
}
if (FAILEDx(append_array_no_dup(column_exprs, spk_related_columns))) {
LOG_WARN("failed to append array no dup", K(ret));
//modify rowkey, need add all columns in schema
ObTableSchema::const_column_iterator iter = index_schema.column_begin();
ObTableSchema::const_column_iterator end = index_schema.column_end();
for (; OB_SUCC(ret) && iter != end; ++iter) {
const ObColumnSchemaV2 *column = *iter;
// skip all rowkeys
if (OB_ISNULL(column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column schema", K(column));
} else if (column->is_rowkey_column()) {
// do nothing
} else if (OB_ISNULL(col_item = ObResolverUtils::find_col_by_base_col_id(*stmt, table_id,
column->get_column_id(),
OB_INVALID_ID, true))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get column item by id failed", K(ret), K(table_id), K(column->get_column_id()));
} else if (OB_FAIL(column_exprs.push_back(col_item->expr_))) {
LOG_WARN("store column expr to column exprs failed", K(ret));
}
}
}