diff --git a/src/share/ob_ddl_checksum.cpp b/src/share/ob_ddl_checksum.cpp index 77c9046788..86f45289f1 100644 --- a/src/share/ob_ddl_checksum.cpp +++ b/src/share/ob_ddl_checksum.cpp @@ -16,6 +16,8 @@ #include "lib/string/ob_sql_string.h" #include "share/inner_table/ob_inner_table_schema.h" #include "share/schema/ob_schema_utils.h" +#include "share/schema/ob_multi_version_schema_service.h" +#include "observer/ob_server_struct.h" using namespace oceanbase::common; using namespace oceanbase::common::hash; @@ -205,24 +207,61 @@ int ObDDLChecksumOperator::get_column_checksum(const ObSqlString &sql, const uin return ret; } +int ObDDLChecksumOperator::construct_tablet_column_map( + const uint64_t tablet_id, + const int64_t column_id, + common::hash::ObHashMap> &tablet_columns_map) +{ + int ret = OB_SUCCESS; + ObArray column_array; + + if (!tablet_columns_map.created()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), K(tablet_columns_map.created())); + } else if (OB_FAIL(tablet_columns_map.get_refactored(tablet_id, column_array))) { + if (OB_HASH_NOT_EXIST == ret) { // empty map, maybe first time to set map + ret = OB_SUCCESS; + if (OB_FAIL(column_array.push_back(column_id))) { + LOG_WARN("fail to push back column id to array", K(ret), K(tablet_id), K(column_id)); + } else if (OB_FAIL(tablet_columns_map.set_refactored(tablet_id, column_array, false))) { + LOG_WARN("fail to set column array", K(ret), K(tablet_id)); + } + } else { + LOG_WARN("fail to get column array from map", K(ret), K(tablet_id)); + } + } else if (OB_FAIL(column_array.push_back(column_id))) { + LOG_WARN("fail to push back column id to array", K(ret), K(tablet_id), K(column_id)); + } else if (OB_FAIL(tablet_columns_map.set_refactored(column_id, column_array, true))) { + LOG_WARN("fail to set tablet column map", K(ret), K(tablet_id), K(column_id)); + } + return ret; +} + int ObDDLChecksumOperator::get_tablet_checksum_status( const ObSqlString &sql, const uint64_t tenant_id, + ObIArray &batch_tablet_array, + ObIArray &column_ids, common::ObMySQLProxy &sql_proxy, - common::hash::ObHashMap &tablet_checksum_map) + common::hash::ObHashMap> &tablet_columns_map, + common::hash::ObHashMap &tablet_checksum_status_map) { int ret = OB_SUCCESS; SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = NULL; - if (!sql.is_valid() || !tablet_checksum_map.created()) { + if (!sql.is_valid() || column_ids.count() <= 0 || + !tablet_columns_map.created() || !tablet_checksum_status_map.created()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(sql), K(tablet_checksum_map.created())); + LOG_WARN("invalid arguments", + K(ret), K(sql), K(column_ids.count()), K(tablet_columns_map.created()), K(tablet_checksum_status_map.created())); } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) { LOG_WARN("fail to execute sql", K(ret)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, query result must not be NULL", K(ret)); } else { + tablet_columns_map.reuse(); + // 1. get tablet column checksums from sql result while (OB_SUCC(ret)) { if (OB_FAIL(result->next())) { if (OB_ITER_END == ret) { @@ -232,10 +271,29 @@ int ObDDLChecksumOperator::get_tablet_checksum_status( LOG_WARN("fail to get next row", K(ret)); } } else { - uint64_t tablet_id_id = 0; - EXTRACT_UINT_FIELD_MYSQL(*result, "task_id", tablet_id_id, uint64_t); - if (OB_FAIL(tablet_checksum_map.set_refactored(tablet_id_id, true))) { - LOG_WARN("fail to set column checksum to map", K(ret)); + int64_t column_id = 0; + uint64_t task_id = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "column_id", column_id, int64_t); + EXTRACT_UINT_FIELD_MYSQL(*result, "task_id", task_id, uint64_t); + if (OB_FAIL(construct_tablet_column_map(task_id, column_id, tablet_columns_map))) { + LOG_WARN("fail to construnt tablet column map", K(ret), K(task_id)); + } + } + } + // 2. check tablet columns checksum exist or not + if (OB_SUCC(ret)) { + ObArray column_array; + bool checksum_status = true; + for (int64_t tablet_idx = 0; OB_SUCC(ret) && tablet_idx < batch_tablet_array.count(); ++tablet_idx) { + uint64_t task_id = batch_tablet_array.at(tablet_idx); + if (OB_FAIL(tablet_columns_map.get_refactored(task_id, column_array))) { + LOG_WARN("fail to get tablet columns", K(ret), K(task_id)); + } else if (column_array.count() != column_ids.count()) { // not all columns checksum reported + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to check column checksum, tablet columns not match", + K(ret), K(column_array.count()), K(column_ids.count())); + } else if (OB_FAIL(tablet_checksum_status_map.set_refactored(task_id, checksum_status, true))) { // force update + LOG_WARN("fail to set tablet column checksum status", K(ret), K(task_id), K(checksum_status)); } } } @@ -249,26 +307,72 @@ int ObDDLChecksumOperator::get_tablet_checksum_record( const uint64_t execution_id, const uint64_t table_id, const int64_t ddl_task_id, + ObIArray &tablet_ids, ObMySQLProxy &sql_proxy, - common::hash::ObHashMap &tablet_checksum_map) + common::hash::ObHashMap &tablet_checksum_status_map) { int ret = OB_SUCCESS; ObSqlString sql; const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id); - if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == execution_id || OB_INVALID_ID == table_id - || OB_INVALID_ID == ddl_task_id || !tablet_checksum_map.created())) { + ObMultiVersionSchemaService *schema_service = GCTX.schema_service_; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + common::hash::ObHashMap> tablet_columns_map; + + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == execution_id || + OB_INVALID_ID == table_id || OB_INVALID_ID == ddl_task_id || + tablet_ids.count() <= 0 || !tablet_checksum_status_map.created())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id), K(execution_id), K(table_id), K(ddl_task_id), - K(tablet_checksum_map.created())); - } else if (OB_FAIL(sql.assign_fmt( - "SELECT task_id FROM %s " - "WHERE execution_id = %ld AND tenant_id = %ld AND table_id = %ld AND ddl_task_id = %ld " - "ORDER BY task_id", OB_ALL_DDL_CHECKSUM_TNAME, - execution_id, ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id), - ObSchemaUtils::get_extract_schema_id(exec_tenant_id, table_id), ddl_task_id))) { - LOG_WARN("fail to assign fmt", K(ret)); - } else if (OB_FAIL(get_tablet_checksum_status(sql, tenant_id, sql_proxy, tablet_checksum_map))) { - LOG_WARN("fail to get column checksum", K(ret), K(sql)); + LOG_WARN("invalid argument", + K(ret), K(tenant_id), K(execution_id), K(table_id), K(ddl_task_id), K(tablet_checksum_status_map.created())); + } else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_INFO("table not exit", K(ret), K(tenant_id), K(table_id)); + } else { + ObArray column_ids; + if (OB_FAIL(table_schema->get_multi_version_column_descs(column_ids))) { + LOG_WARN("fail to get column ids", K(ret), K(tenant_id), K(execution_id), K(ddl_task_id)); + } + int64_t batch_size = 100; + ObArray batch_tablet_array; + if (OB_FAIL(tablet_columns_map.create(batch_size, ObModIds::OB_SSTABLE_CREATE_INDEX))) { + LOG_WARN("fail to create tablet column map", K(ret)); + } + // check every tablet column checksum, task_id is equal to tablet_id + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { + uint64_t last_tablet_id_id = tablet_ids.at(i).id(); + if (OB_FAIL(batch_tablet_array.push_back(last_tablet_id_id))) { + LOG_WARN("fail to push back tablet_id_id", K(ret), K(tenant_id), K(execution_id), K(ddl_task_id)); + } else { + if ((i != 0 && i % batch_size == 0) /* reach batch size */ || i == tablet_ids.count() - 1 /* reach end */) { + if (OB_FAIL(sql.assign_fmt( + "SELECT task_id, column_id FROM %s " + "WHERE execution_id = %ld AND tenant_id = %ld AND table_id = %ld AND ddl_task_id = %ld AND task_id >= %ld and task_id <= %ld" + "ORDER BY task_id", + OB_ALL_DDL_CHECKSUM_TNAME, + execution_id, + ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id), + ObSchemaUtils::get_extract_schema_id(exec_tenant_id, table_id), + ddl_task_id, + batch_tablet_array.at(0), // first tablet_id in one batch + last_tablet_id_id))) { // last tablet id in one batch + LOG_WARN("fail to assign fmt", K(ret), K(tenant_id), K(execution_id), K(ddl_task_id)); + } else if (OB_FAIL(get_tablet_checksum_status( + sql, tenant_id, batch_tablet_array, column_ids, sql_proxy, tablet_columns_map, tablet_checksum_status_map))) { + LOG_WARN("fail to get column checksum", K(ret), K(sql)); + } else { + batch_tablet_array.reset(); + } + } + } + } + } + if (tablet_columns_map.created()) { + tablet_columns_map.destroy(); } return ret; } diff --git a/src/share/ob_ddl_checksum.h b/src/share/ob_ddl_checksum.h index a8e783df6a..119b9063d1 100644 --- a/src/share/ob_ddl_checksum.h +++ b/src/share/ob_ddl_checksum.h @@ -17,6 +17,7 @@ #include "lib/container/ob_array.h" #include "lib/hash/ob_hashmap.h" #include "share/ob_dml_sql_splicer.h" +#include "share/schema/ob_table_param.h" namespace oceanbase { @@ -75,6 +76,7 @@ public: const uint64_t execution_id, const uint64_t table_id, const int64_t ddl_task_id, + ObIArray &tablet_ids, ObMySQLProxy &sql_proxy, common::hash::ObHashMap &tablet_checksum_map); static int check_column_checksum( @@ -103,8 +105,15 @@ private: static int get_tablet_checksum_status( const ObSqlString &sql, const uint64_t tenant_id, + ObIArray &batch_tablet_ids, + ObIArray &column_ids, common::ObMySQLProxy &sql_proxy, - common::hash::ObHashMap &tablet_checksum_map); + common::hash::ObHashMap> &tablet_columns_map, + common::hash::ObHashMap &tablet_checksum_status_map); + static int construct_tablet_column_map( + const uint64_t tablet_id, + const int64_t column_id, + common::hash::ObHashMap> &tablet_columns_map); }; } // end namespace share diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index fe62e2b1f0..62e6a13619 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -1293,7 +1293,7 @@ int ObCheckTabletDataComplementOp::do_check_tablets_merge_status( } // handle every addr tablet for (hash::ObHashMap>::const_iterator ip_iter = ip_tablets_map.begin(); - ip_iter != ip_tablets_map.end() && OB_SUCC(ret); ++ip_iter) { + OB_SUCC(ret) && ip_iter != ip_tablets_map.end(); ++ip_iter) { const ObAddr & dest_ip = ip_iter->first; const ObArray &tablet_array = ip_iter->second; if (OB_FAIL(arg.tablet_ids_.assign(tablet_array))) { @@ -1307,13 +1307,14 @@ int ObCheckTabletDataComplementOp::do_check_tablets_merge_status( } } } - if (OB_SUCC(ret)) { // handle batch result - int tmp_ret = OB_SUCCESS; - common::ObArray return_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { - LOG_WARN("rpc proxy wait failed", K(tmp_ret)); - ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (return_ret_array.count() != ip_tablets_map.size()) { + // handle batch result + int tmp_ret = OB_SUCCESS; + common::ObArray return_ret_array; + if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { + LOG_WARN("rpc proxy wait failed", K(tmp_ret)); + ret = OB_SUCCESS == ret ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + if (return_ret_array.count() != ip_tablets_map.size()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc proxy rsp size not equal to send size", K(ret), K(return_ret_array.count()), K(ip_tablets_map.size())); } else { @@ -1398,7 +1399,7 @@ int ObCheckTabletDataComplementOp::check_tablet_merge_status( int64_t one_batch_build_succ_count = 0; for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { const ObTabletID &tablet_id = tablet_ids.at(i); - if (construct_ls_tablet_map(tenant_id, tablet_id, ls_tablets_map)) { + if (OB_FAIL(construct_ls_tablet_map(tenant_id, tablet_id, ls_tablets_map))) { LOG_WARN("construct_tablet_ls_map fail", K(ret), K(tenant_id), K(tablet_id)); } else { if ((i != 0 && i % batch_size == 0) /* reach batch size */ || i == tablet_ids.count() - 1 /* reach end */) { @@ -1448,7 +1449,7 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status( { int ret = OB_SUCCESS; tablet_checksum_status = false; - common::hash::ObHashMap tablet_checksum_map; + common::hash::ObHashMap tablet_checksum_status_map; int64_t tablet_count = tablet_ids.count(); if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id || @@ -1456,24 +1457,25 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status( ret = OB_INVALID_ARGUMENT; LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(index_table_id), K(tablet_ids), K(execution_id), K(ddl_task_id)); - } else if (OB_FAIL(tablet_checksum_map.create(tablet_count, ObModIds::OB_SSTABLE_CREATE_INDEX))) { + } else if (OB_FAIL(tablet_checksum_status_map.create(tablet_count, ObModIds::OB_SSTABLE_CREATE_INDEX))) { LOG_WARN("fail to create column checksum map", K(ret)); } else if (OB_FAIL(ObDDLChecksumOperator::get_tablet_checksum_record( tenant_id, execution_id, index_table_id, ddl_task_id, + tablet_ids, GCTX.root_service_->get_sql_proxy(), - tablet_checksum_map))) { + tablet_checksum_status_map))) { LOG_WARN("fail to get tablet checksum status", K(ret), K(tenant_id), K(execution_id), K(index_table_id), K(ddl_task_id)); } else { - int tablet_idx = 0; + int64_t tablet_idx = 0; for (tablet_idx = 0; OB_SUCC(ret) && tablet_idx < tablet_count; ++tablet_idx) { const ObTabletID &tablet_id = tablet_ids.at(tablet_idx); uint64_t tablet_id_id = tablet_id.id(); bool status = false; - if (OB_FAIL(tablet_checksum_map.get_refactored(tablet_id_id, status))) { + if (OB_FAIL(tablet_checksum_status_map.get_refactored(tablet_id_id, status))) { LOG_WARN("fail to get tablet checksum record from map", K(ret), K(tablet_id_id)); } else if (!status) { break; @@ -1488,8 +1490,8 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status( } } } - if (tablet_checksum_map.created()) { - tablet_checksum_map.destroy(); + if (tablet_checksum_status_map.created()) { + tablet_checksum_status_map.destroy(); } return ret; }