persist lob access share same table param
This commit is contained in:
parent
218b4fec80
commit
98db7a6727
@ -147,8 +147,8 @@ private:
|
||||
allocator_(tenant_id),
|
||||
lob_ctxs_(),
|
||||
lob_ctx_(),
|
||||
meta_manager_(),
|
||||
piece_manager_()
|
||||
meta_manager_(tenant_id),
|
||||
piece_manager_(tenant_id)
|
||||
{}
|
||||
public:
|
||||
~ObLobManager() { destroy(); }
|
||||
|
@ -147,7 +147,9 @@ private:
|
||||
|
||||
class ObLobMetaManager {
|
||||
public:
|
||||
ObLobMetaManager() {}
|
||||
explicit ObLobMetaManager(const uint64_t tenant_id) :
|
||||
persistent_lob_adapter_(tenant_id)
|
||||
{}
|
||||
~ObLobMetaManager() {}
|
||||
// write one lob meta row
|
||||
int write(ObLobAccessParam& param, ObLobMetaInfo& in_row);
|
||||
|
@ -30,10 +30,132 @@ namespace oceanbase
|
||||
namespace storage
|
||||
{
|
||||
|
||||
ObPersistentLobApator::~ObPersistentLobApator()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
void ObPersistentLobApator::destroy()
|
||||
{
|
||||
STORAGE_LOG(INFO, "[LOB] destroy lob persist", K(tenant_id_));
|
||||
if (OB_NOT_NULL(meta_table_param_)) {
|
||||
meta_table_param_->reset();
|
||||
meta_table_param_->~ObTableParam();
|
||||
allocator_.free(meta_table_param_);
|
||||
meta_table_param_ = nullptr;
|
||||
}
|
||||
if (OB_NOT_NULL(meta_table_dml_param_)) {
|
||||
meta_table_dml_param_->reset();
|
||||
meta_table_dml_param_->~ObTableDMLParam();
|
||||
allocator_.free(meta_table_dml_param_);
|
||||
meta_table_dml_param_ = nullptr;
|
||||
}
|
||||
allocator_.reset();
|
||||
}
|
||||
|
||||
int ObPersistentLobApator::init_meta_column_ids(ObSEArray<uint64_t, 6> &meta_column_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (uint32_t i = 0; OB_SUCC(ret) && i < ObLobMetaUtil::LOB_META_COLUMN_CNT; i++) {
|
||||
if (OB_FAIL(meta_column_ids.push_back(OB_APP_MIN_COLUMN_ID + i))) {
|
||||
LOG_WARN("push col id fail", K(ret), K(i));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPersistentLobApator::get_meta_table_param(const ObTableParam *&table_param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (! ATOMIC_LOAD(&table_param_inited_)) {
|
||||
ObLockGuard<ObSpinLock> guard(lock_);
|
||||
if (! ATOMIC_LOAD(&table_param_inited_)) {
|
||||
if (OB_FAIL(init_table_param())) {
|
||||
LOG_ERROR("init_table_param fail", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("init_table_param success", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_param = ATOMIC_LOAD(&meta_table_param_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObPersistentLobApator::get_meta_table_dml_param(const ObTableDMLParam *&table_param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (! ATOMIC_LOAD(&table_param_inited_)) {
|
||||
ObLockGuard<ObSpinLock> guard(lock_);
|
||||
if (! ATOMIC_LOAD(&table_param_inited_)) {
|
||||
if (OB_FAIL(init_table_param())) {
|
||||
LOG_ERROR("init_table_param fail", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("init_table_param success", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_param = ATOMIC_LOAD(&meta_table_dml_param_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPersistentLobApator::init_table_param()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator tmp_allocator("TmpLobPersist", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_);
|
||||
ObSEArray<uint64_t, 6> meta_column_ids;
|
||||
HEAP_VAR(ObTableSchema, meta_schema, &tmp_allocator) {
|
||||
if (ATOMIC_LOAD(&table_param_inited_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("init again", KR(ret), K(table_param_inited_));
|
||||
} else if (nullptr != meta_table_param_ || nullptr != meta_table_dml_param_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("init again", KR(ret), KP(meta_table_param_), KP(meta_table_dml_param_));
|
||||
} else if (OB_FAIL(share::ObInnerTableSchema::all_column_aux_lob_meta_schema(meta_schema))) {
|
||||
LOG_ERROR("get lob meta schema fail", KR(ret));
|
||||
} else if (OB_FAIL(init_meta_column_ids(meta_column_ids))) {
|
||||
LOG_ERROR("init_meta_column_ids fail", KR(ret));
|
||||
} else if (OB_FALSE_IT(ATOMIC_STORE(&meta_table_param_, OB_NEWx(ObTableParam, &allocator_, allocator_)))) {
|
||||
} else if (OB_ISNULL(meta_table_param_)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc meta_table_param fail", KR(ret), "size", sizeof(ObTableParam));
|
||||
} else if (OB_FAIL(meta_table_param_->convert(meta_schema, meta_column_ids, ObStoragePushdownFlag()))) {
|
||||
LOG_ERROR("Fail to convert table param", KR(ret), K(meta_schema));
|
||||
} else if (OB_FALSE_IT(ATOMIC_STORE(&meta_table_dml_param_, OB_NEWx(ObTableDMLParam, &allocator_, allocator_)))) {
|
||||
} else if (OB_ISNULL(meta_table_dml_param_)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc meta_table_param fail", KR(ret), "size", sizeof(ObTableDMLParam));
|
||||
} else if (OB_FAIL(meta_table_dml_param_->convert(&meta_schema, meta_schema.get_schema_version(), meta_column_ids))) {
|
||||
LOG_ERROR("Fail to convert table param", KR(ret), K(meta_schema));
|
||||
} else {
|
||||
ATOMIC_STORE(&table_param_inited_, true);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPersistentLobApator::prepare_table_param(
|
||||
const ObLobAccessParam ¶m,
|
||||
ObTableScanParam &scan_param,
|
||||
bool is_meta)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_meta) {
|
||||
if (OB_UNLIKELY(scan_param.table_param_ != NULL)) {
|
||||
} else if (OB_FAIL(get_meta_table_param(scan_param.table_param_))) {
|
||||
LOG_WARN("get_meta_table_param fail", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(prepare_piece_table_param(param, scan_param))) {
|
||||
LOG_WARN("prepare_piece_table_param fail", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPersistentLobApator::prepare_piece_table_param(
|
||||
const ObLobAccessParam ¶m,
|
||||
ObTableScanParam &scan_param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *buf = NULL;
|
||||
@ -42,9 +164,7 @@ int ObPersistentLobApator::prepare_table_param(
|
||||
// FIXME: use convert with ObStorageSchema intead of hard-code schema
|
||||
if (OB_UNLIKELY(scan_param.table_param_ != NULL)) {
|
||||
//do nothing
|
||||
} else if (is_meta && OB_FAIL(share::ObInnerTableSchema::all_column_aux_lob_meta_schema(table_schema))) {
|
||||
LOG_WARN("get lob meta schema failed", K(ret));
|
||||
} else if (!is_meta && OB_FAIL(share::ObInnerTableSchema::all_column_aux_lob_piece_schema(table_schema))) {
|
||||
} else if (OB_FAIL(share::ObInnerTableSchema::all_column_aux_lob_piece_schema(table_schema))) {
|
||||
LOG_WARN("get lob piece schema failed", K(ret));
|
||||
} else {
|
||||
// table_schema.set_tablet_id();
|
||||
@ -330,8 +450,8 @@ int ObPersistentLobApator::build_lob_meta_table_dml(
|
||||
LOG_WARN("invalid seq no from param.", K(ret), K(param));
|
||||
}
|
||||
|
||||
HEAP_VAR(ObTableSchema, tbl_schema, param.allocator_) {
|
||||
ObTableSchema* table_schema = param.meta_table_schema_;
|
||||
{
|
||||
const ObTableSchema* table_schema = param.meta_table_schema_;
|
||||
|
||||
for (int i = 0; OB_SUCC(ret) && i < ObLobMetaUtil::LOB_META_COLUMN_CNT; ++i) {
|
||||
if (OB_FAIL(column_ids.push_back(OB_APP_MIN_COLUMN_ID + i))) {
|
||||
@ -341,12 +461,7 @@ int ObPersistentLobApator::build_lob_meta_table_dml(
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (table_schema == nullptr) {
|
||||
table_schema = &tbl_schema;
|
||||
if (OB_FAIL(get_lob_tablet_schema(tenant_id, true, *table_schema, dml_base_param.tenant_schema_version_))) {
|
||||
LOG_WARN("failed get lob tablet schema.", K(ret));
|
||||
} else {
|
||||
dml_base_param.schema_version_ = lob_meta_tablet.get_obj()->get_tablet_meta().max_sync_storage_schema_version_;
|
||||
}
|
||||
dml_base_param.schema_version_ = lob_meta_tablet.get_obj()->get_tablet_meta().max_sync_storage_schema_version_;
|
||||
} else {
|
||||
/**
|
||||
* for test current
|
||||
@ -356,10 +471,8 @@ int ObPersistentLobApator::build_lob_meta_table_dml(
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(dml_param->convert(table_schema, dml_base_param.tenant_schema_version_, column_ids))) {
|
||||
LOG_WARN("failed to convert dml param.", K(ret));
|
||||
} else {
|
||||
dml_base_param.table_param_ = dml_param;
|
||||
} else if (OB_FAIL(get_meta_table_dml_param(dml_base_param.table_param_))) {
|
||||
LOG_WARN("get_meta_table_dml_param fail", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -46,7 +46,20 @@ private:
|
||||
class ObPersistentLobApator : public ObILobApator
|
||||
{
|
||||
public:
|
||||
ObPersistentLobApator() {}
|
||||
explicit ObPersistentLobApator(const uint64_t tenant_id):
|
||||
tenant_id_(tenant_id),
|
||||
allocator_(lib::ObMemAttr(tenant_id, "LobPersist", ObCtxIds::LOB_CTX_ID)),
|
||||
table_param_inited_(false),
|
||||
meta_table_param_(nullptr),
|
||||
meta_table_dml_param_(nullptr)
|
||||
{}
|
||||
|
||||
virtual ~ObPersistentLobApator();
|
||||
|
||||
virtual void destroy();
|
||||
|
||||
int get_meta_table_param(const ObTableParam *&table_param);
|
||||
int get_meta_table_dml_param(const ObTableDMLParam *&table_param);
|
||||
virtual int scan_lob_meta(ObLobAccessParam ¶m,
|
||||
ObTableScanParam &scan_param,
|
||||
common::ObNewRowIterator *&meta_iter) override;
|
||||
@ -139,6 +152,22 @@ private:
|
||||
ObNewRow& new_row,
|
||||
common::ObSingleRowIteratorWrapper* new_row_iter,
|
||||
ObLobPieceInfo& in_row);
|
||||
|
||||
int init_table_param();
|
||||
int init_meta_column_ids(ObSEArray<uint64_t, 6> &meta_column_ids);
|
||||
int prepare_piece_table_param(
|
||||
const ObLobAccessParam ¶m,
|
||||
ObTableScanParam &scan_param);
|
||||
|
||||
private:
|
||||
|
||||
const uint64_t tenant_id_;
|
||||
ObArenaAllocator allocator_;
|
||||
mutable ObSpinLock lock_;
|
||||
bool table_param_inited_;
|
||||
ObTableParam *meta_table_param_;
|
||||
ObTableDMLParam *meta_table_dml_param_;
|
||||
|
||||
private:
|
||||
static const uint64_t LOB_EXPIRE_TIME_US = 3 * 1000 * 1000; // 3s
|
||||
};
|
||||
|
@ -40,7 +40,9 @@ private:
|
||||
class ObLobPieceManager
|
||||
{
|
||||
public:
|
||||
ObLobPieceManager() {}
|
||||
explicit ObLobPieceManager(const uint64_t tenant_id):
|
||||
persistent_lob_adapter_(tenant_id)
|
||||
{}
|
||||
~ObLobPieceManager() {}
|
||||
int get(ObLobAccessParam& param, uint64_t piece_id, ObLobPieceInfo& info);
|
||||
int write(ObLobAccessParam& param, ObLobPieceInfo& in_row);
|
||||
|
Loading…
x
Reference in New Issue
Block a user