[CP] fix some ctas exception handle bugs

This commit is contained in:
obdev
2022-07-04 10:14:19 +08:00
committed by wangzelin.wzl
parent a963f52add
commit 491d36e26a
17 changed files with 252 additions and 154 deletions

View File

@ -245,26 +245,20 @@ int ObCreateTableExecutor::execute_ctas(
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
int64_t dummy = 0;
ObMySQLProxy* sql_proxy = ctx.get_sql_proxy();
common::ObCommonSqlProxy* user_sql_proxy;
common::ObOracleSqlProxy oracle_sql_proxy;
ObSQLSessionInfo* my_session = ctx.get_my_session();
ObPhysicalPlanCtx* plan_ctx = ctx.get_physical_plan_ctx();
ObSchemaGetterGuard schema_guard;
obrpc::ObAlterTableArg alter_table_arg;
obrpc::ObDropTableArg drop_table_arg;
obrpc::ObTableItem table_item;
ObSqlString ins_sql;
bool is_auto_commit = false;
bool need_clean_up = false;
int first_err_code = OB_SUCCESS;
const observer::ObGlobalContext& gctx = observer::ObServer::get_instance().get_gctx();
const ObTableSchema* table_schema = NULL;
obrpc::UInt64 table_id;
common::sqlclient::ObISQLConnection* conn = NULL;
obrpc::ObCreateTableRes create_table_res;
obrpc::ObCreateTableArg& create_table_arg = stmt.get_create_table_arg();
create_table_arg.is_inner_ = my_session->is_inner();
bool need_clean = true;
CK(OB_NOT_NULL(sql_proxy),
OB_NOT_NULL(my_session),
OB_NOT_NULL(gctx.schema_service_),
@ -283,97 +277,126 @@ int ObCreateTableExecutor::execute_ctas(
LOG_WARN("failed to prepare alter table arg", K(ret));
} else if (OB_FAIL(prepare_drop_arg(stmt, my_session, table_item, drop_table_arg))) {
LOG_WARN("failed to prepare drop table arg", K(ret));
} else if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, table_id))) {
} else if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, create_table_res))) {
LOG_WARN("rpc proxy create table failed", K(ret), "dst", common_rpc_proxy->get_server());
} else if (OB_INVALID_ID != table_id) {
const int64_t s1 = ObTimeUtility::current_time();
int64_t s2 = 0;
const int64_t DDL_WAIT_TIME = 1 * 1000 * 1000; // 1s
const int64_t SLEEP_ON_NEED_RETRY = 10 * 1000; // 10ms
obrpc::ObAlterTableRes res;
while (true) {
if (OB_FAIL(
gctx.schema_service_->get_tenant_schema_guard(my_session->get_effective_tenant_id(), schema_guard))) {
LOG_WARN("failed to get schema guard", K(ret));
break;
} else if (OB_INVALID_ID != create_table_res.table_id_) {
if (OB_INVALID_VERSION == create_table_res.schema_version_) {
// During upgrade, high version server send create table rpc to low version RS. RS will
// return a struct UINT64. Try deserialize UINT64 to ObCreateTableRes will only get correct
// table_id. And schema_version remain default value OB_INVALID_VERSION.
// In this scenario, using old strategy.
ObSchemaGetterGuard schema_guard;
const ObTableSchema* table_schema = NULL;
const int64_t s1 = ObTimeUtility::current_time();
int64_t s2 = 0;
const int64_t DDL_WAIT_TIME = 1 * 1000 * 1000; // 1s
const int64_t SLEEP_ON_NEED_RETRY = 10 * 1000; // 10ms
while (true) {
if (OB_FAIL(
gctx.schema_service_->get_tenant_schema_guard(my_session->get_effective_tenant_id(), schema_guard))) {
LOG_WARN("failed to get schema guard", K(ret));
break;
}
if (OB_FAIL(schema_guard.get_table_schema(create_table_res.table_id_, table_schema))) {
LOG_WARN("failed to get table schema", K(ret), K(create_table_res.table_id_));
break;
}
s2 = ObTimeUtility::current_time();
if (OB_NOT_NULL(table_schema)) {
LOG_DEBUG("CTAS refresh table schema succeed!", K(ret), K(s2 - s1));
break;
}
if (s2 - s1 < DDL_WAIT_TIME) {
ret = OB_SUCCESS;
usleep(SLEEP_ON_NEED_RETRY);
LOG_DEBUG("CTAS refresh table schema failed, try again", K(ret), K(create_table_res.table_id_), K(s2 - s1));
} else {
LOG_DEBUG("CTAS refresh table schema timeout!", K(ret), K(create_table_res.table_id_), K(s2 - s1));
break;
}
}
if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) {
LOG_WARN("failed to get table schema", K(ret), K(table_id));
break;
}
s2 = ObTimeUtility::current_time();
if (OB_NOT_NULL(table_schema)) {
LOG_DEBUG("CTAS refresh table schema succeed!", K(ret), K(s2 - s1));
break;
}
if (s2 - s1 < DDL_WAIT_TIME) {
ret = OB_SUCCESS;
usleep(SLEEP_ON_NEED_RETRY);
LOG_DEBUG("CTAS refresh table schema failed, try again", K(ret), K(table_id), K(s2 - s1));
} else {
LOG_DEBUG("CTAS refresh table schema timeout!", K(ret), K(table_id), K(s2 - s1));
break;
}
}
if (share::is_oracle_mode()) {
user_sql_proxy = &oracle_sql_proxy;
} else {
user_sql_proxy = sql_proxy;
uint64_t tenant_id = my_session->get_effective_tenant_id();
if (is_inner_table(create_table_res.table_id_)) {
tenant_id = OB_SYS_TENANT_ID;
}
if (OB_FAIL(gctx.schema_service_->async_refresh_schema(tenant_id, create_table_res.schema_version_))) {
LOG_WARN("failed to async refresh schema", K(ret));
}
}
#ifdef ERRSIM
{
int tmp_ret = E(EventTable::EN_CTAS_FAIL_NO_DROP_ERROR) OB_SUCCESS;
if (OB_FAIL(tmp_ret)) {
ret = tmp_ret;
need_clean = false;
}
}
#else
// do nothing...
#endif
if (OB_SUCC(ret)) {
common::sqlclient::ObISQLConnection *conn = NULL;
if (share::is_oracle_mode()) {
user_sql_proxy = &oracle_sql_proxy;
} else {
user_sql_proxy = sql_proxy;
}
if (OB_FAIL(pool->acquire(my_session, conn))) {
LOG_WARN("failed to acquire inner connection", K(ret));
} else if (OB_ISNULL(conn)) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("connection can not be NULL", K(ret));
} else if (OB_FAIL(conn->start_transaction())) {
LOG_WARN("failed start transaction", K(ret));
} else {
if (OB_FAIL(conn->execute_write(my_session->get_effective_tenant_id(), ins_sql.ptr(),
affected_rows, true))) {
LOG_WARN("failed to exec sql", K(ins_sql), K(ret));
}
// transaction started, must commit or rollback
int tmp_ret = OB_SUCCESS;
if (OB_LIKELY(OB_SUCCESS == ret)) {
tmp_ret = conn->commit();
} else {
tmp_ret = conn->rollback();
}
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
LOG_WARN("fail to end transaction", K(ret), K(tmp_ret));
}
}
if (OB_NOT_NULL(conn)) {
user_sql_proxy->close(conn, true);
}
}
if (OB_SUCC(ret)) {
obrpc::ObAlterTableRes res;
if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
LOG_WARN("failed to update table session", K(ret), K(alter_table_arg));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(pool->acquire(my_session, conn))) {
need_clean_up = true;
first_err_code = ret;
LOG_WARN("failed to acquire inner connection", K(ret));
} else if (OB_FAIL(
conn->execute_write(my_session->get_effective_tenant_id(), ins_sql.ptr(), affected_rows, true))) {
need_clean_up = true;
first_err_code = ret;
LOG_WARN("failed to exec sql", K(ins_sql), K(ret));
} else if (OB_FAIL(my_session->get_autocommit(is_auto_commit))) {
need_clean_up = true;
first_err_code = ret;
LOG_WARN("failed to get auto commit", K(ret));
} else if (!is_auto_commit &&
OB_FAIL(conn->execute_write(my_session->get_effective_tenant_id(), "commit;", dummy, true))) {
need_clean_up = true;
first_err_code = ret;
LOG_WARN("failed to exec commit", K(ins_sql), K(ret));
} else if (OB_FAIL(common_rpc_proxy->alter_table(alter_table_arg, res))) {
need_clean_up = true;
first_err_code = ret;
LOG_WARN("failed to update table session", K(ret), K(alter_table_arg));
my_session->update_last_active_time();
if (OB_LIKELY(need_clean)) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = common_rpc_proxy->drop_table(drop_table_arg))) {
LOG_WARN("failed to drop table", K(drop_table_arg), K(ret));
} else {
LOG_INFO("table is created and dropped due to error ", K(ret));
}
}
} else {
plan_ctx->set_affected_rows(affected_rows);
LOG_DEBUG("CTAS all done", K(ins_sql), K(affected_rows), K(share::is_oracle_mode()));
}
#ifdef ERRSIM
{
int test_used_ret = OB_SUCCESS;
test_used_ret = E(EventTable::EN_CTAS_FAIL_NO_DROP_ERROR) OB_SUCCESS;
if (OB_FAIL(test_used_ret)) {
need_clean_up = false;
}
}
#else
// do nothing...
#endif
if (need_clean_up) {
if (OB_FAIL(common_rpc_proxy->drop_table(drop_table_arg))) {
LOG_WARN("failed to drop table", K(drop_table_arg), K(ret));
} else {
ret = first_err_code;
LOG_INFO("table is created and dropped due to error ", K(ret));
}
}
if (OB_NOT_NULL(conn)) {
user_sql_proxy->close(conn, true);
}
} else {
LOG_DEBUG("table exists, no need to CTAS", K(table_id));
LOG_DEBUG("table exists, no need to CTAS", K(create_table_res.table_id_));
}
}
return ret;
@ -384,7 +407,7 @@ int ObCreateTableExecutor::execute(ObExecContext& ctx, ObCreateTableStmt& stmt)
int ret = OB_SUCCESS;
ObTaskExecutorCtx* task_exec_ctx = NULL;
obrpc::ObCommonRpcProxy* common_rpc_proxy = NULL;
obrpc::UInt64 table_id;
obrpc::ObCreateTableRes res;
obrpc::ObCreateTableArg& create_table_arg = stmt.get_create_table_arg();
ObString first_stmt;
ObSelectStmt* select_stmt = stmt.get_sub_select();
@ -411,7 +434,7 @@ int ObCreateTableExecutor::execute(ObExecContext& ctx, ObCreateTableStmt& stmt)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_ISNULL(select_stmt)) {
if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, table_id))) {
if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, res))) {
LOG_WARN("rpc proxy create table failed", K(ret), "dst", common_rpc_proxy->get_server());
} else { /* do nothing */
}
@ -419,8 +442,10 @@ int ObCreateTableExecutor::execute(ObExecContext& ctx, ObCreateTableStmt& stmt)
LOG_WARN("execute create table as select failed", K(ret));
}
if (OB_SUCC(ret) && 0 != table_schema.get_session_id()) {
LOG_DEBUG("CTAS or temporary table create detected", K(table_schema));
// only CTAS or create temperary table will make session_id != 0. If such table detected, set
// need ctas cleanup task anyway to do some cleanup jobs
if (0 != table_schema.get_session_id()) {
LOG_TRACE("CTAS or temporary table create detected", K(table_schema));
ATOMIC_STORE(&OBSERVER.need_ctas_cleanup_, true);
}
}