check column checksum reported after ddl inner sql finish
This commit is contained in:
		@ -156,6 +156,9 @@ int ObDDLRedefinitionSSTableBuildTask::process()
 | 
				
			|||||||
        if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows,
 | 
					        if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows,
 | 
				
			||||||
                oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param, sql_exec_addr))) {
 | 
					                oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param, sql_exec_addr))) {
 | 
				
			||||||
          LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_));
 | 
					          LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_));
 | 
				
			||||||
 | 
					        } else if (OB_FAIL(ObCheckTabletDataComplementOp::check_finish_report_checksum(tenant_id_, dest_table_id_, execution_id_, task_id_))) {
 | 
				
			||||||
 | 
					          LOG_WARN("fail to check sstable checksum_report_finish",
 | 
				
			||||||
 | 
					            K(ret), K(tenant_id_), K(dest_table_id_), K(execution_id_), K(task_id_));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -200,8 +200,13 @@ int ObDDLSingleReplicaExecutor::check_build_end(bool &is_end, int64_t &ret_code)
 | 
				
			|||||||
        need_schedule |= build_infos.at(i).need_schedule();
 | 
					        need_schedule |= build_infos.at(i).need_schedule();
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      if (OB_SUCC(ret) && build_infos.count() == succ_cnt) {
 | 
					      if (OB_SUCC(ret) && build_infos.count() == succ_cnt) {
 | 
				
			||||||
 | 
					        if (OB_FAIL(ObCheckTabletDataComplementOp::check_finish_report_checksum(
 | 
				
			||||||
 | 
					              tenant_id_, dest_table_id_, execution_id_, task_id_))) {
 | 
				
			||||||
 | 
					          LOG_WARN("fail to check sstable checksum_report_finish",
 | 
				
			||||||
 | 
					            K(ret), K(tenant_id_), K(dest_table_id_), K(execution_id_), K(task_id_));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        is_end = true;
 | 
					        is_end = true;
 | 
				
			||||||
        ret_code = OB_SUCCESS;
 | 
					        ret_code = ret;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
				
			|||||||
@ -132,6 +132,9 @@ int ObIndexSSTableBuildTask::process()
 | 
				
			|||||||
      } else if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows,
 | 
					      } else if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows,
 | 
				
			||||||
                  oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param, sql_exec_addr))) {
 | 
					                  oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param, sql_exec_addr))) {
 | 
				
			||||||
        LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_));
 | 
					        LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_));
 | 
				
			||||||
 | 
					      } else if (OB_FAIL(ObCheckTabletDataComplementOp::check_finish_report_checksum(tenant_id_, dest_table_id_, execution_id_, task_id_))) {
 | 
				
			||||||
 | 
					        LOG_WARN("fail to check sstable checksum_report_finish",
 | 
				
			||||||
 | 
					          K(ret), K(tenant_id_), K(dest_table_id_), K(execution_id_), K(task_id_));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
				
			|||||||
@ -207,60 +207,26 @@ int ObDDLChecksumOperator::get_column_checksum(const ObSqlString &sql, const uin
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObDDLChecksumOperator::construct_tablet_column_map(
 | 
					 | 
				
			||||||
  const uint64_t tablet_id,
 | 
					 | 
				
			||||||
  const int64_t column_id,
 | 
					 | 
				
			||||||
  common::hash::ObHashMap<uint64_t, ObArray<int64_t>> &tablet_columns_map)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
  ObArray<int64_t> column_array;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (!tablet_columns_map.created()) {
 | 
					 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					 | 
				
			||||||
    LOG_WARN("invalid arguments", K(ret), K(tablet_columns_map.created()));
 | 
					 | 
				
			||||||
  } else if (OB_FAIL(tablet_columns_map.get_refactored(tablet_id, column_array))) {
 | 
					 | 
				
			||||||
    if (OB_HASH_NOT_EXIST == ret) { // empty map, maybe first time to set map
 | 
					 | 
				
			||||||
      ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
      if (OB_FAIL(column_array.push_back(column_id))) {
 | 
					 | 
				
			||||||
        LOG_WARN("fail to push back column id to array", K(ret), K(tablet_id), K(column_id));
 | 
					 | 
				
			||||||
      } else if (OB_FAIL(tablet_columns_map.set_refactored(tablet_id, column_array, false))) {
 | 
					 | 
				
			||||||
        LOG_WARN("fail to set column array", K(ret), K(tablet_id));
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      LOG_WARN("fail to get column array from map", K(ret), K(tablet_id));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  } else if (OB_FAIL(column_array.push_back(column_id))) {
 | 
					 | 
				
			||||||
    LOG_WARN("fail to push back column id to array", K(ret), K(tablet_id), K(column_id));
 | 
					 | 
				
			||||||
  } else if (OB_FAIL(tablet_columns_map.set_refactored(column_id, column_array, true))) {
 | 
					 | 
				
			||||||
    LOG_WARN("fail to set tablet column map", K(ret), K(tablet_id), K(column_id));
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObDDLChecksumOperator::get_tablet_checksum_status(
 | 
					int ObDDLChecksumOperator::get_tablet_checksum_status(
 | 
				
			||||||
    const ObSqlString &sql,
 | 
					    const ObSqlString &sql,
 | 
				
			||||||
    const uint64_t tenant_id,
 | 
					    const uint64_t tenant_id,
 | 
				
			||||||
    ObIArray<uint64_t> &batch_tablet_array,
 | 
					    ObIArray<uint64_t> &batch_tablet_array,
 | 
				
			||||||
    ObIArray<ObColDesc> &column_ids,
 | 
					 | 
				
			||||||
    common::ObMySQLProxy &sql_proxy,
 | 
					    common::ObMySQLProxy &sql_proxy,
 | 
				
			||||||
    common::hash::ObHashMap<uint64_t, ObArray<int64_t>> &tablet_columns_map,
 | 
					 | 
				
			||||||
    common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_status_map)
 | 
					    common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_status_map)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
					  SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
				
			||||||
    sqlclient::ObMySQLResult *result = NULL;
 | 
					    sqlclient::ObMySQLResult *result = NULL;
 | 
				
			||||||
    if (!sql.is_valid() || column_ids.count() <= 0 ||
 | 
					    if (!sql.is_valid() || !tablet_checksum_status_map.created()) {
 | 
				
			||||||
        !tablet_columns_map.created() || !tablet_checksum_status_map.created()) {
 | 
					 | 
				
			||||||
      ret = OB_INVALID_ARGUMENT;
 | 
					      ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
      LOG_WARN("invalid arguments",
 | 
					      LOG_WARN("invalid arguments", K(ret), K(sql), K(tablet_checksum_status_map.created()));
 | 
				
			||||||
        K(ret), K(sql), K(column_ids.count()), K(tablet_columns_map.created()), K(tablet_checksum_status_map.created()));
 | 
					 | 
				
			||||||
    } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) {
 | 
					    } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql.ptr()))) {
 | 
				
			||||||
      LOG_WARN("fail to execute sql", K(ret));
 | 
					      LOG_WARN("fail to execute sql", K(ret));
 | 
				
			||||||
    } else if (OB_ISNULL(result = res.get_result())) {
 | 
					    } else if (OB_ISNULL(result = res.get_result())) {
 | 
				
			||||||
      ret = OB_ERR_UNEXPECTED;
 | 
					      ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
      LOG_WARN("error unexpected, query result must not be NULL", K(ret));
 | 
					      LOG_WARN("error unexpected, query result must not be NULL", K(ret));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      tablet_columns_map.reuse();
 | 
					      bool force_update = true;
 | 
				
			||||||
      // 1. get tablet column checksums from sql result
 | 
					      // 1. get tablet column checksums from sql result
 | 
				
			||||||
      while (OB_SUCC(ret)) {
 | 
					      while (OB_SUCC(ret)) {
 | 
				
			||||||
        if (OB_FAIL(result->next())) {
 | 
					        if (OB_FAIL(result->next())) {
 | 
				
			||||||
@ -271,29 +237,13 @@ int ObDDLChecksumOperator::get_tablet_checksum_status(
 | 
				
			|||||||
            LOG_WARN("fail to get next row", K(ret));
 | 
					            LOG_WARN("fail to get next row", K(ret));
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
          int64_t column_id = 0;
 | 
					          // int64_t column_id = 0;
 | 
				
			||||||
          uint64_t task_id = 0;
 | 
					          int64_t task_id = 0;
 | 
				
			||||||
          EXTRACT_INT_FIELD_MYSQL(*result, "column_id", column_id, int64_t);
 | 
					          // EXTRACT_INT_FIELD_MYSQL(*result, "column_id", column_id, int64_t);
 | 
				
			||||||
          EXTRACT_UINT_FIELD_MYSQL(*result, "task_id", task_id, uint64_t);
 | 
					          EXTRACT_INT_FIELD_MYSQL(*result, "task_id", task_id, int64_t);
 | 
				
			||||||
          if (OB_FAIL(construct_tablet_column_map(task_id, column_id, tablet_columns_map))) {
 | 
					          if (OB_SUCC(ret)
 | 
				
			||||||
            LOG_WARN("fail to construnt tablet column map", K(ret), K(task_id));
 | 
					              && OB_FAIL(tablet_checksum_status_map.set_refactored(task_id, true, force_update))) {
 | 
				
			||||||
          }
 | 
					            LOG_WARN("fail to set tablet column map", K(ret), K(task_id));
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      // 2. check tablet columns checksum exist or not
 | 
					 | 
				
			||||||
      if (OB_SUCC(ret)) {
 | 
					 | 
				
			||||||
        ObArray<int64_t> column_array;
 | 
					 | 
				
			||||||
        bool checksum_status = true;
 | 
					 | 
				
			||||||
        for (int64_t tablet_idx = 0; OB_SUCC(ret) && tablet_idx < batch_tablet_array.count(); ++tablet_idx) {
 | 
					 | 
				
			||||||
          uint64_t task_id = batch_tablet_array.at(tablet_idx);
 | 
					 | 
				
			||||||
          if (OB_FAIL(tablet_columns_map.get_refactored(task_id, column_array))) {
 | 
					 | 
				
			||||||
            LOG_WARN("fail to get tablet columns", K(ret), K(task_id));
 | 
					 | 
				
			||||||
          } else if (column_array.count() != column_ids.count()) { // not all columns checksum reported
 | 
					 | 
				
			||||||
            ret = OB_ERR_UNEXPECTED;
 | 
					 | 
				
			||||||
            LOG_WARN("fail to check column checksum, tablet columns not match",
 | 
					 | 
				
			||||||
              K(ret), K(column_array.count()), K(column_ids.count()));
 | 
					 | 
				
			||||||
          } else if (OB_FAIL(tablet_checksum_status_map.set_refactored(task_id, checksum_status, true))) { // force update
 | 
					 | 
				
			||||||
            LOG_WARN("fail to set tablet column checksum status", K(ret), K(task_id), K(checksum_status));
 | 
					 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
@ -314,34 +264,18 @@ int ObDDLChecksumOperator::get_tablet_checksum_record(
 | 
				
			|||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  ObSqlString sql;
 | 
					  ObSqlString sql;
 | 
				
			||||||
  const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
 | 
					  const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
 | 
				
			||||||
  ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
 | 
					 | 
				
			||||||
  ObSchemaGetterGuard schema_guard;
 | 
					 | 
				
			||||||
  const ObTableSchema *table_schema = nullptr;
 | 
					 | 
				
			||||||
  common::hash::ObHashMap<uint64_t, ObArray<int64_t>> tablet_columns_map;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == execution_id ||
 | 
					  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == execution_id ||
 | 
				
			||||||
                  OB_INVALID_ID == table_id || OB_INVALID_ID == ddl_task_id ||
 | 
					                  OB_INVALID_ID == table_id || OB_INVALID_ID == ddl_task_id ||
 | 
				
			||||||
                  tablet_ids.count() <= 0 || !tablet_checksum_status_map.created())) {
 | 
					                  tablet_ids.count() <= 0 || !tablet_checksum_status_map.created())) {
 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
    LOG_WARN("invalid argument",
 | 
					    LOG_WARN("invalid argument",
 | 
				
			||||||
      K(ret), K(tenant_id), K(execution_id), K(table_id), K(ddl_task_id), K(tablet_checksum_status_map.created()));
 | 
					      K(ret), K(tenant_id), K(execution_id), K(table_id), K(ddl_task_id),
 | 
				
			||||||
  } else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) {
 | 
					      K(tablet_checksum_status_map.created()));
 | 
				
			||||||
    LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id));
 | 
					 | 
				
			||||||
  } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
 | 
					 | 
				
			||||||
    LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
 | 
					 | 
				
			||||||
  } else if (OB_ISNULL(table_schema)) {
 | 
					 | 
				
			||||||
    ret = OB_TABLE_NOT_EXIST;
 | 
					 | 
				
			||||||
    LOG_INFO("table not exit", K(ret), K(tenant_id), K(table_id));
 | 
					 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    ObArray<ObColDesc> column_ids;
 | 
					 | 
				
			||||||
    if (OB_FAIL(table_schema->get_multi_version_column_descs(column_ids))) {
 | 
					 | 
				
			||||||
      LOG_WARN("fail to get column ids", K(ret), K(tenant_id), K(execution_id), K(ddl_task_id));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    int64_t batch_size = 100;
 | 
					    int64_t batch_size = 100;
 | 
				
			||||||
    ObArray<uint64_t> batch_tablet_array;
 | 
					    ObArray<uint64_t> batch_tablet_array;
 | 
				
			||||||
    if (OB_FAIL(tablet_columns_map.create(batch_size, ObModIds::OB_SSTABLE_CREATE_INDEX))) {
 | 
					
 | 
				
			||||||
      LOG_WARN("fail to create tablet column map", K(ret));
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    // check every tablet column checksum, task_id is equal to tablet_id
 | 
					    // check every tablet column checksum, task_id is equal to tablet_id
 | 
				
			||||||
    for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
 | 
					    for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
 | 
				
			||||||
      uint64_t last_tablet_id_id = tablet_ids.at(i).id();
 | 
					      uint64_t last_tablet_id_id = tablet_ids.at(i).id();
 | 
				
			||||||
@ -350,9 +284,9 @@ int ObDDLChecksumOperator::get_tablet_checksum_record(
 | 
				
			|||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        if ((i != 0 && i % batch_size == 0) /* reach batch size */ || i == tablet_ids.count() - 1 /* reach end */) {
 | 
					        if ((i != 0 && i % batch_size == 0) /* reach batch size */ || i == tablet_ids.count() - 1 /* reach end */) {
 | 
				
			||||||
          if (OB_FAIL(sql.assign_fmt(
 | 
					          if (OB_FAIL(sql.assign_fmt(
 | 
				
			||||||
              "SELECT task_id, column_id FROM %s "
 | 
					              "SELECT task_id FROM %s "
 | 
				
			||||||
              "WHERE execution_id = %ld AND tenant_id = %ld AND table_id = %ld AND ddl_task_id = %ld AND task_id >= %ld and task_id <= %ld "
 | 
					              "WHERE execution_id = %ld AND tenant_id = %ld AND table_id = %ld AND ddl_task_id = %ld AND task_id >= %ld and task_id <= %ld "
 | 
				
			||||||
              "ORDER BY task_id",
 | 
					              "GROUP BY task_id",
 | 
				
			||||||
              OB_ALL_DDL_CHECKSUM_TNAME,
 | 
					              OB_ALL_DDL_CHECKSUM_TNAME,
 | 
				
			||||||
              execution_id,
 | 
					              execution_id,
 | 
				
			||||||
              ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
 | 
					              ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
 | 
				
			||||||
@ -362,7 +296,7 @@ int ObDDLChecksumOperator::get_tablet_checksum_record(
 | 
				
			|||||||
              last_tablet_id_id))) {    // last  tablet id in one batch
 | 
					              last_tablet_id_id))) {    // last  tablet id in one batch
 | 
				
			||||||
            LOG_WARN("fail to assign fmt", K(ret), K(tenant_id), K(execution_id), K(ddl_task_id));
 | 
					            LOG_WARN("fail to assign fmt", K(ret), K(tenant_id), K(execution_id), K(ddl_task_id));
 | 
				
			||||||
          } else if (OB_FAIL(get_tablet_checksum_status(
 | 
					          } else if (OB_FAIL(get_tablet_checksum_status(
 | 
				
			||||||
              sql, tenant_id, batch_tablet_array, column_ids, sql_proxy, tablet_columns_map, tablet_checksum_status_map))) {
 | 
					              sql, tenant_id, batch_tablet_array, sql_proxy, tablet_checksum_status_map))) {
 | 
				
			||||||
            LOG_WARN("fail to get column checksum", K(ret), K(sql));
 | 
					            LOG_WARN("fail to get column checksum", K(ret), K(sql));
 | 
				
			||||||
          } else {
 | 
					          } else {
 | 
				
			||||||
            batch_tablet_array.reset();
 | 
					            batch_tablet_array.reset();
 | 
				
			||||||
@ -371,9 +305,6 @@ int ObDDLChecksumOperator::get_tablet_checksum_record(
 | 
				
			|||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  if (tablet_columns_map.created()) {
 | 
					 | 
				
			||||||
    tablet_columns_map.destroy();
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -108,14 +108,8 @@ private:
 | 
				
			|||||||
      const ObSqlString &sql,
 | 
					      const ObSqlString &sql,
 | 
				
			||||||
      const uint64_t tenant_id,
 | 
					      const uint64_t tenant_id,
 | 
				
			||||||
      ObIArray<uint64_t> &batch_tablet_ids,
 | 
					      ObIArray<uint64_t> &batch_tablet_ids,
 | 
				
			||||||
      ObIArray<ObColDesc> &column_ids,
 | 
					 | 
				
			||||||
      common::ObMySQLProxy &sql_proxy,
 | 
					      common::ObMySQLProxy &sql_proxy,
 | 
				
			||||||
      common::hash::ObHashMap<uint64_t, ObArray<int64_t>> &tablet_columns_map,
 | 
					 | 
				
			||||||
      common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_status_map);
 | 
					      common::hash::ObHashMap<uint64_t, bool> &tablet_checksum_status_map);
 | 
				
			||||||
  static int construct_tablet_column_map(
 | 
					 | 
				
			||||||
      const uint64_t tablet_id,
 | 
					 | 
				
			||||||
      const int64_t column_id,
 | 
					 | 
				
			||||||
      common::hash::ObHashMap<uint64_t, ObArray<int64_t>> &tablet_columns_map);
 | 
					 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}  // end namespace share
 | 
					}  // end namespace share
 | 
				
			||||||
 | 
				
			|||||||
@ -1540,10 +1540,10 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status(
 | 
				
			|||||||
  const uint64_t ddl_task_id,
 | 
					  const uint64_t ddl_task_id,
 | 
				
			||||||
  const int64_t execution_id,
 | 
					  const int64_t execution_id,
 | 
				
			||||||
  ObIArray<ObTabletID> &tablet_ids,
 | 
					  ObIArray<ObTabletID> &tablet_ids,
 | 
				
			||||||
  bool &tablet_checksum_status)
 | 
					  bool &is_checksums_all_report)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  tablet_checksum_status = false;
 | 
					  is_checksums_all_report = false;
 | 
				
			||||||
  common::hash::ObHashMap<uint64_t, bool> tablet_checksum_status_map;
 | 
					  common::hash::ObHashMap<uint64_t, bool> tablet_checksum_status_map;
 | 
				
			||||||
  int64_t tablet_count = tablet_ids.count();
 | 
					  int64_t tablet_count = tablet_ids.count();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -1565,6 +1565,7 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status(
 | 
				
			|||||||
    LOG_WARN("fail to get tablet checksum status",
 | 
					    LOG_WARN("fail to get tablet checksum status",
 | 
				
			||||||
      K(ret), K(tenant_id), K(execution_id), K(index_table_id), K(ddl_task_id));
 | 
					      K(ret), K(tenant_id), K(execution_id), K(index_table_id), K(ddl_task_id));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
 | 
					    int64_t report_checksum_cnt = 0;
 | 
				
			||||||
    int64_t tablet_idx = 0;
 | 
					    int64_t tablet_idx = 0;
 | 
				
			||||||
    for (tablet_idx = 0; OB_SUCC(ret) && tablet_idx < tablet_count; ++tablet_idx) {
 | 
					    for (tablet_idx = 0; OB_SUCC(ret) && tablet_idx < tablet_count; ++tablet_idx) {
 | 
				
			||||||
      const ObTabletID &tablet_id = tablet_ids.at(tablet_idx);
 | 
					      const ObTabletID &tablet_id = tablet_ids.at(tablet_idx);
 | 
				
			||||||
@ -1572,16 +1573,23 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status(
 | 
				
			|||||||
      bool status = false;
 | 
					      bool status = false;
 | 
				
			||||||
      if (OB_FAIL(tablet_checksum_status_map.get_refactored(tablet_id_id, status))) {
 | 
					      if (OB_FAIL(tablet_checksum_status_map.get_refactored(tablet_id_id, status))) {
 | 
				
			||||||
        LOG_WARN("fail to get tablet checksum record from map", K(ret), K(tablet_id_id));
 | 
					        LOG_WARN("fail to get tablet checksum record from map", K(ret), K(tablet_id_id));
 | 
				
			||||||
 | 
					        if (OB_HASH_NOT_EXIST == ret) {
 | 
				
			||||||
 | 
					          ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					          break;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
      } else if (!status) {
 | 
					      } else if (!status) {
 | 
				
			||||||
        break;
 | 
					        break;
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        report_checksum_cnt++;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    if (OB_SUCC(ret)) {
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
      if (tablet_idx == tablet_count) {
 | 
					      if (report_checksum_cnt == tablet_count) {
 | 
				
			||||||
        tablet_checksum_status = true;
 | 
					        is_checksums_all_report = true;
 | 
				
			||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        ret = OB_EAGAIN;
 | 
					        ret = OB_EAGAIN;
 | 
				
			||||||
        LOG_INFO("not all tablet has update checksum, will re-check", K(ret), K(tablet_idx), K(tablet_count));
 | 
					        LOG_INFO("not all tablet has update checksum, will re-check",
 | 
				
			||||||
 | 
					          K(ret), K(tablet_idx), K(tablet_count), K(is_checksums_all_report));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
@ -1607,7 +1615,7 @@ int ObCheckTabletDataComplementOp::check_all_tablet_sstable_status(
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  ObArray<ObTabletID> dest_tablet_ids;
 | 
					  ObArray<ObTabletID> dest_tablet_ids;
 | 
				
			||||||
  bool tablet_checksum_status = false;
 | 
					  bool is_checksums_all_report = false;
 | 
				
			||||||
  is_all_sstable_build_finished = false;
 | 
					  is_all_sstable_build_finished = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id || OB_INVALID_TIMESTAMP == snapshot_version ||
 | 
					  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id || OB_INVALID_TIMESTAMP == snapshot_version ||
 | 
				
			||||||
@ -1620,15 +1628,40 @@ int ObCheckTabletDataComplementOp::check_all_tablet_sstable_status(
 | 
				
			|||||||
    LOG_WARN("fail to check tablet merge status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(snapshot_version));
 | 
					    LOG_WARN("fail to check tablet merge status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(snapshot_version));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    if (is_all_sstable_build_finished) {
 | 
					    if (is_all_sstable_build_finished) {
 | 
				
			||||||
      if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, index_table_id, ddl_task_id, execution_id, dest_tablet_ids, tablet_checksum_status))) {
 | 
					      if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, index_table_id, ddl_task_id, execution_id, dest_tablet_ids, is_checksums_all_report))) {
 | 
				
			||||||
        LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
					        LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      is_all_sstable_build_finished &= tablet_checksum_status;
 | 
					      is_all_sstable_build_finished &= is_checksums_all_report;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int ObCheckTabletDataComplementOp::check_finish_report_checksum(
 | 
				
			||||||
 | 
					  const uint64_t tenant_id,
 | 
				
			||||||
 | 
					  const uint64_t index_table_id,
 | 
				
			||||||
 | 
					  const int64_t execution_id,
 | 
				
			||||||
 | 
					  const uint64_t ddl_task_id)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  bool is_checksums_all_report = false;
 | 
				
			||||||
 | 
					  ObArray<ObTabletID> dest_tablet_ids;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id ||
 | 
				
			||||||
 | 
					      ddl_task_id == OB_INVALID_ID || execution_id < 0)) {
 | 
				
			||||||
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
 | 
					    LOG_WARN("fail to check report checksum finished", K(ret), K(tenant_id), K(index_table_id), K(execution_id), K(ddl_task_id));
 | 
				
			||||||
 | 
					  } else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id, index_table_id, dest_tablet_ids))) {
 | 
				
			||||||
 | 
					    LOG_WARN("fail to get tablets", K(ret), K(tenant_id), K(index_table_id));
 | 
				
			||||||
 | 
					  } else if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, index_table_id, ddl_task_id, execution_id, dest_tablet_ids, is_checksums_all_report))) {
 | 
				
			||||||
 | 
					    LOG_WARN("fail to check tablet checksum update status, maybe EAGAIN", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
				
			||||||
 | 
					  } else if (!is_checksums_all_report) {
 | 
				
			||||||
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					    LOG_WARN("tablets checksum not all report!", K(is_checksums_all_report), K(ret));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
 * This func is used to check duplicate data completement inner sql
 | 
					 * This func is used to check duplicate data completement inner sql
 | 
				
			||||||
 * if has running inner sql, we should wait until finished. But
 | 
					 * if has running inner sql, we should wait until finished. But
 | 
				
			||||||
 | 
				
			|||||||
@ -367,6 +367,11 @@ public:
 | 
				
			|||||||
      const int64_t schema_version,
 | 
					      const int64_t schema_version,
 | 
				
			||||||
      const int64_t scn,
 | 
					      const int64_t scn,
 | 
				
			||||||
      bool &need_exec_new_inner_sql);
 | 
					      bool &need_exec_new_inner_sql);
 | 
				
			||||||
 | 
					  static int check_finish_report_checksum(
 | 
				
			||||||
 | 
					      const uint64_t tenant_id,
 | 
				
			||||||
 | 
					      const uint64_t index_table_id,
 | 
				
			||||||
 | 
					      const int64_t execution_id,
 | 
				
			||||||
 | 
					      const uint64_t ddl_task_id);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user