[BUG.FIX] fix ddl kv losed in table store

This commit is contained in:
Tyshawn 2023-06-30 02:42:08 +00:00 committed by ob-robot
parent deda754b81
commit 96d2098cdf
4 changed files with 150 additions and 4 deletions

View File

@ -853,6 +853,85 @@ int64_t ObMemtableArray::to_string(char *buf, const int64_t buf_len) const
return pos;
}
int ObDDLKVArray::init(
ObArenaAllocator &allocator,
common::ObIArray<ObITable *> &ddl_kvs)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("initialize twice", K(ret), KPC(this));
} else {
count_ = 0;
ddl_kvs_ = nullptr;
if (0 != ddl_kvs.count()) {
const int64_t size = sizeof(ObITable *) * ddl_kvs.count();
if (OB_ISNULL(ddl_kvs_ = static_cast<ObITable **>(allocator.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate ddl kv pointer arrays", K(ret), K(size));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_kvs.count(); ++i) {
ObITable *table = ddl_kvs.at(i);
if (OB_UNLIKELY(nullptr == table || !table->is_ddl_sstable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table must be ddl kv", K(ret), K(i), KPC(table));
} else {
ddl_kvs_[count_] = table;
++count_;
}
}
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
}
if (OB_UNLIKELY(!is_inited_)) {
reset();
}
return ret;
}
int ObDDLKVArray::deep_copy(
char *dst_buf,
const int64_t buf_size,
int64_t &pos,
ObDDLKVArray &dst) const
{
int ret = OB_SUCCESS;
dst.reset();
const int64_t deep_copy_size = get_deep_copy_size();
if (OB_ISNULL(dst_buf) || OB_UNLIKELY(buf_size - pos < deep_copy_size)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("finvalid argument", K(ret), KP(dst_buf), K(buf_size), K(pos), K(deep_copy_size), K(count_));
} else {
dst.ddl_kvs_ = 0 == count_ ? nullptr : reinterpret_cast<ObITable **>(dst_buf + pos);
const int64_t array_size = count_ * sizeof(ObITable *);
pos += array_size;
for (int64_t i = 0; i < count_; ++i) {
dst.ddl_kvs_[i] = ddl_kvs_[i];
}
dst.count_ = count_;
dst.is_inited_ = is_inited_;
}
return ret;
}
int64_t ObDDLKVArray::to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
if (OB_ISNULL(buf) || buf_len <= 0) {
// do nothing
} else {
J_OBJ_START();
J_NAME("ObDDLKVArray");
J_KV(KP(this),
K_(count),
"ddl_kv_ptr_array", ObArrayWrap<ObITable *>(ddl_kvs_, count_));
J_OBJ_END();
}
return pos;
}
/* ObTableStoreUtil Section */
bool ObTableStoreUtil::ObITableLogTsRangeCompare::operator()(
const ObITable *ltable, const ObITable *rtable) const

View File

@ -122,6 +122,45 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObMemtableArray);
};
class ObDDLKVArray final
{
public:
static const int64_t DDL_KV_ARRAY_SIZE = 64;
public:
ObDDLKVArray() : is_inited_(false), ddl_kvs_(nullptr), count_(0) {}
~ObDDLKVArray() { reset(); }
OB_INLINE ObITable *operator[](const int64_t pos) const
{
ObITable *ddl_kv = nullptr;
if (OB_UNLIKELY(!is_valid() || pos < 0 || pos >= count_)) {
ddl_kv = nullptr;
} else {
ddl_kv = ddl_kvs_[pos];
}
return ddl_kv;
}
OB_INLINE void reset()
{
is_inited_ = false;
ddl_kvs_ = nullptr;
count_ = 0;
}
OB_INLINE bool count() const { return count_; }
OB_INLINE bool empty() const { return 0 == count_; }
OB_INLINE bool is_valid() const { return 1 == count_ || (is_inited_ && count_ > 1 && nullptr != ddl_kvs_); }
OB_INLINE int64_t get_deep_copy_size() const { return count_ * sizeof(ObITable *); }
int init(ObArenaAllocator &allocator, common::ObIArray<ObITable *> &ddl_kvs);
int deep_copy(char *buf, const int64_t buf_size, int64_t &pos, ObDDLKVArray &dst) const;
int64_t to_string(char *buf, const int64_t buf_len) const;
private:
bool is_inited_;
ObITable **ddl_kvs_;
int64_t count_;
private:
DISALLOW_COPY_AND_ASSIGN(ObDDLKVArray);
};
struct ObTableStoreUtil
{
struct ObITableLogTsRangeCompare {

View File

@ -2541,7 +2541,7 @@ int64_t ObPrintTableStore::to_string(char *buf, const int64_t buf_len) const
print_arr(major_tables_, "MAJOR", buf, buf_len, pos, is_print);
print_arr(minor_tables_, "MINOR", buf, buf_len, pos, is_print);
print_arr(ddl_sstables_, "DDL_DUMP", buf, buf_len, pos, is_print);
print_arr(ddl_mem_sstables_, "DDL_MEM", buf, buf_len, pos, is_print);
print_ddl_mem(ddl_mem_sstables_, "DDL_MEM", buf, buf_len, pos, is_print);
print_mem(memtables_, "MEM", buf, buf_len, pos, is_print);
print_arr(meta_major_tables_, "META_MAJOR", buf, buf_len, pos, is_print);
} else {
@ -2574,6 +2574,27 @@ void ObPrintTableStore::print_mem(
is_print = true;
}
}
void ObPrintTableStore::print_ddl_mem(
const ObDDLKVArray &tables,
const char* table_arr,
char *buf,
const int64_t buf_len,
int64_t &pos,
bool &is_print) const
{
for (int64_t i = 0; i < tables.count(); ++i) {
if (is_print && 0 == i) {
J_NEWLINE();
}
table_to_string(tables[i], i == 0 ? table_arr : " ", buf, buf_len, pos);
if (i < tables.count() - 1) {
J_NEWLINE();
}
}
if (tables.count() > 0) {
is_print = true;
}
}
void ObPrintTableStore::print_arr(
const ObSSTableArray &tables,

View File

@ -88,7 +88,6 @@ public:
OB_INLINE const storage::ObSSTableArray &get_major_sstables() const { return major_tables_; }
OB_INLINE const storage::ObSSTableArray &get_minor_sstables() const { return minor_tables_; }
OB_INLINE const storage::ObSSTableArray &get_ddl_sstables() const { return ddl_sstables_; }
OB_INLINE const storage::ObSSTableArray &get_ddl_memtables() const { return ddl_mem_sstables_; }
OB_INLINE const storage::ObSSTableArray &get_meta_major_sstables() const { return meta_major_tables_; }
OB_INLINE blocksstable::ObSSTable *get_meta_major_sstable() const
{
@ -323,7 +322,7 @@ private:
ObSSTableArray ddl_sstables_;
ObSSTableArray meta_major_tables_;
ObMemtableArray memtables_;
ObSSTableArray ddl_mem_sstables_;
ObDDLKVArray ddl_mem_sstables_;
bool is_ready_for_read_;
bool is_inited_;
@ -362,11 +361,19 @@ private:
int64_t &pos,
bool &is_print) const;
void print_ddl_mem(
const ObDDLKVArray &tables,
const char* table_arr,
char *buf,
const int64_t buf_len,
int64_t &pos,
bool &is_print) const;
private:
const ObSSTableArray &major_tables_;
const ObSSTableArray &minor_tables_;
const ObMemtableArray &memtables_;
const ObSSTableArray &ddl_mem_sstables_;
const ObDDLKVArray &ddl_mem_sstables_;
const ObSSTableArray &ddl_sstables_;
const ObSSTableArray &meta_major_tables_;
const bool is_ready_for_read_;