Fix add column with transfer makes insert core bug.
This commit is contained in:
parent
0c3e8eaa9b
commit
867c039a90
@ -1184,7 +1184,8 @@ int ObMicroBlockDecoder::init_decoders()
|
||||
*/
|
||||
if (OB_ISNULL(read_info_) || typeid(ObRowkeyReadInfo) == typeid(*read_info_)) {
|
||||
ObObjMeta col_type;
|
||||
if (OB_UNLIKELY(header_->column_count_ < request_cnt_ && nullptr == read_info_)) {
|
||||
if (OB_UNLIKELY((header_->column_count_ < request_cnt_ && nullptr == read_info_) ||
|
||||
(header_->column_count_ > request_cnt_))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("for empty read info, request cnt is invalid", KR(ret), KP(read_info_), KPC(header_), K(request_cnt_));
|
||||
}
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "storage/tx/wrs/ob_weak_read_util.h"
|
||||
#include "rootserver/mview/ob_collect_mv_merge_info_task.h"
|
||||
#include "storage/high_availability/ob_transfer_parallel_build_tablet_info.h"
|
||||
#include "share/schema/ob_tenant_schema_service.h"
|
||||
|
||||
using namespace oceanbase::transaction;
|
||||
using namespace oceanbase::share;
|
||||
@ -537,6 +538,8 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
|
||||
LOG_WARN("failed to get dest ls max desided scn", K(ret), K(task_info));
|
||||
} else if (!new_transfer && !enable_kill_trx && OB_FAIL(check_src_ls_has_active_trans_(task_info.src_ls_id_))) {
|
||||
LOG_WARN("failed to check src ls active trans", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(ctx_.build_storage_schema_info(task_info, timeout_ctx))) {
|
||||
LOG_WARN("failed to build latest storage schema", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(update_all_tablet_to_ls_(task_info, trans))) {
|
||||
LOG_WARN("failed to update all tablet to ls", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(lock_tablet_on_dest_ls_for_table_lock_(task_info, trans))) {
|
||||
@ -1424,9 +1427,9 @@ int ObTransferHandler::get_start_trans_timeout_(
|
||||
stmt_timeout = 10_s;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||
const int64_t LOCK_MEMBER_LIST_TIMEOUT = 10_s;
|
||||
const int64_t BASELINE_TIMEOUT = 20_s;
|
||||
if (tenant_config.is_valid()) {
|
||||
stmt_timeout = tenant_config->_transfer_start_trans_timeout + LOCK_MEMBER_LIST_TIMEOUT;
|
||||
stmt_timeout = tenant_config->_transfer_start_trans_timeout + BASELINE_TIMEOUT;
|
||||
if (tenant_config->_enable_balance_kill_transaction) {
|
||||
stmt_timeout += tenant_config->_balance_kill_transaction_threshold;
|
||||
stmt_timeout += tenant_config->_balance_wait_killing_transaction_end_threshold;
|
||||
|
@ -396,7 +396,6 @@ private:
|
||||
const share::ObTransferTaskInfo &task_info);
|
||||
void finish_parallel_tablet_info_dag_(
|
||||
const share::ObTransferTaskInfo &task_info);
|
||||
|
||||
private:
|
||||
static const int64_t INTERVAL_US = 1 * 1000 * 1000; //1s
|
||||
static const int64_t KILL_TX_MAX_RETRY_TIMES = 3;
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "storage/high_availability/ob_storage_ha_utils.h"
|
||||
#include "storage/tx/ob_ts_mgr.h"
|
||||
#include "storage/high_availability/ob_storage_ha_diagnose_mgr.h"
|
||||
#include "storage/compaction/ob_medium_compaction_func.h"
|
||||
|
||||
using namespace oceanbase;
|
||||
using namespace share;
|
||||
@ -1211,7 +1212,9 @@ int ObTransferRelatedInfo::get_related_info_task_id(share::ObTransferTaskID &tas
|
||||
/******************ObTransferTabletInfoMgr*********************/
|
||||
ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::ObTransferTabletInfoMgr()
|
||||
: lock_(),
|
||||
tablet_info_array_()
|
||||
tablet_info_array_(),
|
||||
storage_schema_mgr_()
|
||||
|
||||
{
|
||||
}
|
||||
|
||||
@ -1219,6 +1222,7 @@ ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::~ObTransferTabletInfoMgr(
|
||||
{
|
||||
common::SpinWLockGuard guard(lock_);
|
||||
tablet_info_array_.reset();
|
||||
storage_schema_mgr_.reset();
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::add_tablet_info(
|
||||
@ -1253,7 +1257,24 @@ int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::get_tablet_info(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get tablet info get invalid argument", K(ret), K(index));
|
||||
} else {
|
||||
param = &tablet_info_array_.at(index);
|
||||
ObMigrationTabletParam &tmp_param = tablet_info_array_.at(index);
|
||||
ObStorageSchema *storage_schema = nullptr;
|
||||
if (OB_FAIL(storage_schema_mgr_.get_storage_schema(tmp_param.tablet_id_, storage_schema))) {
|
||||
LOG_WARN("failed to get storage schema", K(tmp_param));
|
||||
} else if (OB_ISNULL(storage_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("storage schema should not be NULL", K(ret), K(tmp_param));
|
||||
} else if (storage_schema->column_cnt_ > tmp_param.storage_schema_.column_cnt_) {
|
||||
LOG_INFO("modified storage schema", "new_storage_schema", *storage_schema,
|
||||
"old_storage_schema", tmp_param.storage_schema_);
|
||||
if (OB_FAIL(tmp_param.storage_schema_.assign(tmp_param.allocator_, *storage_schema))) {
|
||||
LOG_WARN("failed to assign storage schema", K(ret), K(tmp_param), KPC(storage_schema));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
param = &tablet_info_array_.at(index);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1262,6 +1283,23 @@ void ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::reuse()
|
||||
{
|
||||
common::SpinWLockGuard guard(lock_);
|
||||
tablet_info_array_.reset();
|
||||
storage_schema_mgr_.reset();
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::build_storage_schema(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!task_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("build storage schema info get invalid argument", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(storage_schema_mgr_.init(task_info.tablet_list_.count()))) {
|
||||
LOG_WARN("failed to init storage schema mgr", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(storage_schema_mgr_.build_storage_schema(task_info, timeout_ctx))) {
|
||||
LOG_WARN("failed to build storage schema", K(ret), K(task_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/******************ObTransferBuildTabletInfoCtx*********************/
|
||||
@ -1428,3 +1466,199 @@ int64_t ObTransferBuildTabletInfoCtx::get_tablet_info_num() const
|
||||
{
|
||||
return mgr_.get_tablet_info_num();
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::build_storage_schema_info(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (!task_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("build storage schema info get invalid argument", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(mgr_.build_storage_schema(task_info, timeout_ctx))) {
|
||||
LOG_WARN("failed to build storage schema", K(ret), K(task_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/******************ObTransferStorageSchemaMgr*********************/
|
||||
ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::ObTransferStorageSchemaMgr()
|
||||
: is_inited_(false),
|
||||
allocator_(),
|
||||
storage_schema_map_()
|
||||
{
|
||||
ObMemAttr attr(MTL_ID(), "TransferSchema");
|
||||
allocator_.set_attr(attr);
|
||||
}
|
||||
|
||||
ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::~ObTransferStorageSchemaMgr()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
void ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::reset()
|
||||
{
|
||||
FOREACH(iter, storage_schema_map_) {
|
||||
ObStorageSchema *storage_schema = iter->second;
|
||||
storage_schema->~ObStorageSchema();
|
||||
}
|
||||
storage_schema_map_.destroy();
|
||||
allocator_.reset();
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::init(const int64_t bucket_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMemAttr attr(MTL_ID(), "TransferSchema");
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("transfer storage schema mgr is already init", K(ret));
|
||||
} else if (bucket_num <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("transfer storage schema mgr init get invalid argument", K(ret), K(bucket_num));
|
||||
} else if (OB_FAIL(storage_schema_map_.create(bucket_num, attr))) {
|
||||
LOG_WARN("failed to create storage schema map", K(ret), K(bucket_num));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::build_storage_schema(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("transfer storage schema mgr do not init", K(ret));
|
||||
} else if (!task_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("build storage schema get invalid argument", K(ret), K(task_info));
|
||||
} else if (OB_FAIL(build_latest_storage_schema_(task_info, timeout_ctx))) {
|
||||
LOG_WARN("failed to build latest storage schema", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::get_storage_schema(
|
||||
const ObTabletID &tablet_id,
|
||||
ObStorageSchema *&storage_schema)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
storage_schema = nullptr;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("transfer storage schema mgr do not init", K(ret));
|
||||
} else if (!tablet_id.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get storage schema get invalid argument", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(storage_schema_map_.get_refactored(tablet_id, storage_schema))) {
|
||||
LOG_WARN("failed to get storage schema", K(ret), K(tablet_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::build_latest_storage_schema_(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSchemaService *server_schema_service = nullptr;
|
||||
ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
const int64_t start_ts = ObTimeUtil::current_time();
|
||||
ObLSService *ls_service = nullptr;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("transfer storage schema mgr do not init", K(ret));
|
||||
} else if (OB_ISNULL(GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_service is null", KR(ret));
|
||||
} else if (OB_ISNULL(server_schema_service = GCTX.schema_service_->get_schema_service())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("server_schema_service is null", KR(ret));
|
||||
} else if (OB_ISNULL(schema_service = MTL(ObTenantSchemaService *)->get_schema_service())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get schema service from MTL", K(ret));
|
||||
} else if (OB_ISNULL(ls_service = MTL(ObLSService*))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get ObLSService from MTL", K(ret), KP(ls_service));
|
||||
} else if (OB_FAIL(ls_service->get_ls(task_info.src_ls_id_, ls_handle, ObLSGetMod::HA_MOD))) {
|
||||
LOG_WARN("failed to get ls", K(ret), K(task_info));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls should not be NULL", K(ret), KP(ls), K(task_info));
|
||||
} else {
|
||||
ObRefreshSchemaStatus status;
|
||||
status.tenant_id_ = task_info.tenant_id_;
|
||||
int64_t schema_version = 0;
|
||||
|
||||
if (OB_FAIL(server_schema_service->fetch_schema_version(status, *GCTX.sql_proxy_, schema_version))) {
|
||||
LOG_WARN("fail to fetch schema version", KR(ret), K(status));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < task_info.tablet_list_.count(); ++i) {
|
||||
const ObTabletID &tablet_id = task_info.tablet_list_.at(i).tablet_id_;
|
||||
if (timeout_ctx.is_timeouted()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("transfer prepare storage schema timeout", K(ret));
|
||||
} else if (OB_FAIL(build_tablet_storage_schema_(task_info, tablet_id, schema_version, ls, *schema_service))) {
|
||||
LOG_WARN("failed to build tablet storage schema", K(ret), K(tablet_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("finish build storage schema", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferBuildTabletInfoCtx::ObTransferStorageSchemaMgr::build_tablet_storage_schema_(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
const ObTabletID &tablet_id,
|
||||
const int64_t schema_version,
|
||||
ObLS *ls,
|
||||
ObMultiVersionSchemaService &schema_service)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
ObStorageSchema *storage_schema = nullptr;
|
||||
bool is_skip_merge_index = false;
|
||||
uint64_t compat_version = 0;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("transfer storage schema mgr do not init", K(ret));
|
||||
} else if (schema_version < 0 || !tablet_id.is_valid() || OB_ISNULL(ls)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get table storage schema get invalid argument", K(ret), K(schema_version));
|
||||
} else if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle, 0,
|
||||
ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(task_info));
|
||||
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet should not be NULL", K(ret), KP(tablet));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(task_info.tenant_id_, compat_version))) {
|
||||
LOG_WARN("fail to get data version", KR(ret), K(task_info));
|
||||
} else if (OB_FAIL(ObStorageSchemaUtil::alloc_storage_schema(allocator_, storage_schema))) {
|
||||
LOG_WARN("failed to alloc storage schema", K(ret));
|
||||
} else if (OB_FAIL(compaction::ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
|
||||
schema_service, *tablet, schema_version, compat_version, allocator_, *storage_schema, is_skip_merge_index))) {
|
||||
LOG_WARN("failed to get table schema to merge", K(ret), KPC(tablet), K(task_info));
|
||||
} else if (OB_FAIL(storage_schema_map_.set_refactored(tablet_id, storage_schema))) {
|
||||
LOG_WARN("failed to push storage schema into map", K(ret), K(tablet_id), KPC(storage_schema));
|
||||
} else {
|
||||
storage_schema = nullptr;
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(storage_schema)) {
|
||||
storage_schema->~ObStorageSchema();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/tablet/ob_tablet_create_delete_mds_user_data.h"
|
||||
#include "share/ob_storage_ha_diagnose_struct.h"
|
||||
#include "lib/hash/ob_hashmap.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -360,13 +361,45 @@ public:
|
||||
int add_tablet_info(const ObMigrationTabletParam ¶m);
|
||||
int get_tablet_info(const int64_t index, const ObMigrationTabletParam *¶m);
|
||||
int64_t get_tablet_info_num() const;
|
||||
|
||||
int build_storage_schema_info(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx);
|
||||
TO_STRING_KV(K_(index), K_(tablet_info_array), K_(child_task_num), K_(total_tablet_count),
|
||||
K_(result), K_(data_version), K_(task_id));
|
||||
private:
|
||||
bool is_valid_() const;
|
||||
|
||||
private:
|
||||
class ObTransferStorageSchemaMgr final
|
||||
{
|
||||
public:
|
||||
ObTransferStorageSchemaMgr();
|
||||
~ObTransferStorageSchemaMgr();
|
||||
int init(const int64_t bucket_num);
|
||||
int build_storage_schema(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx);
|
||||
int get_storage_schema(
|
||||
const ObTabletID &tablet_id,
|
||||
ObStorageSchema *&storage_schema);
|
||||
void reset();
|
||||
private:
|
||||
int build_latest_storage_schema_(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx);
|
||||
int build_tablet_storage_schema_(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
const ObTabletID &tablet_id,
|
||||
const int64_t schema_version,
|
||||
ObLS *ls,
|
||||
ObMultiVersionSchemaService &schema_service);
|
||||
private:
|
||||
bool is_inited_;
|
||||
common::ObArenaAllocator allocator_;
|
||||
hash::ObHashMap<ObTabletID, ObStorageSchema *> storage_schema_map_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTransferStorageSchemaMgr);
|
||||
};
|
||||
|
||||
struct ObTransferTabletInfoMgr final
|
||||
{
|
||||
public:
|
||||
@ -376,11 +409,14 @@ private:
|
||||
int64_t get_tablet_info_num() const;
|
||||
int get_tablet_info(const int64_t index, const ObMigrationTabletParam *¶m);
|
||||
void reuse();
|
||||
|
||||
int build_storage_schema(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx);
|
||||
TO_STRING_KV(K_(tablet_info_array));
|
||||
private:
|
||||
common::SpinRWLock lock_;
|
||||
common::ObArray<ObMigrationTabletParam> tablet_info_array_;
|
||||
ObTransferStorageSchemaMgr storage_schema_mgr_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTransferTabletInfoMgr);
|
||||
};
|
||||
private:
|
||||
@ -397,6 +433,9 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTransferBuildTabletInfoCtx);
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user