Support monotonic cross-machine data_version.

This commit is contained in:
fforkboat
2024-06-24 08:16:40 +00:00
committed by ob-robot
parent b7f4eb6aa2
commit 9865419f35
52 changed files with 2047 additions and 392 deletions

View File

@ -23,6 +23,8 @@ ob_set_subtarget(oblib_common common
ob_target_specific.cpp
ob_timeout_ctx.cpp
ob_zone_type.cpp
ob_version_def.cpp
ob_tenant_data_version_mgr.cpp
)
ob_set_subtarget(oblib_common common_mixed

View File

@ -276,6 +276,7 @@ DEFINE_DESERIALIZE(ObRecordHeader)
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &data_checksum_))) {
COMMON_LOG(WARN, "encode data failed..", KP(buf), K_(data_checksum), K(ret));
}
// 后续新增成员时,应该使用header_length_作为反序列化长度的限制,以保证升级兼容
return ret;
}

View File

@ -0,0 +1,572 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "common/ob_tenant_data_version_mgr.h"
#include "common/ob_record_header.h"
#define DV_ILOG_F(fmt, args...) COMMON_LOG(INFO, "[DATA_VERSION] " fmt, ##args)
#define DV_TLOG(fmt, args...) COMMON_LOG(TRACE, "[DATA_VERSION] " fmt, ##args)
namespace oceanbase
{
namespace common
{
ObTenantDataVersionMgr& ObTenantDataVersionMgr::get_instance()
{
static ObTenantDataVersionMgr mgr;
return mgr;
}
int ObTenantDataVersionMgr::init(bool enable_compatible_monotonic)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(map_.create(TENANT_DATA_VERSION_BUCKET_NUM, lib::ObLabel("TenantDVMap")))) {
COMMON_LOG(WARN, "fail to create TenantDataVersionMgr map", K(ret));
} else {
is_inited_ = true;
mock_data_version_ = 0;
enable_compatible_monotonic_ = enable_compatible_monotonic;
}
return ret;
}
int ObTenantDataVersionMgr::get(const uint64_t tenant_id, uint64_t &data_version) const
{
int ret = OB_SUCCESS;
ObTenantDataVersion *version = NULL;
data_version = 0;
if (OB_UNLIKELY(mock_data_version_ != 0)) {
data_version = ATOMIC_LOAD(&mock_data_version_);
DV_TLOG("mock_data_version is set", K(tenant_id), K(data_version));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "ObTenantDataVersionMgr doesn't init", K(ret), K(tenant_id));
} else if (OB_FAIL(map_.get_refactored(tenant_id, version))) {
if (OB_HASH_NOT_EXIST == ret) {
if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) {
data_version = LAST_BARRIER_DATA_VERSION;
ret = OB_SUCCESS;
COMMON_LOG(WARN, "data_version fallback to LAST_BARRIER_DATA_VERSION",
K(tenant_id), K(data_version));
} else {
ret = OB_ENTRY_NOT_EXIST;
}
} else {
COMMON_LOG(WARN, "fail to get data_version", K(ret), K(tenant_id));
}
} else if (NULL == version) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "data_version is NULL", K(ret));
} else if (version->is_removed()) {
ret = OB_ENTRY_NOT_EXIST;
} else {
data_version = version->get_version();
}
return ret;
}
int ObTenantDataVersionMgr::set(const uint64_t tenant_id, const uint64_t data_version)
{
int ret = OB_SUCCESS;
ObTenantDataVersion *version = NULL;
bool need_to_set = false;
if (OB_UNLIKELY(mock_data_version_ != 0)) {
// do nothing
DV_TLOG("mock_data_version is set", K(tenant_id), K(data_version), K(mock_data_version_));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "ObTenantDataVersionMgr doesn't init", K(ret),
K(tenant_id));
} else if (OB_FAIL(map_.get_refactored(tenant_id, version))) {
if (OB_HASH_NOT_EXIST == ret) {
need_to_set = true;
ret = OB_SUCCESS;
} else {
COMMON_LOG(WARN, "fail to get data_version", K(ret), K(tenant_id));
}
} else if (NULL == version) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "data_version is NULL", K(ret));
} else if (version->is_removed() || data_version <= version->get_version()) {
// if the tenant is dropped or the new data_version is smaller, then
// no need to set
} else {
need_to_set = true;
}
if (OB_SUCC(ret) && need_to_set) {
SpinWLockGuard guard(lock_);
if (OB_FAIL(set_(tenant_id, data_version))) {
COMMON_LOG(WARN, "fail to set data_version", K(ret), K(tenant_id), K(data_version));
}
}
return ret;
}
int ObTenantDataVersionMgr::remove(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObTenantDataVersion *version = NULL;
const int64_t remove_ts = ObTimeUtility::current_time();
SpinWLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "ObTenantDataVersionMgr doesn't init", K(ret), K(tenant_id));
} else if (OB_FAIL(map_.get_refactored(tenant_id, version))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
COMMON_LOG(WARN, "fail to get data_version", K(ret), K(tenant_id));
}
} else if (NULL == version) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "data_version is NULL", K(ret));
} else if (version->is_removed()) {
// already removed
} else if (OB_FAIL(remove_and_dump_to_file_(tenant_id, remove_ts))) {
COMMON_LOG(WARN, "fail to dump data_version file", K(ret), K(tenant_id));
} else {
version->set_removed(true, remove_ts);
DV_ILOG_F("Tenant DATA_VERSION is removed", K(ret), K(tenant_id), K(remove_ts), K(*version));
}
return ret;
}
int ObTenantDataVersionMgr::load_from_file()
{
int ret = OB_SUCCESS;
int fd = 0;
const char *file_path = TENANT_DATA_VERSION_FILE_PATH;
SpinWLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "ObTenantDataVersionMgr doesn't init", K(ret));
} else if ((fd = ::open(file_path, O_RDONLY)) < 0) {
if (ENOENT != errno) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to open data_version file", K(ret), K(errno), K(file_path));
} else {
// when errno is ENOENT, the file does not exist
COMMON_LOG(WARN, "data_version file doesn't exist, skip load");
}
} else {
char *load_buf = NULL;
int64_t buf_size = TENANT_DATA_VERSION_FILE_MAX_SIZE;
PageArena<> pa;
if (OB_ISNULL(load_buf = pa.alloc(buf_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(ERROR, "fail to alloc buf", K(ret), K(buf_size));
} else {
size_t read_len = ::read(fd, load_buf, buf_size);
if (read_len < 0) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to read data_version file", K(ret), K(read_len), K(errno), K(fd));
} else {
// deserialize header
// checksum check
// load data_version
ObRecordHeader header;
int64_t pos = 0;
if (OB_FAIL(header.deserialize(load_buf, read_len, pos))) {
COMMON_LOG(ERROR, "deserialize header failed", K(ret), K(read_len), K(pos));
} else {
const int64_t header_length = header.header_length_;
const int64_t data_length = read_len - header_length;
const char *const p_data = load_buf + header_length;
if (data_length <= 0 || data_length != header.data_zlength_) {
ret = OB_INVALID_DATA;
COMMON_LOG(ERROR, "invalid data length", K(ret), K(header_length),
K(data_length), K(buf_size), K(read_len), K(header));
} else if (OB_FAIL(header.check_header_checksum())) {
COMMON_LOG(ERROR, "check header checksum failed", K(ret), K(header));
} else if (OB_CONFIG_MAGIC != header.magic_) {
ret = OB_INVALID_DATA;
COMMON_LOG(ERROR, "check magic number failed", K(ret),
K_(header.magic));
} else if (OB_FAIL(header.check_payload_checksum(p_data, data_length))) {
COMMON_LOG(ERROR, "check data checksum failed", K(ret));
} else {
while (OB_SUCC(ret)) {
ret = load_data_version_(load_buf, pos);
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
}
}
}
if (0 != close(fd)) {
if (OB_SUCC(ret)) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to close data_version file fd", K(ret), K(errno), K(fd));
}
}
}
COMMON_LOG(INFO, "[DATA_VERSION] load data_version file", K(ret), K(map_.size()));
return ret;
}
int ObTenantDataVersionMgr::set_(const uint64_t tenant_id,
const uint64_t data_version)
{
int ret = OB_SUCCESS;
ObTenantDataVersion *version = NULL;
bool need_to_insert = false;
uint64_t old_version = 0;
if (OB_FAIL(map_.get_refactored(tenant_id, version))) {
if (OB_HASH_NOT_EXIST == ret) {
need_to_insert = true;
version = NULL;
old_version = 0;
ret = OB_SUCCESS;
} else {
COMMON_LOG(WARN, "fail to get result from data_version_map", K(ret),
K(tenant_id), K(data_version));
}
} else if (NULL == version) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "data_version is NULL", K(ret));
} else {
old_version = version->get_version();
}
if (OB_SUCC(ret)) {
if (OB_NOT_NULL(version) && (version->is_removed() || data_version <= old_version)) {
COMMON_LOG(INFO,
"tenant is removed or new data_version is not bigger than old "
"value, no need to update tenant data_version", K(tenant_id),
"is_removed", version->is_removed(),
"old_version", old_version,
"new_version", data_version);
} else if (OB_FAIL(set_and_dump_to_file_(tenant_id, data_version, need_to_insert))) {
COMMON_LOG(WARN, "fail to dump data_version file", K(ret), K(tenant_id));
} else {
if (need_to_insert) {
void *version_buf = NULL;
if (OB_ISNULL(version_buf = allocator_.alloc(sizeof(ObTenantDataVersion)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(ERROR, "fail to alloc buf", K(ret), K(tenant_id),
K(sizeof(ObTenantDataVersion)));
} else if (FALSE_IT(version = new (version_buf) ObTenantDataVersion(data_version))) {
} else if (OB_FAIL(map_.set_refactored(tenant_id, version))) {
COMMON_LOG(WARN, "fail to set data_version", K(ret), K(tenant_id));
}
} else {
version->set_version(data_version);
}
DV_ILOG_F("Tenant DATA_VERSION is changed", K(ret), K(tenant_id), "old_version", old_version,
"new_version", data_version);
}
}
return ret;
}
int ObTenantDataVersionMgr::set_and_dump_to_file_(const uint64_t tenant_id,
const uint64_t data_version,
const bool need_to_insert) {
int ret = OB_SUCCESS;
ObRecordHeader header;
int64_t header_length = header.get_serialize_size();
char *dump_buf = NULL;
// we may need to insert a new entry to the map, so map_size + 1 to ensure the memory is enough
int64_t buf_length = header_length + (map_.size() + 1) * ObTenantDataVersion::MAX_DUMP_BUF_SIZE;
PageArena<> pa;
if (OB_ISNULL(dump_buf = pa.alloc(buf_length))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(ERROR, "fail to alloc buf", K(ret), K(tenant_id), K(buf_length));
} else {
int64_t pos = 0;
const int64_t data_pos = pos + header_length;
pos += header_length;
ObTenantDataVersionMap::const_iterator it = map_.begin();
for (; it != map_.end() && OB_SUCC(ret); ++it) {
const uint64_t iter_tenant_id = it->first;
const ObTenantDataVersion *iter_version = it->second;
if (OB_ISNULL(iter_version)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "iter_version is null", K(iter_tenant_id));
} else if (iter_tenant_id == tenant_id) {
if (OB_FAIL(dump_data_version_(
dump_buf, buf_length, pos, iter_tenant_id,
iter_version->is_removed(),
iter_version->get_remove_timestamp(),
data_version))) {
COMMON_LOG(WARN, "fail to dump data_version", K(ret),
K(iter_tenant_id), K(data_version), K(*iter_version));
}
} else {
if (OB_FAIL(dump_data_version_(dump_buf, buf_length, pos, iter_tenant_id,
iter_version->is_removed(),
iter_version->get_remove_timestamp(),
iter_version->get_version()))) {
COMMON_LOG(WARN, "fail to dump data_version", K(ret),
K(iter_tenant_id), K(*iter_version));
}
}
}
if (OB_SUCC(ret) && need_to_insert) {
if (OB_FAIL(dump_data_version_(dump_buf, buf_length, pos, tenant_id,
false /*removed*/, 0 /*remove_ts*/,
data_version))) {
COMMON_LOG(WARN, "fail to dump data_version", K(ret), K(tenant_id), K(data_version));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(write_to_file_(dump_buf, buf_length, pos - data_pos))) {
COMMON_LOG(WARN, "fail to write data_version file", K(ret), K(tenant_id));
}
}
return ret;
}
int ObTenantDataVersionMgr::remove_and_dump_to_file_(const uint64_t tenant_id,
const int64_t remove_ts) {
int ret = OB_SUCCESS;
ObRecordHeader header;
int64_t header_length = header.get_serialize_size();
char *dump_buf = NULL;
int64_t buf_length = header_length + map_.size() * ObTenantDataVersion::MAX_DUMP_BUF_SIZE;
PageArena<> pa;
if (OB_ISNULL(dump_buf = pa.alloc(buf_length))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(ERROR, "fail to alloc buf", K(ret), K(tenant_id), K(buf_length));
} else {
int64_t pos = 0;
const int64_t data_pos = pos + header_length;
pos += header_length;
ObTenantDataVersionMap::const_iterator it = map_.begin();
for (; it != map_.end() && OB_SUCC(ret); ++it) {
const uint64_t iter_tenant_id = it->first;
const ObTenantDataVersion *iter_version = it->second;
if (OB_ISNULL(iter_version)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "iter_version is null", K(iter_tenant_id));
} else if (iter_tenant_id == tenant_id) {
if (OB_FAIL(dump_data_version_(dump_buf, buf_length, pos, iter_tenant_id,
true /*removed*/,
remove_ts,
iter_version->get_version()))) {
COMMON_LOG(WARN, "fail to dump data_version", K(ret),
K(iter_tenant_id), K(remove_ts), K(*iter_version));
}
} else {
if (OB_FAIL(dump_data_version_(dump_buf, buf_length, pos,
iter_tenant_id,
iter_version->is_removed(),
iter_version->get_remove_timestamp(),
iter_version->get_version()))) {
COMMON_LOG(WARN, "fail to dump data_version", K(ret),
K(iter_tenant_id), K(*iter_version));
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(write_to_file_(dump_buf, buf_length, pos - data_pos))) {
COMMON_LOG(WARN, "fail to write data_version file", K(ret), K(tenant_id));
}
}
return ret;
}
int64_t ObTenantDataVersionMgr::get_max_dump_buf_size_() const
{
ObRecordHeader header;
int64_t header_size = header.get_serialize_size();
// we may need to insert a new entry to the map
int64_t data_size = (map_.size() + 1) * ObTenantDataVersion::MAX_DUMP_BUF_SIZE;
return header_size + data_size;
}
int ObTenantDataVersionMgr::dump_data_version_(char *buf, int64_t buf_length, int64_t &pos,
const uint64_t tenant_id,
const bool removed,
const int64_t remove_ts,
const uint64_t data_version) {
int ret = OB_SUCCESS;
char tmp_buf[ObTenantDataVersion::MAX_DUMP_BUF_SIZE]{0};
char version_str[OB_SERVER_VERSION_LENGTH]{0};
int res = 0;
if (OB_INVALID_INDEX ==
VersionUtil::print_version_str(version_str, OB_SERVER_VERSION_LENGTH, data_version)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "fail to print data_version str", K(ret), K(data_version));
} else if (OB_FAIL(databuff_printf(
buf, buf_length, pos, ObTenantDataVersion::DUMP_BUF_FORMAT,
tenant_id, version_str, data_version, removed, remove_ts))) {
COMMON_LOG(WARN, "fail to printf", K(ret), K(tenant_id), K(buf_length), K(pos));
} else if (pos >= buf_length) {
ret = OB_SIZE_OVERFLOW;
COMMON_LOG(WARN, "buffer size overflow", K(ret), K(tenant_id), K(buf_length), K(pos));
} else {
// we use '\n' as the separator of tenant, we'll use STRTOK to parse this
buf[pos] = '\n';
pos += 1;
}
return ret;
}
int ObTenantDataVersionMgr::load_data_version_(char *buf, int64_t &pos) {
int ret = OB_SUCCESS;
ObTenantDataVersion *version = NULL;
uint64_t tenant_id = 0;
int removed = false;
uint64_t remove_timestamp = 0;
uint64_t version_val = 0;
char version_str[OB_SERVER_VERSION_LENGTH]{0};
int res = 0;
const int expected_item_size = 5;
char *saveptr = NULL;
char *token = NULL;
if (NULL == buf) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "buf is null", K(ret), K(pos));
} else if (NULL == (token = STRTOK_R(buf + pos, "\n", &saveptr))) {
ret = OB_ITER_END;
} else {
res = sscanf(token, ObTenantDataVersion::DUMP_BUF_FORMAT, &tenant_id,
version_str, &version_val, &removed, &remove_timestamp);
if (res != expected_item_size) {
ret = OB_INVALID_DATA;
COMMON_LOG(ERROR, "fail to parse data_version", K(ret), K(res), K(pos),
K(token), K(tenant_id), K(version_val), K(version_str),
K(removed), K(remove_timestamp));
} else {
COMMON_LOG(INFO, "[DATA_VERSION] successfully parse data_version",
K(tenant_id), K(version_val), K(version_str), K(removed),
K(remove_timestamp), K(pos));
void *version_buf = NULL;
if (OB_ISNULL(version_buf =
allocator_.alloc(sizeof(ObTenantDataVersion)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COMMON_LOG(ERROR, "fail to alloc buf", K(ret), K(sizeof(ObTenantDataVersion)));
} else if (FALSE_IT(version = new (version_buf) ObTenantDataVersion(
removed, remove_timestamp, version_val))) {
} else if (OB_FAIL(map_.set_refactored(tenant_id, version))) {
COMMON_LOG(WARN, "fail to set data_version", K(ret), K(tenant_id), K(version));
} else {
pos += (saveptr - token);
}
}
}
return ret;
}
int ObTenantDataVersionMgr::write_to_file_(char *buf, int64_t buf_length, int64_t data_length)
{
int ret = OB_SUCCESS;
int fd = 0;
ObRecordHeader header;
const int64_t header_length = header.get_serialize_size();
const int64_t total_length = header_length + data_length;
const int64_t max_length = TENANT_DATA_VERSION_FILE_MAX_SIZE;
int64_t header_pos = 0;
if (total_length > buf_length || total_length > max_length) {
ret = OB_INVALID_DATA;
COMMON_LOG(WARN, "dump buffer overflow", K(ret), K(total_length),
K(data_length), K(buf_length), K(max_length));
} else {
header.magic_ = OB_CONFIG_MAGIC;
header.header_length_ = static_cast<int16_t>(header_length);
header.version_ = OB_CONFIG_VERSION;
header.data_length_ = static_cast<int32_t>(data_length);
header.data_zlength_ = header.data_length_;
header.data_checksum_ = ob_crc64(buf + header_length, data_length);
header.set_header_checksum();
if (OB_FAIL(header.serialize(buf, buf_length, header_pos))) {
COMMON_LOG(WARN, "fail to serialize header", K(ret), K(header), K(buf_length), K(header_pos));
} else {
const char *file_path = TENANT_DATA_VERSION_FILE_PATH;
char tmp_path[MAX_PATH_SIZE]{0};
char hist_path[MAX_PATH_SIZE]{0};
if (OB_FAIL(databuff_printf(tmp_path, MAX_PATH_SIZE, "%s.tmp", file_path))) {
COMMON_LOG(WARN, "fail to printf", K(ret));
} else if (OB_FAIL(databuff_printf(hist_path, MAX_PATH_SIZE, "%s.history", file_path))) {
COMMON_LOG(WARN, "fail to printf", K(ret));
} else if ((fd = ::open(tmp_path, O_WRONLY | O_CREAT | O_TRUNC,
S_IRUSR | S_IWUSR | S_IRGRP)) < 0) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to open data_version file", K(ret), K(errno),
K(fd), K(total_length), K(tmp_path));
} else if (total_length != ::write(fd, buf, total_length)) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to write data_version file", K(ret), K(errno),
K(fd), K(total_length));
if (0 != ::close(fd)) {
COMMON_LOG(WARN, "fail to close data_version file fd", K(ret), K(errno),
K(fd), K(total_length));
}
} else if (0 != ::fsync(fd)) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to sync data_version file", K(ret), K(errno),
K(fd), K(total_length));
if (0 != ::close(fd)) {
COMMON_LOG(WARN, "fail to close data_version file fd", K(ret), K(errno),
K(fd), K(total_length));
}
} else if (0 != ::close(fd)) {
ret = OB_IO_ERROR;
COMMON_LOG(WARN, "fail to close data_version file fd", K(ret), K(errno),
K(fd), K(total_length));
}
if (OB_SUCC(ret)) {
if (0 != ::rename(file_path, hist_path) && errno != ENOENT) {
// it's OK to continue if we fail to backup history file, so we ignore the err ret here
COMMON_LOG(WARN, "fail to backup history config file", KERRMSG, K(ret));
}
// 运行到这里的时候可能掉电,导致没有 conf 文件,需要 DBA 手工拷贝 tmp 文件到这里
if (0 != ::rename(tmp_path, file_path)) {
ret = OB_ERR_SYS;
COMMON_LOG(WARN, "fail to move tmp config file", KERRMSG, K(ret));
}
}
}
COMMON_LOG(INFO, "[DATA_VERSION] write data_version file", K(ret),
K(header_length), K(data_length), K(total_length));
}
return ret;
}
} // namespace common
} // namespace oceanbase

View File

@ -0,0 +1,154 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_OB_TENANT_DATA_VERSION_H_
#define OCEANBASE_OBSERVER_OB_TENANT_DATA_VERSION_H_
#include "lib/ob_define.h"
#include "lib/hash/ob_hashmap.h"
#include "common/ob_version_def.h"
namespace oceanbase
{
namespace common
{
/**
* ObTenantDataVersionMgr maintains the data_version of all tenants by a
* hashmap<tenant_id, data_version>.
*
* The map<tenant_id, data_version> is persisted in a disk file, and whenever
* the data_version of a tenant need to be updated, the disk file will be
* modified before the map updated in memory, so we can ensure the data_version
* will not go back in this machine.
*
* The path of the disk file is $observer_home/etc/observer.data_version.bin.
* This file is composed of a header part and a data part.
* | ---------------------HEADER (Not Readable)---------------------- |
* | ObRecordHeader(magic_number, length, checksum...) |
* | ------------------------DATA (Readable)------------------------- |
* | tenant_id: version_str version_val removed remove_timestamp\n |
* | 1001: 4.3.0.1 17180065793 0 0\n |
* | 1002: 4.3.0.1 17180065793 0 0\n |
* | ... |
* | ------------------------------------------------------------------
* The insert/update/remove operations will set a mgr-level write lock first,
* however the get operation can access the map directly without a lock,
* therefore the overhead of the get operation is relatively small.
*
*/
class ObTenantDataVersionMgr
{
public:
ObTenantDataVersionMgr()
: is_inited_(false), allocator_(lib::ObLabel("TenantDVMgr")),
mock_data_version_(0), enable_compatible_monotonic_(false) {}
~ObTenantDataVersionMgr() {}
static ObTenantDataVersionMgr& get_instance();
int init(bool enable_compatible_monotonic);
int get(const uint64_t tenant_id, uint64_t &data_version) const;
/**
* update the tenant's data_version, if the tenant is not in the mgr yet,
* insert the entry instead.
*/
int set(const uint64_t tenant_id, const uint64_t data_version);
int remove(const uint64_t tenant_id);
int load_from_file();
bool is_enable_compatible_monotonic() {
return ATOMIC_LOAD(&enable_compatible_monotonic_);
}
void set_enable_compatible_monotonic(bool enable) {
ATOMIC_STORE(&enable_compatible_monotonic_, enable);
}
// for unittest
void set_mock_data_version(const uint64_t data_version) {
ATOMIC_SET(&mock_data_version_, data_version);
}
private:
struct ObTenantDataVersion
{
ObTenantDataVersion(uint64_t version)
: removed_(false), remove_timestamp_(0), version_(version) {}
ObTenantDataVersion(bool removed, uint64_t remove_timestamp_, uint64_t version)
: removed_(removed), remove_timestamp_(remove_timestamp_), version_(version) {}
~ObTenantDataVersion() {}
// tenant_id: version_str version_val removed remove_timestamp
// for removed field, 1 stands for tenant is removed, 0 stands for tenant is active
// e.g. 1001: 4.3.0.1 17180065793 0 0
static constexpr const char *DUMP_BUF_FORMAT = "%lu: %s %lu %d %lu";
// the max length of uint64 decimal format is 20, so:
// tenant_id(20) + version_str(OB_SERVER_VERSION_LENGTH) + version_val(20) +
// removed(1) + remove_timestamp(20) + spaces_and_others(10)
static constexpr int64_t MAX_DUMP_BUF_SIZE = 20 + OB_SERVER_VERSION_LENGTH + 20 + 1 + 20 + 10;
bool is_removed() const
{
return ATOMIC_LOAD(&removed_);
}
uint64_t get_remove_timestamp() const
{
return ATOMIC_LOAD(&remove_timestamp_);
}
void set_removed(bool removed, uint64_t remove_ts)
{
ATOMIC_STORE(&removed_, removed);
ATOMIC_STORE(&remove_timestamp_, remove_ts);
}
uint64_t get_version() const
{
return ATOMIC_LOAD(&version_);
}
void set_version(uint64_t version)
{
ATOMIC_STORE(&version_, version);
}
TO_STRING_KV(K_(removed), K_(remove_timestamp), K_(version));
private:
bool removed_;
uint64_t remove_timestamp_;
uint64_t version_;
};
int set_(const uint64_t tenant_id, const uint64_t data_version);
// for set: set data_version before dump
int set_and_dump_to_file_(const uint64_t tenant_id, const uint64_t data_version, const bool need_to_insert);
// for remove: remove `tenant_id`'s data_version before dump
int remove_and_dump_to_file_(const uint64_t tenant_id, const int64_t remove_ts);
int64_t get_max_dump_buf_size_() const;
int dump_data_version_(char *buf, int64_t buf_length, int64_t &pos, const uint64_t tenant_id,
const bool removed, const int64_t remove_ts,
const uint64_t data_version);
int load_data_version_(char *buf, int64_t &pos);
int write_to_file_(char *buf, int64_t buf_length, int64_t data_length);
private:
// we use NoPthreadDefendMode here, so the hashmap will not lock buckets before get/set.
// to ensure thread safety, we have several promises:
// 1. we only insert new entry into the hashmap, never delete or overwrite existing entry
// 2. before insert new entry, we acquire a global lock in ObTenantDataVersionMgr
typedef hash::ObHashMap<uint64_t, ObTenantDataVersion *, hash::NoPthreadDefendMode> ObTenantDataVersionMap;
static constexpr const char *TENANT_DATA_VERSION_FILE_PATH = "etc/observer.data_version.bin";
static constexpr int64_t TENANT_DATA_VERSION_FILE_MAX_SIZE = 1 << 26; // 64MB
// The number of tenant won't be too large, so 1024 buckets should be enough
static constexpr int64_t TENANT_DATA_VERSION_BUCKET_NUM = 1024;
static constexpr int16_t OB_CONFIG_MAGIC = static_cast<int16_t>(0XBEDE);
static const int16_t OB_CONFIG_VERSION = 1;
bool is_inited_;
ObTenantDataVersionMap map_;
common::SpinRWLock lock_;
common::ObArenaAllocator allocator_;
// for unittest
uint64_t mock_data_version_;
bool enable_compatible_monotonic_;
};
} // namespace common
} // namespace oceanbase
#define ODV_MGR (::oceanbase::common::ObTenantDataVersionMgr::get_instance())
#endif // OCEANBASE_OBSERVER_OB_TENANT_DATA_VERSION_H_

View File

@ -0,0 +1,75 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "common/ob_version_def.h"
#include "lib/utility/ob_print_utils.h"
namespace oceanbase
{
namespace common
{
bool VersionUtil::check_version_valid(const uint64_t version)
{
bool bret = true;
const uint32_t major = OB_VSN_MAJOR(version);
const uint16_t minor = OB_VSN_MINOR(version);
const uint8_t major_patch = OB_VSN_MAJOR_PATCH(version);
const uint8_t minor_patch = OB_VSN_MINOR_PATCH(version);
if (major < 3 || (3 == major && minor < 2)) {
// cluster_version is less than "3.2":
// - should be "a.b.0.c";
bret = (0 == major_patch);
} else if (3 == major && 2 == minor) {
// cluster_version's prefix is "3.2":
// - cluster_version == 3.2.0.0/1/2
// - cluster_version >= 3.2.3.x
bret = (0 == major_patch && minor_patch <= 2) || (major_patch >= 3);
} else {
// cluster_version is greator than "3.2"
bret = true;
}
return bret;
}
int64_t VersionUtil::print_version_str(char *buf, const int64_t buf_len, uint64_t version)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
const uint32_t major = OB_VSN_MAJOR(version);
const uint16_t minor = OB_VSN_MINOR(version);
const uint8_t major_patch = OB_VSN_MAJOR_PATCH(version);
const uint8_t minor_patch = OB_VSN_MINOR_PATCH(version);
if (OB_UNLIKELY(!check_version_valid(version))) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(ERROR, "invalid cluster version", K(version), K(lbt()));
} else if (major < 3
|| (3 == major && minor < 2)
|| (3 == major && 2 == minor && 0 == major_patch && minor_patch < 3)) {
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%u.%u.%u",
major, minor, minor_patch))) {
COMMON_LOG(WARN, "fail to print version str", K(ret), K(version));
}
} else {
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%u.%u.%u.%u",
major, minor, major_patch, minor_patch))) {
COMMON_LOG(WARN, "fail to print version str", K(ret), K(version));
}
}
if (OB_FAIL(ret)) {
pos = OB_INVALID_INDEX;
}
return pos;
}
} // namespace common
} // namespace oceanbase

178
deps/oblib/src/common/ob_version_def.h vendored Normal file
View File

@ -0,0 +1,178 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_OB_VERSION_DEF_H_
#define OCEANBASE_OBSERVER_OB_VERSION_DEF_H_
#include "lib/ob_define.h"
#include "lib/allocator/page_arena.h"
namespace oceanbase
{
namespace common
{
#define OB_VSN_MAJOR_SHIFT 32
#define OB_VSN_MINOR_SHIFT 16
#define OB_VSN_MAJOR_PATCH_SHIFT 8
#define OB_VSN_MINOR_PATCH_SHIFT 0
#define OB_VSN_MAJOR_MASK 0xffffffff
#define OB_VSN_MINOR_MASK 0xffff
#define OB_VSN_MAJOR_PATCH_MASK 0xff
#define OB_VSN_MINOR_PATCH_MASK 0xff
#define OB_VSN_MAJOR(version) (static_cast<const uint32_t>((version >> OB_VSN_MAJOR_SHIFT) & OB_VSN_MAJOR_MASK))
#define OB_VSN_MINOR(version) (static_cast<const uint16_t>((version >> OB_VSN_MINOR_SHIFT) & OB_VSN_MINOR_MASK))
#define OB_VSN_MAJOR_PATCH(version) (static_cast<const uint8_t>((version >> OB_VSN_MAJOR_PATCH_SHIFT) & OB_VSN_MAJOR_PATCH_MASK))
#define OB_VSN_MINOR_PATCH(version) (static_cast<const uint8_t>(version & OB_VSN_MINOR_PATCH_MASK))
#define CALC_VERSION(major, minor, major_patch, minor_patch) \
(((major) << OB_VSN_MAJOR_SHIFT) + \
((minor) << OB_VSN_MINOR_SHIFT) + \
((major_patch) << OB_VSN_MAJOR_PATCH_SHIFT) + \
((minor_patch)))
constexpr static inline uint64_t
cal_version(const uint64_t major, const uint64_t minor, const uint64_t major_patch, const uint64_t minor_patch)
{
return CALC_VERSION(major, minor, major_patch, minor_patch);
}
#define DEF_MAJOR_VERSION 1
#define DEF_MINOR_VERSION 4
#define DEF_MAJOR_PATCH_VERSION 40
#define DEF_MINOR_PATCH_VERSION 0
#define CLUSTER_VERSION_140 (oceanbase::common::cal_version(1, 4, 0, 0))
#define CLUSTER_VERSION_141 (oceanbase::common::cal_version(1, 4, 0, 1))
#define CLUSTER_VERSION_142 (oceanbase::common::cal_version(1, 4, 0, 2))
#define CLUSTER_VERSION_143 (oceanbase::common::cal_version(1, 4, 0, 3))
#define CLUSTER_VERSION_1431 (oceanbase::common::cal_version(1, 4, 0, 31))
#define CLUSTER_VERSION_1432 (oceanbase::common::cal_version(1, 4, 0, 32))
#define CLUSTER_VERSION_144 (oceanbase::common::cal_version(1, 4, 0, 4))
#define CLUSTER_VERSION_1440 (oceanbase::common::cal_version(1, 4, 0, 40))
#define CLUSTER_VERSION_1450 (oceanbase::common::cal_version(1, 4, 0, 50))
#define CLUSTER_VERSION_1460 (oceanbase::common::cal_version(1, 4, 0, 60))
#define CLUSTER_VERSION_1461 (oceanbase::common::cal_version(1, 4, 0, 61))
#define CLUSTER_VERSION_1470 (oceanbase::common::cal_version(1, 4, 0, 70))
#define CLUSTER_VERSION_1471 (oceanbase::common::cal_version(1, 4, 0, 71))
#define CLUSTER_VERSION_1472 (oceanbase::common::cal_version(1, 4, 0, 72))
#define CLUSTER_VERSION_1500 (oceanbase::common::cal_version(1, 5, 0, 0))
#define CLUSTER_VERSION_2000 (oceanbase::common::cal_version(2, 0, 0, 0))
#define CLUSTER_VERSION_2100 (oceanbase::common::cal_version(2, 1, 0, 0))
#define CLUSTER_VERSION_2110 (oceanbase::common::cal_version(2, 1, 0, 1))
#define CLUSTER_VERSION_2200 (oceanbase::common::cal_version(2, 2, 0, 0))
#define CLUSTER_VERSION_2210 (oceanbase::common::cal_version(2, 2, 0, 1))
/*
* FIXME: cluster_version目前最高是4位,此处定义要和CMakeLists.txt、tools/upgrade、src/share/parameter/ob_parameter_seed.ipp的定义保持一致
* 当最后一位非0时,需要注意。比方说2.2.2版本实际上代表的是2.2.02版本,但实际我们想定义成2.2.20版本,和我们的意图不符。
* 但2.2.1及之前的版本已经发版,为了避免引入兼容性问题,不改历史版本的cluster_version定义。
*/
#define CLUSTER_VERSION_2220 (oceanbase::common::cal_version(2, 2, 0, 20))
#define CLUSTER_VERSION_2230 (oceanbase::common::cal_version(2, 2, 0, 30))
#define CLUSTER_VERSION_2240 (oceanbase::common::cal_version(2, 2, 0, 40))
#define CLUSTER_VERSION_2250 (oceanbase::common::cal_version(2, 2, 0, 50))
#define CLUSTER_VERSION_2260 (oceanbase::common::cal_version(2, 2, 0, 60))
#define CLUSTER_VERSION_2270 (oceanbase::common::cal_version(2, 2, 0, 70))
#define CLUSTER_VERSION_2271 (oceanbase::common::cal_version(2, 2, 0, 71))
#define CLUSTER_VERSION_2272 (oceanbase::common::cal_version(2, 2, 0, 72))
#define CLUSTER_VERSION_2273 (oceanbase::common::cal_version(2, 2, 0, 73))
#define CLUSTER_VERSION_2274 (oceanbase::common::cal_version(2, 2, 0, 74))
#define CLUSTER_VERSION_2275 (oceanbase::common::cal_version(2, 2, 0, 75))
#define CLUSTER_VERSION_2276 (oceanbase::common::cal_version(2, 2, 0, 76))
#define CLUSTER_VERSION_2277 (oceanbase::common::cal_version(2, 2, 0, 77))
#define CLUSTER_VERSION_3000 (oceanbase::common::cal_version(3, 0, 0, 0))
#define CLUSTER_VERSION_3100 (oceanbase::common::cal_version(3, 1, 0, 0))
#define CLUSTER_VERSION_311 (oceanbase::common::cal_version(3, 1, 0, 1))
#define CLUSTER_VERSION_312 (oceanbase::common::cal_version(3, 1, 0, 2))
#define CLUSTER_VERSION_3200 (oceanbase::common::cal_version(3, 2, 0, 0))
#define CLUSTER_VERSION_321 (oceanbase::common::cal_version(3, 2, 0, 1))
#define CLUSTER_VERSION_322 (oceanbase::common::cal_version(3, 2, 0, 2))
// ATTENSION!!!!!!!!!!!!!!!!!
//
// Cluster Version which is less than "3.2.3":
// - 1. It's composed by 3 parts(major、minor、minor_patch)
// - 2. String: cluster version will be format as "major.minor.minor_patch[.0]", and string like "major.minor.x.minor_patch" is invalid.
// - 3. Integer: for compatibility, cluster version will be encoded into "major|minor|x|minor_patch". "x" must be 0, otherwise, it's invalid.
// - 4. Print: cluster version str will be still printed as 3 parts.
//
// Cluster Version which is not less than "3.2.3":
// - 1. It's composed by 4 parts(major、minor、major_patch、minor_patch)
// - 2. String: cluster version will be format as "major.minor.major_patch.minor_patch".
// - 3. Integer: cluster version will be encoded into "major|minor|major_patch|minor_patch".
// - 4. Print: cluster version str will be printed as 4 parts.
#define CLUSTER_VERSION_3_2_3_0 (oceanbase::common::cal_version(3, 2, 3, 0))
#define CLUSTER_VERSION_4_0_0_0 (oceanbase::common::cal_version(4, 0, 0, 0))
#define CLUSTER_VERSION_4_1_0_0 (oceanbase::common::cal_version(4, 1, 0, 0))
#define CLUSTER_VERSION_4_1_0_1 (oceanbase::common::cal_version(4, 1, 0, 1))
#define CLUSTER_VERSION_4_1_0_2 (oceanbase::common::cal_version(4, 1, 0, 2))
#define CLUSTER_VERSION_4_2_0_0 (oceanbase::common::cal_version(4, 2, 0, 0))
#define CLUSTER_VERSION_4_2_1_0 (oceanbase::common::cal_version(4, 2, 1, 0))
#define CLUSTER_VERSION_4_2_1_1 (oceanbase::common::cal_version(4, 2, 1, 1))
#define CLUSTER_VERSION_4_2_1_2 (oceanbase::common::cal_version(4, 2, 1, 2))
#define MOCK_CLUSTER_VERSION_4_2_1_3 (oceanbase::common::cal_version(4, 2, 1, 3))
#define MOCK_CLUSTER_VERSION_4_2_1_4 (oceanbase::common::cal_version(4, 2, 1, 4))
#define MOCK_CLUSTER_VERSION_4_2_1_6 (oceanbase::common::cal_version(4, 2, 1, 6))
#define MOCK_CLUSTER_VERSION_4_2_1_7 (oceanbase::common::cal_version(4, 2, 1, 7))
#define CLUSTER_VERSION_4_2_2_0 (oceanbase::common::cal_version(4, 2, 2, 0))
#define MOCK_CLUSTER_VERSION_4_2_2_1 (oceanbase::common::cal_version(4, 2, 2, 1))
#define MOCK_CLUSTER_VERSION_4_2_3_0 (oceanbase::common::cal_version(4, 2, 3, 0))
#define MOCK_CLUSTER_VERSION_4_2_3_1 (oceanbase::common::cal_version(4, 2, 3, 1))
#define MOCK_CLUSTER_VERSION_4_2_4_0 (oceanbase::common::cal_version(4, 2, 4, 0))
// new data version before 4.3 cannot upgrade to master, must add "MOCK_" prefix
#define CLUSTER_VERSION_4_3_0_0 (oceanbase::common::cal_version(4, 3, 0, 0))
#define CLUSTER_VERSION_4_3_0_1 (oceanbase::common::cal_version(4, 3, 0, 1))
#define CLUSTER_VERSION_4_3_1_0 (oceanbase::common::cal_version(4, 3, 1, 0))
#define CLUSTER_VERSION_4_3_2_0 (oceanbase::common::cal_version(4, 3, 2, 0))
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
//TODO: If you update the above version, please update CLUSTER_CURRENT_VERSION.
#define CLUSTER_CURRENT_VERSION CLUSTER_VERSION_4_3_2_0
// ATTENSION !!!!!!!!!!!!!!!!!!!!!!!!!!!
// 1. After 4.0, each cluster_version is corresponed to a data version.
// 2. cluster_version and data_version is not compariable.
// 3. TODO: If you update data_version below, please update DATA_CURRENT_VERSION & ObUpgradeChecker too.
#define DEFAULT_MIN_DATA_VERSION (oceanbase::common::cal_version(0, 0, 0, 1))
#define DATA_VERSION_4_0_0_0 (oceanbase::common::cal_version(4, 0, 0, 0))
#define DATA_VERSION_4_1_0_0 (oceanbase::common::cal_version(4, 1, 0, 0))
#define DATA_VERSION_4_1_0_1 (oceanbase::common::cal_version(4, 1, 0, 1))
#define DATA_VERSION_4_1_0_2 (oceanbase::common::cal_version(4, 1, 0, 2))
#define DATA_VERSION_4_2_0_0 (oceanbase::common::cal_version(4, 2, 0, 0))
#define DATA_VERSION_4_2_1_0 (oceanbase::common::cal_version(4, 2, 1, 0))
#define DATA_VERSION_4_2_1_1 (oceanbase::common::cal_version(4, 2, 1, 1))
#define DATA_VERSION_4_2_1_2 (oceanbase::common::cal_version(4, 2, 1, 2))
#define MOCK_DATA_VERSION_4_2_1_3 (oceanbase::common::cal_version(4, 2, 1, 3))
#define MOCK_DATA_VERSION_4_2_1_4 (oceanbase::common::cal_version(4, 2, 1, 4))
#define MOCK_DATA_VERSION_4_2_1_5 (oceanbase::common::cal_version(4, 2, 1, 5))
#define DATA_VERSION_4_2_2_0 (oceanbase::common::cal_version(4, 2, 2, 0))
#define MOCK_DATA_VERSION_4_2_2_1 (oceanbase::common::cal_version(4, 2, 2, 1))
#define MOCK_DATA_VERSION_4_2_3_0 (oceanbase::common::cal_version(4, 2, 3, 0))
#define MOCK_DATA_VERSION_4_2_3_1 (oceanbase::common::cal_version(4, 2, 3, 1))
#define MOCK_DATA_VERSION_4_2_4_0 (oceanbase::common::cal_version(4, 2, 4, 0))
// new data version before 4.3 cannot upgrade to master, must add "MOCK_" prefix
#define DATA_VERSION_4_3_0_0 (oceanbase::common::cal_version(4, 3, 0, 0))
#define DATA_VERSION_4_3_0_1 (oceanbase::common::cal_version(4, 3, 0, 1))
#define DATA_VERSION_4_3_1_0 (oceanbase::common::cal_version(4, 3, 1, 0))
#define DATA_VERSION_4_3_2_0 (oceanbase::common::cal_version(4, 3, 2, 0))
#define DATA_CURRENT_VERSION DATA_VERSION_4_3_2_0
// ATTENSION !!!!!!!!!!!!!!!!!!!!!!!!!!!
// LAST_BARRIER_DATA_VERSION should be the latest barrier data version before DATA_CURRENT_VERSION
#define LAST_BARRIER_DATA_VERSION DATA_VERSION_4_2_1_0
class VersionUtil
{
public:
static bool check_version_valid(const uint64_t version);
static int64_t print_version_str(char *buf, const int64_t buf_len, uint64_t version);
};
} // namespace common
} // namespace oceanbase
#endif

View File

@ -884,6 +884,9 @@ DEF_NAME(id, "id")
DEF_NAME(sql_update_cost_time, "sql_update_cost_time")
DEF_NAME(sql_update_times, "sql_update_times")
DEF_NAME(last_sql_update_time, "last_sql_update_time")
DEF_NAME(is_data_version_crossed, "is_data_version_crossed")
DEF_NAME(finish_data_version, "finish_data_version")
DEF_NAME(data_version_barrier_scn, "data_version_barrier_scn")
// << add pair events BEFORE this line
DEF_NAME(NAME_COUNT, "invalid")

View File

@ -50,6 +50,12 @@ public:
public:
static int64_t CLUSTER_ID;
static uint64_t CLUSTER_NAME_HASH;
static bool is_self_cluster(int64_t cluster_id)
{
return ObRpcNetHandler::CLUSTER_ID == cluster_id &&
ObRpcNetHandler::CLUSTER_ID != OB_INVALID_CLUSTER_ID &&
cluster_id != OB_INVALID_CLUSTER_ID;
}
protected:
char *easy_alloc(easy_pool_t *pool, int64_t size) const;
private:

View File

@ -16,7 +16,9 @@
#include "lib/ob_define.h"
#include "lib/coro/co_var.h"
#include "common/storage/ob_sequence.h"
#include "common/ob_tenant_data_version_mgr.h"
#include "rpc/obrpc/ob_rpc_net_handler.h"
#include "share/ob_cluster_version.h"
using namespace oceanbase::common::serialization;
using namespace oceanbase::common;
@ -34,7 +36,16 @@ int ObRpcPacketHeader::serialize(char* buf, const int64_t buf_len, int64_t& pos)
int ret = OB_SUCCESS;
if (buf_len - pos >= get_encoded_size()) {
seq_no_ = ObSequence::get_max_seq_no();
LOG_DEBUG("rpc send seq_no ", K_(seq_no));
data_version_ = 0;
if (ObRpcNetHandler::is_self_cluster(dst_cluster_id_) && ODV_MGR.is_enable_compatible_monotonic()) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ODV_MGR.get(tenant_id_, data_version_))) {
data_version_ = LAST_BARRIER_DATA_VERSION;
LOG_WARN("fail to get data_version", K(tmp_ret), K(tenant_id_));
}
}
LOG_TRACE("rpc send info", K_(tenant_id), K_(data_version), K_(seq_no), K_(dst_cluster_id),
K_(pcode), K(ObRpcNetHandler::CLUSTER_ID));
if (OB_FAIL(encode_i32(buf, buf_len, pos, pcode_))) {
LOG_WARN("Encode error", K(ret), KP(buf), K(buf_len), K(pos));
} else if (OB_FAIL(encode_i8(buf, buf_len, pos, static_cast<int8_t> (get_encoded_size())))) {
@ -83,6 +94,8 @@ int ObRpcPacketHeader::serialize(char* buf, const int64_t buf_len, int64_t& pos)
LOG_WARN("Encode error", K(ret), KP(buf), K(buf_len), K(pos));
} else if (OB_FAIL(encode_i64(buf, buf_len, pos, cluster_name_hash_))) {
LOG_WARN("Encode error", K(ret), KP(buf), K(buf_len), K(pos));
} else if (OB_FAIL(encode_i64(buf, buf_len, pos, data_version_))) {
LOG_WARN("Encode error", K(ret), KP(buf), K(buf_len), K(pos));
} else {
//do nothing
#ifdef ERRSIM
@ -169,6 +182,8 @@ int ObRpcPacketHeader::deserialize(const char* buf, const int64_t data_len, int6
LOG_WARN("Decode error", K(ret), KP(buf), K(hlen_), K(pos));
} else if (hlen_ > pos && OB_FAIL(decode_i64(buf, hlen_, pos, reinterpret_cast<int64_t*>(&cluster_name_hash_)))) {
LOG_WARN("Decode error", K(ret), KP(buf), K(hlen_), K(pos));
} else if (hlen_ > pos && OB_FAIL(decode_i64(buf, data_len, pos, reinterpret_cast<int64_t*>(&data_version_)))) {
LOG_WARN("Decode error", K(ret), KP(buf), K(data_len), K(pos));
} else {
#ifdef ERRSIM
int64_t type = 0;
@ -182,6 +197,21 @@ int ObRpcPacketHeader::deserialize(const char* buf, const int64_t data_len, int6
}
ObSequence::update_max_seq_no(seq_no_);
LOG_DEBUG("rpc receive seq_no ", K_(seq_no), K(ObSequence::get_max_seq_no()));
// for RPC response, if the src_cluster_id is the same as current cluster id, we set the
// data_version here. for RPC request, the failure of setting data_version may cause
// disconnection, to avoid it, we delay setting to the RPC process phase.
if (OB_SUCC(ret) &&
flags_ & ObRpcPacketHeader::RESP_FLAG &&
ObRpcNetHandler::is_self_cluster(src_cluster_id_) && data_version_ > 0) {
if (OB_FAIL(ODV_MGR.set(tenant_id_, data_version_))) {
LOG_WARN("fail to update data_version", K(ret), KP(tenant_id_), K(data_version_));
}
LOG_TRACE("rpc receive data version", K_(tenant_id), K_(data_version), K_(pcode),
K_(src_cluster_id), K(ObRpcNetHandler::CLUSTER_ID));
}
if (OB_ARB_GC_NOTIFY == pcode_ && REACH_TIME_INTERVAL(5000000)) {
LOG_TRACE("receive arb rpc", K_(src_cluster_id), K_(pcode));
}
}
return ret;

View File

@ -180,6 +180,7 @@ public:
int64_t seq_no_;
int32_t group_id_;
uint64_t cluster_name_hash_;
uint64_t data_version_;
#ifdef ERRSIM
ObErrsimModuleType module_type_;
@ -191,12 +192,13 @@ public:
TO_STRING_KV(K(checksum_), K(pcode_), K(hlen_), K(priority_),
K(flags_), K(tenant_id_), K(priv_tenant_id_), K(session_id_),
K(trace_id_), K(timeout_), K_(timestamp), K_(dst_cluster_id), K_(cost_time),
K(compressor_type_), K(original_len_), K(src_cluster_id_), K(seq_no_));
K(compressor_type_), K(original_len_), K(src_cluster_id_), K(seq_no_),
K(data_version_));
ObRpcPacketHeader() { memset(this, 0, sizeof(*this)); flags_ |= (OB_LOG_LEVEL_NONE & OB_LOG_LEVEL_MASK); }
static inline int64_t get_encoded_size()
{
return HEADER_SIZE + ObRpcCostTime::get_encoded_size() + 8 /* for seq no */;
return HEADER_SIZE + ObRpcCostTime::get_encoded_size() + 8 /* for seq no */ + 8 /* for data version*/;
}
};
@ -339,6 +341,7 @@ public:
inline int32_t get_request_level() const;
inline void set_group_id(int32_t group_id);
inline int32_t get_group_id() const;
inline uint64_t get_data_version() const;
int encode_ez_header(char *buf, int64_t len, int64_t &pos);
TO_STRING_KV(K(hdr_), K(chid_), K(clen_), K_(assemble), K_(msg_count), K_(payload));
@ -837,6 +840,11 @@ int32_t ObRpcPacket::get_group_id() const
return hdr_.group_id_;
}
uint64_t ObRpcPacket::get_data_version() const
{
return hdr_.data_version_;
}
RLOCAL_EXTERN(int, g_pcode);
inline ObRpcPacketCode current_pcode()
{

View File

@ -22,6 +22,7 @@
#include "lib/trace/ob_trace_event.h"
#include "lib/trace/ob_trace.h"
#include "common/data_buffer.h"
#include "common/ob_tenant_data_version_mgr.h"
#include "rpc/obrpc/ob_rpc_req_context.h"
#include "rpc/obrpc/ob_rpc_stream_cond.h"
#include "rpc/obrpc/ob_rpc_result_code.h"
@ -76,6 +77,11 @@ int ObRpcProcessorBase::run()
LOG_WARN("req timeout", K(ret));
} else if (OB_FAIL(check_cluster_id())) {
LOG_WARN("checking cluster ID failed", K(ret));
} else if (OB_FAIL(update_data_version())) {
// update_data_version could have been called in RPC deserialization process, however, the
// failure of setting data_version may cause disconnection, so for normal RPC request, we do it
// here
LOG_WARN("fail to update data_version", K(ret));
} else if (OB_FAIL(deserialize())) {
deseri_succ = false;
LOG_WARN("deserialize argument fail", K(ret));
@ -137,6 +143,26 @@ int ObRpcProcessorBase::check_cluster_id()
return ret;
}
int ObRpcProcessorBase::update_data_version()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(rpc_pkt_)) {
ret = OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "rpc_pkt_ should not be NULL", K(ret));
} else {
int64_t src_cluster_id = rpc_pkt_->get_src_cluster_id();
uint64_t data_version = rpc_pkt_->get_data_version();
if (ObRpcNetHandler::is_self_cluster(src_cluster_id) && data_version > 0) {
if (OB_FAIL(ODV_MGR.set(tenant_id_, data_version))) {
LOG_WARN("fail to update data_version", K(ret), KP(tenant_id_), K(data_version));
}
}
}
return ret;
}
int ObRpcProcessorBase::deserialize()
{
int ret = OB_SUCCESS;
@ -299,6 +325,10 @@ int ObRpcProcessorBase::do_response(const Response &rsp)
packet->set_resp();
// The cluster_id of the response must be the src_cluster_id of the request
packet->set_dst_cluster_id(rpc_pkt_->get_src_cluster_id());
// the tenant_id in response is only used to sync data_version, it has no meaning to the
// RPC framework
packet->set_tenant_id(rpc_pkt_->get_tenant_id());
packet->set_src_cluster_id(ObRpcNetHandler::CLUSTER_ID);
#ifdef ERRSIM
packet->set_module_type(THIS_WORKER.get_module_type());
@ -598,6 +628,11 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout, const ObAddr *src_addr)
req_ = NULL;
is_stream_end_ = true;
RPC_OBRPC_LOG(ERROR, "rpc packet is NULL in stream", K(ret));
} else if (OB_FAIL(update_data_version())) {
// update_data_version could have been called in RPC deserialization process, however, the
// failure of setting data_version may cause disconnection, so for stream RPC request, we do
// it here
RPC_OBRPC_LOG(WARN, "fail to update data_version", K(ret));
} else if (rpc_pkt_->is_stream_last()) {
ret = OB_ITER_END;
} else {

View File

@ -71,7 +71,7 @@ public:
protected:
int check_timeout() { return common::OB_SUCCESS; }
virtual int check_cluster_id();
int update_data_version();
virtual int before_process() { return common::OB_SUCCESS; }
virtual int after_process(int error_code);