Fix invalid rowid rowkey inside rowid

This commit is contained in:
Hongqin-Li
2023-10-16 03:39:21 +00:00
committed by ob-robot
parent 62e21b0cea
commit 8082123b1e
2 changed files with 25 additions and 16 deletions

View File

@ -12,6 +12,7 @@
#include "ob_urowid.h"
#include "common/ob_smart_call.h"
#include "common/ob_tablet_id.h"
#include "common/object/ob_object.h"
#include "lib/encode/ob_base64_encode.h"
@ -447,10 +448,10 @@ int ObURowIDData::decode2urowid(const char* input, const int64_t input_len,
urowid_data.rowid_content_ = decoded_buf;
urowid_data.rowid_len_ = pos;
// check validity after decoding
if (!urowid_data.is_valid_urowid()) {
if (OB_FAIL(urowid_data.check_is_valid_urowid())) {
urowid_data.rowid_content_ = NULL;
urowid_data.rowid_len_ = 0;
ret = OB_INVALID_ROWID;
COMMON_LOG(WARN, "invalid rowid", K(ret));
} else {
// do nothing
}
@ -487,35 +488,44 @@ int ObURowIDData::build_invalid_rowid_obj(ObIAllocator &alloc, ObObj *&rowid_obj
return ret;
}
bool ObURowIDData::is_valid_urowid() const
int ObURowIDData::check_is_valid_urowid() const
{
bool is_valid = true;
int ret = OB_SUCCESS;
const uint8_t version = get_version();
if (HEAP_TABLE_ROWID_VERSION == version) {
is_valid = rowid_len_ == HEAP_ORGANIZED_TABLE_ROWID_CONTENT_BUF_SIZE;
if (rowid_len_ != HEAP_ORGANIZED_TABLE_ROWID_CONTENT_BUF_SIZE) {
ret = OB_INVALID_ROWID;
}
} else if (EXT_HEAP_TABLE_ROWID_VERSION == version) {
is_valid = rowid_len_ == EXT_HEAP_ORGANIZED_TABLE_ROWID_CONTENT_BUF_SIZE;
if (rowid_len_ != EXT_HEAP_ORGANIZED_TABLE_ROWID_CONTENT_BUF_SIZE) {
ret = OB_INVALID_ROWID;
}
} else if (NO_PK_ROWID_VERSION == version
|| PK_ROWID_VERSION == version
|| LOB_NO_PK_ROWID_VERSION == version
|| EXTERNAL_TABLE_ROWID_VERSION == version) {
int64_t pos = get_pk_content_offset();
ObObj obj;
for (; is_valid && pos < rowid_len_; ) {
for (; OB_SUCC(ret) && pos < rowid_len_; ) {
ObObjType obj_type = get_pk_type(pos);
if (OB_UNLIKELY(ob_is_invalid_obj_type(obj_type))) {
is_valid = false;
ret = OB_INVALID_ROWID;
COMMON_LOG(WARN, "invalid obj type in rowid", K(ret), K(obj_type), K(pos), KPC(this));
} else if (OB_FAIL(get_pk_value(obj_type, pos, obj))) {
is_valid = false;
} else {
// do nothing
if (OB_NOT_SUPPORTED == ret) {
ret = OB_INVALID_ROWID; // oracle doesn't have such kind of rowkey
}
COMMON_LOG(WARN, "failed to get pk value", K(ret), K(obj_type), K(pos), KPC(this));
} else if (obj.is_urowid()) {
if (OB_FAIL(SMART_CALL(obj.get_urowid().check_is_valid_urowid()))) {
COMMON_LOG(WARN, "failed to check is valid urowid", K(ret), K(pos));
}
}
}
} else {
is_valid = false;
ret = OB_INVALID_ROWID;
}
return is_valid;
return ret;
}
bool ObURowIDData::is_valid_version(int64_t v)
@ -1028,8 +1038,7 @@ int ObURowIDData::get_pk_vals(ObIArray<ObObj> &pk_vals) const
{
int ret = OB_SUCCESS;
int64_t pos = 0;
if (OB_UNLIKELY(!is_valid_urowid())) {
ret = OB_INVALID_ROWID;
if (OB_FAIL(check_is_valid_urowid())) {
COMMON_LOG(WARN, "invalid rowid", K(ret));
} else if (is_physical_rowid()) {
if (OB_FAIL(get_rowkey_for_heap_organized_table(pk_vals))) {

View File

@ -216,7 +216,7 @@ private:
static bool is_valid_version(int64_t v);
bool is_valid_urowid() const;
int check_is_valid_urowid() const;
static int set_heap_organized_table_rowid_content(const ObIArray<ObObj> &args,
uint8_t *buf,