[FEAT MERGE] Support tenant-level global control direct load data

Co-authored-by: fforkboat <fforkboat@gmail.com>
This commit is contained in:
obdev
2024-09-29 06:16:11 +00:00
committed by ob-robot
parent e8ba6ca6d0
commit fdd96032d6
42 changed files with 868 additions and 340 deletions

View File

@ -111,49 +111,45 @@ int ObCreateTableExecutor::ObInsSQLPrinter::inner_print(char *buf, int64_t buf_l
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null stmt", K(ret));
} else {
const char *insert_str = NULL;
const int64_t parallel_str_max_len = 256;
char parallel_str[parallel_str_max_len] = {0};
int64_t parallel_str_pos = 0;
const char *osg_str = NULL;
const char *append_str = NULL;
const int64_t direct_str_max_len = 256;
char direct_str[direct_str_max_len] = {0};
int64_t direct_str_pos = 0;
insert_mode = stmt_->get_insert_mode();
if (insert_mode != 0 &&
insert_mode != 1 &&
insert_mode != 2 ) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected insert_mode", K(insert_mode), K(ret));
} else if (insert_mode == 1 /*ignore*/) {
if (OB_FAIL(databuff_printf(buf, buf_len, pos1,
do_osg_
? "insert ignore /*+ ENABLE_PARALLEL_DML PARALLEL(%lu) GATHER_OPTIMIZER_STATISTICS*/ into %c%.*s%c.%c%.*s%c"
: "insert ignore /*+ ENABLE_PARALLEL_DML PARALLEL(%lu) NO_GATHER_OPTIMIZER_STATISTICS*/ into %c%.*s%c.%c%.*s%c",
stmt_->get_parallelism(),
sep_char,
stmt_->get_database_name().length(),
stmt_->get_database_name().ptr(),
sep_char,
sep_char,
stmt_->get_table_name().length(),
stmt_->get_table_name().ptr(),
sep_char))) {
LOG_WARN("fail to print insert into string", K(ret));
}
} else if (insert_mode == 2 /*replace*/) {
if (OB_FAIL(databuff_printf(buf, buf_len, pos1,
do_osg_
? "replace /*+ ENABLE_PARALLEL_DML PARALLEL(%lu) GATHER_OPTIMIZER_STATISTICS*/ into %c%.*s%c.%c%.*s%c"
: "replace /*+ ENABLE_PARALLEL_DML PARALLEL(%lu) NO_GATHER_OPTIMIZER_STATISTICS*/ into %c%.*s%c.%c%.*s%c",
stmt_->get_parallelism(),
sep_char,
stmt_->get_database_name().length(),
stmt_->get_database_name().ptr(),
sep_char,
sep_char,
stmt_->get_table_name().length(),
stmt_->get_table_name().ptr(),
sep_char))) {
LOG_WARN("fail to print insert into string", K(ret));
} else {
insert_str = insert_mode == 0 ? "insert" : insert_mode == 1 ? "insert ignore" : "replace";
osg_str = do_osg_ ? "GATHER_OPTIMIZER_STATISTICS" : "NO_GATHER_OPTIMIZER_STATISTICS";
append_str = stmt_->get_has_append_hint() ? "append" : "";
const bool has_parallel_hint = stmt_->get_has_parallel_hint();
const ObDirectLoadHint &direct_load_hint = stmt_->get_direct_load_hint();
if (has_parallel_hint &&
OB_FAIL(databuff_printf(parallel_str, parallel_str_max_len, parallel_str_pos,
"PARALLEL(%lu)", stmt_->get_parallelism()))) {
LOG_WARN("fail to print parallel hint", K(ret), K(stmt_->get_parallelism()));
} else if (OB_FAIL(direct_load_hint.print_direct_load_hint(direct_str, direct_str_max_len,
direct_str_pos))) {
LOG_WARN("fail to print direct load hint", K(ret), K(direct_load_hint));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos1,
do_osg_
? "insert /*+ ENABLE_PARALLEL_DML PARALLEL(%lu) GATHER_OPTIMIZER_STATISTICS*/ into %c%.*s%c.%c%.*s%c"
: "insert /*+ ENABLE_PARALLEL_DML PARALLEL(%lu) NO_GATHER_OPTIMIZER_STATISTICS*/ into %c%.*s%c.%c%.*s%c",
stmt_->get_parallelism(),
"%s /*+ ENABLE_PARALLEL_DML %s %s %s %s */ into %c%.*s%c.%c%.*s%c",
insert_str,
parallel_str,
osg_str,
append_str,
direct_str,
sep_char,
stmt_->get_database_name().length(),
stmt_->get_database_name().ptr(),
@ -161,7 +157,8 @@ int ObCreateTableExecutor::ObInsSQLPrinter::inner_print(char *buf, int64_t buf_l
sep_char,
stmt_->get_table_name().length(),
stmt_->get_table_name().ptr(),
sep_char))) {
sep_char
))) {
LOG_WARN("fail to print insert into string", K(ret));
}
}
@ -219,6 +216,7 @@ int ObCreateTableExecutor::ObInsSQLPrinter::inner_print(char *buf, int64_t buf_l
LOG_WARN("fail to print select stmt", K(ret));
} else {
res_len = pos1;
LOG_INFO("[CTAS] successfully print sql", "sql", buf);
}
}
return ret;
@ -452,6 +450,13 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
if (OB_SUCC(ret)) {
bool is_mysql_temp_table = stmt.get_create_table_arg().schema_.is_mysql_tmp_table();
bool in_trans = my_session->is_in_transaction();
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
const ObString &config_str = tenant_config->default_load_mode.get_value_string();
bool is_full_direct_insert =
stmt.get_has_append_hint() ||
stmt.get_direct_load_hint().is_full_load_method() ||
(tenant_config.is_valid() && 0 == config_str.case_compare("FULL_DIRECT_WRITE"));
bool need_start_trans = !is_full_direct_insert && (!is_mysql_temp_table || !in_trans);
ObBasicSessionInfo::UserScopeGuard user_scope_guard(my_session->get_sql_scope_flags());
common::sqlclient::ObISQLConnection *conn = NULL;
const uint64_t tenant_id = my_session->get_effective_tenant_id();
@ -465,8 +470,7 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
} else if (OB_ISNULL(conn)) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("connection can not be NULL", K(ret));
} else if ((!is_mysql_temp_table || !in_trans)
&& OB_FAIL(conn->start_transaction(tenant_id))) {
} else if (need_start_trans && OB_FAIL(conn->start_transaction(tenant_id))) {
LOG_WARN("failed start transaction", K(ret), K(tenant_id));
} else {
if (OB_FAIL(conn->execute_write(tenant_id, ins_sql.ptr(),
@ -475,7 +479,7 @@ int ObCreateTableExecutor::execute_ctas(ObExecContext &ctx,
}
// transaction started, must commit or rollback
int tmp_ret = OB_SUCCESS;
if (!is_mysql_temp_table || !in_trans) {
if (need_start_trans) {
if (OB_LIKELY(OB_SUCCESS == ret)) {
tmp_ret = conn->commit();
} else {