Add _publish_schema_mode parameter

This commit is contained in:
tino247 2023-09-14 07:14:13 +00:00 committed by ob-robot
parent 864eccfc5c
commit bd31323f98
13 changed files with 88 additions and 40 deletions

View File

@ -256,7 +256,9 @@ int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response
}
}
// even try reload schema failed, we should continue do following things
int schema_ret = schema_updater_.try_reload_schema(lease_response.refresh_schema_info_);
const bool set_received_schema_version = false;
int schema_ret = schema_updater_.try_reload_schema(lease_response.refresh_schema_info_,
set_received_schema_version);
if (OB_SUCCESS != schema_ret) {
LOG_WARN("try reload schema failed", "schema_version", lease_response.schema_version_,

View File

@ -404,7 +404,9 @@ int ObServerSchemaUpdater::process_async_refresh_tasks(
return ret;
}
int ObServerSchemaUpdater::try_reload_schema(const ObRefreshSchemaInfo &schema_info)
int ObServerSchemaUpdater::try_reload_schema(
const ObRefreshSchemaInfo &schema_info,
const bool set_received_schema_version)
{
int ret = OB_SUCCESS;
@ -419,9 +421,10 @@ int ObServerSchemaUpdater::try_reload_schema(const ObRefreshSchemaInfo &schema_i
// Here, we ignore error since set_tenant_received_broadcast_version() may fail before tenant firstly refresh schema.
int tmp_ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID != schema_info.get_tenant_id()
&& schema_info.get_schema_version() > 0 ) {
// && OB_SUCCESS != (tmp_ret = schema_mgr_->set_tenant_received_broadcast_version(
// schema_info.get_tenant_id(), schema_info.get_schema_version()))) {
&& schema_info.get_schema_version() > 0
&& set_received_schema_version
&& OB_TMP_FAIL(schema_mgr_->set_tenant_received_broadcast_version(
schema_info.get_tenant_id(), schema_info.get_schema_version()))) {
LOG_WARN("fail to set tenant received broadcast version", K(tmp_ret), K(schema_info));
}

View File

@ -103,7 +103,8 @@ public:
void stop();
void wait();
int try_reload_schema(const share::schema::ObRefreshSchemaInfo &schema_info);
int try_reload_schema(const share::schema::ObRefreshSchemaInfo &schema_info,
const bool set_received_schema_version);
int try_release_schema();
int async_refresh_schema(
const uint64_t tenant_id,

View File

@ -1305,13 +1305,12 @@ int ObService::switch_schema(
} else if (OB_UNLIKELY(schema_version <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(schema_version));
//FIXME:(yanmu.ztl)
// As a temporary solution to reduce schema error after execute ddl,
// try the best to refresh schema in cluster synchronously.
//} else if (!arg.force_refresh_) {
// if (OB_FAIL(schema_updater_.try_reload_schema(schema_info))) {
// LOG_WARN("reload schema failed", K(schema_info), K(ret));
// }
} else if (arg.is_async_) {
const bool set_received_schema_version = true;
if (OB_FAIL(schema_updater_.try_reload_schema(
schema_info, set_received_schema_version))) {
LOG_WARN("reload schema failed", KR(ret), K(schema_info));
}
} else {
ObSEArray<uint64_t, 1> tenant_ids;
ObMultiVersionSchemaService *schema_service = gctx_.schema_service_;
@ -1328,25 +1327,6 @@ int ObService::switch_schema(
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(GCONF.rpc_timeout, abs_timeout))) {
LOG_WARN("fail to get abs timeout", KR(ret), "default_timeout", static_cast<int64_t>(GCONF.rpc_timeout));
} else {
/*
bool need_retry = arg.force_refresh_; // sync refresh schema should retry until timeout
do {
int tmp_ret = OB_SUCCESS;
if (ObTimeUtility::current_time() >= abs_timeout) {
ret = OB_TIMEOUT;
LOG_WARN("already timeout", KR(ret), K(abs_timeout));
} else if (OB_TMP_FAIL(schema_service->refresh_and_add_schema(tenant_ids))) {
LOG_WARN("fail to refresh schema", KR(tmp_ret), K(schema_info), K(tenant_ids));
if (need_retry) {
ob_usleep(100 * 1000L); // 100ms
} else {
ret = tmp_ret;
}
} else {
break;
}
} while (OB_SUCC(ret));
*/
// To set the received_schema_version period in advance,
// let refresh_schema can execute before analyze_dependencies logic;
int64_t LEFT_TIME = 200 * 1000;// 200ms
@ -1781,7 +1761,9 @@ int ObService::wait_master_key_in_sync(
LOG_WARN("fail to convert tenant max key version", KR(ret), K(wms_in_sync_arg));
} else {
ObRefreshSchemaInfo schema_info;
if (OB_FAIL(schema_updater_.try_reload_schema(schema_info))) {
const bool set_received_schema_version = false;
if (OB_FAIL(schema_updater_.try_reload_schema(
schema_info, set_received_schema_version))) {
LOG_WARN("fail to try reload schema", KR(ret));
} else if (OB_FAIL(trigger_tenant_config(wms_in_sync_arg))) {
LOG_WARN("fail to got versions", KR(ret));

View File

@ -26388,7 +26388,16 @@ int ObDDLService::notify_refresh_schema(const ObAddrIArray &addrs)
schema_info.set_schema_version(schema_version);
}
LOG_INFO("try to notify refresh schema", K(schema_version), K(local_schema_info), K(schema_info));
bool is_async = false;
if (OB_INVALID_TENANT_ID != schema_info.get_tenant_id()) {
omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(
schema_info.get_tenant_id()));
if (tenant_config.is_valid()) {
is_async = (0 == tenant_config->_publish_schema_mode.case_compare(PUBLISH_SCHEMA_MODE_ASYNC));
}
}
LOG_INFO("try to notify refresh schema", K(is_async), K(schema_version), K(local_schema_info), K(schema_info));
const int64_t rpc_timeout = GCONF.rpc_timeout;
int64_t timeout = 0;
FOREACH_X(s, server_list, OB_SUCC(ret)) {
@ -26414,6 +26423,9 @@ int ObDDLService::notify_refresh_schema(const ObAddrIArray &addrs)
timeout = std::min(THIS_WORKER.get_timeout_remain(), rpc_timeout);
}
arg.force_refresh_ = found;
if (!arg.force_refresh_) {
arg.is_async_ = is_async;
}
// overwrite ret
if (FAILEDx(proxy.call(*s, timeout, arg))) {
LOG_WARN("send switch schema rpc failed", KR(ret),

View File

@ -174,6 +174,11 @@ public:
ObLatchRGuard rd_guard(const_cast<ObLatch&>(lock_), ObLatchIds::CONFIG_LOCK);
return value_ptr();
}
int case_compare(const char* str) const
{
ObLatchRGuard rd_guard(const_cast<ObLatch&>(lock_), ObLatchIds::CONFIG_LOCK);
return ObString::make_string(value_ptr()).case_compare(str);
}
virtual const char *spfile_str() const
{
const char *ret = nullptr;
@ -920,6 +925,11 @@ public:
ObLatchRGuard rd_guard(const_cast<ObLatch&>(lock_), ObLatchIds::CONFIG_LOCK);
return ObString::make_string(value_str_);
}
int case_compare(const char *str) const
{
ObLatchRGuard rd_guard(const_cast<ObLatch&>(lock_), ObLatchIds::CONFIG_LOCK);
return ObString::make_string(value_str_).case_compare(str);
}
int copy(char *buf, const int64_t buf_len); // '\0' will be added
int deep_copy_value_string(ObIAllocator &allocator, ObString &dst);
virtual ObConfigItemType get_config_item_type() const {

View File

@ -31,6 +31,7 @@
#include "src/observer/ob_server.h"
#include "share/table/ob_table_config_util.h"
#include "share/config/ob_config_mode_name_def.h"
#include "share/schema/ob_schema_struct.h"
namespace oceanbase
{
using namespace share;
@ -380,6 +381,12 @@ bool ObTTLDutyDurationChecker::check(const ObConfigItem& t) const
return OB_SUCCESS == common::ObTTLUtil::parse(t.str(), duty_duration) && duty_duration.is_valid();
}
bool ObConfigPublishSchemaModeChecker::check(const ObConfigItem& t) const
{
return 0 == t.case_compare(PUBLISH_SCHEMA_MODE_BEST_EFFORT)
|| 0 == t.case_compare(PUBLISH_SCHEMA_MODE_ASYNC);
}
bool ObConfigMemoryLimitChecker::check(const ObConfigItem &t) const
{
bool is_valid = false;

View File

@ -472,6 +472,17 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObTTLDutyDurationChecker);
};
class ObConfigPublishSchemaModeChecker
: public ObConfigChecker
{
public:
ObConfigPublishSchemaModeChecker() {}
virtual ~ObConfigPublishSchemaModeChecker() {}
bool check(const ObConfigItem& t) const;
private:
DISALLOW_COPY_AND_ASSIGN(ObConfigPublishSchemaModeChecker);
};
// config item container
class ObConfigStringKey
{

View File

@ -3784,17 +3784,19 @@ void ObSwitchSchemaArg::reset()
{
schema_info_.reset();
force_refresh_ = false;
is_async_ = false;
}
DEF_TO_STRING(ObSwitchSchemaArg)
{
int64_t pos = 0;
J_KV(K_(schema_info),
K_(force_refresh));
K_(force_refresh),
K_(is_async));
return pos;
}
OB_SERIALIZE_MEMBER(ObSwitchSchemaArg, schema_info_, force_refresh_);
OB_SERIALIZE_MEMBER(ObSwitchSchemaArg, schema_info_, force_refresh_, is_async_);
DEF_TO_STRING(ObSwitchLeaderArg)
{

View File

@ -4379,10 +4379,17 @@ struct ObSwitchSchemaArg
{
OB_UNIS_VERSION(1);
public:
explicit ObSwitchSchemaArg() : schema_info_(), force_refresh_(false) {}
explicit ObSwitchSchemaArg(const share::schema::ObRefreshSchemaInfo &schema_info,
bool force_refresh)
: schema_info_(schema_info), force_refresh_(force_refresh) {}
explicit ObSwitchSchemaArg()
: schema_info_(),
force_refresh_(false),
is_async_(false) {}
explicit ObSwitchSchemaArg(
const share::schema::ObRefreshSchemaInfo &schema_info,
const bool force_refresh,
const bool is_async)
: schema_info_(schema_info),
force_refresh_(force_refresh),
is_async_(is_async) {}
~ObSwitchSchemaArg() {}
void reset();
bool is_valid() const { return schema_info_.get_schema_version() > 0; }
@ -4391,6 +4398,7 @@ public:
share::schema::ObRefreshSchemaInfo schema_info_;
bool force_refresh_;
bool is_async_;
};
struct ObLSTabletPair final

View File

@ -138,6 +138,12 @@ DEF_TIME(schema_history_expire_time, OB_CLUSTER_PARAMETER, "7d", "[1m, 30d]",
"with default 7days. Range: [1h, 30d]",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR_WITH_CHECKER(_publish_schema_mode, OB_TENANT_PARAMETER, "BEST_EFFORT",
common::ObConfigPublishSchemaModeChecker,
"specify the inspection of schema synchronous status after ddl transaction commits"
"values: BEST_EFFORT, ASYNC",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR_WITH_CHECKER(default_compress_func, OB_CLUSTER_PARAMETER, "zstd_1.3.8",
common::ObConfigCompressFuncChecker,
"default compress function name for create new table, "

View File

@ -146,6 +146,9 @@ static const int64_t SCHEMA_MALLOC_BLOCK_SIZE = 128;
static const int64_t SCHEMA_MID_MALLOC_BLOCK_SIZE = 256;
static const int64_t SCHEMA_BIG_MALLOC_BLOCK_SIZE = 1024;
static const char* PUBLISH_SCHEMA_MODE_BEST_EFFORT = "BEST_EFFORT";
static const char* PUBLISH_SCHEMA_MODE_ASYNC = "ASYNC";
//-------enum defenition
enum ObTableLoadType
{

View File

@ -347,6 +347,7 @@ _parallel_server_sleep_time
_pipelined_table_function_memory_limit
_print_sample_ppm
_private_buffer_size
_publish_schema_mode
_pushdown_storage_level
_px_bloom_filter_group_size
_px_chunklist_count_ratio