[CP] [CP] _parallel_ddl_control

This commit is contained in:
fanfangzhou
2024-06-19 06:47:28 +00:00
committed by ob-robot
parent 6bf3f38ce4
commit dc3831cd1d
11 changed files with 322 additions and 9 deletions

View File

@ -1221,6 +1221,15 @@ bool ObConfigModeItem::set(const char *str)
return valid; return valid;
} }
int ObConfigModeItem::init_mode(ObIConfigMode &mode)
{
int ret = OB_SUCCESS;
ObLatchRGuard r_guard(lock_, ObLatchIds::CONFIG_LOCK);
if (OB_FAIL(mode.set_value(*this))) {
OB_LOG(WARN, "set_value failed", KR(ret));
};
return ret;
}
ObConfigVersionItem::ObConfigVersionItem(ObConfigContainer *container, ObConfigVersionItem::ObConfigVersionItem(ObConfigContainer *container,
Scope::ScopeInfo scope_info, Scope::ScopeInfo scope_info,
const char *name, const char *name,

View File

@ -1255,6 +1255,7 @@ private:
common::ObSArray<ObConfigPair> config_array_; common::ObSArray<ObConfigPair> config_array_;
}; };
class ObIConfigMode;
class ObConfigModeItem: public ObConfigItem class ObConfigModeItem: public ObConfigItem
{ {
public: public:
@ -1275,6 +1276,8 @@ public:
virtual ObConfigItemType get_config_item_type() const { virtual ObConfigItemType get_config_item_type() const {
return ObConfigItemType::OB_CONF_ITEM_TYPE_MODE; return ObConfigItemType::OB_CONF_ITEM_TYPE_MODE;
} }
int init_mode(ObIConfigMode &mode);
static const int64_t MAX_MODE_BYTES = 32;
protected: protected:
//use current value to do input operation //use current value to do input operation
bool set(const char *str); bool set(const char *str);
@ -1297,7 +1300,6 @@ protected:
protected: protected:
static const uint64_t VALUE_BUF_SIZE = 65536UL; static const uint64_t VALUE_BUF_SIZE = 65536UL;
static const int64_t MAX_MODE_BYTES = 32;
ObConfigParser *parser_; ObConfigParser *parser_;
char value_str_[VALUE_BUF_SIZE]; char value_str_[VALUE_BUF_SIZE];
char value_reboot_str_[VALUE_BUF_SIZE]; char value_reboot_str_[VALUE_BUF_SIZE];
@ -1307,6 +1309,16 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObConfigModeItem); DISALLOW_COPY_AND_ASSIGN(ObConfigModeItem);
}; };
class ObIConfigMode
{
public:
ObIConfigMode() {}
~ObIConfigMode() {}
virtual int set_value(const ObConfigModeItem &mode_item) = 0;
private:
DISALLOW_COPY_AND_ASSIGN(ObIConfigMode);
};
} // namespace common } // namespace common
} // namespace oceanbase } // namespace oceanbase

View File

@ -1061,7 +1061,7 @@ bool ObConfigSQLTlsVersionChecker::check(const ObConfigItem &t) const
0 == tmp_str.case_compare("TLSV1.3"); 0 == tmp_str.case_compare("TLSV1.3");
} }
int ObModeConfigParserUitl::parse_item_to_kv(char *item, ObString &key, ObString &value) int ObModeConfigParserUitl::parse_item_to_kv(char *item, ObString &key, ObString &value, const char* delim)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(item)) { if (OB_ISNULL(item)) {
@ -1070,7 +1070,7 @@ int ObModeConfigParserUitl::parse_item_to_kv(char *item, ObString &key, ObString
} else { } else {
// key // key
char *save_ptr = NULL; char *save_ptr = NULL;
char *key_ptr = STRTOK_R(item, "=", &save_ptr); char *key_ptr = STRTOK_R(item, delim, &save_ptr);
ObString tmp_key(key_ptr); ObString tmp_key(key_ptr);
key = tmp_key.trim(); key = tmp_key.trim();
// value // value
@ -1118,7 +1118,7 @@ int ObModeConfigParserUitl::format_mode_str(const char *src, int64_t src_len, ch
return ret; return ret;
} }
int ObModeConfigParserUitl::get_kv_list(char *str, ObIArray<std::pair<ObString, ObString>> &kv_list) int ObModeConfigParserUitl::get_kv_list(char *str, ObIArray<std::pair<ObString, ObString>> &kv_list, const char* delim)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(str)) { if (OB_ISNULL(str)) {
@ -1136,7 +1136,7 @@ int ObModeConfigParserUitl::get_kv_list(char *str, ObIArray<std::pair<ObString,
uint64_t len = strlen(token); uint64_t len = strlen(token);
while (len > 0 && token[len - 1] == ' ') token[--len] = '\0'; while (len > 0 && token[len - 1] == ' ') token[--len] = '\0';
// check and set mode // check and set mode
if (OB_FAIL(parse_item_to_kv(token, key, value))) { if (OB_FAIL(parse_item_to_kv(token, key, value, delim))) {
OB_LOG(WARN, "fail to check config item", K(ret)); OB_LOG(WARN, "fail to check config item", K(ret));
} else if (OB_FAIL(kv_list.push_back(std::make_pair(key, value)))) { } else if (OB_FAIL(kv_list.push_back(std::make_pair(key, value)))) {
OB_LOG(WARN, "fail to push back key and value pair", K(ret), K(key), K(value)); OB_LOG(WARN, "fail to push back key and value pair", K(ret), K(key), K(value));
@ -1256,5 +1256,60 @@ bool ObConfigArchiveLagTargetChecker::check(const uint64_t tenant_id, const ObAd
return is_valid; return is_valid;
} }
bool ObParallelDDLControlParser::parse(const char *str, uint8_t *arr, int64_t len)
{
bool bret = true;
ObParallelDDLControlMode ddl_mode;
if (OB_ISNULL(str) || OB_ISNULL(arr)) {
bret = false;
OB_LOG_RET(WARN, OB_ERR_UNEXPECTED, "Get config item failed", KP(str), KP(arr));
} else if (strlen(str) == 0) {
// do nothing
} else {
int tmp_ret = OB_SUCCESS;
ObSEArray<std::pair<ObString, ObString>, 1> kv_list;
int64_t str_len = strlen(str);
const int64_t buf_len = 3 * str_len; // need replace ',' to ' , '
char buf[buf_len];
MEMSET(buf, 0, sizeof(buf));
MEMCPY(buf, str, str_len);
if (OB_TMP_FAIL(ObModeConfigParserUitl::format_mode_str(str, str_len, buf, buf_len))) {
bret = false;
OB_LOG_RET(WARN, tmp_ret, "fail to format mode str", K(str));
} else if (OB_TMP_FAIL(ObModeConfigParserUitl::get_kv_list(buf, kv_list, ":"))) {
bret = false;
OB_LOG_RET(WARN, tmp_ret, "fail to get kv list", K(str));
} else {
for (int64_t i = 0; bret && i < kv_list.count(); ++i) {
uint8_t mode = MODE_ON;
if (kv_list.at(i).second.case_compare("on") == 0) {
mode = MODE_ON;
} else if (kv_list.at(i).second.case_compare("off") == 0) {
mode = MODE_OFF;
} else {
bret = false;
OB_LOG_RET(WARN, OB_INVALID_CONFIG, "unknown mode type", K(kv_list.at(i).second));
}
ObParallelDDLControlMode::ObParallelDDLType ddl_type = ObParallelDDLControlMode::MAX_TYPE;
if (!bret) {
// do nothing
} else if (OB_TMP_FAIL(ObParallelDDLControlMode::string_to_ddl_type(kv_list.at(i).first, ddl_type))) {
bret = false;
OB_LOG_RET(WARN, tmp_ret, "fail to trans string ddl_type", K(kv_list.at(i).first));
} else if (OB_TMP_FAIL(ddl_mode.set_parallel_ddl_mode(ddl_type, mode))) {
bret = false;
OB_LOG_RET(WARN, tmp_ret, "fail to set parallel ddl mode", K(ddl_type), K(mode));
}
}
}
}
if (bret) {
for (uint64_t i = 0; i < 8; ++i) {
arr[i] = static_cast<uint8_t>((ddl_mode.get_value() >> (i * 8)) & 0xFF);
}
}
return bret;
}
} // end of namepace common } // end of namepace common
} // end of namespace oceanbase } // end of namespace oceanbase

View File

@ -750,9 +750,9 @@ private:
class ObModeConfigParserUitl class ObModeConfigParserUitl
{ {
public: public:
// parse config item like: "xxx=yyy" // parse config item like: "xxx=yyy", "xxx:yyy"
static int parse_item_to_kv(char *item, ObString &key, ObString &value); static int parse_item_to_kv(char *item, ObString &key, ObString &value, const char* delim = "=");
static int get_kv_list(char *str, ObIArray<std::pair<ObString, ObString>> &kv_list); static int get_kv_list(char *str, ObIArray<std::pair<ObString, ObString>> &kv_list, const char* delim = "=");
// format str for split config item // format str for split config item
static int format_mode_str(const char *src, int64_t src_len, char *dst, int64_t dst_len); static int format_mode_str(const char *src, int64_t src_len, char *dst, int64_t dst_len);
}; };
@ -818,6 +818,21 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObConfigMigrationChooseSourceChecker); DISALLOW_COPY_AND_ASSIGN(ObConfigMigrationChooseSourceChecker);
}; };
class ObParallelDDLControlParser : public ObConfigParser
{
public:
ObParallelDDLControlParser() {}
virtual ~ObParallelDDLControlParser() {}
virtual bool parse(const char *str, uint8_t *arr, int64_t len) override;
public:
static const uint8_t MODE_ON = 0b00;
static const uint8_t MODE_OFF = 0b01;
private:
DISALLOW_COPY_AND_ASSIGN(ObParallelDDLControlParser);
};
typedef __ObConfigContainer<ObConfigStringKey, typedef __ObConfigContainer<ObConfigStringKey,
ObConfigItem, OB_MAX_CONFIG_NUMBER> ObConfigContainer; ObConfigItem, OB_MAX_CONFIG_NUMBER> ObConfigContainer;
} // namespace common } // namespace common

View File

@ -588,6 +588,10 @@ DEF_TIME(recyclebin_object_expire_time, OB_CLUSTER_PARAMETER, "0s", "[0s,)",
"default 0 that means auto purge recyclebin off. Range: [0s, +∞)", "default 0 that means auto purge recyclebin off. Range: [0s, +∞)",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_MODE_WITH_PARSER(_parallel_ddl_control, OB_TENANT_PARAMETER, "",
common::ObParallelDDLControlParser,
"switch for parallel capability of parallel DDL",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
// ========================= LogService Config Begin ===================== // ========================= LogService Config Begin =====================
DEF_CAP(log_disk_size, OB_CLUSTER_PARAMETER, "0M", "[0M,)", DEF_CAP(log_disk_size, OB_CLUSTER_PARAMETER, "0M", "[0M,)",

View File

@ -1230,6 +1230,109 @@ int ObSchemaUtils::is_drop_column_only(const AlterTableSchema &alter_table_schem
} }
return ret; return ret;
} }
const char* DDLType[]
{
"TRUNCATE_TABLE",
"SET_COMMENT",
"CREATE_INDEX",
"CREATE_VIEW"
};
int ObParallelDDLControlMode::string_to_ddl_type(const ObString &ddl_string, ObParallelDDLType &ddl_type)
{
int ret = OB_SUCCESS;
ddl_type = MAX_TYPE;
STATIC_ASSERT((ARRAYSIZEOF(DDLType)) == MAX_TYPE, "size count not match");
bool find = false;
for (uint64_t i = 0; !find && i < ARRAYSIZEOF(DDLType); i++) {
if (ddl_string.case_compare(DDLType[i]) == 0) {
find = true;
ddl_type = static_cast<ObParallelDDLType>(i);
}
}
if (OB_UNLIKELY(!find)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "unknown ddl_type", KR(ret), K(ddl_string));
}
return ret;
}
int ObParallelDDLControlMode::set_value(const ObConfigModeItem &mode_item)
{
int ret = OB_SUCCESS;
const uint8_t* values = mode_item.get_value();
if (OB_ISNULL(values)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "mode item's value_ is null ptr", KR(ret));
} else {
STATIC_ASSERT(((sizeof(value_)/sizeof(uint8_t) <= ObConfigModeItem::MAX_MODE_BYTES)),
"value_ size overflow");
STATIC_ASSERT( (MAX_TYPE * 2) <= (sizeof(value_) * 8), "type size overflow");
value_ = 0;
for (uint64_t i = 0; i < 8; ++i) {
value_ = (value_ | static_cast<uint64_t>(values[i]) << (8 * i));
}
}
return ret;
}
int ObParallelDDLControlMode::set_parallel_ddl_mode(const ObParallelDDLType type, const uint8_t mode)
{
int ret = OB_SUCCESS;
if ((TRUNCATE_TABLE <= type) && (type < MAX_TYPE)) {
uint64_t shift = static_cast<uint64_t>(type);
if (!check_mode_valid_(mode)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "mode invalid", KR(ret), K(mode));
} else {
uint64_t mask = MASK << (shift * MASK_SIZE);
value_ = (value_ & ~mask) | (static_cast<uint64_t>(mode) << (shift * MASK_SIZE));
}
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "type invalid", KR(ret), K(type));
}
return ret;
}
int ObParallelDDLControlMode::is_parallel_ddl(const ObParallelDDLType type, bool &is_parallel)
{
int ret = OB_SUCCESS;
is_parallel = true;
if ((TRUNCATE_TABLE <= type) && (type < MAX_TYPE)) {
uint64_t shift = static_cast<uint64_t>(type);
uint8_t value = static_cast<uint8_t>((value_ >> (shift * MASK_SIZE)) & MASK);
if (value == ObParallelDDLControlParser::MODE_OFF) {
is_parallel = false;
} else if (value == ObParallelDDLControlParser::MODE_ON) {
is_parallel = true;
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "invalid value unexpected", KR(ret), K(value));
}
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "type invalid", KR(ret), K(type));
}
return ret;
}
int ObParallelDDLControlMode::is_parallel_ddl_enable(const ObParallelDDLType ddl_type, const uint64_t tenant_id, bool &is_parallel)
{
int ret = OB_SUCCESS;
is_parallel = true;
ObParallelDDLControlMode cfg;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (OB_UNLIKELY(!tenant_config.is_valid())) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "invalid tenant config", KR(ret), K(tenant_id));
} else if (OB_FAIL(tenant_config->_parallel_ddl_control.init_mode(cfg))) {
LOG_WARN("init mode failed", KR(ret));
} else if (OB_FAIL(cfg.is_parallel_ddl(ddl_type, is_parallel))) {
LOG_WARN("fail to check is parallel ddl", KR(ret), K(ddl_type));
}
return ret;
}
bool ObSchemaUtils::can_add_column_group(const ObTableSchema &table_schema) bool ObSchemaUtils::can_add_column_group(const ObTableSchema &table_schema)
{ {

View File

@ -23,6 +23,7 @@
#include "share/schema/ob_schema_struct.h" #include "share/schema/ob_schema_struct.h"
#include "share/system_variable/ob_sys_var_class_type.h" #include "share/system_variable/ob_sys_var_class_type.h"
#include "common/sql_mode/ob_sql_mode.h" #include "common/sql_mode/ob_sql_mode.h"
#include "share/config/ob_config.h"
namespace oceanbase namespace oceanbase
{ {
@ -354,6 +355,33 @@ int64_t ObSchemaUtils::get_partition_array_convert_size(
return convert_size; return convert_size;
} }
class ObParallelDDLControlMode final : public ObIConfigMode
{
public:
ObParallelDDLControlMode(): value_(0) {}
enum ObParallelDDLType {
TRUNCATE_TABLE = 0,
SET_COMMENT = 1,
CREATE_INDEX = 2,
CREATE_VIEW = 3,
MAX_TYPE // can not > 32
};
static constexpr uint64_t MASK_SIZE = 2;
static constexpr uint64_t MASK = 0x03;
virtual int set_value(const ObConfigModeItem &mode_item) override;
uint64_t get_value() const { return value_; }
int set_parallel_ddl_mode(const ObParallelDDLType type, const uint8_t mode);
int is_parallel_ddl(const ObParallelDDLType type, bool &is_parallel);
static int is_parallel_ddl_enable(const ObParallelDDLType ddl_type, const uint64_t tenant_id, bool &is_parallel);
static int string_to_ddl_type(const ObString &ddl_string, ObParallelDDLType &ddl_type);
private:
bool check_mode_valid_(uint8_t mode) { return mode > MASK ? false : true; }
uint64_t value_;
DISALLOW_COPY_AND_ASSIGN(ObParallelDDLControlMode);
};
} // end schema } // end schema
} // end share } // end share
} // end oceanbase } // end oceanbase

View File

@ -2088,6 +2088,14 @@ int ObTruncateTableExecutor::check_use_parallel_truncate(const obrpc::ObTruncate
use_parallel_truncate = (table_schema->get_autoinc_column_id() == 0 && compat_version >= DATA_VERSION_4_1_0_0) use_parallel_truncate = (table_schema->get_autoinc_column_id() == 0 && compat_version >= DATA_VERSION_4_1_0_0)
|| compat_version >= DATA_VERSION_4_1_0_2; || compat_version >= DATA_VERSION_4_1_0_2;
} }
if (OB_FAIL(ret)) {
// do nothing
} else if (use_parallel_truncate
&& OB_FAIL(ObParallelDDLControlMode::is_parallel_ddl_enable(
ObParallelDDLControlMode::TRUNCATE_TABLE,
tenant_id, use_parallel_truncate))) {
LOG_WARN("fail to check whether is parallel truncate table", KR(ret), K(tenant_id));
}
return ret; return ret;
} }

View File

@ -401,6 +401,7 @@ _optimizer_better_inlist_costing
_optimizer_group_by_placement _optimizer_group_by_placement
_optimizer_skip_scan_enabled _optimizer_skip_scan_enabled
_optimizer_sortmerge_join_enabled _optimizer_sortmerge_join_enabled
_parallel_ddl_control
_parallel_max_active_sessions _parallel_max_active_sessions
_parallel_min_message_pool _parallel_min_message_pool
_parallel_redo_logging_trigger _parallel_redo_logging_trigger

View File

@ -22,3 +22,4 @@ rs_unittest(test_primary_ls_service)
rs_unittest(test_archive_checkpoint) rs_unittest(test_archive_checkpoint)
rs_unittest(test_import_table) rs_unittest(test_import_table)
rs_unittest(test_heartbeat_service) rs_unittest(test_heartbeat_service)
rs_unittest(test_parallel_ddl_control)

View File

@ -0,0 +1,77 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "share/schema/ob_schema_utils.h"
#include "lib/time/ob_time_utility.h"
#include "share/parameter/ob_parameter_macro.h"
#include <gtest/gtest.h>
namespace oceanbase
{
using namespace share;
using namespace share::schema;
namespace common
{
ObConfigContainer l_container;
static ObConfigContainer *local_container()
{
return &l_container;
}
class TestObParallelDDLControl : public ::testing::Test
{
public:
#undef OB_TENANT_PARAMETER
#define OB_TENANT_PARAMETER(args...) args
DEF_MODE_WITH_PARSER(_parallel_ddl_control, OB_TENANT_PARAMETER, "",
common::ObParallelDDLControlParser,
"switch for parallel capability of parallel DDL",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
#undef OB_TENANT_PARAMETER
};
TEST_F(TestObParallelDDLControl, test_parse)
{
ObParallelDDLControlParser parser;
uint8_t arr[32];
ASSERT_EQ(true, parser.parse("", arr, 32));
ASSERT_EQ(arr[0], 0b00000000);
MEMSET(arr, 0, 32);
ASSERT_EQ(true, parser.parse("truncate_table:off", arr, 32));
ASSERT_EQ(arr[0], 0b00000001);
}
TEST_F(TestObParallelDDLControl, testObParallelDDLControlMode)
{
ObParallelDDLControlMode ddl_mode;
_parallel_ddl_control.init_mode(ddl_mode);
bool is_parallel = false;
ASSERT_EQ(OB_SUCCESS, ddl_mode.is_parallel_ddl(ObParallelDDLControlMode::TRUNCATE_TABLE, is_parallel));
ASSERT_EQ(true, is_parallel);
ASSERT_EQ(OB_INVALID_ARGUMENT, ddl_mode.is_parallel_ddl(ObParallelDDLControlMode::MAX_TYPE, is_parallel));
ASSERT_EQ(true, _parallel_ddl_control.set_value("truncate_table:off"));
_parallel_ddl_control.init_mode(ddl_mode);
ASSERT_EQ(OB_SUCCESS, ddl_mode.is_parallel_ddl(ObParallelDDLControlMode::TRUNCATE_TABLE, is_parallel));
ASSERT_EQ(false, is_parallel);
}
} // common
} // oceanbase
int main(int argc, char **argv)
{
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}