reform order autoinc

This commit is contained in:
obdev
2023-07-05 10:54:10 +00:00
committed by ob-robot
parent 6fb782096e
commit 34bc7336c0
24 changed files with 389 additions and 210 deletions

View File

@ -321,6 +321,7 @@ int ObAutoincrementService::get_handle_order(AutoincParam &param, CacheHandle *&
const uint64_t increment = param.autoinc_increment_;
const uint64_t max_value = get_max_value(column_type);
const int64_t auto_increment_cache_size = param.auto_increment_cache_size_;
const int64_t autoinc_version = param.autoinc_version_;
uint64_t desired_count = 0;
// calc nb_desired_values in MySQL
@ -353,8 +354,8 @@ int ObAutoincrementService::get_handle_order(AutoincParam &param, CacheHandle *&
LOG_WARN("reach max autoinc", K(ret), K(table_auto_increment));
} else if (OB_FAIL(global_autoinc_service_.get_value(key, offset, increment, max_value,
table_auto_increment, batch_count,
auto_increment_cache_size, sync_value,
start_inclusive, end_inclusive))) {
auto_increment_cache_size, autoinc_version,
sync_value, start_inclusive, end_inclusive))) {
LOG_WARN("fail get value", K(ret));
} else if (OB_UNLIKELY(sync_value > max_value || start_inclusive > max_value)) {
ret = OB_ERR_REACH_AUTOINC_MAX;
@ -657,18 +658,21 @@ int ObAutoincrementService::lock_autoinc_row(const uint64_t &tenant_id,
return ret;
}
//This implement is only for Truncate, table need to reset autoinc version after truncate
int ObAutoincrementService::reset_autoinc_row(const uint64_t &tenant_id,
const uint64_t &table_id,
const uint64_t &column_id,
common::ObMySQLTransaction &trans)
const uint64_t &table_id,
const uint64_t &column_id,
const int64_t &autoinc_version,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
ObSqlString update_sql;
int64_t affected_rows = 0;
if (OB_FAIL(update_sql.assign_fmt("UPDATE %s SET sequence_value = 1, sync_value = 0 ",
OB_ALL_AUTO_INCREMENT_TNAME))) {
if (OB_FAIL(update_sql.assign_fmt("UPDATE %s SET sequence_value = 1, sync_value = 0, truncate_version = %ld",
OB_ALL_AUTO_INCREMENT_TNAME,
autoinc_version))) {
LOG_WARN("failed to assign sql", KR(ret));
} else if (OB_FAIL(update_sql.append_fmt("WHERE sequence_key = %lu AND tenant_id = %lu AND column_id = %lu",
} else if (OB_FAIL(update_sql.append_fmt(" WHERE sequence_key = %lu AND tenant_id = %lu AND column_id = %lu",
ObSchemaUtils::get_extract_schema_id(tenant_id, table_id),
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
column_id))) {
@ -686,13 +690,14 @@ int ObAutoincrementService::reset_autoinc_row(const uint64_t &tenant_id,
int ObAutoincrementService::reinit_autoinc_row(const uint64_t &tenant_id,
const uint64_t &table_id,
const uint64_t &column_id,
const int64_t &autoinc_version,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
if (OB_FAIL(lock_autoinc_row(tenant_id, table_id, column_id, trans))) {
LOG_WARN("failed to select for update", KR(ret));
} else if (OB_FAIL(reset_autoinc_row(tenant_id, table_id, column_id, trans))) {
} else if (OB_FAIL(reset_autoinc_row(tenant_id, table_id, column_id, autoinc_version, trans))) {
LOG_WARN("failed to update auto increment", KR(ret));
}
return ret;
@ -701,8 +706,7 @@ int ObAutoincrementService::reinit_autoinc_row(const uint64_t &tenant_id,
int ObAutoincrementService::clear_autoinc_cache_all(const uint64_t tenant_id,
const uint64_t table_id,
const uint64_t column_id,
const bool autoinc_is_order,
const common::ObArray<ObAddr>* alive_server_list/*nullptr*/)
const bool autoinc_is_order)
{
int ret = OB_SUCCESS;
if (OB_SUCC(ret)) {
@ -716,22 +720,8 @@ int ObAutoincrementService::clear_autoinc_cache_all(const uint64_t tenant_id,
ObHashSet<ObAddr> server_set;
if (OB_FAIL(server_set.create(PARTITION_LOCATION_SET_BUCKET_NUM))) {
LOG_WARN("failed to create hash set", K(ret));
//to do
//fix can not get all server bug
//
} else if (OB_ISNULL(alive_server_list)) {
if (OB_FAIL(get_server_set(tenant_id, table_id, server_set, true))) {
SHARE_LOG(WARN, "failed to get table partitions server set", K(ret));
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < alive_server_list->count(); i++) {
int err = OB_SUCCESS;
err = server_set.set_refactored(alive_server_list->at(i));
if (OB_SUCCESS != err && OB_HASH_EXIST != err) {
ret = err;
LOG_WARN("failed to add element to set", "server", alive_server_list->at(i), K(ret));
}
}
} else if (OB_FAIL(get_server_set(tenant_id, table_id, server_set, true))) {
SHARE_LOG(WARN, "failed to get table partitions server set", K(ret));
}
if (OB_SUCC(ret)) {
const int64_t sync_timeout = SYNC_OP_TIMEOUT + TIME_SKEW;
@ -840,7 +830,7 @@ int ObAutoincrementService::get_table_node(const AutoincParam &param, TableNode
key.tenant_id_ = tenant_id;
key.table_id_ = table_id;
key.column_id_ = column_id;
int64_t autoinc_version = get_modify_autoinc_version(param.autoinc_version_);
if (OB_FAIL(node_map_.get(key, table_node))) {
if (ret != OB_ENTRY_NOT_EXIST) {
LOG_ERROR("get from map failed", K(ret));
@ -857,7 +847,7 @@ int ObAutoincrementService::get_table_node(const AutoincParam &param, TableNode
LOG_ERROR("failed to init table node", K(param), K(ret));
} else {
table_node->prefetch_node_.reset();
table_node->autoinc_version_ = param.autoinc_version_;
table_node->autoinc_version_ = autoinc_version;
lib::ObMutexGuard guard(map_mutex_);
if (OB_FAIL(node_map_.insert_and_get(key, table_node))) {
LOG_WARN("failed to create table node", K(param), K(ret));
@ -875,15 +865,20 @@ int ObAutoincrementService::get_table_node(const AutoincParam &param, TableNode
if (OB_FAIL(alloc_autoinc_try_lock(table_node->alloc_mutex_))) {
LOG_WARN("failed to get lock", K(ret));
} else {
if (OB_UNLIKELY(param.autoinc_version_ != table_node->autoinc_version_)) {
LOG_INFO("start reset table node", K(*table_node), K(param));
// local cache is expired
if (OB_UNLIKELY(autoinc_version > table_node->autoinc_version_)) {
LOG_INFO("start to reset table node", K(*table_node), K(param));
table_node->next_value_ = 0;
table_node->local_sync_ = 0;
table_node->curr_node_.reset();
table_node->prefetch_node_.reset();
table_node->prefetching_ = false;
table_node->curr_node_state_is_pending_ = true;
table_node->autoinc_version_ = param.autoinc_version_;
table_node->autoinc_version_ = autoinc_version;
// old request cannot get table node, it should retry
} else if (OB_UNLIKELY(autoinc_version < table_node->autoinc_version_)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("old reqeust can not get table node, it should retry", KR(ret), K(autoinc_version), K(table_node->autoinc_version_));
}
table_node->alloc_mutex_.unlock();
}
@ -927,6 +922,7 @@ int ObAutoincrementService::fetch_table_node(const AutoincParam &param,
const uint64_t desired_count = param.autoinc_desired_count_;
const uint64_t offset = param.autoinc_offset_;
const uint64_t increment = param.autoinc_increment_;
const int64_t autoinc_version = param.autoinc_version_;
if (part_num <= 0 || ObNullType == column_type) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(part_num), K(column_type), K(ret));
@ -958,7 +954,7 @@ int ObAutoincrementService::fetch_table_node(const AutoincParam &param,
LOG_WARN("reach max autoinc", K(ret), K(table_auto_increment));
} else if (OB_FAIL(distributed_autoinc_service_.get_value(
key, offset, increment, max_value, table_auto_increment,
batch_count, auto_increment_cache_size, sync_value,
batch_count, auto_increment_cache_size, autoinc_version, sync_value,
start_inclusive, end_inclusive))) {
LOG_WARN("fail get value", K(ret));
} else if (sync_value > max_value || start_inclusive > max_value) {
@ -1122,6 +1118,7 @@ int ObAutoincrementService::sync_insert_value_order(AutoincParam &param,
const ObObjType column_type = param.autoinc_col_type_;
const uint64_t max_value = get_max_value(column_type);
const uint64_t part_num = param.autoinc_table_part_num_;
const int64_t autoinc_version = param.autoinc_version_;
uint64_t global_sync_value = 0;
AutoincKey key(tenant_id, table_id, column_id);
@ -1130,7 +1127,7 @@ int ObAutoincrementService::sync_insert_value_order(AutoincParam &param,
value_to_sync = max_value;
}
if (OB_FAIL(global_autoinc_service_.local_push_to_global_value(
key, max_value, value_to_sync, global_sync_value))) {
key, max_value, value_to_sync, autoinc_version, global_sync_value))) {
LOG_WARN("fail sync value to global", K(key), K(insert_value), K(ret));
} else if (NULL != cache_handle) {
LOG_DEBUG("insert value, generate next val",
@ -1173,6 +1170,7 @@ int ObAutoincrementService::sync_insert_value_noorder(AutoincParam &param,
const ObObjType column_type = param.autoinc_col_type_;
const uint64_t max_value = get_max_value(column_type);
const uint64_t part_num = param.autoinc_table_part_num_;
const int64_t autoinc_version = param.autoinc_version_;
TableNode *table_node = NULL;
LOG_DEBUG("begin to sync insert value globally", K(param), K(insert_value));
if (OB_FAIL(get_table_node(param, table_node))) {
@ -1202,7 +1200,7 @@ int ObAutoincrementService::sync_insert_value_noorder(AutoincParam &param,
const uint64_t local_cache_size = is_order ? 1 : param.auto_increment_cache_size_;
uint64_t value_to_sync = calc_next_cache_boundary(insert_value, local_cache_size, max_value);
if (OB_FAIL(distributed_autoinc_service_.local_push_to_global_value(key, max_value,
value_to_sync, global_sync_value))) {
value_to_sync, autoinc_version, global_sync_value))) {
LOG_WARN("fail sync value to global", K(key), K(insert_value), K(ret));
} else {
if (OB_FAIL(alloc_autoinc_try_lock(table_node->alloc_mutex_))) {
@ -1470,7 +1468,7 @@ int ObAutoincrementService::fetch_global_sync(const uint64_t tenant_id,
if (OB_SUCC(ret)) {
AutoincKey key(tenant_id, table_id, column_id);
if (OB_FAIL(distributed_autoinc_service_.local_sync_with_global_value(key, sync_value))) {
if (OB_FAIL(distributed_autoinc_service_.local_sync_with_global_value(key, table_node.autoinc_version_, sync_value))) {
LOG_WARN("fail refresh global value", K(ret));
} else {
atomic_update(table_node.local_sync_, sync_value);
@ -1583,6 +1581,7 @@ int ObAutoincrementService::get_sequence_value(const uint64_t tenant_id,
const uint64_t table_id,
const uint64_t column_id,
const bool is_order,
const int64_t autoinc_version,
uint64_t &seq_value)
{
int ret = OB_SUCCESS;
@ -1590,10 +1589,10 @@ int ObAutoincrementService::get_sequence_value(const uint64_t tenant_id,
key.tenant_id_ = tenant_id;
key.table_id_ = table_id;
key.column_id_ = column_id;
if (is_order && OB_FAIL(global_autoinc_service_.get_sequence_value(key, seq_value))) {
if (is_order && OB_FAIL(global_autoinc_service_.get_sequence_value(key, autoinc_version, seq_value))) {
LOG_WARN("global autoinc service get sequence value failed", K(ret));
} else if (!is_order &&
OB_FAIL(distributed_autoinc_service_.get_sequence_value(key, seq_value))) {
OB_FAIL(distributed_autoinc_service_.get_sequence_value(key, autoinc_version, seq_value))) {
LOG_WARN("distributed autoinc service get sequence value failed", K(ret));
}
return ret;
@ -1604,14 +1603,16 @@ int ObAutoincrementService::get_sequence_values(
const uint64_t tenant_id,
const ObIArray<AutoincKey> &order_autokeys,
const ObIArray<AutoincKey> &noorder_autokeys,
const ObIArray<int64_t> &order_autoinc_versions,
const ObIArray<int64_t> &noorder_autoinc_versions,
ObHashMap<AutoincKey, uint64_t> &seq_values)
{
int ret = OB_SUCCESS;
if (OB_FAIL(global_autoinc_service_.get_auto_increment_values(
tenant_id, order_autokeys, seq_values))) {
tenant_id, order_autokeys, order_autoinc_versions, seq_values))) {
LOG_WARN("global_autoinc_service_ get sequence values failed", K(ret));
} else if (OB_FAIL(distributed_autoinc_service_.get_auto_increment_values(
tenant_id, noorder_autokeys, seq_values))) {
tenant_id, noorder_autokeys, noorder_autoinc_versions, seq_values))) {
LOG_WARN("distributed_autoinc_service_ get sequence values failed", K(ret));
}
return ret;
@ -1625,6 +1626,7 @@ int ObInnerTableGlobalAutoIncrementService::get_value(
const uint64_t table_auto_increment,
const uint64_t desired_count,
const uint64_t cache_size,
const int64_t &autoinc_version,
uint64_t &sync_value,
uint64_t &start_inclusive,
uint64_t &end_inclusive)
@ -1633,8 +1635,8 @@ int ObInnerTableGlobalAutoIncrementService::get_value(
int ret = OB_SUCCESS;
uint64_t sync_value_from_inner_table = 0;
ret = inner_table_proxy_.next_autoinc_value(
key, offset, increment, table_auto_increment, max_value, desired_count,
true /*for_update*/, start_inclusive, end_inclusive, sync_value_from_inner_table);
key, offset, increment, table_auto_increment, max_value, desired_count, autoinc_version,
start_inclusive, end_inclusive, sync_value_from_inner_table);
if (OB_SUCC(ret)) {
if (table_auto_increment != 0 && table_auto_increment - 1 > sync_value_from_inner_table) {
sync_value = table_auto_increment -1;
@ -1646,17 +1648,20 @@ int ObInnerTableGlobalAutoIncrementService::get_value(
}
int ObInnerTableGlobalAutoIncrementService::get_sequence_value(const AutoincKey &key,
const int64_t &autoinc_version,
uint64_t &sequence_value)
{
uint64_t sync_value = 0; // unused
return inner_table_proxy_.get_autoinc_value(key, sequence_value, sync_value);
return inner_table_proxy_.get_autoinc_value(key, autoinc_version, sequence_value, sync_value);
}
int ObInnerTableGlobalAutoIncrementService::get_auto_increment_values(
const uint64_t tenant_id,
const common::ObIArray<AutoincKey> &autoinc_keys,
const common::ObIArray<int64_t> &autoinc_versions,
common::hash::ObHashMap<AutoincKey, uint64_t> &seq_values)
{
UNUSED(autoinc_versions);
return inner_table_proxy_.get_autoinc_value_in_batch(tenant_id, autoinc_keys, seq_values);
}
@ -1664,19 +1669,21 @@ int ObInnerTableGlobalAutoIncrementService::local_push_to_global_value(
const AutoincKey &key,
const uint64_t max_value,
const uint64_t insert_value,
const int64_t &autoinc_version,
uint64_t &sync_value)
{
uint64_t seq_value = 0; // unused, * MUST * set seq_value to 0 here.
return inner_table_proxy_.sync_autoinc_value(key, insert_value, max_value, true, /*for_update*/
seq_value, sync_value);
return inner_table_proxy_.sync_autoinc_value(key, insert_value, max_value, autoinc_version,
seq_value, sync_value);
}
int ObInnerTableGlobalAutoIncrementService::local_sync_with_global_value(
const AutoincKey &key,
const int64_t &autoinc_version,
uint64_t &sync_value)
{
uint64_t seq_value = 0; // unused
return inner_table_proxy_.get_autoinc_value(key, seq_value, sync_value);
return inner_table_proxy_.get_autoinc_value(key, autoinc_version, seq_value, sync_value);
}
int ObRpcGlobalAutoIncrementService::init(
@ -1708,44 +1715,49 @@ int ObRpcGlobalAutoIncrementService::get_value(
const uint64_t table_auto_increment,
const uint64_t desired_count,
const uint64_t cache_size,
const int64_t &autoinc_version,
uint64_t &sync_value,
uint64_t &start_inclusive,
uint64_t &end_inclusive)
{
return gais_client_.get_value(key, offset, increment, max_value, table_auto_increment,
desired_count, cache_size, sync_value, start_inclusive,
end_inclusive);
desired_count, cache_size, autoinc_version,sync_value,
start_inclusive, end_inclusive);
}
int ObRpcGlobalAutoIncrementService::get_sequence_value(const AutoincKey &key,
const int64_t &autoinc_version,
uint64_t &sequence_value)
{
return gais_client_.get_sequence_value(key, sequence_value);
return gais_client_.get_sequence_value(key, autoinc_version, sequence_value);
}
int ObRpcGlobalAutoIncrementService::get_auto_increment_values(
const uint64_t tenant_id,
const common::ObIArray<AutoincKey> &autoinc_keys,
common::hash::ObHashMap<AutoincKey, uint64_t> &seq_values)
const common::ObIArray<int64_t> &autoinc_versions,
common::hash::ObHashMap<AutoincKey, uint64_t> &seq_valuesm)
{
UNUSED(tenant_id);
return gais_client_.get_auto_increment_values(autoinc_keys, seq_values);
return gais_client_.get_auto_increment_values(autoinc_keys, autoinc_versions, seq_valuesm);
}
int ObRpcGlobalAutoIncrementService::local_push_to_global_value(
const AutoincKey &key,
const uint64_t max_value,
const uint64_t value,
const int64_t &autoinc_version,
uint64_t &global_sync_value)
{
return gais_client_.local_push_to_global_value(key, max_value, value, global_sync_value);
return gais_client_.local_push_to_global_value(key, max_value, value, autoinc_version, global_sync_value);
}
int ObRpcGlobalAutoIncrementService::local_sync_with_global_value(
const AutoincKey &key,
const int64_t &autoinc_version,
uint64_t &value)
{
return gais_client_.local_sync_with_global_value(key, value);
return gais_client_.local_sync_with_global_value(key, autoinc_version, value);
}
int ObRpcGlobalAutoIncrementService::clear_global_autoinc_cache(const AutoincKey &key)
@ -1753,13 +1765,35 @@ int ObRpcGlobalAutoIncrementService::clear_global_autoinc_cache(const AutoincKey
return gais_client_.clear_global_autoinc_cache(key);
}
int ObAutoIncInnerTableProxy::check_inner_autoinc_version(const int64_t &request_autoinc_version,
const int64_t &inner_autoinc_version,
const AutoincKey &key)
{
int ret = OB_SUCCESS;
if (0 == request_autoinc_version || 0 == inner_autoinc_version) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("autoinc version is zero", KR(ret), K(request_autoinc_version), K(inner_autoinc_version));
// inner table did not update
} else if (OB_UNLIKELY(inner_autoinc_version < request_autoinc_version)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("inner_autoinc_version can not less than autoinc_version", KR(ret), K(key),
K(inner_autoinc_version), K(request_autoinc_version));
// old request
} else if (OB_UNLIKELY(inner_autoinc_version > request_autoinc_version)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("inner_autoinc_version is greater than autoinc_version, request needs retry", KR(ret), K(key),
K(inner_autoinc_version), K(request_autoinc_version));
}
return ret;
}
int ObAutoIncInnerTableProxy::next_autoinc_value(const AutoincKey &key,
const uint64_t offset,
const uint64_t increment,
const uint64_t base_value,
const uint64_t max_value,
const uint64_t desired_count,
const bool for_update,
const int64_t &autoinc_version,
uint64_t &start_inclusive,
uint64_t &end_inclusive,
uint64_t &sync_value)
@ -1771,6 +1805,8 @@ int ObAutoIncInnerTableProxy::next_autoinc_value(const AutoincKey &key,
const uint64_t table_id = key.table_id_;
const uint64_t column_id = key.column_id_;
uint64_t sequence_value = 0;
int64_t inner_autoinc_version = OB_INVALID_VERSION;
int64_t tmp_autoinc_version = get_modify_autoinc_version(autoinc_version);
if (OB_ISNULL(mysql_proxy_)) {
ret = OB_NOT_INIT;
LOG_WARN("mysql proxy is null", K(ret));
@ -1782,13 +1818,11 @@ int ObAutoIncInnerTableProxy::next_autoinc_value(const AutoincKey &key,
const uint64_t exec_tenant_id = tenant_id;
const char *table_name = OB_ALL_AUTO_INCREMENT_TNAME;
sql_len = snprintf(sql, OB_MAX_SQL_LENGTH,
" SELECT sequence_key, sequence_value, sync_value FROM %s"
" WHERE tenant_id = %lu AND sequence_key = %lu AND column_id = %lu %s",
" SELECT sequence_key, sequence_value, sync_value, truncate_version FROM %s WHERE tenant_id = %lu AND sequence_key = %lu AND column_id = %lu FOR UPDATE",
table_name,
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
ObSchemaUtils::get_extract_schema_id(tenant_id, table_id),
column_id,
for_update ? "FOR UPDATE" : "");
column_id);
if (sql_len >= OB_MAX_SQL_LENGTH || sql_len <= 0) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("failed to format sql. size not enough");
@ -1821,6 +1855,10 @@ int ObAutoIncInnerTableProxy::next_autoinc_value(const AutoincKey &key,
LOG_WARN("fail to get int_value.", K(ret));
} else if (OB_FAIL(result->get_uint(2l, sync_value))) {
LOG_WARN("fail to get int_value.", K(ret));
} else if (OB_FAIL(result->get_int(3l, inner_autoinc_version))) {
LOG_WARN("fail to get inner_autoinc_version.", K(ret));
} else if (OB_FAIL(check_inner_autoinc_version(tmp_autoinc_version, inner_autoinc_version, key))) {
LOG_WARN("fail to check inner_autoinc_version", KR(ret));
} else {
if (sync_value >= max_value) {
sequence_value = max_value;
@ -1873,12 +1911,13 @@ int ObAutoIncInnerTableProxy::next_autoinc_value(const AutoincKey &key,
sql_len = snprintf(sql, OB_MAX_SQL_LENGTH,
"UPDATE %s SET sequence_value = %lu, gmt_modified = now(6)"
" WHERE tenant_id = %lu AND sequence_key = %lu AND column_id = %lu",
" WHERE tenant_id = %lu AND sequence_key = %lu AND column_id = %lu AND truncate_version = %ld",
table_name,
next_sequence_value,
OB_INVALID_TENANT_ID,
table_id,
column_id);
column_id,
inner_autoinc_version);
int64_t affected_rows = 0;
if (sql_len >= OB_MAX_SQL_LENGTH || sql_len <= 0) {
ret = OB_SIZE_OVERFLOW;
@ -1914,18 +1953,20 @@ int ObAutoIncInnerTableProxy::next_autoinc_value(const AutoincKey &key,
}
int ObAutoIncInnerTableProxy::get_autoinc_value(const AutoincKey &key,
const int64_t &autoinc_version,
uint64_t &seq_value,
uint64_t &sync_value)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = key.tenant_id_;
const int64_t tmp_autoinc_version = get_modify_autoinc_version(autoinc_version);
SMART_VARS_2((ObMySQLProxy::MySQLResult, res), (char[OB_MAX_SQL_LENGTH], sql)) {
ObMySQLResult *result = NULL;
int sql_len = 0;
const uint64_t exec_tenant_id = tenant_id;
const char *table_name = OB_ALL_AUTO_INCREMENT_TNAME;
sql_len = snprintf(sql, OB_MAX_SQL_LENGTH,
"SELECT sequence_value, sync_value FROM %s"
" SELECT sequence_value, sync_value, truncate_version FROM %s"
" WHERE tenant_id = %lu AND sequence_key = %lu AND column_id = %lu",
table_name,
ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
@ -1957,10 +1998,15 @@ int ObAutoIncInnerTableProxy::get_autoinc_value(const AutoincKey &key,
LOG_WARN("fail get next value", K(key), K(ret));
}
} else {
int64_t inner_autoinc_version = OB_INVALID_VERSION;
if (OB_FAIL(result->get_uint(0l, seq_value))) {
LOG_WARN("fail to get int_value.", K(ret));
} else if (OB_FAIL(result->get_uint(1l, sync_value))) {
LOG_WARN("fail to get int_value.", K(ret));
} else if (OB_FAIL(result->get_int(2l, inner_autoinc_version))) {
LOG_WARN("fail to get truncate_version.", K(ret));
} else if (OB_FAIL(check_inner_autoinc_version(tmp_autoinc_version, inner_autoinc_version, key))) {
LOG_WARN("fail to check inner_autoinc_version", KR(ret));
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
@ -1979,6 +2025,9 @@ int ObAutoIncInnerTableProxy::get_autoinc_value(const AutoincKey &key,
return ret;
}
// TODO: (xingrui.cwh)
// If this interface is used by another function except for __tenant_virtual_all_table,
// this interface will need to verfiy autoinc_version for correctness
int ObAutoIncInnerTableProxy::get_autoinc_value_in_batch(
const uint64_t tenant_id,
const common::ObIArray<AutoincKey> &keys,
@ -2071,7 +2120,7 @@ int ObAutoIncInnerTableProxy::get_autoinc_value_in_batch(
int ObAutoIncInnerTableProxy::sync_autoinc_value(const AutoincKey &key,
const uint64_t insert_value,
const uint64_t max_value,
const bool for_update,
const int64_t autoinc_version,
uint64_t &seq_value,
uint64_t &sync_value)
{
@ -2083,6 +2132,8 @@ int ObAutoIncInnerTableProxy::sync_autoinc_value(const AutoincKey &key,
ObSqlString sql;
bool with_snap_shot = true;
uint64_t fetch_seq_value = 0;
int64_t inner_autoinc_version = OB_INVALID_VERSION;
int64_t tmp_autoinc_version = get_modify_autoinc_version(autoinc_version);
if (OB_ISNULL(mysql_proxy_)) {
ret = OB_NOT_INIT;
LOG_WARN("mysql proxy is null", K(ret));
@ -2092,14 +2143,12 @@ int ObAutoIncInnerTableProxy::sync_autoinc_value(const AutoincKey &key,
const uint64_t exec_tenant_id = tenant_id;
const char *table_name = OB_ALL_AUTO_INCREMENT_TNAME;
int64_t fetch_table_id = OB_INVALID_ID;
if (OB_FAIL(sql.assign_fmt(" SELECT sequence_key, sequence_value, sync_value FROM %s"
" WHERE tenant_id = %lu AND sequence_key = %lu"
" AND column_id = %lu %s",
if (OB_FAIL(sql.assign_fmt(" SELECT sequence_key, sequence_value, sync_value, truncate_version FROM %s WHERE tenant_id = %lu AND sequence_key = %lu"
" AND column_id = %lu FOR UPDATE",
table_name,
ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
ObSchemaUtils::get_extract_schema_id(exec_tenant_id, table_id),
column_id,
for_update ? "FOR UPDATE" : ""))) {
column_id))) {
LOG_WARN("failed to assign sql", K(ret));
}
if (OB_SUCC(ret)) {
@ -2128,6 +2177,10 @@ int ObAutoIncInnerTableProxy::sync_autoinc_value(const AutoincKey &key,
LOG_WARN("failed to get int_value.", K(ret));
} else if (OB_FAIL(result->get_uint(2l, sync_value))) {
LOG_WARN("failed to get int_value.", K(ret));
} else if (OB_FAIL(result->get_int(3l, inner_autoinc_version))) {
LOG_WARN("failed to get inner_autoinc_version.", K(ret));
} else if (OB_FAIL(check_inner_autoinc_version(tmp_autoinc_version, inner_autoinc_version, key))) {
LOG_WARN("failed to check inner_autoinc_version", KR(ret));
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
@ -2171,9 +2224,9 @@ int ObAutoIncInnerTableProxy::sync_autoinc_value(const AutoincKey &key,
// > I can't get MAX_VALUE in DDL context. auto inc column type is needed.
if (OB_FAIL(sql.assign_fmt(
"UPDATE %s SET sync_value = %lu, sequence_value = %lu, gmt_modified = now(6) "
"WHERE tenant_id=%lu AND sequence_key=%lu AND column_id=%lu",
"WHERE tenant_id=%lu AND sequence_key=%lu AND column_id=%lu AND truncate_version=%ld",
table_name, sync_value, new_seq_value,
OB_INVALID_TENANT_ID, table_id, column_id))) {
OB_INVALID_TENANT_ID, table_id, column_id, inner_autoinc_version))) {
LOG_WARN("failed to assign sql", K(ret));
} else if (GCTX.is_standby_cluster() && OB_SYS_TENANT_ID != exec_tenant_id) {
ret = OB_OP_NOT_ALLOW;