fix truncate table publish_schema and farm case

This commit is contained in:
obdev
2023-02-07 20:39:21 +08:00
committed by ob-robot
parent 28126a0a5a
commit bf3ace128e
6 changed files with 73 additions and 29 deletions

View File

@ -16150,7 +16150,8 @@ int ObDDLService::generate_table_schemas(const ObIArray<const ObTableSchema*> &o
int ObDDLService::new_truncate_table_in_trans(const ObIArray<const ObTableSchema*> &orig_table_schemas,
ObDDLSQLTransaction &trans,
const ObString *ddl_stmt_str)
const ObString *ddl_stmt_str,
obrpc::ObDDLRes &ddl_res)
{
int ret = OB_SUCCESS;
ObString table_name;
@ -16262,6 +16263,13 @@ int ObDDLService::new_truncate_table_in_trans(const ObIArray<const ObTableSchema
if (FAILEDx(trans.serialize_inc_schemas(first_schema_version - 1))) {
LOG_WARN("fail to serialize inc schemas", KR(ret), K(tenant_id), "start_schema_version", first_schema_version - 1);
}
if (OB_SUCC(ret)) {
ddl_res.tenant_id_ = tenant_id;
ddl_res.schema_id_ = new_table_schemas.at(0)->get_table_id();
ddl_res.task_id_ = boundary_schema_version;
}
} // else
@ -16286,21 +16294,16 @@ int ObDDLService::new_truncate_table_in_trans(const ObIArray<const ObTableSchema
// For table has auto_increment
// Sequence will not start from 1 when table not refresh newest schema
//To protect sequence value, we should synchronous refresh schema when we finish truncate table
if (OB_SUCC(ret)) {
uint64_t column_id = orig_table_schemas.at(0)->get_autoinc_column_id();
if (0 != column_id) {
if (OB_FAIL(publish_schema(tenant_id))) {
LOG_WARN("publish_schema failed", KR(ret), K(table_name), K(tenant_id));
}
}
}
int64_t finish_truncate_in_trans = ObTimeUtility::current_time();
//if (OB_SUCC(ret)) {
// if (OB_FAIL(publish_schema(tenant_id))) {
// LOG_WARN("publish_schema failed", KR(ret), K(table_name), K(tenant_id));
// }
//}
LOG_INFO("truncate cost after truncate_in_trans finish", KR(ret), K(task_id),
"cost", finish_truncate_in_trans - start_time,
"trans_cost", trans_end - start_time,
"fetch_schema_cost", before_wait_task - before_fetch_schema,
"wait_task_cost", wait_task - before_wait_task,
"trans_end_cost", trans_end - wait_task,
"publish_schema_cost", finish_truncate_in_trans - trans_end);
"trans_end_cost", trans_end - wait_task);
return ret;
}
@ -16499,7 +16502,7 @@ int ObDDLService::check_table_schema_is_legal(const ObDatabaseSchema & database_
}
int ObDDLService::new_truncate_table(const obrpc::ObTruncateTableArg &arg,
const obrpc::ObDDLRes &ddl_res,
obrpc::ObDDLRes &ddl_res,
const SCN &frozen_version)
{
int ret = OB_SUCCESS;
@ -16581,7 +16584,7 @@ int ObDDLService::new_truncate_table(const obrpc::ObTruncateTableArg &arg,
int64_t after_get_schema = ObTimeUtility::current_time();
LOG_INFO("truncate cost after get schema and check legal",
KR(ret), "cost_ts", after_get_schema - after_table_lock);
if (FAILEDx(new_truncate_table_in_trans(table_schema_array, trans, &arg.ddl_stmt_str_))) {
if (FAILEDx(new_truncate_table_in_trans(table_schema_array, trans, &arg.ddl_stmt_str_, ddl_res))) {
LOG_WARN("truncate table in trans failed",
KR(ret), K(arg.table_name_), K(table_id), K(orig_table_schema.get_schema_version()));
}

View File

@ -455,7 +455,7 @@ public:
ObArenaAllocator &allocator,
int64_t &task_id);
virtual int new_truncate_table(const obrpc::ObTruncateTableArg &arg,
const obrpc::ObDDLRes &ddl_res,
obrpc::ObDDLRes &ddl_res,
const share::SCN &frozen_scn);
int drop_not_null_cst_in_column_flag(const ObTableSchema &orig_table_schema,
const AlterTableSchema &alter_table_schema,
@ -1518,7 +1518,8 @@ private:
const common::ObString &db_name);
int new_truncate_table_in_trans(const ObIArray<const share::schema::ObTableSchema*> &orig_table_schemas,
ObDDLSQLTransaction &trans,
const ObString *ddl_stmt_str);
const ObString *ddl_stmt_str,
obrpc::ObDDLRes &ddl_res);
int restore_obj_priv_after_truncation(
ObDDLOperator &ddl_operator,
ObMySQLTransaction &trans,

View File

@ -37,7 +37,7 @@ TG_DEF(RLMGR, RLMGR, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(LeaseQueueTh, LeaseQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::LEASE_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_LEASE_TASK_THREAD_CNT))
TG_DEF(DDLQueueTh, DDLQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::DDL_TASK_THREAD_CNT, observer::ObSrvDeliver::DDL_TASK_THREAD_CNT))
TG_DEF(MysqlQueueTh, MysqlQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MYSQL_TASK_THREAD_CNT), GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MINI_MODE_MYSQL_TASK_THREAD_CNT)))
TG_DEF(DDLPQueueTh, DDLPQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(3, 24), 2))
TG_DEF(DDLPQueueTh, DDLPQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(2, 24), 2))
TG_DEF(DiagnoseQueueTh, DiagnoseQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::MYSQL_DIAG_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_MYSQL_DIAG_TASK_THREAD_CNT))
TG_DEF(DdlBuild, DdlBuild, "", TG_STATIC, ASYNC_TASK_QUEUE, ThreadCountPair(16, 1), 4 << 10)
TG_DEF(LSService, LSService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(2 ,2))

View File

@ -35,10 +35,8 @@ int ObDDLTransController::init(share::schema::ObMultiVersionSchemaService *schem
LOG_WARN("hashset create fail", KR(ret));
} else if (OB_FAIL(tenant_for_ddl_trans_new_lock_.create(10))) {
LOG_WARN("hashset create fail", KR(ret));
} else if (OB_FAIL(timer_.init("RefreshSchema"))) {
LOG_WARN("init timer fail", KR(ret));
} else if (OB_FAIL(timer_.schedule(*this, 200 * 1000, true))) {
LOG_WARN("schedule refresh task fail", KR(ret));
} else if (OB_FAIL(ObThreadPool::start())) {
LOG_WARN("thread start fail", KR(ret));
} else {
schema_service_ = schema_service;
inited_ = true;
@ -47,10 +45,22 @@ int ObDDLTransController::init(share::schema::ObMultiVersionSchemaService *schem
return ret;
}
void ObDDLTransController::runTimerTask()
ObDDLTransController::~ObDDLTransController()
{
ObThreadPool::stop();
wait_cond_.signal();
ObThreadPool::wait();
ObThreadPool::destroy();
schema_service_ = NULL;
tenant_for_ddl_trans_new_lock_.destroy();
inited_ = false;
}
void ObDDLTransController::run1()
{
int ret = OB_SUCCESS;
while (OB_SUCC(ret)) {
lib::set_thread_name("DDLTransCtr");
while (!has_set_stop()) {
ObArray<uint64_t> tenant_ids;
{
SpinWLockGuard guard(lock_);
@ -78,8 +88,9 @@ void ObDDLTransController::runTimerTask()
}
}
}
} else {
break;
}
if (tenant_ids.empty()) {
wait_cond_.timedwait(100 * 1000);
}
}
}
@ -193,6 +204,8 @@ int ObDDLTransController::remove_task(int64_t task_id)
LOG_WARN("remove_task fail", KR(ret), K(task_id));
} else if (OB_FAIL(tenants_.set_refactored(tenant_id, 1, 0, 1))) {
LOG_WARN("set_refactored fail", KR(ret), K(tenant_id));
} else {
wait_cond_.signal();
}
break;
}

View File

@ -17,6 +17,7 @@
#include "lib/lock/ob_thread_cond.h"
#include "lib/task/ob_timer.h"
#include "lib/hash/ob_hashset.h"
#include "common/ob_queue_thread.h"
namespace oceanbase
{
@ -36,10 +37,11 @@ struct TaskDesc
};
// impl for ddl schema change trans commit in order with schema_version
class ObDDLTransController : public common::ObTimerTask
class ObDDLTransController : public lib::ThreadPool
{
public:
ObDDLTransController() : inited_(false), schema_service_(NULL) {}
~ObDDLTransController();
int init(share::schema::ObMultiVersionSchemaService *schema_service);
static const int DDL_TASK_COND_SLOT = 128;
int create_task_and_assign_schema_version(const uint64_t tenant_id,
@ -51,7 +53,7 @@ public:
int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res);
int set_enable_ddl_trans_new_lock(int64_t tenant_id);
private:
virtual void runTimerTask() override;
virtual void run1() override;
int check_task_ready(int64_t task_id, bool &ready);
private:
bool inited_;
@ -66,6 +68,8 @@ private:
// for compat
common::SpinRWLock lock_for_tenant_set_;
common::hash::ObHashSet<uint64_t> tenant_for_ddl_trans_new_lock_;
common::ObCond wait_cond_;
};
} // end schema

View File

@ -1732,8 +1732,31 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
break;
}
}
int64_t step_time = ObTimeUtility::current_time();
LOG_INFO("truncate_table_v2 finish trans", K(ret), "cost", step_time-start_time, "table_name", truncate_table_arg.table_name_, K(res));
if (OB_FAIL(ret)) {
} else if (!res.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("truncate invalid ddl_res", KR(ret), K(res));
} else {
// wait schema_version refreshed on this server
while (OB_SUCC(ret) && ctx.get_timeout() > 0) {
int64_t refreshed_schema_version = OB_INVALID_VERSION;
if (OB_FAIL(GCTX.schema_service_->get_tenant_refreshed_schema_version(res.tenant_id_, refreshed_schema_version))) {
LOG_WARN("get schema_version fail", KR(ret), K(res.tenant_id_));
} else if (refreshed_schema_version >= res.task_id_) {
break;
} else {
ob_usleep(10 * 1000);
}
}
}
int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("truncate_table_v2", K(ret), "cost", end_time-start_time, "table_name", truncate_table_arg.table_name_);
LOG_INFO("truncate_table_v2", K(ret), "cost", end_time-start_time,
"trans_cost", step_time - start_time,
"wait_refresh", end_time - step_time,
"table_name", truncate_table_arg.table_name_,
K(res));
}
}