[FEAT MERGE] New minimal mode for tenant.

This commit is contained in:
jiadebinmary@gmail.com
2023-11-27 08:05:35 +00:00
committed by ob-robot
parent fdae2b1ab4
commit fcfd20c623
11 changed files with 484 additions and 43 deletions

View File

@ -1767,14 +1767,36 @@ int ObDelUpdResolver::build_returning_lob_expr(ObColumnRefRawExpr *ref_expr, ObS
return ret;
}
bool ObDelUpdResolver::need_all_columns(const ObTableSchema &table_schema, int64_t binlog_row_image)
int ObDelUpdResolver::need_all_columns(const ObTableSchema &table_schema,
const int64_t binlog_row_image,
const int64_t need_check_uk,
bool &need_all_columns)
{
return (table_schema.is_heap_table() ||
table_schema.get_foreign_key_infos().count() > 0 ||
table_schema.get_trigger_list().count() > 0 ||
table_schema.has_check_constraint() ||
table_schema.has_generated_and_partkey_column() ||
binlog_row_image == ObBinlogRowImage::FULL);
int ret = OB_SUCCESS;
const bool is_binlog_full_mode = (ObBinlogRowImage::FULL == binlog_row_image) ? true : false;
bool has_not_null_uk = false;
if (!is_binlog_full_mode && need_check_uk && table_schema.is_heap_table()) {
// Need check UK for heap table only when binlog_row_image is not FULL.
ObSchemaGetterGuard *schema_guard = NULL;
CK (OB_NOT_NULL(schema_checker_));
CK (OB_NOT_NULL(schema_guard = schema_checker_->get_schema_guard()));
if (OB_SUCC(ret) && OB_FAIL(table_schema.has_not_null_unique_key(*schema_guard, has_not_null_uk))) {
LOG_WARN("has_not_null_unique_key failed", K(ret), K(table_schema));
}
}
// For heap table:
// 1) If it does not have [NOT NULL UK], need record all columns.
// 2) If it has [NOT NULL UK], only need record all columns when UK is one of the updated columns,
// or just need record UK in OldRow.
if (OB_SUCC(ret)) {
need_all_columns = (is_binlog_full_mode ||
(table_schema.is_heap_table() && !has_not_null_uk) ||
table_schema.get_foreign_key_infos().count() > 0 ||
table_schema.get_trigger_list().count() > 0 ||
table_schema.has_check_constraint() ||
table_schema.has_generated_and_partkey_column());
}
return ret;
}
int ObDelUpdResolver::add_all_columns_to_stmt(const TableItem &table_item,
@ -1809,7 +1831,6 @@ int ObDelUpdResolver::add_all_columns_to_stmt(const TableItem &table_item,
}
return ret;
}
int ObDelUpdResolver::add_all_lob_columns_to_stmt(const TableItem &table_item,
ObIArray<ObColumnRefRawExpr*> &column_exprs)
{
@ -1908,6 +1929,72 @@ int ObDelUpdResolver::add_all_rowkey_columns_to_stmt(const TableItem &table_item
return ret;
}
int ObDelUpdResolver::add_all_unique_key_columns_to_stmt(const TableItem &table_item,
ObIArray<ObColumnRefRawExpr*> &column_exprs)
{
int ret = OB_SUCCESS;
const ObTableSchema *table_schema = NULL;
const TableItem &base_table_item = table_item.get_base_table_item();
ObDelUpdStmt *stmt = get_del_upd_stmt();
ObSchemaGetterGuard *schema_guard = NULL;
uint64_t view_id = OB_INVALID_ID;
CK (OB_NOT_NULL(schema_checker_));
CK (OB_NOT_NULL(schema_guard = schema_checker_->get_schema_guard()));
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(stmt));
} else if (stmt->has_instead_of_trigger()) {
// do nothing, instead of trigger doesn't have rowkey
} else if (OB_ISNULL(params_.session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("params_.session_info_ is null", K(ret));
} else if (OB_FAIL(schema_checker_->get_table_schema(params_.session_info_->get_effective_tenant_id(),
base_table_item.ref_id_,
table_schema,
base_table_item.is_link_table()))) {
LOG_WARN("table schema not found", K(base_table_item));
} else if (NULL == table_schema) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid table schema", K(table_item));
} else {
// extract all column_ids for unique key, and add them into stmt
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos))) {
LOG_WARN("get simple_index_infos failed", K(ret));
} else {
const uint64_t tenant_id = table_schema->get_tenant_id();
for (int64_t i = 0; OB_SUCCESS == ret && i < simple_index_infos.count(); ++i) {
const ObTableSchema *index_table_schema = NULL;
if (OB_FAIL(schema_guard->get_table_schema(tenant_id,
simple_index_infos.at(i).table_id_, index_table_schema))) {
LOG_WARN("fail to get table schema", K(tenant_id),
K(simple_index_infos.at(i).table_id_), K(ret));
} else if (OB_ISNULL(index_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("index table schema must not be NULL", K(ret));
} else if (!index_table_schema->is_unique_index()) {
// not unique index, skip
} else {
ObTableSchema::const_column_iterator iter = index_table_schema->column_begin();
for ( ; iter != index_table_schema->column_end(); iter++) {
const ObColumnSchemaV2 *column_schema = *iter;
if (OB_ISNULL(column_schema)) {
LOG_WARN("unexpected err", KPC(column_schema));
} else if (!column_schema->is_index_column()) {
// skip non index column
} else if (OB_FAIL(add_column_to_stmt(table_item, *column_schema, column_exprs))) {
LOG_WARN("add column to stmt failed", K(ret), K(table_item));
} else { /*do nothing*/ }
}
}
}
}
}
return ret;
}
//for ObDelUpdStmt
// add column's related columns in index to stmt
// if column_id is OB_INVALID_ID, all indexes' columns would be added to stmt
@ -2012,6 +2099,47 @@ int ObDelUpdResolver::add_all_index_rowkey_to_stmt(const TableItem &table_item,
return ret;
}
int ObDelUpdResolver::add_necessary_columns_for_minimal_mode(const TableItem &table_item,
ObIArray<ObColumnRefRawExpr*> &column_exprs)
{
// The timestamp and datetime columns need to be recorded unconditionly for MINIMAL mode.
int ret = OB_SUCCESS;
const ObTableSchema *table_schema = NULL;
const TableItem& base_table_item = table_item.get_base_table_item();
if (OB_ISNULL(params_.session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("params_.session_info_ is null", K(ret));
} else if (OB_FAIL(schema_checker_->get_table_schema(params_.session_info_->get_effective_tenant_id(),
base_table_item.ref_id_,
table_schema,
base_table_item.is_link_table()))) {
LOG_WARN("not find table schema", K(ret), K(base_table_item));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(table_schema), K(ret));
} else {
ObTableSchema::const_column_iterator iter = table_schema->column_begin();
ObTableSchema::const_column_iterator end = table_schema->column_end();
for (; OB_SUCC(ret) && iter != end; ++iter) {
const ObColumnSchemaV2 *column = *iter;
if (OB_ISNULL(column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column schema", K(column));
} else if (column->get_meta_type().is_datetime() ||
column->get_meta_type().is_timestamp() ||
column->get_meta_type().is_time() ||
column->get_meta_type().is_date() ||
column->get_meta_type().is_otimestamp_type()) {
// date/datatime and time/timestamp column need to be added to old_row
if (OB_FAIL(add_column_to_stmt(table_item, *column, column_exprs))) {
LOG_WARN("add column item to stmt failed", K(ret));
}
} else { /* do nothing */ }
}
}
return ret;
}
int ObDelUpdResolver::add_all_index_rowkey_to_stmt(const TableItem &table_item,
const ObTableSchema *index_schema,
ObIArray<ObColumnRefRawExpr *> &column_items)
@ -2097,6 +2225,34 @@ int ObDelUpdResolver::add_all_partition_key_columns_to_stmt(const TableItem &tab
return ret;
}
int ObDelUpdResolver::add_udt_hidden_columns_for_minimal_mode(const TableItem &table_item,
const ObTableSchema *table_schema,
ObColumnRefRawExpr *col_expr,
ObIArray<ObColumnRefRawExpr *> &column_items)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema) || OB_ISNULL(col_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid arguments", K(ret));
} else if (!col_expr->get_result_type().is_user_defined_sql_type()) {
// not udt column ref, do nothing
} else {
ObSEArray<ObColumnSchemaV2 *, 1> hidden_cols;
if (OB_FAIL(table_schema->get_column_schema_in_same_col_group(col_expr->get_column_id(),
col_expr->get_udt_set_id(),
hidden_cols))) {
LOG_WARN("failed to get column schema", K(ret));
} else {
for (int i = 0; i < hidden_cols.count() && OB_SUCC(ret); i++) {
if (OB_FAIL(add_column_to_stmt(table_item, *(hidden_cols[i]), column_items))) {
LOG_WARN("add column to stmt failed", K(ret), K(table_item), K(i), K(*(hidden_cols[i])));
}
}
}
}
return ret;
}
int ObDelUpdResolver::uv_check_key_preserved(const TableItem &table_item, bool &key_preserved)
{
int ret = OB_SUCCESS;

View File

@ -125,15 +125,19 @@ protected:
virtual int process_values_function(ObRawExpr *&expr);
virtual int recursive_values_expr(ObRawExpr *&expr);
bool need_all_columns(const share::schema::ObTableSchema &table_schema, int64_t binlog_row_image);
int need_all_columns(const share::schema::ObTableSchema &table_schema,
const int64_t binlog_row_image,
const int64_t need_check_uk,
bool &need_all_columns);
int add_all_columns_to_stmt(const TableItem &table_item,
common::ObIArray<ObColumnRefRawExpr*> &column_exprs);
int add_all_columns_to_stmt_for_trigger(const TableItem &table_item,
common::ObIArray<ObColumnRefRawExpr*> &column_exprs);
int add_all_rowkey_columns_to_stmt(const TableItem &table_item,
common::ObIArray<ObColumnRefRawExpr*> &column_exprs);
common::ObIArray<ObColumnRefRawExpr*> &column_exprs);
int add_all_unique_key_columns_to_stmt(const TableItem &table_item,
common::ObIArray<ObColumnRefRawExpr*> &column_exprs);
int add_index_related_columns_to_stmt(const TableItem &table_item,
const uint64_t column_id,
common::ObIArray<ObColumnRefRawExpr*> &column_exprs);
@ -224,6 +228,11 @@ protected:
int add_select_items(ObSelectStmt &select_stmt, const ObIArray<SelectItem>& select_items);
int add_select_list_for_set_stmt(ObSelectStmt &select_stmt);
int add_all_lob_columns_to_stmt(const TableItem &table_item, ObIArray<ObColumnRefRawExpr*> &column_exprs);
int add_necessary_columns_for_minimal_mode(const TableItem &table_item, ObIArray<ObColumnRefRawExpr*> &column_exprs);
int add_udt_hidden_columns_for_minimal_mode(const TableItem &table_item,
const ObTableSchema *table_schema,
ObColumnRefRawExpr *col_expr,
ObIArray<ObColumnRefRawExpr *> &column_items);
protected:
int generate_insert_table_info(const TableItem &table_item,
ObInsertTableInfo &table_info,

View File

@ -387,6 +387,7 @@ int ObDeleteResolver::generate_delete_table_info(const TableItem &table_item)
uint64_t index_tid[OB_MAX_INDEX_PER_TABLE];
int64_t gindex_cnt = OB_MAX_INDEX_PER_TABLE;
int64_t binlog_row_image = ObBinlogRowImage::FULL;
bool is_need_all_columns = true;
if (OB_ISNULL(schema_checker_) || OB_ISNULL(params_.session_info_) ||
OB_ISNULL(allocator_) || OB_ISNULL(delete_stmt)) {
ret = OB_ERR_UNEXPECTED;
@ -411,13 +412,16 @@ int ObDeleteResolver::generate_delete_table_info(const TableItem &table_item)
LOG_WARN("failed to allocate table info", K(ret));
} else {
table_info = new(ptr) ObDeleteTableInfo();
const bool need_check_uk = true;
if (OB_FAIL(table_info->part_ids_.assign(base_table_item.part_ids_))) {
LOG_WARN("failed to assign part ids", K(ret));
} else if (!delete_stmt->has_instead_of_trigger()) {
// todo @zimiao error logging also need all columns ?
if (OB_FAIL(add_all_rowkey_columns_to_stmt(table_item, table_info->column_exprs_))) {
LOG_WARN("add all rowkey columns to stmt failed", K(ret));
} else if (need_all_columns(*table_schema, binlog_row_image)) {
} else if (OB_FAIL(need_all_columns(*table_schema, binlog_row_image, need_check_uk, is_need_all_columns))) {
LOG_WARN("call need_all_columns failed", K(ret), K(binlog_row_image));
} else if (is_need_all_columns) {
if (OB_FAIL(add_all_columns_to_stmt(table_item, table_info->column_exprs_))) {
LOG_WARN("fail to add all column to stmt", K(ret), K(table_item));
}

View File

@ -483,6 +483,68 @@ int ObUpdateResolver::resolve_table_list(const ParseNode &parse_tree)
return ret;
}
int ObUpdateResolver::is_table_has_unique_key(const ObTableSchema *table_schema,
bool &is_has_uk) const
{
int ret = OB_SUCCESS;
is_has_uk = false;
ObSchemaGetterGuard *schema_guard = NULL;
CK (OB_NOT_NULL(schema_checker_));
CK (OB_NOT_NULL(schema_guard = schema_checker_->get_schema_guard()));
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
if (NULL == table_schema) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table schema", K(table_schema));
} else if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos))) {
LOG_WARN("get simple_index_infos failed", K(ret));
} else {
const uint64_t tenant_id = table_schema->get_tenant_id();
for (int64_t i = 0; OB_SUCC(ret) && !is_has_uk && i < simple_index_infos.count(); ++i) {
const ObTableSchema *index_table_schema = NULL;
if (OB_FAIL(schema_guard->get_table_schema(tenant_id,
simple_index_infos.at(i).table_id_, index_table_schema))) {
LOG_WARN("fail to get table schema", K(tenant_id),
K(simple_index_infos.at(i).table_id_), K(ret));
} else if (OB_ISNULL(index_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("index table schema must not be NULL", K(ret));
} else if (index_table_schema->is_unique_index()) {
is_has_uk = true;
} else {
// not unique index, skip
}
}
}
return ret;
}
int ObUpdateResolver::check_unique_key_is_updated(const ObTableSchema *table_schema,
const common::ObIArray<ObAssignment> &assigns,
bool &is_updated) const
{
int ret = OB_SUCCESS;
is_updated = false;
ObSchemaGetterGuard *schema_guard = NULL;
if (OB_ISNULL(schema_guard = schema_checker_->get_schema_guard())) {
} else {
for (int64_t i = 0; OB_SUCC(ret) && !is_updated && i < assigns.count(); i++) {
// We cannot use col_expr->is_unique_key_column_ here, which may be false for a unique index column.
ObColumnRefRawExpr *col_expr = assigns.at(i).column_expr_;
bool is_unique_col = false;
if (OB_ISNULL(col_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(table_schema->is_real_unique_index_column(*schema_guard, col_expr->get_column_id(), is_unique_col))) {
LOG_WARN("is_unique_key_column failed", K(ret));
} else if (is_unique_col) {
is_updated = true;
break;
} else { /*do nothing*/ }
}
}
return ret;
}
int ObUpdateResolver::generate_update_table_info(ObTableAssignment &table_assign)
{
int ret = OB_SUCCESS;
@ -494,6 +556,7 @@ int ObUpdateResolver::generate_update_table_info(ObTableAssignment &table_assign
uint64_t index_tid[OB_MAX_INDEX_PER_TABLE];
int64_t gindex_cnt = OB_MAX_INDEX_PER_TABLE;
int64_t binlog_row_image = ObBinlogRowImage::FULL;
bool is_need_all_columns = false;
if (OB_ISNULL(schema_checker_) || OB_ISNULL(params_.session_info_) ||
OB_ISNULL(allocator_) || OB_ISNULL(update_stmt)) {
ret = OB_ERR_UNEXPECTED;
@ -526,13 +589,35 @@ int ObUpdateResolver::generate_update_table_info(ObTableAssignment &table_assign
} else if (OB_FAIL(table_info->part_ids_.assign(table_item->get_base_table_item().part_ids_))) {
LOG_WARN("failed to assign part ids", K(ret));
} else if (!update_stmt->has_instead_of_trigger()) {
const bool is_binlog_full_mode = (ObBinlogRowImage::FULL == binlog_row_image) ? true : false;
is_need_all_columns = is_binlog_full_mode;
bool is_has_uk = false;
bool is_uk_updated = false;
if (OB_FAIL(add_all_rowkey_columns_to_stmt(*table_item, table_info->column_exprs_))) {
LOG_WARN("add all rowkey columns to stmt failed", K(ret));
} else if (need_all_columns(*table_schema, binlog_row_image) ||
update_stmt->is_error_logging()) {
} else if (!is_need_all_columns &&
OB_FAIL(is_table_has_unique_key(table_schema, is_has_uk))) {
LOG_WARN("is_table_has_unique_key failed", K(ret));
} else if (!is_need_all_columns &&
is_has_uk &&
OB_FAIL(check_unique_key_is_updated(table_schema, table_assign.assignments_, is_uk_updated))) {
// Check whether UK is updated only when binlog_row_image is not FULL.
LOG_WARN("check_unique_key_is_updated failed", K(ret));
} else if (!is_need_all_columns &&
OB_FAIL(need_all_columns(*table_schema, binlog_row_image, is_has_uk, is_need_all_columns))) {
LOG_WARN("call need_all_columns failed", K(ret), K(binlog_row_image));
} else if (is_need_all_columns ||
update_stmt->is_error_logging() ||
is_uk_updated) {
if (OB_FAIL(add_all_columns_to_stmt(*table_item, table_info->column_exprs_))) {
LOG_WARN("fail to add all column to stmt", K(ret), K(*table_item));
}
} else if (is_has_uk &&
OB_FAIL(add_all_unique_key_columns_to_stmt(*table_item, table_info->column_exprs_))) {
LOG_WARN("add all unique columns to stmt failed", K(ret));
} else if (!is_binlog_full_mode &&
OB_FAIL(add_necessary_columns_for_minimal_mode(*table_item, table_info->column_exprs_))) {
LOG_WARN("fail to add necessary columns for minimal mode", K(ret), K(*table_item));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < table_assign.assignments_.count(); ++i) {
ObAssignment &assign = table_assign.assignments_.at(i);
@ -541,6 +626,9 @@ int ObUpdateResolver::generate_update_table_info(ObTableAssignment &table_assign
} else if (OB_FAIL(add_index_related_columns_to_stmt(*table_item,
assign.column_expr_->get_column_id(), table_info->column_exprs_))) {
LOG_WARN("failed to add index columns", K(ret));
} else if (OB_FAIL(add_udt_hidden_columns_for_minimal_mode(*table_item,
table_schema, assign.column_expr_, table_info->column_exprs_))) {
LOG_WARN("failed to add udt hidden columns", K(ret));
} else { /*do nothing*/ }
}
}

View File

@ -58,6 +58,11 @@ private:
int check_safe_update_mode(ObUpdateStmt *update_stmt);
int resolve_update_constraints();
int generate_batched_stmt_info();
int is_table_has_unique_key(const ObTableSchema *table_schema,
bool &is_has_uk) const;
int check_unique_key_is_updated(const ObTableSchema *table_schema,
const common::ObIArray<ObAssignment> &assigns,
bool &is_updated) const;
};
} // namespace sql