[FEAT MERGE] Parallel create table

Co-authored-by: Tyshawn <tuyunshan@gmail.com>
This commit is contained in:
tino247
2023-08-31 10:40:35 +00:00
committed by ob-robot
parent abb2a6b573
commit 49d54bfc46
101 changed files with 10118 additions and 651 deletions

View File

@ -54,6 +54,7 @@
#include "share/external_table/ob_external_table_file_rpc_processor.h"
#include "share/external_table/ob_external_table_utils.h"
#include "share/ob_debug_sync.h"
#include "share/schema/ob_schema_utils.h"
namespace oceanbase
{
using namespace common;
@ -499,14 +500,17 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
ObSelectStmt *select_stmt = stmt.get_sub_select();
ObTableSchema &table_schema = create_table_arg.schema_;
ObSQLSessionInfo *my_session = ctx.get_my_session();
uint64_t tenant_id = table_schema.get_tenant_id();
uint64_t data_version = 0;
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
LOG_WARN("get first statement failed", K(ret));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get data version", KR(ret), K(tenant_id));
} else if (table_schema.is_duplicate_table()) {
bool is_compatible = false;
uint64_t tenant_id = table_schema.get_tenant_id();
if (OB_FAIL(ObShareUtil::check_compat_version_for_readonly_replica(tenant_id, is_compatible))) {
LOG_WARN("fail to check data version for duplicate table", KR(ret), K(tenant_id));
} else if (!is_compatible) {
@ -526,6 +530,13 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
create_table_arg.is_inner_ = my_session->is_inner();
create_table_arg.consumer_group_id_ = THIS_WORKER.get_group_id();
const_cast<obrpc::ObCreateTableArg&>(create_table_arg).ddl_stmt_str_ = first_stmt;
bool enable_parallel_create_table = false;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
enable_parallel_create_table = tenant_config.is_valid()
&& tenant_config->_enable_parallel_table_creation;
}
if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed", K(ret));
@ -541,26 +552,49 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
LOG_WARN("common rpc proxy should not be null", K(ret));
} else if (OB_ISNULL(select_stmt)) { // 普通建表的处理
if (OB_FAIL(ctx.get_sql_ctx()->schema_guard_->reset())){
LOG_WARN("schema_guard reset failed", K(ret));
} else if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, res))) {
LOG_WARN("rpc proxy create table failed", K(ret), "dst", common_rpc_proxy->get_server());
} else {
if (table_schema.is_external_table()) {
//auto refresh after create external table
OZ (ObAlterTableExecutor::update_external_file_list(
table_schema.get_tenant_id(), res.table_id_,
table_schema.get_external_file_location(),
table_schema.get_external_file_location_access_info(),
table_schema.get_external_file_pattern(),
ctx));
LOG_WARN("schema_guard reset failed", KR(ret));
} else if (table_schema.is_view_table()
|| data_version < DATA_VERSION_4_2_1_0
|| !enable_parallel_create_table) {
if (OB_FAIL(common_rpc_proxy->create_table(create_table_arg, res))) {
LOG_WARN("rpc proxy create table failed", KR(ret), "dst", common_rpc_proxy->get_server());
}
} else {
int64_t start_time = ObTimeUtility::current_time();
ObTimeoutCtx ctx;
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF._ob_ddl_timeout))) {
LOG_WARN("fail to set timeout ctx", KR(ret));
} else if (OB_FAIL(common_rpc_proxy->parallel_create_table(create_table_arg, res))) {
LOG_WARN("rpc proxy create table failed", KR(ret), "dst", common_rpc_proxy->get_server());
} else {
int64_t refresh_time = ObTimeUtility::current_time();
if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(ctx,
tenant_id, res.schema_version_, GCONF._wait_interval_after_truncate))) {
LOG_WARN("fail to check paralleld ddl schema in sync", KR(ret), K(res));
}
int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("[parallel_create_table]", KR(ret),
"cost", end_time - start_time,
"execute_time", refresh_time - start_time,
"wait_schema", end_time - refresh_time,
"table_name", create_table_arg.schema_.get_table_name());
}
}
if (OB_SUCC(ret) && table_schema.is_external_table()) {
//auto refresh after create external table
OZ (ObAlterTableExecutor::update_external_file_list(
table_schema.get_tenant_id(), res.table_id_,
table_schema.get_external_file_location(),
table_schema.get_external_file_location_access_info(),
table_schema.get_external_file_pattern(),
ctx));
}
} else {
if (table_schema.is_external_table()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "create external table as select");
} else if (OB_FAIL(execute_ctas(ctx, stmt, common_rpc_proxy))){ // 查询建表的处理
LOG_WARN("execute create table as select failed", K(ret));
LOG_WARN("execute create table as select failed", KR(ret));
}
}
@ -1902,6 +1936,8 @@ int ObAlterTableExecutor::check_alter_partition(ObExecContext &ctx,
}
} else if (obrpc::ObAlterTableArg::DROP_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::DROP_SUB_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::RENAME_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::RENAME_SUB_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_PARTITION == arg.alter_part_type_
|| obrpc::ObAlterTableArg::TRUNCATE_SUB_PARTITION == arg.alter_part_type_) {
// do-nothing
@ -2173,25 +2209,9 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
} else if (!res.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("truncate invalid ddl_res", KR(ret), K(res));
} else {
// wait schema_version refreshed on this server
while (OB_SUCC(ret) && ctx.get_timeout() > 0) {
int64_t refreshed_schema_version = OB_INVALID_VERSION;
int64_t consensus_schema_version = OB_INVALID_VERSION;
if (OB_FAIL(GCTX.schema_service_->get_tenant_refreshed_schema_version(res.tenant_id_, refreshed_schema_version))) {
LOG_WARN("get refreshed schema_version fail", KR(ret), K(res.tenant_id_));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_broadcast_consensus_version(res.tenant_id_, consensus_schema_version))) {
LOG_WARN("get consensus schema_version fail", KR(ret), K(res.tenant_id_));
} else if (refreshed_schema_version >= res.task_id_
&& consensus_schema_version >= res.task_id_) {
break;
} else if (refreshed_schema_version >= res.task_id_
&& ObTimeUtility::current_time() - step_time >= GCONF._wait_interval_after_truncate) {
break;
} else {
ob_usleep(10 * 1000);
}
}
} else if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(ctx,
tenant_id, res.task_id_, GCONF._wait_interval_after_truncate))) {
LOG_WARN("fail to check parallel ddl schema in sync", KR(ret), K(res));
}
int64_t end_time = ObTimeUtility::current_time();
LOG_INFO("truncate_table_v2", KR(ret), "cost", end_time-start_time,